In [1]:
import os

In [2]:
os.chdir('../')
%pwd

'/home/paladin/Downloads/Bixi-OD-Matrix-Prediction/Bixi-OD-Matrix-Prediction'

In [3]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    local_data_file: Path 
    local_train_od_dir: Path       
    local_test_od_dir: Path
    local_scaler_od_dir: Path 
    local_train_tensor_dir: Path   
    local_test_tensor_dir: Path
    local_scaler_tensor_dir: Path

In [4]:
from timeseriesPredictor.utils import create_directories, read_yaml
from timeseriesPredictor.constants import *

In [5]:
class configurationManeger:
    def __init__(self, 
                 config_filepath = CONFIG_FILE_PATH,
                 secret_filepath = SECRET_FILE_PATH,                 
                 params_filepath = PARAMS_FILE_PATH):
        
        self.config = read_yaml(config_filepath) 
        self.secret = read_yaml(secret_filepath)        
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])

    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation             

        create_directories([config.root_dir])

        data_trnsformation_config = DataTransformationConfig(
            root_dir= config.root_dir,
            local_data_file=self.config.data_ingestion.local_data_file,           
            local_train_od_dir= config.local_train_od_dir,            
            local_test_od_dir= config.local_test_od_dir,
            local_scaler_od_dir=config.local_scaler_od_dir,
            local_train_tensor_dir= config.local_train_tensor_dir,            
            local_test_tensor_dir= config.local_test_tensor_dir,
            local_scaler_tensor_dir= config.local_scaler_tensor_dir         

        )

        return data_trnsformation_config

In [6]:
import sys
import os
import pandas as pd
from timeseriesPredictor.utils import OD_tensor_matrix, train_test_split, save_pickle
from timeseriesPredictor.exception import CustomException
from timeseriesPredictor.logger import logging
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from box import ConfigBox

from sklearn.preprocessing import StandardScaler

In [7]:
class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def get_data_transformer_object(self):
        try:            
            OD_matrix_pipeline = Pipeline(
                steps=[
                    ('matrix_creation', 
                    FunctionTransformer(OD_tensor_matrix)),
                    ('train_val_test_split', 
                      FunctionTransformer(train_test_split, kw_args={'train_test_ratio':0.75}))
                ]
            )

            tensor_matrix_pipeline = Pipeline(
                steps=[
                    ('matrix_creation', 
                    FunctionTransformer(OD_tensor_matrix, kw_args={'tensor': True})),
                    ('train_val_test_split', 
                      FunctionTransformer(train_test_split, kw_args={'train_test_ratio':0.75}))
                ]
            )
            return  ConfigBox({'OD_matrix_pipeline': OD_matrix_pipeline, 
                               'tensor_matrix_pipeline': tensor_matrix_pipeline})           
        
        except Exception as e:
            raise CustomException(e, sys)
        
    
    def initiate_data_transformation(self):
        if not os.path.exists(self.config.local_data_file):
            logging.info(f"WARNING: {self.config.local_data_file} does not exist!")             
        
        else:  
            
            df = pd.read_csv(self.config.local_data_file)
            logging.info('Read data is completed!')
            df['start_date'] = pd.to_datetime(df['start_date'], format='ISO8601')
            logging.info('Changing object to datetime format is completed!')
           
            logging.info("Obtaining preprocessing object")
            preprocessing_obj = self.get_data_transformer_object()  

            logging.info(f"Applying OD preprocessing object on dataframe")
            matrix_train, matrix_test = preprocessing_obj.OD_matrix_pipeline.fit_transform(df)             
            scaler = StandardScaler()
            l, m, n , c = matrix_train.shape
            scaled_matrix_train = scaler.fit_transform(matrix_train.reshape(l, m*n)).reshape(l, m, n , c) 
            l, m, n , c = matrix_test.shape
            scaled_matrix_test = scaler.transform(matrix_test.reshape(l, m*n)).reshape(l, m, n , c)             


            save_pickle(path= self.config.local_train_od_dir, obj= scaled_matrix_train)
            save_pickle(path = self.config.local_test_od_dir, obj= scaled_matrix_test)
            save_pickle(path= self.config.local_scaler_od_dir, obj= scaler)                     
  

            logging.info(f"Applying tensor preprocessing object on dataframe")
            matrix_train, matrix_test = preprocessing_obj.tensor_matrix_pipeline.fit_transform(df)
            scaler = StandardScaler()
            l, m, n , c = matrix_train.shape
            scaled_matrix_train = scaler.fit_transform(matrix_train.reshape(l, m*n)).reshape(l, m, n , c) 
            l, m, n , c = matrix_test.shape
            scaled_matrix_test = scaler.transform(matrix_test.reshape(l, m*n)).reshape(l, m, n , c)   
            save_pickle(path= self.config.local_train_tensor_dir , obj= scaled_matrix_train)
            save_pickle(path= self.config.local_test_tensor_dir , obj= scaled_matrix_test)
            save_pickle(path= self.config.local_scaler_tensor_dir, obj= scaler)
             


In [8]:
try:
    config = configurationManeger()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.initiate_data_transformation()

except Exception as e:
    raise CustomException(e, sys)