In [1]:
import os

In [2]:
%pwd

'c:\\Users\\ambig\\jupiter_notebook\\Projects\\premium-price-prediction\\research'

In [3]:
os.chdir("../")
%pwd

'c:\\Users\\ambig\\jupiter_notebook\\Projects\\premium-price-prediction'

In [65]:
from pathlib import Path
from dataclasses import dataclass

@dataclass
class DataPreprocessingconfig:
    root_dir: Path
    input_path: Path
    output_path: Path
    test_filepath: Path
    train_filepath: Path
    raw_filepath: Path
    preprocess_column: str

In [66]:
from src.Premium_Price_Prediction.constants import *
from src.Premium_Price_Prediction.utils.common import read_yaml , create_directories
from src.Premium_Price_Prediction import logger

In [69]:
class ConfigurationManager:
    def __init__(self, 
                 config_filepath = CONFIG_FILE_PATH,
                 params_filepath = PARAMS_FILE_PATH,
                 schema_filepath = SCHEMA_FILE_PATH):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)
        
        create_directories([self.config.artifacts_root])
        
    def get_data_preprocessing_config(self)->DataPreprocessingconfig:
        config = self.config.data_preprocessing
        create_directories([config.root_dir])
        
        data_ingestion_config = DataPreprocessingconfig(
            root_dir = config.root_dir,
            input_path = config.input_path,
            output_path = config.output_path,
            preprocess_column = config.preprocess_column,
            train_filepath = config.train_filepath,
            test_filepath = config.test_filepath,
            raw_filepath = config.raw_filepath,
            
        )
        return data_ingestion_config
        
        
        

In [71]:
import os
import pandas as pd
from src.Premium_Price_Prediction import logger
from sklearn.model_selection import train_test_split


