In [1]:
import os

os.chdir("../../")
%pwd

'c:\\Users\\anfe1\\OneDrive\\Escritorio\\Instaleap\\Instamarket'

In [2]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path
    stores_list_file: Path
    preprocessor_file: Path

In [3]:
from instamarket.constants import CONFIG_FILE_PATH
from instamarket.utils.common import read_yaml, create_directories

class ConfigurationManager:
    def __init__(self) -> None:
        config_file_path = CONFIG_FILE_PATH

        self.config = read_yaml(config_file_path)

        create_directories([self.config.artifacts_root])
    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path,
            stores_list_file=config.stores_list_file,
            preprocessor_file=config.preprocessor_file
        )

        return data_transformation_config

In [9]:
import os
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from scipy.sparse import hstack

from instamarket.utils.common import save_object, load_object
from instamarket.logging import logger

class DataTransformation:
    def __init__(self, config:DataTransformationConfig) -> None:
        self.config = config

    def get_data_transformer_obj(self):
        """
        This function is responsible for data transformation 

        """
        numerical_columns = ["optimal_total_time"]
        categorical_columns = [
            "store_id",
            "optimal_start_day", "optimal_start_hour", "optimal_start_minute", "optimal_start_weekday",
            "optimal_end_day", "optimal_end_hour", "optimal_end_minute", "optimal_end_weekday"
            ]

        num_pipeline = Pipeline(
            steps=[
                ("scaler",StandardScaler())
            ]
        )
        logger.info("Numerical columns standard scaling completed")

        stores = load_object(self.config.stores_list_file)
        datetime_categories = [list(range(1,32)),list(range(0,24)),list(range(0,60)),list(range(0,7))]
        categories = [stores,*datetime_categories,*datetime_categories]
        cat_pipeline = Pipeline(
            steps=[
                ("one_hot_encoder",OneHotEncoder(categories=categories)),
                ("scaler",StandardScaler(with_mean=False))
            ]
        )
        logger.info("Categorical columns encoding completed")

        preprocessor = ColumnTransformer(
            [
                ("num_pipeline",num_pipeline,numerical_columns),
                ("cat_pipeline",cat_pipeline,categorical_columns)
            ]
        )

        logger.info("Save preprocessing object")
        save_object(self.config.preprocessor_file, preprocessor)

        return preprocessor


    def initiate_data_transformation(self, preprocessing_obj, target_column_name):
        logger.info("Read train & test data as dataframe")
        train_df = pd.read_csv(os.path.join(self.config.data_path,"train.csv"))
        test_df = pd.read_csv(os.path.join(self.config.data_path,"test.csv"))

        input_feature_train_df = train_df.drop(columns=target_column_name,axis=1)
        target_feature_train_df = train_df[target_column_name]

        input_feature_test_df = test_df.drop(columns=target_column_name,axis=1)
        target_feature_test_df = test_df[target_column_name]

        logger.info("Applying preprocessing object on training and testing dataframes")
        input_feature_train_arr = preprocessing_obj.fit_transform(input_feature_train_df)
        input_feature_test_arr = preprocessing_obj.fit_transform(input_feature_test_df)

        train_arr = hstack([input_feature_train_arr,np.array(target_feature_train_df)])
        test_arr = hstack([input_feature_test_arr,np.array(target_feature_test_df)])

        save_object(os.path.join(self.config.root_dir,"train.pkl"), train_arr)
        save_object(os.path.join(self.config.root_dir,"test.pkl"), test_arr)

    def convert(self):
        preprocessing_obj = self.get_data_transformer_obj()
        self.initiate_data_transformation(preprocessing_obj, ["start_delay", "end_delay"])
        logger.info("Transformed dataset saved")

In [10]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.convert()
except Exception as e:
    raise e

[2024-04-20 15:10:07,830] 29 common - INFO - yaml file config\config.yml loaded successfully
[2024-04-20 15:10:07,830] 47 common - INFO - Created directory at: artifacts
[2024-04-20 15:10:07,839] 47 common - INFO - Created directory at: artifacts/data_transformation
[2024-04-20 15:10:07,840] 34 232841500 - INFO - Numerical columns standard scaling completed
[2024-04-20 15:10:07,842] 45 232841500 - INFO - Categorical columns encoding completed
[2024-04-20 15:10:07,844] 54 232841500 - INFO - Save preprocessing object
[2024-04-20 15:10:07,851] 61 232841500 - INFO - Read train & test data as dataframe
[2024-04-20 15:10:08,376] 71 232841500 - INFO - Applying preprocessing object on training and testing dataframes
[2024-04-20 15:10:09,146] 84 232841500 - INFO - Transformed dataset saved
