In [2]:
import os

In [3]:
pwd

'd:\\MLOPS\\NetWork Security\\expariments'

In [4]:
os.chdir('../')

In [5]:
pwd

'd:\\MLOPS\\NetWork Security'

## Traning Pipline Config

In [6]:
import numpy as np
import pandas as pd

In [7]:
from networksecurity.constant import traning_pipline


## Config Entity

In [8]:
from datetime import datetime

class TraningPiplineConfig:
    def __init__(self) -> None:
      self.pipline_name=traning_pipline.PIPELINE_NAME   
      self.artifact_name=traning_pipline.ARTIFACT_DIR
      self.artifact_dir=os.path.join(self.artifact_name)
      
class DataValidationConfig:
    def __init__(self,traning_pipline_config:TraningPiplineConfig) -> None:
      self.data_validation_dir:str=os.path.join(
      	traning_pipline_config.artifact_dir,traning_pipline.DATA_VALIDATION_DIR_NAME ## crating data validaton folder inside artifacts
		)
      self.valid_dir_name:str=os.path.join(
         self.data_validation_dir, traning_pipline.DATA_VALIDATION_VALID_DIR ## validated report folder inside data validation folder
		)
      self.invalid_dir_name:str=os.path.join(
         self.data_validation_dir,traning_pipline.DATA_VALIDATION_INVALID_DIR ## invalid report folder inside data validation folder
		)
      self.drift_report_dir:str=os.path.join(
         self.data_validation_dir,traning_pipline.DATA_VALIDATION_DRIFT_REPORT_DIR,traning_pipline.DATA_VALIDATION_DRIFT_REPORT_FILE_NAME # data validation dir, drift report dir, report name
		)
      self.valid_traning_data_store_path:str=os.path.join(
         traning_pipline_config.artifact_dir, traning_pipline.DATA_INGESTION__DIR, traning_pipline.TRAIN_FILE_NAME  ## artifacts folder , ingest folder , train data path
		)
      self.valid_test_data_store_path:str=os.path.join(
         traning_pipline_config.artifact_dir,traning_pipline.DATA_INGESTION__DIR, traning_pipline.TEST_FILE_NAME ## artifacts folder , ingest folder , test data path
		)
      self.invalid_traning_data_store_path:str=os.path.join(
         traning_pipline_config.artifact_dir, traning_pipline.DATA_INGESTION__DIR, traning_pipline.TRAIN_FILE_NAME  ## artifacts folder , ingest folder , train data path
		)
      self.invalid_test_data_store_path:str=os.path.join(
         traning_pipline_config.artifact_dir,traning_pipline.DATA_INGESTION__DIR, traning_pipline.TEST_FILE_NAME ## artifacts folder , ingest folder , test data path
		)

## Components Output Artifacts Entity

In [9]:
from dataclasses import dataclass
@dataclass
class DataValidationArtifact:
    validation_status:bool
    valid_train_path:str
    valid_test_path:str
    invalid_train_path:str
    invalid_test_path:str
    drift_report_path:str

## Data Validation Component

In [11]:
from networksecurity.logging.logger import logging
from networksecurity.exception.exception import CustomException
import os
import sys
import numpy as np
import pandas as pd
from typing import List
from scipy.stats import ks_2samp
from networksecurity.utils.utills import read_yaml,write_yaml_file
from networksecurity.logging.logger import logging
from networksecurity.exception.exception import CustomException
from networksecurity.entity.artifact_entity import DataIngestionArtifact
from networksecurity.constant.traning_pipline import SCHEMA_File_DIR,SCHEMA_File_NAME

In [12]:

