In [1]:
import os
%pwd

'/Users/whysocurious/Documents/MLDSAIProjects/e2e-mlops-gcp/research'

In [2]:
os.chdir("../")
%pwd

'/Users/whysocurious/Documents/MLDSAIProjects/e2e-mlops-gcp'

In [3]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformationConfig:
    color: str
    year_train: int
    month_train: int
    year_val: int
    month_val: int
    year_test: int
    month_test: int
    root_dir: Path
    data_path: Path

In [4]:
from mlProject.constants import *
from mlProject.utils.common import read_yaml, create_directories

In [5]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])


    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation
        params = self.params.dataDetails

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            color=params.color,
            year_train=params.year_train,
            month_train=params.month_train,
            year_val=params.year_val,
            month_val=params.month_val,
            year_test=params.year_test,
            month_test=params.month_test,
            
            root_dir=config.root_dir,
            data_path=config.data_path,
        )

        return data_transformation_config

In [6]:
import os
from mlProject import logger
import pickle
import pandas as pd


class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    
    
    def dump_pickle(self, obj, filename: str):
        with open(filename, "wb") as f_out:
            return pickle.dump(obj, f_out)


    def read_dataframe(self, filename: str):
        df = pd.read_parquet(filename)

        df['duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
        df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
        df = df[(df.duration >= 1) & (df.duration <= 60)]

        categorical = ['PULocationID', 'DOLocationID']
        df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
        df['ride_id'] = f'{self.config.year_train:04d}/{self.config.month_train:02d}_' + df.index.astype('str')

        return df


    def preprocess(self, df: pd.DataFrame,):  # dv: DictVectorizer, fit_dv: bool = False):
        df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
        categorical = ['PU_DO']
        numerical = ['trip_distance']
        dicts = df[categorical + numerical].to_dict(orient='records')
        # if fit_dv:
        #     X = dv.fit_transform(dicts)
        # else:
        #     X = dv.transform(dicts)
        return dicts

    def process_data(self):
        logger.info(f'reading file - {self.config.data_path}')
        df_train = self.read_dataframe(
        os.path.join(self.config.data_path, f'{self.config.color}_tripdata_{self.config.year_train:04d}-{self.config.month_train:02d}.parquet')
        )
        df_val = self.read_dataframe(
            os.path.join(self.config.data_path, f'{self.config.color}_tripdata_{self.config.year_val:04d}-{self.config.month_val:02d}.parquet')
        )
        df_test = self.read_dataframe(
            os.path.join(self.config.data_path, f'{self.config.color}_tripdata_{self.config.year_test:04d}-{self.config.month_test:02d}.parquet')
        )

        # Extract the target
        target = 'duration'
        y_train = df_train[target].values
        y_val = df_val[target].values
        y_test = df_test[target].values

        # Fit the DictVectorizer and preprocess data
        # dv = DictVectorizer()

        # Preprocess data
        logger.info("preprocess data.")
        X_train = self.preprocess(df_train) #, dv, fit_dv=True)
        X_val = self.preprocess(df_val) #, dv, fit_dv=False)
        X_test = self.preprocess(df_test) #, dv, fit_dv=False)

        # Save DictVectorizer and datasets
        # self.dump_pickle(dv, os.path.join(self.config.root_dir, "dv.pkl"))
        self.dump_pickle((X_train, y_train), os.path.join(self.config.root_dir, "train.pkl"))
        self.dump_pickle((X_val, y_val), os.path.join(self.config.root_dir, "val.pkl"))
        self.dump_pickle((X_test, y_test), os.path.join(self.config.root_dir, "test.pkl"))

        
        logger.info("Data transformation completed")


In [7]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.process_data()
except Exception as e:
    raise e

[2024-07-05 12:38:18,823: INFO: common: yaml file: config/config.yaml loaded successfully]
[2024-07-05 12:38:18,825: INFO: common: yaml file: params.yaml loaded successfully]
[2024-07-05 12:38:18,826: INFO: common: yaml file: schema.yaml loaded successfully]
[2024-07-05 12:38:18,827: INFO: common: created directory at: artifacts]
[2024-07-05 12:38:18,828: INFO: common: created directory at: artifacts/data_transformation]
[2024-07-05 12:38:18,829: INFO: 4052946182: reading file - artifacts/data_ingestion]
[2024-07-05 12:38:41,010: INFO: 4052946182: preprocess data.]
[2024-07-05 12:39:01,081: INFO: 4052946182: Data transformation completed]