class DataPreprocessing:
    def __init__(self, config: DataPreprocessingconfig):
        # Initialize configuration attributes
        self.input_path: str = config.input_path
        self.output_path: str = config.output_path
        self.preprocess_column: str = config.preprocess_column
        self.train_filepath: str = config.train_filepath
        self.test_filepath: str = config.test_filepath
        self.raw_filepath: str = config.raw_filepath
        self.df: pd.DataFrame = None  # DataFrame to hold combined data

    def load_data(self) -> None:
        """Load data from a CSV file."""
        try:
            self.df = pd.read_csv(self.raw_filepath)
            logger.info(f"Loaded data from {self.raw_filepath} with shape {self.df.shape}")
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise e

    def preprocess(self) -> pd.DataFrame:
        """Preprocess the data by handling missing values and duplicates."""
        try:
            if self.df is None:
                self.load_data()

            self.handle_missing_values()
            self.standardize_column_names()
            self.remove_duplicates()

            return self.df

        except Exception as e:
            logger.error(f"Error during preprocessing: {str(e)}")
            raise e

    def handle_missing_values(self) -> None:
        """Handle missing values in the DataFrame."""
        missing_values = self.df.isna().sum().sum()
        logger.info(f"Total missing values before dropping: {missing_values}")
        self.df.dropna(inplace=True)
        logger.info(f"Dropped missing values; remaining rows: {self.df.shape[0]}")

    def standardize_column_names(self) -> None:
        """Standardize column names by replacing spaces with underscores and converting to lowercase."""
        self.df.columns = self.df.columns.str.replace(" ", "_").str.lower()
        logger.info("Standardized column names")

    def remove_duplicates(self) -> None:
        """Remove duplicate rows from the DataFrame."""
        duplicates = self.df.duplicated().sum()
        if duplicates > 0:
            self.df.drop_duplicates(inplace=True)
            logger.info(f"Dropped {duplicates} duplicate rows; remaining rows: {self.df.shape[0]}")
        else:
            logger.info("No duplicate rows found")

    def save_data(self) -> None:
        """Save the processed training and testing datasets to specified output paths."""
        try:
            os.makedirs(self.output_path, exist_ok=True)
            logger.info("Train-test split initiated")
            train_set, test_set = train_test_split(self.df, test_size=0.2, random_state=42)

            train_set.to_csv(self.train_filepath, index=False, header=True)
            test_set.to_csv(self.test_filepath, index=False, header=True)
            logger.info("Data saved to train and test files successfully")
        except Exception as e:
            logger.error(f"Error saving data: {str(e)}")
            raise e

    def iqr_bounds(self, column_name: str) -> tuple:
        """Calculate the IQR bounds for a given column."""
        q1 = self.df[column_name].quantile(0.25)
        q3 = self.df[column_name].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        return lower_bound, upper_bound

    def apply_iqr_treatment(self) -> pd.DataFrame:
        """Apply IQR treatment to the specified column in the DataFrame."""
        try:
            if self.df is None:
                self.load_data()

            # Check if the preprocess_column exists
            if self.preprocess_column not in self.df.columns:
                logger.error(f"Column '{self.preprocess_column}' does not exist in the DataFrame.")
                raise KeyError(f"Column '{self.preprocess_column}' does not exist in the DataFrame.")

            lower, upper = self.iqr_bounds(self.preprocess_column)
            self.df[self.preprocess_column] = self.df[self.preprocess_column].clip(lower=lower, upper=upper)
            logger.info(f"IQR treatment applied on column '{self.preprocess_column}' with bounds {lower} and {upper}")

            self.remove_extreme_outliers()

            # Standardize 'smoking_status' values if present
            self.standardize_smoking_status()

            # Filter the DataFrame for training dataset and drop 'dataset' column
            if 'dataset' in self.df.columns:
                self.df = self.df[self.df['dataset'] == 'train'].drop(columns=['dataset'])

            return self.df

        except KeyError as ke:
            logger.error(f"KeyError: {str(ke)}")
            raise
        except Exception as e:
            logger.error(f"Error during IQR treatment: {str(e)}")
            raise e

    def remove_extreme_outliers(self) -> None:
        """Remove extreme outliers based on the 99.9th percentile."""
        quantile_threshold = self.df[self.preprocess_column].quantile(0.999)
        extreme_outliers = self.df[self.df[self.preprocess_column] > quantile_threshold].shape[0]
        self.df = self.df[self.df[self.preprocess_column] <= quantile_threshold]
        logger.info(f"Removed {extreme_outliers} extreme outliers beyond 99.9th percentile for '{self.preprocess_column}'")

    def standardize_smoking_status(self) -> None:
        """Standardize values in the 'smoking_status' column if it exists."""
        if 'smoking_status' in self.df.columns:
            self.df['smoking_status'] = self.df['smoking_status'].replace({
                'Not Smoking': 'No Smoking',
                'Does Not Smoke': 'No Smoking',
                'Smoking=0': 'No Smoking'
            })
            logger.info("Standardized values in 'smoking_status' column to unify non-smoking labels")


# Main execution block


In [72]:
try:
    # Initialize the configuration manager and get the preprocessing config
    config = ConfigurationManager()
    data_preprocessing_config = config.get_data_preprocessing_config()
    data_preprocessing = DataPreprocessing(data_preprocessing_config)
    data_preprocessing.load_data()  
    data_preprocessing.preprocess()  
    data_preprocessing.apply_iqr_treatment()  
    data_preprocessing.save_data()  

except Exception as e:
    print(f"An error occurred: {e}")  # Print the error message


[2024-11-04 12:51:55,070: INFO: common: 30] YAML file : config\config.yaml loaded successfully
[2024-11-04 12:51:55,070: INFO: common: 30] YAML file : params.yaml loaded successfully
[2024-11-04 12:51:55,075: INFO: common: 30] YAML file : schema.yaml loaded successfully
[2024-11-04 12:51:55,078: INFO: common: 50] Directory artifacts created successfully.
[2024-11-04 12:51:55,080: INFO: common: 50] Directory artifacts/data_preprocessing created successfully.
[2024-11-04 12:51:55,183: INFO: 2727307769: 22] Loaded data from artifacts/data_ingestion/raw.csv with shape (50000, 13)
[2024-11-04 12:51:55,210: INFO: 2727307769: 46] Total missing values before dropping: 26
[2024-11-04 12:51:55,233: INFO: 2727307769: 48] Dropped missing values; remaining rows: 49976
[2024-11-04 12:51:55,233: INFO: 2727307769: 53] Standardized column names
[2024-11-04 12:51:55,278: INFO: 2727307769: 62] No duplicate rows found
[2024-11-04 12:51:55,282: INFO: 2727307769: 100] IQR treatment applied on column 'income