In [1]:
import os
os.chdir('../')
%pwd

'/home/paladin/Downloads/Sensor-Fault-Detection'

In [2]:
from pathlib import Path
from dataclasses import dataclass


@dataclass(frozen=True)
class DataValidationConfig:
    root_dir: Path
    train_data_file: Path
    test_data_file: Path    
    valid_train_file: Path
    valid_test_file: Path
    invalid_train_file: Path
    invalid_test_file: Path    
    drift_report_file: Path
    schema_columns: list
    schema_numerical_columns: list    
    pvalue_threshold: float


In [3]:
from sensorFaultDetection.constants import *
from sensorFaultDetection.utils import read_yaml, create_directories

In [4]:
class ConfigurationManager:
    def __init__(self,
                 config_filepath=CONFIG_FILE_PATH,
                 secret_filepath=SECRET_FILE_PATH,
                 schema_filepath=SCHEMA_FILE_PATH,
                 params_filepath=PARAMS_FILE_PATH,
                 saved_modelpath=SAVED_MODEL_PATH,
                 ):
       
        self.config = read_yaml(config_filepath)
        self.secret = read_yaml(secret_filepath)
        self.schema = read_yaml(schema_filepath)
        self.params = read_yaml(params_filepath)
        self.saved_modelpath = saved_modelpath
        
        create_directories([self.config.artifacts_root])

    def get_data_validation_config(self) -> DataValidationConfig:
        config = self.config.data_validation

        valid_dir = os.path.dirname(config.VALID_TRAIN_FILE)
        invalid_dir = os.path.dirname(config.INVALID_TRAIN_FILE)
        report_dir = os.path.dirname(config.DRIFT_REPORT_FILE)

        create_directories([config.ROOT_DIR, valid_dir, invalid_dir, report_dir])

        data_validation_config = DataValidationConfig(
            root_dir= config.ROOT_DIR,
            train_data_file= self.config.data_ingestion.TRAIN_DATA_FILE,
            test_data_file= self.config.data_ingestion.TEST_DATA_FILE,
            valid_train_file= config.VALID_TRAIN_FILE,
            valid_test_file= config.VALID_TEST_FILE,
            invalid_train_file= config.INVALID_TRAIN_FILE,
            invalid_test_file= config.INVALID_TEST_FILE,
            drift_report_file= config.DRIFT_REPORT_FILE,
            schema_columns= self.schema.columns,
            schema_numerical_columns= self.schema.numerical_columns,            
            pvalue_threshold= self.params.PVALUE_THRESHOLD,

        )        

        return data_validation_config

In [5]:
import pandas as pd
from sensorFaultDetection.logger import logging
from sensorFaultDetection.utils import write_yaml_file
from scipy.stats import ks_2samp
import sys
from sensorFaultDetection.exception import CustomException

