<h1 align=center>Data Ingestion</h1>

In [1]:
import os

os.chdir("../")
os.getcwd()

'c:\\Users\\44787\\Desktop\\mlops-pro-project'

In [2]:
import os
import sys
import pandas as pd
from sklearn.model_selection import train_test_split
from dataclasses import dataclass
from src.logger import logger
from src.exception import CustomException

In [3]:
@dataclass
class DataIngestionConfig:
    """Configuration for data ingestion component."""
    raw_data_path: str
    train_data_path: str
    test_data_path: str
    test_size: float = 0.2
    random_state: int = 42

In [4]:
class DataIngestion:
    """
    Handles data loading and splitting into train/test sets.
    """
    
    def __init__(self, config: DataIngestionConfig):
        """
        Initialize DataIngestion component.
        
        Args:
            config: DataIngestionConfig object with paths and parameters
        """
        self.config = config
        logger.info("Data Ingestion component initialized")
    
    def initiate_data_ingestion(self) -> tuple:
        """
        Load data and split into train/test sets.
        
        Returns:
            Tuple of (train_data_path, test_data_path)
        """
        logger.info("Starting data ingestion process")
        
        try:
            # Read the dataset
            logger.info(f"Reading dataset from {self.config.raw_data_path}")
            df = pd.read_csv(self.config.raw_data_path)
            logger.info(f"Dataset loaded successfully. Shape: {df.shape}")
            
            # Basic info logging
            logger.info(f"Columns: {list(df.columns)}")
            logger.info(f"Missing values: {df.isnull().sum().sum()}")
            logger.info(f"Duplicates: {df.duplicated().sum()}")
            
            # Create directory for processed data
            os.makedirs(os.path.dirname(self.config.train_data_path), exist_ok=True)
            
            # Split the data
            logger.info(f"Splitting data with test_size={self.config.test_size}")
            train_set, test_set = train_test_split(
                df,
                test_size=self.config.test_size,
                random_state=self.config.random_state,
                stratify=df.iloc[:, -1] if 'Churn' in df.columns else None  # Stratify on target
            )
            
            logger.info(f"Train set shape: {train_set.shape}")
            logger.info(f"Test set shape: {test_set.shape}")
            
            # Save train and test sets
            train_set.to_csv(self.config.train_data_path, index=False, header=True)
            test_set.to_csv(self.config.test_data_path, index=False, header=True)
            
            logger.info("Data ingestion completed successfully")
            logger.info(f"Train data saved to: {self.config.train_data_path}")
            logger.info(f"Test data saved to: {self.config.test_data_path}")
            
            return (
                self.config.train_data_path,
                self.config.test_data_path
            )
            
        except Exception as e:
            logger.error("Error in data ingestion")
            raise CustomException(e, sys)
    
    def get_data_info(self) -> dict:
        """
        Get information about the ingested data.
        
        Returns:
            Dictionary with data statistics
        """
        try:
            df = pd.read_csv(self.config.raw_data_path)
            
            info = {
                'total_rows': len(df),
                'total_columns': len(df.columns),
                'columns': list(df.columns),
                'missing_values': df.isnull().sum().to_dict(),
                'duplicates': int(df.duplicated().sum()),
                'dtypes': df.dtypes.astype(str).to_dict()
            }
            
            return info
            
        except Exception as e:
            raise CustomException(e, sys)

In [None]:
def create_data_ingestion_config(config_dict: dict) -> DataIngestionConfig:
    """
    Create DataIngestionConfig from dictionary.
    
    Args:
        config_dict: Configuration dictionary
        
    Returns:
        DataIngestionConfig object
    """
    return DataIngestionConfig(
        raw_data_path=config_dict.data_ingeti.raw_data_path,
        train_data_path=config_dict.train_data_path,
        test_data_path=config_dict.test_data_path,
        test_size=config_dict.test_size,
        random_state=config_dict.random_state)

In [None]:
from pathlib import Path
from yaml import safe_load
from src.utils.common import read_yaml

try:
    config_dict = read_yaml(Path("configs/config.yaml"))
    config = create_data_ingestion_config(config_dict.data_ingestion)
    data_ingestion = DataIngestion(config)
    train_data_path, test_data_path = data_ingestion.initiate_data_ingestion()
except Exception as e:
    logger.error(f"Data ingestion failed: {e}")
    raise CustomException(e, sys)

[2025-11-06 18:25:30,342] INFO - ChurnPrediction - yaml file: configs\config.yaml loaded successfully
[2025-11-06 18:25:30,356] INFO - ChurnPrediction - Data Ingestion component initialized
[2025-11-06 18:25:30,357] INFO - ChurnPrediction - Starting data ingestion process
[2025-11-06 18:25:30,358] INFO - ChurnPrediction - Reading dataset from data/raw/churn_data.csv
[2025-11-06 18:25:30,426] INFO - ChurnPrediction - Dataset loaded successfully. Shape: (7043, 21)


