In [1]:
from dataclasses import dataclass 
from datetime import datetime
import os 
import sys 
import pandas as pd


@dataclass
class Data_Ingestion_Artifact:
    train_file_path:str 
    test_file_path:str

@dataclass 
class Data_validation_Artifact:
    validation_status:bool 
    message_error:str 
    drift_report_file_path:str

    

In [2]:
# Artifacts
ARTIFACTS = 'artifacts'
PIPELINE_DIR = 'network'

@dataclass 
class NS_Train_Configeration:
    artifact_dir:str = ARTIFACTS
    pipeline_dir:str = PIPELINE_DIR
    TIMESTAMP = datetime.now().strftime('%m_%d_%Y_%H_%M_%S')

train_config = NS_Train_Configeration()

In [3]:
DATA_VALIDATION_DIR:str = 'data_validation'
DATA_VALIDATION_REPORT_DIR:str = 'drift_report'
DATA_VALIDATION_REPORT_YAML:str = 'report.yaml'


class Data_validation_config:
    data_validation_dir = os.path.join(train_config.artifact_dir,DATA_VALIDATION_DIR)
    data_validation_report = os.path.join(data_validation_dir,DATA_VALIDATION_REPORT_DIR,DATA_VALIDATION_REPORT_YAML)

In [None]:
import os
import yaml

def read_yaml_file(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    with open(file_path, 'rb') as file:
        yaml.safe_load(file) 
    


def write_yaml_file(file_path: str, content: object, replace: bool = False) -> None:
    
        if replace:
            if os.path.exists(file_path):
                os.remove(file_path)   
        os.makedirs(os.path.dirname(file_path), exist_ok=True)  
        with open(file_path, "w") as file:
            yaml.dump(content, file)  
    


In [None]:
from Network_Security.logging.logger import logging
from Network_Security.constant import SEHEMA_FILE_PATH
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
from sklearn.model_selection import train_test_split
from Network_Security.components.data_ingestion import DataIngestion
import json



class Data_validation:
    def __init__(self,data_ingestion_artifact=Data_Ingestion_Artifact,
                data_validation_config=Data_validation_config):
        self.data_ingestion_artifact = data_ingestion_artifact
        self.data_validation_config = data_validation_config 
        self._sehema_yaml = read_yaml_file(file_path=SEHEMA_FILE_PATH)

    def valid_no_columns(self,dataframe: pd.DataFrame)->bool:
        status =  len(dataframe.columns) == self._sehema_yaml['columns'] 
        return status 
    
    def Is_column_exists(self,dataframe=pd.DataFrame)->bool:
        missing_num_columns = []
        for column in self._sehema_yaml['numeric_columns']:
            if column not in dataframe.columns:
                missing_num_columns.append(column)
            if len(missing_num_columns)>0:
                logging.info('Missing numeric column',missing_num_columns)

        missing_cat_columns = []
        for column in self._sehema_yaml['categorical_columns']:
            if column not in dataframe.columns:
                missing_cat_columns.append(column)
            if len(missing_cat_columns)>0:
                logging.info('Missing categorical column',missing_cat_columns)
 
        status =  [False if len(missing_num_columns) or len(missing_cat_columns) > 0 else True]
        return status
    
    @staticmethod
    def read_data(dataframe)->pd.DataFrame:
        return pd.read_csv(dataframe)

    def detect_dataset_drift(self,reference_df:pd.DataFrame,current_df:pd.DataFrame):
        #detect_drift_profile = Profile(sections=[DataDriftProfileSection()])
        detect_drift_profile = Profile(DataDriftProfileSection())
        detect_drift_profile.calculate(reference_df,current_df)
        report = detect_drift_profile.json()
        json_report = json.loads(report)

        write_yaml_file(file_path=self.data_validation_config.data_validation_report,content=json_report)

        n_feature = json_report['data_drift']['data']['metrics']['n_feature']
        n_drift_feature = json_report['data_drift']['data']['metrics']['n_dragt_feature']
        logging.info(f"{n_drift_feature}/{n_feature} drift detected.")
        drift_status = json_report["data_drift"]["data"]["metrics"]["dataset_drift"]
        return drift_status
    
    def init_data_validation(self)-> Data_Ingestion_Artifact:
        valid_message_error = []
        train_data,test_data = DataIngestion.read_data(self.data_ingestion_artifact.train_file_path,
                                                        self.data_ingestion_artifact.test_file_path)
        # train_data
        status = self.valid_no_columns(train_data)
        if not status:
            valid_message_error += 'Error: Column Mismatch'
        
        status = self.Is_column_exists(train_data)
        if not status:
            valid_message_error += 'Error: Column Mismatch'
        # test_data
        status = self.valid_no_columns(train_data)
        if not status:
            valid_message_error += 'Error: Column Mismatch'
        
        status = self.Is_column_exists(train_data)
        if not status:
            valid_message_error += 'Error: Column Mismatch'
        # drift_detect
        validation_status = len(valid_message_error)== 0
        if validation_status:
            status = self.detect_dataset_drift(train_data,test_data)
            if status:
                valid_message_error = 'Drift detected'
            else:
                valid_message_error = 'Drift not detected'
        else:
            logging.info(f'valid_message_error{valid_message_error}')
           
        data_validation_Artifact = Data_validation_Artifact(
            validation_status= validation_status,
            message_error=valid_message_error,
            drift_report_file_path=self.data_validation_config.data_validation_report
        )
        return data_validation_Artifact


 

In [None]:
class Training_Pipeline:
    def __init__(self):
    #   self.data_ingestion_config = Data_ingestion_Config()
        self.validation_config = Data_validation_config()


    # def start_data_ingestion(self)->Data_Ingestion_Artifact:
    #     data_ingestion = Data_Ingestion(ingestion_config=self.data_ingestion_config)
    #     data_ingestion_artifacet = data_ingestion.init_data_ingestion()
    #     return data_ingestion_artifacet

    def start_data_validation(self,data_ingestion_artifacet:Data_Ingestion_Artifact)-> Data_validation_Artifact:
        data_valid = Data_validation(data_ingestion_artifacet=data_ingestion_artifacet,
                                      data_validation_config=self.data_validation_config)
        data_validation_Artifact = data_valid.init_data_ingestion()
        return data_validation_Artifact

