In [1]:
import os
import sys

In [2]:
%pwd

'd:\\collab_projects\\South_German_Credit_Risk\\notebook'

In [3]:
os.chdir("../")

In [4]:
%pwd

'd:\\collab_projects\\South_German_Credit_Risk'

### Create the Entity

In [5]:
from dataclasses import dataclass
from pathlib import Path 


@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path 
    data_path: Path

### Create ConfigurationManager

In [6]:
from South_German_Bank.constants import *
from South_German_Bank.utils.common import read_yaml, create_directories

In [7]:
class ConfigurationManager:
    def __init__(
            self,
            config_filepath = CONFIG_FILE_PATH,
            params_filepath = PARAMS_FILE_PATH,
            schema_filepath = SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])


    def get_data_transformation_config(self)->DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path
        )

        return data_transformation_config

### Create Components

In [21]:
import os
from South_German_Bank.logging import logger
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OrdinalEncoder, RobustScaler

In [19]:

class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config
        self.preprocessor = None  # Initialize preprocessor as None

    def get_data_transformation(self):
        try:
            # Load the data
            df = pd.read_csv(self.config.data_path)

            numerical_features = df.select_dtypes(exclude="object").columns
            categorical_features = df.select_dtypes(include="object").columns

            num_pipeline = Pipeline(
                steps=[
                    ("imputer", SimpleImputer(strategy="median")),
                    ("robustscaler", RobustScaler())
                ]
            )

            cat_pipeline = Pipeline(
                steps=[
                    ("imputer", SimpleImputer(strategy="most_frequent")),
                    ("ordinalEncoder", OrdinalEncoder()),
                    ("robustscaler", RobustScaler())
                ]
            )

            logger.info(f"Categorical columns: {categorical_features}")
            logger.info(f"Numerical columns: {numerical_features}")

            preprocessor = ColumnTransformer(
                transformers=[
                    ("num_pipeline", num_pipeline, numerical_features),
                    ("cat_pipeline", cat_pipeline, categorical_features)
                ]
            )

            self.preprocessor = preprocessor  # Store the preprocessor for later use
            logger.info("Data preprocessing done")

        except Exception as e:
            raise e

    def train_test_split(self):
        if self.preprocessor is None:
            raise ValueError("Preprocessor is not available. Please call get_data_transformation first.")

        data = pd.read_csv(self.config.data_path)

        # Transform the data using the preprocessor
        transformed_data = self.preprocessor.fit_transform(data)

        # Convert the transformed data back to a DataFrame
        transformed_df = pd.DataFrame(transformed_data, columns=data.columns)

        # Split the data into train and test sets
        train, test = train_test_split(transformed_df)

        # Save the encoded train and test sets to CSV files
        train.to_csv(os.path.join(self.config.root_dir, "train.csv"), index=False)
        test.to_csv(os.path.join(self.config.root_dir, "test.csv"), index=False)

        logger.info("Splited data into training and test sets")
        logger.info(train.shape)
        logger.info(test.shape)


### Create Pipeline

In [20]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.get_data_transformation()
    data_transformation.train_test_split()
except Exception as e:
    raise e


[{asctime}s: INFO: common: yaml file: config\config.yaml loaded successfully]
[{asctime}s: INFO: common: yaml file: params.yaml loaded successfully]
[{asctime}s: INFO: common: yaml file: schema.yaml loaded successfully]
[{asctime}s: INFO: common: created directory at: artifacts]
[{asctime}s: INFO: common: created directory at: artifacts/data_transformation]


[{asctime}s: INFO: 1679601004: Categorical columns: Index(['status', 'credit_history', 'purpose', 'savings', 'employment_duration',
       'installment_rate', 'personal_status_sex', 'other_debtors',
       'present_residence', 'property', 'other_installment_plans', 'housing',
       'number_credits', 'job', 'people_liable', 'telephone', 'foreign_worker',
       'credit_risk'],
      dtype='object')]
[{asctime}s: INFO: 1679601004: Numerical columns: Index(['duration', 'amount', 'age'], dtype='object')]
[{asctime}s: INFO: 1679601004: Data preprocessing done]
[{asctime}s: INFO: 1679601004: Splited data into training and test sets]
[{asctime}s: INFO: 1679601004: (750, 21)]
[{asctime}s: INFO: 1679601004: (250, 21)]
