In [2]:
import os

In [3]:
%pwd

'/Users/rahulshelke/Documents/Data-Science/Data-Science-Projects/customer-segmentation/notebooks'

In [4]:
os.chdir('../')

In [5]:
%pwd

'/Users/rahulshelke/Documents/Data-Science/Data-Science-Projects/customer-segmentation'

In [6]:
import json
import sys
from typing import Tuple, Union
import pandas as pd
# from evidently import Report
# from evidently.metrics import *
# from evidently.presets import *

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from pandas import DataFrame

from customer_segmentation.entity.artifact_entity import DataIngestionArtifact#, DataValidationArtifact
# from customer_segmentation.entity.config_entity import DataValidationConfig

from customer_segmentation.component.data_ingestion import DataIngestion

from customer_segmentation.exception import CustomerException
from customer_segmentation.logger import logging
from customer_segmentation.utils.main_utils import MainUtils, write_yaml_file

## config_entity.py

In [7]:
from dataclasses import dataclass

from datetime import datetime

from customer_segmentation.constant.trainin_pipeline import *

TIMESTAMP: str = datetime.now().strftime("%m_%d_%Y_%H_%M_%S")

In [8]:
# DATA_VALIDATION_DRIFT_REPORT_FILE_NAME

In [9]:
@dataclass
class TrainingPieplineConfig:
    pipeline_name: str = PIPELINE_NAME
    artifact_dir: str = os.path.join(PIPELINE_NAME, ARTIFACT_DIR, TIMESTAMP)
    timestamp: str = TIMESTAMP

training_pipeline_config: TrainingPieplineConfig = TrainingPieplineConfig()


@dataclass
class DataIngestionConfig:
    data_ingestion_dir: str = os.path.join(training_pipeline_config.artifact_dir,
                                           DATA_INGESTION_DIR_NAME)
    feature_store_file_path: str = os.path.join(data_ingestion_dir,
                                                DATA_INGESTION_FEATURE_STORE_DIR,
                                                FILE_NAME)
    ingested_data_dir: str = os.path.join(data_ingestion_dir,
                                          DATA_INGESTION_INGESTED_DIR)
    training_file_path: str = os.path.join(data_ingestion_dir,
                                           DATA_INGESTION_INGESTED_DIR, 
                                           TRAIN_FILE_NAME)
    testing_file_path: str = os.path.join(data_ingestion_dir,
                                          DATA_INGESTION_INGESTED_DIR,
                                          TEST_FILE_NAME)
    train_test_split_ratio: float = DATA_INGESTION_TRAIN_TEST_SPLIT_RATIO
    collection_name: str = DATA_INGESTION_COLLECTION_NAME

@dataclass
class DataValidationConfig:
    data_validation_dir: str = os.path.join(training_pipeline_config.artifact_dir, DATA_VALIDATION_DIR_NAME)
    valid_data_dir: str = os.path.join(data_validation_dir, DATA_VALIDATION_VALID_DIR)
    invalid_data_dir: str = os.path.join(data_validation_dir, DATA_VALIDATION_INVALID_DIR)
    valid_train_file_path: str = os.path.join(data_validation_dir, TRAIN_FILE_NAME)
    valid_test_file_path: str = os.path.join(data_validation_dir, TEST_FILE_NAME)
    invalid_train_file_path: str = os.path.join(invalid_data_dir, TRAIN_FILE_NAME)
    invalid_test_file_path: str = os.path.join(invalid_data_dir, TEST_FILE_NAME)
    drift_report_file_path: str = os.path.join(data_validation_dir,
                                               DATA_VALIDATION_DRIFT_REPORT_DIR,
                                               DATA_VALIDATION_DRIFT_REPORT_FILE_NAME
                                               )

## artifact_entity.py

In [10]:
@dataclass
class DataIngestionArtifact:
    train_file_path: str
    test_file_path: str

@dataclass
class DataValidationArtifact:
    validation_status: bool
    valid_train_file_path: str
    valid_test_file_path: str
    invalid_train_file_path: str
    invalid_test_file_path: str
    drift_report_file_path: str

## data_validation.py

In [11]:
import json
import sys
from typing import Tuple, Union
import pandas as pd
# from evidently. # for data profiling
# from evidently. # for profiling data drift
from pandas import DataFrame
# from customer_segmentation.entity.config_entity import DataValidationConfig
from customer_segmentation.entity.artifact_entity import DataIngestionArtifact#, DataValidationArtifact