class DataValidation:
    def __init__(self, training_pipeline_config: DataValidationConfig, data_ingestion_artifacts: DataIngestionArtifact) -> None:
        self.data_ingestion_artifacts = data_ingestion_artifacts
        self.data_validation_config = training_pipeline_config
        schema_file_path = os.path.join(SCHEMA_File_DIR,SCHEMA_File_NAME)
        self.schema_config = read_yaml(schema_file_path)

    @staticmethod
    def read_data(filepath: str) -> pd.DataFrame:
        try:
            return pd.read_csv(filepath)
        except Exception as e:
            raise CustomException(e, sys)

    def validate_num_of_cols(self, df: pd.DataFrame) -> bool:
        try:
            number_of_columns=len(self.schema_config['columns'])
            
            # checking 'Unnamed: 0' col prasent or not
            if 'Unnamed: 0' in df.columns:
                print('Unnamed: 0 present in dataframe')
                logging.info(f'Unnamed: 0 present in dataframe {df.columns}')
                df.drop(columns='Unnamed: 0',axis=1,inplace=True)
            else:
                df

            number_of_df_columns=len(df.columns)
            logging.info(f"Required number of columns:{number_of_columns}")
            logging.info(f"Data frame has columns:{number_of_df_columns}")
            if number_of_df_columns==number_of_columns:
                return True
            return False
        except Exception as e:
            raise CustomException(e, sys)

    def detect_data_drift(self, base_df: pd.DataFrame, current_df: pd.DataFrame, threshold: float = 0.05) -> bool:
        try:
            status = True
            report = {}

            for column in base_df.columns:
                d1=base_df[column]
                d2=current_df[column]
                _, p_value=ks_2samp(d1,d2)
                if threshold<=p_value:
                    is_found=False
                else:
                    is_found=True
                    status=False
                report.update({column:{
                    "p_value":float(p_value),
                    "drift_status":is_found
                    
                    }})

            drift_report_path = self.data_validation_config.drift_report_dir
            os.makedirs(os.path.dirname(drift_report_path), exist_ok=True)
            write_yaml_file(file_path=drift_report_path, content=report)

            logging.info('Data drift detection completed and report generated.')
            return status
        except Exception as e:
            raise CustomException(e, sys)

    def initiate_data_validation(self) -> DataValidationArtifact:
        try:
            train_file_path = self.data_ingestion_artifacts.trained_file_path
            test_file_path = self.data_ingestion_artifacts.test_file_path

            # Read training and test data
            train_df = self.read_data(train_file_path)
            test_df = self.read_data(test_file_path)
            logging.info('Data read from training and test files completed.')

            # Validate number of columns
            if not self.validate_num_of_cols(df=train_df):
                raise CustomException("Training data does not contain the expected columns.")
            if not self.validate_num_of_cols(df=test_df):
                raise CustomException("Testing data does not contain the expected columns.")

            # Check for data drift
            drift_status = self.detect_data_drift(base_df=train_df, current_df=test_df)
            logging.info('data dreft report created successfully')
            print('data dreft report created successfully')

            # Save validated data
            os.makedirs(os.path.dirname(self.data_validation_config.valid_traning_data_store_path), exist_ok=True)
            train_df.to_csv(self.data_validation_config.valid_traning_data_store_path, index=False, header=True)
            os.makedirs(os.path.dirname(self.data_validation_config.valid_test_data_store_path), exist_ok=True)
            test_df.to_csv(self.data_validation_config.valid_test_data_store_path, index=False, header=True)

            # Create DataValidationArtifact
            data_validation_artifact = DataValidationArtifact(
                validation_status=drift_status,
                valid_train_path=self.data_validation_config.valid_traning_data_store_path,
                valid_test_path=self.data_validation_config.valid_test_data_store_path,
                invalid_train_path=None,
                invalid_test_path=None,
                drift_report_path=self.data_validation_config.drift_report_dir
            )

            logging.info('Data validation completed successfully.')
            return data_validation_artifact
        except Exception as e:
            raise CustomException(e, sys)


## Execute Pipline

In [13]:
from networksecurity.components.data_ingestion import DataIngestion
from networksecurity.entity.artifact_entity import DataIngestionArtifact
from networksecurity.entity.config_entity import DataIngestionConfig

In [15]:
try:
   traning_pipline_config=TraningPiplineConfig()
   data_ingestion_config=DataIngestionConfig(traning_pipline_config=traning_pipline_config)
   data_ingestion=DataIngestion(data_ingestion_config=data_ingestion_config)
   data_ingestion_artifacts=data_ingestion.initiate_data_ingestion()
   data_validation_config=DataValidationConfig(traning_pipline_config=traning_pipline_config)
   data_validation=DataValidation(training_pipeline_config=data_validation_config,data_ingestion_artifacts=data_ingestion_artifacts)
   data_validation.initiate_data_validation()
except Exception as e:
   raise CustomException(e,sys)

   having_IP_Address  URL_Length  Shortining_Service  having_At_Symbol  \
0                 -1           1                   1                 1   
1                  1           1                   1                 1   
2                  1           0                   1                 1   
3                  1           0                   1                 1   
4                  1           0                  -1                 1   

   double_slash_redirecting  Prefix_Suffix  having_Sub_Domain  SSLfinal_State  \
0                        -1             -1                 -1              -1   
1                         1             -1                  0               1   
2                         1             -1                 -1              -1   
3                         1             -1                 -1              -1   
4                         1             -1                  1               1   

   Domain_registeration_length  Favicon  ...  popUpWidnow  Iframe  \