In [6]:
from dataclasses import dataclass
from typing import Dict, Optional
import pandas as pd

In [7]:
@dataclass
class DataValidationArtifact:
    validation_status: bool
    drift_report_file_path: Optional
    dataframes: Dict[str,pd.DataFrame]

In [1]:
import os
os.chdir('../../')
os.getcwd()

'c:\\Users\\roser\\OneDrive\\Documentos\\fint'

In [3]:
from datetime import datetime
import os
from src.constants import training_pipeline


class TrainingPipelineConfig:
    def __init__(self,timestamp=datetime.now()):
        timestamp =timestamp.strftime('%m_%d_%Y_%H_%M_S')
        self.pipeline_name=training_pipeline.PIPELINE_NAME
        self.artifact_name=training_pipeline.ARTIFACT_DIR
        self.artifact_dir=os.path.join(self.artifact_name,timestamp)
        self.model_dir=os.path.join('final_model')
        self.timestamp: str=timestamp

class DataValidationConfig:
    def __init__(self,training_pipeline_config:TrainingPipelineConfig):
        self.data_validation_dir: str=os.path.join(training_pipeline_config.artifact_dir, training_pipeline.DATA_VALIDATION_DIR_NAME)
        self.valid_data_dir:str= os.path.join(self.data_validation_dir, training_pipeline.DATA_VALIDATION_VALID_DIR)
        self.invalid_data_dir:str= os.path.join(self.data_validation_dir,training_pipeline.DATA_VALIDATION_INVALID_DIR)
        self.drift_report_file_path:str =os.path.join(self.data_validation_dir, training_pipeline.DATA_VALIDATION_DRIFT_REPORT_DIR,training_pipeline.DATA_VALIDATION_DRIFT_REPORT_FILE_NAME)


In [None]:
from src.exception.exception import FintechException
from src.logging.logger import logging
from src.constants.training_pipeline import SCHEMA_FILE_PATH
from src.utils.common import read_yaml_file,write_yaml_file
from src.entity.artifact_entity import DataIngestionArtifact
from src.entity.config_entity import DataIngestionConfig
import pandas as pd
import sys
from scipy.stats import ks_2samp

In [None]:
class DataValidation:
    def __init__(self,data_ingestion_artifact:DataIngestionArtifact,
                 data_validation_config:DataValidationConfig):
        try:
            self.data_ingestion_artifact=data_ingestion_artifact
            self.data_validation_config=data_validation_config
            self._schema_config= read_yaml_file(SCHEMA_FILE_PATH)
        except Exception as e:
            raise FintechException(e,sys)
        
    @staticmethod
    def read_data(file_path)->pd.DataFrame:
        try:
            return pd.read_csv(file_path)
        except Exception as e:
            raise FintechException
        
    def validate_number_of_columns(self,df,dataframe:pd.DataFrame)->bool:
        try:
            number_of_columns=len(self._schema_config[df]['columns'])
            logging.info(f'Required number of columns: {number_of_columns}')
            logging.info(f'Data frame has columns:{len(dataframe.columns)}')
            if len(dataframe.columns) == number_of_columns:
                return True
        except Exception as e:
            raise FintechException(e,sys)
        
    def detect_dataset_drift(self,base_df,current_df,threshold=0.5)->bool:
        try:
            status=True
            report={}
            for column in base_df:
                d1=base_df[column]
                d2=current_df[column]
                is_same_dist=ks_2samp(d1,d2)
                if threshold <= is_same_dist.pvalue:
                    is_found=False
                else:
                    is_found=True
                    status=False
                report.update({column:{
                    'p_value':float(is_same_dist.pvalue),
                    'drift_status':is_found
                }})
            drift_report_file_path=self.data_validation_config.drift_report_file_path

            dir_path = os.path.dirname(drift_report_file_path)
            os.makedirs(dir_path,exist_ok=True)
            write_yaml_file(filep_path=drift_report_file_path,content=report)
        except Exception as e:
            raise FintechException(e,sys)
        
    def clean_columns(self, df:pd.DataFrame)-> pd.DataFrame:
        try:

            columns_transformations ={
            "credit_limit": lambda x: x.str.replace("$", "").str.replace(",", "").astype(float).fillna(0).astype(int),
            "per_capita_income": lambda x: x.str.replace("$", "").str.replace(",", "").str.replace("50K", "50000").fillna("0"),
            "yearly_income": lambda x: x.str.replace("$", "").astype(float).fillna(0).astype(int),
            "total_debt": lambda x: x.str.replace("$", "").astype(float).fillna(0).astype(int),
            "amount": lambda x: x.str.replace("$", "").str.replace(",", "").astype(float).fillna(0).astype(int)
            }

            for col, func in columns_transformations.items():
                if col in df.columns:
                    df[col] = func(df[col])

            if 'credit_limit' in df.columns:
                df = df.dropna(subset=['credit_limit'])

            if 'amount' in df.columns:
                df = df.dropna(subset=['amount'])

            return df
        
        except Exception as e:
            raise FintechException(e,sys)



    def initiate_data_validation(self,df_name: str, df: pd.DataFrame) -> DataValidationArtifact:
        try:
            df= self.clean_columns(df)

            status = self.validate_number_of_columns(df_name,df)

            if status:
                logging.info(f'Validation columns sucessful')
            else:
                logging.warning(f'Validation columns error for {df_name}')

            data_validation_artifact = DataValidationArtifact(
                validation_status = status,
                drift_report_file_path = self.data_validation_config.drift_report_file_path,
                dataframes = df
            )
            return data_validation_artifact


        except Exception as e:
            raise FintechException(e,sys)
        
        