from customer_segmentation.exception import CustomerException
from customer_segmentation.logger import logging
from customer_segmentation.utils.main_utils import MainUtils, write_yaml_file

In [None]:
class DataValidation:

    def __init__(self, data_ingestion_artifact: DataIngestionArtifact,
                 data_validation_config: DataValidationConfig):
        self.data_ingestion_artifact = data_ingestion_artifact
        self.data_validation_config = data_validation_config
        self.utils = MainUtils()
        self._schema_config = self.utils.read_schema_config_file()

    def validate_schema_columns(self, dataframe: DataFrame) -> bool:
        """ 
        Method Name : validate_schema_columns
        Description : This method validate the schema columns for the particular dataframe

        Output      : True or False value is returned based on the schema
        On Failure  : Write an exception log and then raise an exception

        Version     : 0.1
        """
        try:
            status = len(dataframe.columns) == len(self._schema_config["columns"])
            logging.info(f"Is required column present [{status}]")
            return status
        except Exception as e:
            raise CustomerException(e, sys)
        
    def validate_dataset_schema_columns(self, train_set, test_set) -> Tuple[bool, bool]:
        """ 
        Method Name : validate_dataset_schema_columns
        Description : This methos validate the schema for shcem columns for both train and test set

        Output      : True or False value is returned based on the schema
        On Failure  : Write an exception log and then raise an exception
        Version     : 0.1
        """
        logging.info("Entered validate_dataset_schema_columns method of DataValidationClass")
        try:
            logging.info("Validating dataset schema columns")
            train_schema_status = self.validate_schema_columns(train_set)
            logging.info("Validated dataset schema columns on the train set")
            test_schema_status = self.validate_schema_columns(test_set)
            logging.info("Validated dataset schema columns")

            return train_schema_status, test_schema_status
        
        except Exception as e:
            raise CustomerException(e, sys)
        
    def detect_dataset_drift(self, reference_df: DataFrame, current_df: DataFrame)-> bool:
        """ 
        Method Name : detect_dataset_drift
        Description : This method deetcts the dataset droft using the reference and production dataframe

        Output      : Returns bool or float value based on the get_ration parameter
        On Failure  : Write an exception log and then raise an exception

        Version     : 0.1
        """
        try:
            data_drift_profile = Report([DataDriftPreset()])
            data_drift_profile.run(reference_data=reference_df, current_data=current_df)
            report = data_drift_profile.json()
            json_report = json.loads(report)
            # writing the report as json
            write_yaml_file(file_path=self.data_validation_config.drift_report_file_path, content=json_report)

            n_features = json_report["metrics"][0]["result"]["number_of_columns"]

            n_drifted_features = json_report["metrics"][0]["result"]["number_of_drifted_columns"]
            
            logging.info(f"{n_drifted_features}/{n_features} drift detected.")

            drift_status = json_report["metrics"][0]["result"]["dataset_drift"]

            return drift_status

        except Exception as e:
            raise CustomerException(e, sys)
        
    @staticmethod
    def read_data(file_path) -> DataFrame:
        try:
            return pd.read_csv(file_path)
        except Exception as e:
            raise CustomerException(e, sys)
        
    def initiate_data_validation(self) -> DataValidationArtifact:
        """
        Method Name :   initiate_data_validation
        Description :   This method initiates the data validation component for the pipeline
        
        Output      :   Returns bool value based on validation results
        On Failure  :   Write an exception log and then raise an exception
        
        Version     :   0.1
        """
        logging.info("Entered initiate_data_validation method of Data_Validation class")

        try:
            logging.info("Initiated data validation for the dataset")

            train_df, test_df = (DataValidation.read_data(file_path = self.data_ingestion_artifact.train_file_path),
                                DataValidation.read_data(file_path = self.data_ingestion_artifact.test_file_path))
            
            
            
            drift = self.detect_dataset_drift(train_df, test_df)

            (
                schema_train_col_status,
                schema_test_col_status,
            ) = self.validate_dataset_schema_columns(train_set=train_df, test_set=test_df)

            logging.info(
                f"Schema train cols status is {schema_train_col_status} and schema test cols status is {schema_test_col_status}"
            )

            logging.info("Validated dataset schema columns")

            

            if (
                schema_train_col_status is True
                and schema_test_col_status is True
                and drift is False
            ):
                logging.info("Dataset schema validation completed")

                validation_status = True
            else:
                validation_status = False
            
            data_validation_artifact = DataValidationArtifact(
                validation_status=validation_status,
                valid_train_file_path=self.data_ingestion_artifact.train_file_path,
                valid_test_file_path=self.data_ingestion_artifact.test_file_path,
                invalid_train_file_path=self.data_validation_config.invalid_train_file_path,
                invalid_test_file_path=self.data_validation_config.invalid_test_file_path,
                drift_report_file_path=self.data_validation_config.drift_report_file_path
            )

            return data_validation_artifact
        except Exception as e:
            raise CustomerException(e, sys)