[2025-11-06 18:25:30,429] INFO - ChurnPrediction - Columns: ['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']
[2025-11-06 18:25:30,442] INFO - ChurnPrediction - Missing values: 0
[2025-11-06 18:25:30,476] INFO - ChurnPrediction - Duplicates: 0
[2025-11-06 18:25:30,479] INFO - ChurnPrediction - Splitting data with test_size=0.2
[2025-11-06 18:25:30,504] INFO - ChurnPrediction - Train set shape: (5634, 21)
[2025-11-06 18:25:30,506] INFO - ChurnPrediction - Test set shape: (1409, 21)
[2025-11-06 18:25:30,585] INFO - ChurnPrediction - Data ingestion completed successfully
[2025-11-06 18:25:30,588] INFO - ChurnPrediction - Train data saved to: data/processed/train.csv
[2025-11-06 18:25:30,589] INFO - ChurnPrediction - Tes

<h1 align=center>Data Validation</h1>

In [14]:
import os
import sys
import pandas as pd
from typing import Dict, List
from dataclasses import dataclass
from src.logger import logger
from src.exception import CustomException
# from src.utils.common import save_json

import json
from pathlib import Path

def save_json(path, data):
    path = Path(path)
    with open(path, "w") as f:
        json.dump(data, f, indent=4)

In [15]:
@dataclass
class DataValidationConfig:
    """Configuration for data validation component."""
    report_path: str

In [22]:
import pandas as pd

