# **Data Validation**

In [1]:
import os

In [2]:
%pwd

'/Users/rahulshelke/Documents/Data-Science/Data-Science-Projects/finance_complaint/research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'/Users/rahulshelke/Documents/Data-Science/Data-Science-Projects/finance_complaint'

## **schema.py**

basically defining the schema for data , to put it into a tabular fromat

In [5]:
from typing import List, Dict
from pyspark.sql.types import (TimestampType, StringType, FloatType, StructType, StructField)
from pyspark.sql import DataFrame
from finance_complaint.exception import FinanceException
from finance_complaint.logger import logging as logger
import os, sys

Reading environment variables
Read Complete!


In [6]:
class FinanceDataSchema:

    def __init__(self):
        """ 
        here we are going to set our data's column names
        """
        self.col_company_response: str = 'company_response'
        self.col_consumer_consent_provided: str = 'consumer_consent_provided'
        self.col_submitted_via = 'submitted_via'
        self.col_timely: str = 'timely'
        self.col_diff_in_days: str = 'diff_in_days'
        self.col_company: str = 'company'
        self.col_issue: str = 'issue'
        self.col_product: str = 'product'
        self.col_state: str = 'state'
        self.col_zip_code: str = 'zip_code'
        self.col_consumer_disputed: str = 'consumer_disputed'
        self.col_date_sent_to_company: str = 'date_sent_to_company'
        self.col_date_received: str = 'date_received'
        self.col_complaint_id: str = 'complaint_id'
        self.col_sub_product: str = 'sub_product'
        self.col_complaint_what_happened: str = 'complaint_what_happened'
        self.col_company_public_response: str = 'company_public_response'

    @property
    def dataframe_schema(self) -> StructType:
        """ 
        now specifying each columns data type
        """
        try:
            schema = StructType(
                [
                    StructField(self.col_company_response, StringType()),
                    StructField(self.col_consumer_consent_provided , StringType()),
                    StructField(self.col_submitted_via, StringType()),
                    StructField(self.col_timely, StringType()),
                    StructField(self.col_date_sent_to_company, TimestampType()),
                    StructField(self.col_date_received, TimestampType()),
                    StructField(self.col_company, StringType()),
                    StructField(self.col_issue, StringType()),
                    StructField(self.col_product, StringType()),
                    StructField(self.col_state, StringType()),
                    StructField(self.col_zip_code, StringType()),
                    StructField(self.col_consumer_disputed, StringType()),
                ]
            )
            return schema
        except Exception as e:
            raise FinanceDataSchema(e, sys)

    @property
    def target_column(self) -> str:
        return self.col_consumer_disputed
    
    @property
    def one_hot_encoding_features(self) -> List[str]:
        fetaures = [
            self.col_company_response,
            self.col_consumer_consent_provided,
            self.col_submitted_via,
        ]
        return fetaures
    
    @property
    def tfidf_fetaures(self) -> List[str]:
        fetaures = [
            self.col_issue
        ]
        return fetaures
    
    @property
    def required_columns(self) -> List[str]:
        features = [self.target_column] + self.one_hot_encoding_features + self.tfidf_fetaures + \
                    [self.col_date_sent_to_company, self.col_date_received]
        return features
    
    @property
    def unwanted_columns(self) -> List[str]:
        features = [
            self.col_complaint_id,
            self.col_sub_product,
            self.col_complaint_what_happened,
        ]
        return features

## **constant.py**

In [7]:
from dataclasses import dataclass
from datetime import datetime
import os

TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")

@dataclass
class EnvironmentVariable:
    mongo_db_url = os.getenv("MONGO_DB_URL")

env_var = EnvironmentVariable()

## **config_entity.py**

In [8]:
from dataclasses import dataclass

In [9]:
DATA_VALIDATION_DIR = "data_validation"
DATA_VALIDATION_FILE_NAME = "finance_complaint"
DATA_VALIDATION_ACCEPTED_DATA_DIR = "accepted_data"
DATA_VALIDATION_REJECTED_DATA_DIR = "rejected_data"

In [10]:
# training pipeline config
@dataclass
class TrainingPipelineConfig:
    pipeline_name: str = "artifact"
    artifact_dir: str = os.path.join(pipeline_name, TIMESTAMP)

In [11]:
class DataValidationConfig:

    def __init__(self, training_pipeline_config: TrainingPipelineConfig) -> None:
        try:
            data_validation_dir = os.path.join(training_pipeline_config.artifact_dir, 
                                               DATA_VALIDATION_DIR)
            self.accepted_data_dir = os.path.join(data_validation_dir, DATA_VALIDATION_ACCEPTED_DATA_DIR)
            self.rejected_data_dir = os.path.join(data_validation_dir, DATA_VALIDATION_REJECTED_DATA_DIR)
            self.file_name = DATA_VALIDATION_FILE_NAME
        except Exception as e:
            raise FinanceException(e, sys)

## **artifact_entity.py**

In [12]:
@dataclass
class DataValidationArtifact:
    accepted_file_path: str
    rejected_dir: str

## **data_validation.py**

In [13]:
import os,sys
from collections import namedtuple
from typing import List, Dict