In [6]:
class DataValidation:
    def __init__(self, config: DataValidationConfig):
        self.config = config

    
    def validate_number_of_columns(self, dataframe: pd.DataFrame) -> bool:
        try:
            number_of_columns = len(self.config.schema_columns)                                  
            if len(list(dataframe))==number_of_columns:
                return True
            else:
                logging.info('WARNING: Required number of columns is differnt from dataframe columns!')  
                return False
        except Exception as e:
            raise CustomException(e, sys)

    def is_numerical_column_exist(self, dataframe: pd.DataFrame) -> bool:
        try:
            numerical_columns = self.config.schema_numerical_columns
            dataframe_columns = list(dataframe)

            numerical_column_present = True
            missing_numerical_columns = []
            for num_column in numerical_columns:
                if num_column  not in dataframe_columns:
                    numerical_column_present= False
                    missing_numerical_columns.append(num_column)
            
            logging.info(f'Missing numerical columns: {missing_numerical_columns}')
            return numerical_column_present
        except Exception as e:
            CustomException(e, sys)

    def drop_zero_std_columns(self, dataframe: pd.DataFrame):
        pass  

    @staticmethod
    def detect_dataset_drift(base_dataframe: pd.DataFrame, current_datafrme: pd.DataFrame, threshold: float):
        try:
            status = True
            report ={}
            for column in base_dataframe.columns:
                df1 = base_dataframe[column]
                df2 = current_datafrme[column]
                is_same_dist = ks_2samp(df1, df2)
                if is_same_dist.pvalue >= threshold:
                    is_found = False
                else:
                    is_found = True
                    status = False
                
                report.update({column:{
                    'p_value': float(is_same_dist.pvalue),
                    'drift_status': is_found
                            }})
            return status, report
        except Exception as e:
            raise CustomException(e, sys)
    
    @staticmethod
    def read_data(file_path) -> pd.DataFrame:
        try:
            return pd.read_csv(file_path)
        except Exception as e:
            raise CustomException(e, sys)

    
    def initiate_data_validation(self):
        error_message = ""

        train_dataframe = self.read_data(self.config.train_data_file)
        logging.info(f"Train data is read from {self.config.train_data_file}!") 
        test_dataframe = self.read_data(self.config.test_data_file)
        logging.info(f"Test data is read from {self.config.test_data_file}!") 

        status= self.validate_number_of_columns(dataframe= train_dataframe)        
        if not status:
            error_message = f'{error_message} Train dataframe does not contain all columns!'
        
        status = self.validate_number_of_columns(dataframe= test_dataframe)       
        if not status:
            error_message = f'{error_message} Test dataframe does not contain all columns!'

        status = self.is_numerical_column_exist(dataframe= train_dataframe)
        if not status:
            error_message = f'{error_message} Train dataframe does not contain all numerical columns!'

        status = self.is_numerical_column_exist(dataframe= test_dataframe)
        if not status:
            error_message = f'{error_message} Test dataframe does not contain all numerical columns!'

        
        if len(error_message)>0:
            raise Exception(error_message)        
        
        #############################################################################################
        # Data drift check at traing model stage
        # Here, we need to test whether test datset and train dataset are from same distribution i.e. identical
        # Base dataset: Train
        # To compare with: Test
        # If same distribution: No drift
        # Solution: If NOT then do train split correctly

        # Data drift at prediction stage:
        # NOT possible to detect data drift immediately as it is NOT possible to summerize one record
        # Solution: Saving each request in database and then fetching all request by hour or day 
        # Base dataset: Train
        # To compare with: Collected data, batch data
        # If huge differnce, go for retraining

        # Concept drift:
        # It related to model where relation between input feature and target feature is changed.
        # Solution: Retrain the model

        # Target drift:
        # If the distribution of target column changed e.g., having a new category in the target variable
        # Solution: Retrain the model
        #############################################################################################

        status, drift_report = self.detect_dataset_drift(train_dataframe, test_dataframe, self.config.pvalue_threshold)
        if status:   
            logging.info('NO data drift issue!')         
            train_dataframe.to_csv(self.config.valid_train_file, index=False, header=True)
            test_dataframe.to_csv(self.config.valid_test_file, index=False, header=True)
            logging.info(f'Train set is saved at {self.config.valid_train_file}!') 
            logging.info(f'Test set is saved at {self.config.valid_test_file}!')   
        else:
            logging.info('WARNING: We faced data drift issue, check report.yaml!')
            train_dataframe.to_csv(self.config.invalid_train_file, index=False, header=True)
            test_dataframe.to_csv(self.config.invalid_test_file, index=False, header=True)
            logging.info(f'Train set is saved at {self.config.invalid_train_file}!') 
            logging.info(f'Test set is saved at {self.config.invalid_test_file}!') 


        write_yaml_file(path= self.config.drift_report_file, content= drift_report, replace= True)

In [7]:
import sys
from sensorFaultDetection.exception import CustomException

In [8]:
try:
    config = ConfigurationManager()
    data_validation_config = config.get_data_validation_config()
    data_validation = DataValidation(config=data_validation_config)
    data_validation.initiate_data_validation()
  
except Exception as e:
    CustomException(e, sys)