df = pd.read_csv(Path("data/raw/churn_data.csv"))
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [31]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7043 entries, 0 to 7042
Data columns (total 21 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   customerID        7043 non-null   object 
 1   gender            7043 non-null   object 
 2   SeniorCitizen     7043 non-null   int64  
 3   Partner           7043 non-null   object 
 4   Dependents        7043 non-null   object 
 5   tenure            7043 non-null   int64  
 6   PhoneService      7043 non-null   object 
 7   MultipleLines     7043 non-null   object 
 8   InternetService   7043 non-null   object 
 9   OnlineSecurity    7043 non-null   object 
 10  OnlineBackup      7043 non-null   object 
 11  DeviceProtection  7043 non-null   object 
 12  TechSupport       7043 non-null   object 
 13  StreamingTV       7043 non-null   object 
 14  StreamingMovies   7043 non-null   object 
 15  Contract          7043 non-null   object 
 16  PaperlessBilling  7043 non-null   object 


In [28]:
class DataValidation:
    """
    Validates data quality and schema compliance.
    """
    
    # Expected schema for churn dataset
    EXPECTED_COLUMNS = [
        'customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
        'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
        'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
        'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
        'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn'
    ]
    
    ## type of TotalCharges is object
    # NUMERICAL_COLUMNS = ['tenure', 'MonthlyCharges', 'TotalCharges']
    NUMERICAL_COLUMNS = ['tenure', 'MonthlyCharges']
    
    CATEGORICAL_COLUMNS = [
        'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService',
        'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup',
        'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies',
        'Contract', 'PaperlessBilling', 'PaymentMethod'
    ]
    TARGET_COLUMN = 'Churn'
    
    def __init__(self, config: DataValidationConfig):
        """
        Initialize DataValidation component.
        
        Args:
            config: DataValidationConfig object
        """
        self.config = config
        logger.info("Data Validation component initialized")
    
    def validate_schema(self, df: pd.DataFrame) -> Dict[str, bool]:
        """
        Validate if dataframe matches expected schema.
        
        Args:
            df: DataFrame to validate
            
        Returns:
            Dictionary with validation results
        """
        try:
            validation_results = {}
            
            # Check if all expected columns are present
            missing_columns = set(self.EXPECTED_COLUMNS) - set(df.columns)
            validation_results['all_columns_present'] = len(missing_columns) == 0
            validation_results['missing_columns'] = list(missing_columns)
            
            # Check for extra columns
            extra_columns = set(df.columns) - set(self.EXPECTED_COLUMNS)
            validation_results['extra_columns'] = list(extra_columns)
            
            # Check data types for numerical columns
            numerical_dtype_check = {}
            for col in self.NUMERICAL_COLUMNS:
                if col in df.columns:
                    numerical_dtype_check[col] = pd.api.types.is_numeric_dtype(df[col])
            validation_results['numerical_dtypes_correct'] = all(numerical_dtype_check.values())
            validation_results['numerical_dtype_details'] = numerical_dtype_check
            
            logger.info(f"Schema validation completed: {validation_results}")
            return validation_results
            
        except Exception as e:
            raise CustomException(e, sys)
    
    def validate_data_quality(self, df: pd.DataFrame) -> Dict[str, any]:
        """
        Validate data quality checks.
        
        Args:
            df: DataFrame to validate
            
        Returns:
            Dictionary with quality check results
        """
        try:
            quality_report = {}
            
            # Check for missing values
            missing_values = df.isnull().sum()
            quality_report['missing_values'] = missing_values[missing_values > 0].to_dict()
            quality_report['total_missing'] = int(df.isnull().sum().sum())
            quality_report['missing_percentage'] = round(
                (df.isnull().sum().sum() / (df.shape[0] * df.shape[1])) * 100, 2
            )
            
            # Check for duplicates
            quality_report['duplicate_rows'] = int(df.duplicated().sum())
            quality_report['duplicate_percentage'] = round(
                (df.duplicated().sum() / len(df)) * 100, 2
            )
            
            # Check for data ranges (numerical columns)
            numerical_stats = {}
            for col in self.NUMERICAL_COLUMNS:
                if col in df.columns:
                    numerical_stats[col] = {
                        'min': float(df[col].min()),
                        'max': float(df[col].max()),
                        'mean': float(df[col].mean()),
                        'std': float(df[col].std()),
                        'negative_values': int((df[col] < 0).sum())
                    }
            quality_report['numerical_statistics'] = numerical_stats
            
            # Check target distribution
            if self.TARGET_COLUMN in df.columns:
                target_dist = df[self.TARGET_COLUMN].value_counts()
                quality_report['target_distribution'] = target_dist.to_dict()
                quality_report['target_balance_ratio'] = round(
                    target_dist.min() / target_dist.max(), 2
                )
            
            logger.info(f"Data quality validation completed")
            return quality_report
            
        except Exception as e:
            raise CustomException(e, sys)
    
    def initiate_data_validation(self, train_path: str, test_path: str) -> bool:
        """
        Perform complete data validation on train and test sets.
        
        Args:
            train_path: Path to training data
            test_path: Path to test data
            
        Returns:
            Boolean indicating if data passed validation
        """
        logger.info("Starting data validation process")
        
        try:
            # Load data
            train_df = pd.read_csv(train_path)
            test_df = pd.read_csv(test_path)
            
            logger.info(f"Loaded train data: {train_df.shape}")
            logger.info(f"Loaded test data: {test_df.shape}")
            
            # Validate schema
            train_schema = self.validate_schema(train_df)
            test_schema = self.validate_schema(test_df)
            
            # Validate quality
            train_quality = self.validate_data_quality(train_df)
            test_quality = self.validate_data_quality(test_df)
            
            # Compile validation report
            validation_report = {
                'train_data': {
                    'shape': train_df.shape,
                    'schema_validation': train_schema,
                    'quality_validation': train_quality
                },
                'test_data': {
                    'shape': test_df.shape,
                    'schema_validation': test_schema,
                    'quality_validation': test_quality
                },
                'validation_passed': (
                    train_schema['all_columns_present'] and 
                    test_schema['all_columns_present']
                )
            }
            
            # Save validation report
            save_json(self.config.report_path, validation_report)
            logger.info(f"Validation report saved to: {self.config.report_path}")
            
            # Log critical issues
            if not validation_report['validation_passed']:
                logger.warning("Data validation failed! Check the validation report.")
            else:
                logger.info("Data validation passed successfully!")
            
            return validation_report['validation_passed']
            
        except Exception as e:
            logger.error("Error in data validation")
            raise CustomException(e, sys)

In [29]:
def create_data_validation_config(config_dict: dict) -> DataValidationConfig:
    """
    Create DataValidationConfig from dictionary.
    
    Args:
        config_dict: Configuration dictionary
        
    Returns:
        DataValidationConfig object
    """
    return DataValidationConfig(
        report_path=config_dict.report_path)

In [30]:
try:
    config_dict = read_yaml(Path("configs/config.yaml"))
    data_validation_config = create_data_validation_config(config_dict.data_validation)
    data_validation = DataValidation(data_validation_config)
    validation_passed = data_validation.initiate_data_validation(
        config_dict.data_ingestion.train_data_path,
        config_dict.data_ingestion.test_data_path
    )
except Exception as e:
    logger.error(f"Data validation failed: {e}")
    raise CustomException(e, sys)

[2025-11-06 18:39:38,973] INFO - ChurnPrediction - yaml file: configs\config.yaml loaded successfully
[2025-11-06 18:39:38,975] INFO - ChurnPrediction - Data Validation component initialized
[2025-11-06 18:39:38,977] INFO - ChurnPrediction - Starting data validation process
[2025-11-06 18:39:39,016] INFO - ChurnPrediction - Loaded train data: (5634, 21)
[2025-11-06 18:39:39,018] INFO - ChurnPrediction - Loaded test data: (1409, 21)
[2025-11-06 18:39:39,020] INFO - ChurnPrediction - Schema validation completed: {'all_columns_present': True, 'missing_columns': [], 'extra_columns': [], 'numerical_dtypes_correct': True, 'numerical_dtype_details': {'tenure': True, 'MonthlyCharges': True}}
[2025-11-06 18:39:39,020] INFO - ChurnPrediction - Schema validation completed: {'all_columns_present': True, 'missing_columns': [], 'extra_columns': [], 'numerical_dtypes_correct': True, 'numerical_dtype_details': {'tenure': True, 'MonthlyCharges': True}}
[2025-11-06 18:39:39,069] INFO - ChurnPrediction -

<h1 align=center>Data Preprocessing</h1>