In [33]:
if __name__ == "__main__":
    # print(os.getcwd())
    data_ingestion_object = DataIngestion()
    data_ingestion_artifact = data_ingestion_object.initiate_data_ingestion()

    data_validation_object = DataValidation(data_ingestion_artifact=data_ingestion_artifact, 
                                            data_validation_config=DataValidationConfig)
    data_validation_artifact = data_validation_object.initiate_data_validation()
    print(data_validation_artifact)

DataValidationArtifact(validation_status=True, valid_train_file_path='customer_segmentation/artifact/05_06_2025_11_39_18/data_ingestion/ingested/train.csv', valid_test_file_path='customer_segmentation/artifact/05_06_2025_11_39_18/data_ingestion/ingested/test.csv', invalid_train_file_path='customer_segmentation/artifact/05_06_2025_11_46_53/data_validation/invalid/train.csv', invalid_test_file_path='customer_segmentation/artifact/05_06_2025_11_46_53/data_validation/invalid/test.csv', drift_report_file_path='customer_segmentation/artifact/05_06_2025_11_46_53/data_validation/drift_report/report.yaml')


## Evidently

In [14]:
# python version: 3.8.20
# !pip install evidently==0.3.0

In [15]:
import evidently
evidently.__version__

'0.3.0'

## Data Drift Report Generation

In [16]:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

import pandas as pd
from sklearn.model_selection import train_test_split
import os

os.getcwd()

'/Users/rahulshelke/Documents/Data-Science/Data-Science-Projects/customer-segmentation'

In [17]:
df = pd.read_csv("notebooks/data/marketing_campaign.csv")

In [18]:
df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,...,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,04-09-2012,58,635,...,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,08-03-2014,38,11,...,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,21-08-2013,26,426,...,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,10-02-2014,26,11,...,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,19-01-2014,94,173,...,5,0,0,0,0,0,0,3,11,0


In [19]:
df.shape

(2240, 29)

In [20]:
df = df.drop(["ID", "Z_CostContact", "Z_Revenue"], axis=1)

In [21]:
df.shape

(2240, 26)

In [22]:
df.columns

Index(['Year_Birth', 'Education', 'Marital_Status', 'Income', 'Kidhome',
       'Teenhome', 'Dt_Customer', 'Recency', 'MntWines', 'MntFruits',
       'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts',
       'MntGoldProds', 'NumDealsPurchases', 'NumWebPurchases',
       'NumCatalogPurchases', 'NumStorePurchases', 'NumWebVisitsMonth',
       'AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1',
       'AcceptedCmp2', 'Complain', 'Response'],
      dtype='object')

In [23]:
train_set, test_set = train_test_split(df, test_size=0.2)

In [24]:
# data drift opbject
data_drift_profile = Report([
    DataDriftPreset()
])

In [25]:
# data drift report
# data_drift_report = 
data_drift_profile.run(reference_data=train_set, current_data=test_set)

In [26]:
type(data_drift_profile)

evidently.report.report.Report

In [27]:
report = data_drift_profile.json()

In [28]:
json_report = json.loads(report)

In [29]:
n_features = json_report["metrics"][0]["result"]["number_of_columns"]

In [30]:
n_drifted_features = json_report["metrics"][0]["result"]["number_of_drifted_columns"]

In [31]:
print(f"{n_drifted_features}/{n_features} drift detected.")

1/26 drift detected.


In [32]:
drift_status = json_report["metrics"][0]["result"]["dataset_drift"]

drift_status

False