from pyspark.sql import DataFrame
from pyspark.sql.functions import col

from finance_complaint.config.spark_manager import spark_session
from finance_complaint.entity.artifact_entity import DataIngestionArtifact
# from finance_complaint.entity.config_entity import DataValidationConfig
# from finance_complaint.entity.schema import FinanceDataSchema
from finance_complaint.exception import FinanceException
from finance_complaint.logger import logging as logger

from pyspark.sql.functions import lit
# from finance_complaint.entity.artifact_entity import DataValidationArtifact

MongoClient(host=['ac-zciqt62-shard-00-00.uhbxanv.mongodb.net:27017', 'ac-zciqt62-shard-00-02.uhbxanv.mongodb.net:27017', 'ac-zciqt62-shard-00-01.uhbxanv.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, retrywrites=True, w='majority', appname='mlcluster', authsource='admin', replicaset='atlas-10vcjq-shard-0', tls=True, server_api=<pymongo.server_api.ServerApi object at 0x1107ab380>)


25/05/04 21:08:09 WARN Utils: Your hostname, Rahuls-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.13 instead (on interface en0)
25/05/04 21:08:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/04 21:08:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/04 21:08:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [14]:
ERROR_MESSAGE = "error_msg"
MissingReport = namedtuple("MissingReport",
                           ["total_row", "missing_row", "missing_percentage"])

In [15]:
class DataValidation():
    def __init__(self,
                data_validation_config: DataValidationConfig,
                data_ingestion_artifact: DataIngestionArtifact,
                schema=FinanceDataSchema()
                ):
        try:
            # super().__init__()
            self.data_ingestion_artifact: DataIngestionArtifact = data_ingestion_artifact
            self.data_validation_config = data_validation_config
            self.schema = schema       
        except Exception as e:
            raise FinanceException(e, sys)
        
    def read_data(self) -> DataFrame:
        """ 
        function to read a dataframe
        """
        try:
            dataframe: DataFrame = spark_session.read.parquet(
                self.data_ingestion_artifact.feature_store_file_path
            ).limit(10000) # only reading 10 thiusands of records , when go for deployment will remove it
            logger.info(f"Data frame is created using file: {self.data_ingestion_artifact.feature_store_file_path}")
            logger.info(f"Number of row: {dataframe.count()} and column: {len(dataframe.columns)}")
            return dataframe
        except Exception as e:
            raise FinanceException(e, sys)
        
    @staticmethod
    def get_missing_report(dataframe: DataFrame, ) -> Dict[str, MissingReport]:
        """ 
        this function is responsible for creating report of missing data for each columns
        """
        try:
            missing_report: Dict[str: MissingReport] = dict()
            logger.info(f"Prepraing missing reports for each column")
            number_of_row = dataframe.count()

            for column in dataframe.columns:
                missing_row = dataframe.filter(f"{column} is null").count()
                missing_percentage = (missing_row * 100) / number_of_row
                missing_report[column] = MissingReport(
                    total_row=number_of_row,
                    missing_row=missing_row,
                    missing_percentage=missing_percentage
                )
            logger.info(f"Missing report prepared: {missing_report}")
            return missing_report
        except Exception as e:
            raise FinanceException(e, sys)
        
    def get_unwanted_and_high_missing_value_columns(self, dataframe: DataFrame, threshold: float = 0.2) -> List[str]:
        """ 
        this function removes columns with threshold missing values
        """
        try:
            missing_report: Dict[str, MissingReport] = self.get_missing_report(dataframe=dataframe)

            unwanted_column: List[str] = self.schema.unwanted_columns
            for column in missing_report:
                if missing_report[column].missing_percentage > (threshold * 100):
                    unwanted_column.append(column)
                    logger.info(f"Missing report {column}: [{missing_report[column]}]")
            unwanted_column = list(set(unwanted_column))

            return unwanted_column
        except Exception as e:
            raise FinanceException(e, sys)
        
    def drop_unwanted_columns(self, dataframe: DataFrame) -> DataFrame:
        """ 
        this function will drop the unwanted columns from the dataframe
        """
        try:
            # get all unwanted columns to drop
            unwanted_columns: List = self.get_unwanted_and_high_missing_value_columns(dataframe=dataframe,)
            # log those columns
            logger.info(f"Droppping fetaures: {','.join(unwanted_columns)}")
            # after dropping columns get the dataframe
            unwanted_dataframe: DataFrame = dataframe.select(unwanted_columns)
            # adding one new column `ERROR_MESSAGE and for each rwo there is a value given in lit("message_")` to move 
            # to rejected directory
            unwanted_dataframe = unwanted_dataframe.withColumn(ERROR_MESSAGE, lit("Contains many missing values"))

            # preparing rejection directory
            rejected_dir = os.path.join(self.data_validation_config.rejected_data_dir, "missing_data")
            # creating directory in memory
            os.makedirs(rejected_dir, exist_ok=True)
            # prepare the file path
            file_path = os.path.join(rejected_dir, self.data_validation_config.file_name)

            # logging the file path
            logger.info(f"writing dropped column into file: [{file_path}]")
            # then write the content to file
            unwanted_dataframe.write.mode('append').parquet(file_path)
            # now from original data frame will drop the unwanted columns
            dataframe: DataFrame = dataframe.drop(*unwanted_columns)
            # logging the remaining columns of original dataframe
            logger.info(f"Remaining columns of dataframe: [{dataframe.columns}]")
            # return the dataframe which will have required columns
            return dataframe
        except Exception as e:
            raise FinanceException(e, sys)
        
    def is_required_columns_exist(self, dataframe: DataFrame):
        """ 
        this function checks if dataframe have required columns or not
        """
        try:
            # check for required columns
            columns = list(filter(lambda x: x in self.schema.required_columns, dataframe.columns))

            # then check for length
            if len(columns) != len(self.schema.required_columns):
                raise Exception(f"Required column missing\n\
                                Expected columns: {self.schema.required_columns}\n\
                                Found columns: {columns}\
                                    ")
        except Exception as e:
            raise FinanceException(e, sys)
        
    def initiate_data_validation(self) -> DataValidationArtifact:
        """ 
        this function resonable to perform data validation
        """
        try:
            logger.info(f"initiating data processing.")
            # 1st read the data
            dataframe: DataFrame = self.read_data()

            logger.info(f"Dropping unwanted columns")
            # 2nd drop unwanted oclumns
            dataframe: DataFrame = self.drop_unwanted_columns(dataframe=dataframe)

            # validation to ensure that all required column available
            self.is_required_columns_exist(dataframe=dataframe)
            # logging log
            logger.info("Saving preprocessed data.")
            # printing number of rows and columns
            print(f"Row: [{dataframe.count()}] Column: [{len(dataframe.columns)}]")
            # printing expected columns and present columns
            print(f"Expected Column: {self.schema.required_columns}\nPresent Columns: {dataframe.columns}")
            # creating directory for accepted data
            os.makedirs(self.data_validation_config.accepted_data_dir, exist_ok=True)
            # preparing artifact path
            accepted_file_path = os.path.join(self.data_validation_config.accepted_data_dir,
                                              self.data_validation_config.file_name)
            
            # saving dataframe in parquet format to accepted folder
            dataframe.write.parquet(accepted_file_path)
            # preparing the artifact
            artifact = DataValidationArtifact(
                accepted_file_path=accepted_file_path,
                rejected_dir=self.data_validation_config.rejected_data_dir
            )
            logger.info(f"Data validation artifact: [{artifact}]")
            # returning artifact
            return artifact
        except Exception as e:
            raise FinanceException(e, sys)


## **training.py**

In [16]:
from finance_complaint.entity.config_entity import DataIngestionConfig
from finance_complaint.components.data_ingestion import DataIngestion

In [17]:
class TrainingPipeline:
    """ 
    this is a training pipeline comprising each components from flow chart
    """
    def __init__(self, training_pipeline_config: TrainingPipelineConfig):
        self.training_pipeline_config: TrainingPipelineConfig = training_pipeline_config

    def start_data_ingestion(self) -> DataIngestionArtifact:
        """
        responsible for starting data ingestion 
        """
        try:
            data_ingestion_config = DataIngestionConfig(training_pipeline_config=self.training_pipeline_config)
            data_ingestion = DataIngestion(data_ingestion_config=data_ingestion_config)
            data_ingestion_artifact = data_ingestion.initiate_data_ingestion()
            return data_ingestion_artifact
        
        except Exception as e:
            logger.debug(f"Error: {e}")
            raise FinanceException(e, sys)
        
    def start_data_validation(self, data_ingestion_artifact: DataIngestionArtifact) -> DataValidationArtifact:
        """
        takes ingested data artifact and validate that data by generating validation artifact
        """
        try:
            data_validation_config = DataValidationConfig(training_pipeline_config=self.training_pipeline_config)
            data_validation = DataValidation(
                data_validation_config=data_validation_config,
                data_ingestion_artifact=data_ingestion_artifact
            )
            data_validation_artifact = data_validation.initiate_data_validation()
            return data_validation_artifact
        except Exception as e:
            raise FinanceException(e, sys)
        
    def start(self):
        try:
            # initalizating data ingestion
            data_ingestion_artifact = self.start_data_ingestion()
            # initalizing data validation
            data_validation_artifact = self.start_data_validation(data_ingestion_artifact=data_ingestion_artifact)
        except Exception as e:
            raise FinanceException(e, sys)    

## **train.py**

In [18]:
if __name__ == "__main__":
    training_pipeline_config = TrainingPipelineConfig()
    training_pipeline = TrainingPipeline(training_pipeline_config=training_pipeline_config)
    training_pipeline.start()

                                                                                

Row: [10000] Column: [12]
Expected Column: ['consumer_disputed', 'company_response', 'consumer_consent_provided', 'submitted_via', 'issue', 'date_sent_to_company', 'date_received']
Present Columns: ['company', 'company_response', 'consumer_consent_provided', 'consumer_disputed', 'date_received', 'date_sent_to_company', 'issue', 'product', 'state', 'submitted_via', 'timely', 'zip_code']
