In [1]:
import os
os.chdir("../")

In [2]:

from src.movie_predictor.constants import *
import os
from box.exceptions import BoxValueError
import yaml
from src.movie_predictor import logger
import json
import joblib
from ensure import ensure_annotations
from box import ConfigBox
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd

def read_yaml(path_to_yaml: Path) -> ConfigBox:
    """reads yaml file and returns
    Args:
        path_to_yaml (str): input is path
    Raises:
        ValueError: if yaml file is empty
        e: empty file
    Returns:
        ConfigBox: ConfigBox type
    """
    try:
        with open(path_to_yaml) as yaml_file:
            config_yaml = yaml.safe_load(yaml_file)
            logger.info(f"yaml file: {path_to_yaml} loaded successfully")
            return ConfigBox(config_yaml)
    except BoxValueError:
        raise ValueError("yaml file is empty")
    except Exception as e:
        raise e

@ensure_annotations
def create_directories(path_to_directories: list, verbose=True):
    """create list of directories
    Args:
        path_to_directories (list): list of path of directories
        ignore_log (bool, optional): ignore if multiple dirs is to be created. Defaults to False.
    """
    for path in path_to_directories:
        os.makedirs(path, exist_ok=True)
        if verbose:
            logger.info(f"created directory at: {path}")

@ensure_annotations
def save_json(path: Path, data: dict):
    """save json data
    Args:
        path (Path): path to json file
        data (dict): data to be saved in json file
    """
    with open(path, "w") as f:
        json.dump(data, f, indent=4)

    logger.info(f"json file saved at: {path}")

@ensure_annotations
def load_json(path: Path) -> ConfigBox:
    """load json files data
    Args:
        path (Path): path to json file
    Returns:
        ConfigBox: data as class attributes instead of dict
    """
    with open(path) as f:
        config_yaml = json.load(f)

    logger.info(f"json file loaded succesfully from: {path}")
    return ConfigBox(config_yaml)

@ensure_annotations
def save_bin(data: Any, path: Path):
    """save binary file
    Args:
        data (Any): data to be saved as binary
        path (Path): path to binary file
    """
    joblib.dump(value=data, filename=path)
    logger.info(f"binary file saved at: {path}")

@ensure_annotations
def load_bin(path: Path) -> Any:
    """load binary data
    Args:
        path (Path): path to binary file
    Returns:
        Any: object stored in the file
    """
    data = joblib.load(path)
    logger.info(f"binary file loaded from: {path}")
    return data

@ensure_annotations
def load_data(file_path: str, schema_file_path: str) -> pd.DataFrame:
    try:
        datatset_schema = read_yaml(schema_file_path)

        schema = datatset_schema[DATASET_SCHEMA_COLUMNS_KEY]

        dataframe = pd.read_csv(file_path)

        error_messgae = ""


        for column in dataframe.columns:
            if column in list(schema.keys()):
                dataframe[column].astype(schema[column])
            else:
                error_messgae = f"{error_messgae} \nColumn: [{column}] is not in the schema."
        if len(error_messgae) > 0:
            raise Exception(error_messgae)
        return dataframe

    except Exception as e:
        return e
@ensure_annotations    
def get_size(path: Path) -> str:
    """get size in KB
    Args:
        path (Path): path of the file
    Returns:
        str: size in KB
    """
    size_in_kb = round(os.path.getsize(path)/1024)
    return f"~ {size_in_kb} KB"


@ensure_annotations
def save_numpy_array_data(file_path: Path, array: np.array):
    """
    Save numpy array data to file
    file_path: str location of file to save
    array: np.array data to save
    """
    try:
        dir_path = os.path.dirname(file_path)
        os.makedirs(dir_path, exist_ok=True)
        with open(file_path, 'wb') as file_obj:
            np.save(file_obj, array)
    except Exception as e:
        return e
    
@ensure_annotations
def save_object(file_path:str,obj):
    """
    file_path: str
    obj: Any sort of object
    """
    try:
        dir_path = os.path.dirname(file_path)
        os.makedirs(dir_path, exist_ok=True)
        with open(file_path, "wb") as file_obj:
            joblib.dump(obj, file_obj)
    except Exception as e:
        return e

In [3]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformConfig:
    root_dir: Path
    tranfored_train_dir: Path
    transormed_test_dir: Path
    preprocessed_file_path: Path

In [6]:
import os
BUDGET_XLSX = 'budget.xlsx'
BUDGET_FILE_PATH = os.path.join(CONFIG_DIR,BUDGET_XLSX)

COMBINE_XLSX = 'combine.xlsx'
COMBINE_FILE_PATH = os.path.join(CONFIG_DIR,COMBINE_XLSX)

In [7]:
f3 = pd.read_excel(BUDGET_FILE_PATH)
df1 = pd.read_excel(COMBINE_FILE_PATH,sheet_name='runtime')
df2 = pd.read_excel(COMBINE_FILE_PATH,sheet_name='opening_theaters')

In [4]:
config = read_yaml(CONFIG_FILE_PATH)
schema = read_yaml(DATA_VALIDATION_FILE)
create_directories([config.artifacts_root])

transform_config = config.data_transformation

In [5]:
transform_config.root_dir

'artifacts/data_transformation'

In [6]:

class ConfigurationManager:
    def __init__(
        self, 
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):
        
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(DATA_VALIDATION_FILE)
        create_directories([self.config.artifacts_root])

    
    def get_data_transform_config(self) -> DataTransformConfig:
        transform_config = self.config.data_transformation

        create_directories([transform_config.root_dir])
        create_directories([transform_config.transformed_train_dir])
        create_directories([transform_config.transformed_test_dir])
        create_directories([transform_config.preprocessed_dir])

        data_transform_config = DataTransformConfig(
            root_dir = transform_config.root_dir,
            tranfored_train_dir = transform_config.transformed_train_dir,
            transormed_test_dir = transform_config.transformed_test_dir,
            preprocessed_file_path = transform_config.preprocessed_object_file_name
        )
        return data_transform_config

In [7]:
ConfigurationManager().get_data_transform_config()

DataTransformConfig(root_dir='artifacts/data_transformation', tranfored_train_dir='artifacts/data_transformation/train', transormed_test_dir='artifacts/data_transformation/test', preprocessed_file_path='artifacts/data_transformation/preprocessed/preprocessed.pkl')

In [8]:
from collections import namedtuple
DataIngestionArtifact = namedtuple("DataIngestionArtifact",
[ "train_file_path", "test_file_path", "is_ingested", "message"])

DataValidationArtifact = namedtuple("DataValidationArtifact",
[ "report_file_path","report_page_file_path","is_validated","message"])

In [9]:
DATASET_SCHEMA_COLUMNS_KEY = "columns"
NUMERICAL_COLUMN_KEY = "numerical_columns"
CATEGORICAL_COLUMN_KEY = "categorical_columns"
TARGET_COLUMN_KEY = "target_column"

In [10]:
datatset_schema = read_yaml(DATA_VALIDATION_FILE)

In [11]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class ReplaceCharsTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X[self.columns] = X[self.columns].applymap(self._replace_chars)
        for col in self.columns:
            if X[col].dtype == 'O' or X[col].dtype == 'float':
                X[col].replace('\\N', np.nan, inplace=True)
                
            if col in ['budget','opening_theaters','world_revenue','runtimeMinutes']:
                X[col] = pd.to_numeric(X[col], errors='coerce')    
    
        return X
    
    def _replace_chars(self, cell):
        if isinstance(cell, str):
            cell_str = str(cell)
            cell_str = cell_str.replace(',', '').replace('$', '')
            return cell_str
        else:
            return cell



class DropDuplicatesTransformer(TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        return X.drop_duplicates()
    
class FillMissingBudgets(TransformerMixin):
    def __init__(self, df3):
        self.d_p = dict(zip(df3['originalTitle'], df3['budget']))
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['budget'] = X['budget'].fillna(X['originalTitle'].map(self.d_p))
        return X



class FillnaTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, column, dictionary):
        self.column = column
        self.dictionary = dictionary
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X[self.column] = X[self.column].fillna(X['tconst'].map(self.dictionary))
        return X    


class DropColumnsTransformer(TransformerMixin):
    """Custom transformer to drop columns from a Pandas DataFrame."""
    
    def __init__(self, columns_to_drop):
        self.columns_to_drop = columns_to_drop
        
    def transform(self, X):
        X = X.drop(columns=self.columns_to_drop)
        return X
    
    def fit(self, X, y=None):
        return self

class CategoricalEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        self.labels_ordered_ = {}
        categorical_features = [feature for feature in X.columns if X[feature].dtype == 'O']
        for feature in categorical_features:
            labels_ordered = X.groupby([feature]).size().sort_values().index
            labels_ordered = {value: index for index, value in enumerate(labels_ordered, 0)}
            self.labels_ordered_[feature] = labels_ordered
        return self
    
    def transform(self, X, y=None):
        for feature, labels_ordered in self.labels_ordered_.items():
            X[feature] = X[feature].map(labels_ordered)
        return X.dropna()


In [12]:
from src.movie_predictor.entity.artifact_entity import DataTransformationArtifact

In [22]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
import pandas as pd
class DataTransform:
    def __init__(self, data_transformation: DataTransformConfig,
                 data_ingestion_artifact: DataIngestionArtifact,
                 data_validation_artifact: DataValidationArtifact) -> None:
        try:
            
            self.data_transformation_config = data_transformation
            self.data_ingestion_artifact = data_ingestion_artifact
            self.data_validation_artifact = data_validation_artifact

        except Exception as e:
            return e
        

    
    def get_data_transformer_object(self,dataframe) -> ColumnTransformer:
            try:
                # train_file_path = self.data_ingestion_artifact.train_file_path
                # test_file_path = self.data_ingestion_artifact.test_file_path
                
                datatset_schema = read_yaml(DATA_VALIDATION_FILE)
                df3 = pd.read_excel('budget.xlsx')
                df1 = pd.read_excel('combine.xlsx',sheet_name='runtime')
                df2 = pd.read_excel('combine.xlsx',sheet_name='opening_theaters')
                d = dict(zip(df2['tconst'], df2['opening_theaters']))
                p = dict(zip(df1['tconst'], df1['runtimeMinutes']))

                # Define a custom transformer class to fill null values based on a dictionary
                # Create a pipeline that applies the FillnaTransformer to fill null values in two columns
                # Create a pipeline that applies the ReplaceCharsTransformer to the DataFrame for specific columns
                pipeline_duplicate = Pipeline([
                    ('remove_duplicates', DropDuplicatesTransformer())
                ])

                # Apply the pipeline to the dataframe

                conversion_pipeline = Pipeline([
                                    ('replace_chars', ReplaceCharsTransformer(columns=['budget','opening_theaters','world_revenue','runtimeMinutes','genres_y','job','characters','birthYear','deathYear','knownForTitles'])),
                                    ('passthrough', 'passthrough')
                ])
                mpaa_missing_pipeline = Pipeline([
                                    ('imputer', SimpleImputer(strategy='most_frequent'))
                ])
                                
                manual_nan_pipeline = Pipeline([
                                    ('fillna_runtime', FillnaTransformer(column='runtimeMinutes', dictionary=p)),
                                    ('fillna_opening', FillnaTransformer(column='opening_theaters', dictionary=d)),
                                    ('fill_budgets', FillMissingBudgets(df3))
                ])
                cols_drop = ['Unnamed: 0','genres_y','domestic_revenue','opening_revenue','nconst','deathYear','job','characters','birthYear','primaryProfession','knownForTitles','isAdult','titleType','tconst']
              

                drop_missing_pipeline = Pipeline([
                    ('drop_cols', DropColumnsTransformer(columns_to_drop=cols_drop))
                ])

                encoding_scaling_pipeline = Pipeline([
                    ('encoder', CategoricalEncoder()),
                     ('scaler', MinMaxScaler())
                ])
                preprocessing = ColumnTransformer([
                ('pipeline_duplicate', pipeline_duplicate, dataframe),
                ('conversion_pipeline', conversion_pipeline, dataframe),
                ('mpaa_missing_pipeline', mpaa_missing_pipeline, dataframe),
                ('manual_nan_pipeline', manual_nan_pipeline, dataframe),
                ('drop_missing_pipeline', drop_missing_pipeline, dataframe),
                ('encoding_scaling_pipeline', encoding_scaling_pipeline, dataframe)

                ])
                return preprocessing

                
                # df_clean = pipeline_duplicate.fit_transform(dataframe)
                # df_clean = conversion_pipeline.fit_transform(df_clean)
                # df_clean['MPAA'] = mpaa_missing_pipeline.fit_transform(df_clean[['MPAA']])

                # new_df = manual_nan_pipeline.fit_transform(df_clean)
                # transformed_df = drop_missing_pipeline.fit_transform(new_df)
                # coded = encoding_scaling_pipeline.fit_transform(transformed_df)
            
            except Exception as e:
                return e    
            


            

    def run_data_transformation(self)->DataTransformationArtifact:
        try:
            
            

            #logging.info(f"Obtaining training and test file path.")
            train_file_path = self.data_ingestion_artifact.train_file_path
            test_file_path = self.data_ingestion_artifact.test_file_path
            

            #schema_file_path = self.data_validation_artifact.schema_file_path
            datatset_schema = read_yaml(DATA_VALIDATION_FILE)
            # logging.info(f"Loading training and test data as pandas dataframe.")
            train_df = load_data(file_path=train_file_path, schema_file_path=datatset_schema)
            
            test_df = load_data(file_path=test_file_path, schema_file_path=datatset_schema)

            schema = read_yaml(file_path=datatset_schema)

            target_column_name = schema[TARGET_COLUMN_KEY]


            #logging.info(f"Splitting input and target feature from training and testing dataframe.")
            input_feature_train_df = train_df.drop(columns=[target_column_name],axis=1)
            target_feature_train_df = train_df[target_column_name]

            input_feature_test_df = test_df.drop(columns=[target_column_name],axis=1)
            target_feature_test_df = test_df[target_column_name]
            

            #logging.info(f"Applying preprocessing object on training dataframe and testing dataframe")
            input_feature_train_arr=preprocessing_obj.fit_transform(input_feature_train_df)
            input_feature_test_arr = preprocessing_obj.transform(input_feature_test_df)


            train_arr = np.c_[ input_feature_train_arr, np.array(target_feature_train_df)]

            test_arr = np.c_[input_feature_test_arr, np.array(target_feature_test_df)]
            # root_dir: Path
            # tranfored_train_dir: Path
            # transormed_test_dir: Path
            # preprocessed_file_path: Path
            transformed_train_dir = self.data_transformation_config.tranfored_train_dir
            transformed_test_dir = self.data_transformation_config.transormed_test_dir
            #since we have numpy array data we replace file extension in npz
            train_file_name = os.path.basename(train_file_path).replace(".csv",".npz")
            test_file_name = os.path.basename(test_file_path).replace(".csv",".npz")

            transformed_train_file_path = os.path.join(transformed_train_dir, train_file_name)
            transformed_test_file_path = os.path.join(transformed_test_dir, test_file_name)

            #logging.info(f"Saving transformed training and testing array.")
            
            save_numpy_array_data(file_path=transformed_train_file_path,array=train_arr)
            save_numpy_array_data(file_path=transformed_test_file_path,array=test_arr)

            
            preprocessing_obj_file_path = self.data_transformation_config.preprocessed_file_path
            
            preprocessing_obj = self.get_data_transformer_object(datatset_schema)
            
            # logging.info(f"Saving preprocessing object.")
            save_object(file_path=preprocessing_obj_file_path, obj=preprocessing_obj)

            data_transformation_artifact = DataTransformationArtifact(is_transformed=True,
            message="Data transformation successfull.",
            transformed_train_file_path=transformed_train_file_path,
            transformed_test_file_path=transformed_test_file_path,
            preprocessed_object_file_path=preprocessing_obj_file_path

            )
            # logging.info(f"Data transformationa artifact: {data_transformation_artifact}")
            return data_transformation_artifact
        except Exception as e:
            return e        

In [23]:
schema = ConfigurationManager()
transform_config = schema.get_data_transform_config()
data_transform = DataTransform(data_transformation=transform_config,
                                    data_ingestion_artifact=DataIngestionArtifact,
                                    data_validation_artifact=DataValidationArtifact)
    


In [24]:
data_transform.run_data_transformation()

ensure.main.EnsureError("Argument file_path of type <class '_collections._tuplegetter'> to <function load_data at 0x7f65b90be950> does not match annotation type <class 'str'>")

In [18]:
data_transform

<__main__.DataTransform at 0x7f65b8efb280>

In [26]:
data_ingestion_artifact =  DataIngestionArtifact       
data_transformation_config = DataTransformConfig
train_file_path = data_ingestion_artifact.train_file_path
test_file_path = data_ingestion_artifact.test_file_path

In [29]:
train_file_path

_tuplegetter(0, 'Alias for field number 0')

In [9]:
from src.movie_predictor.config.configuration import Configuartion

In [20]:
config = Configuartion.training_pipeline_config

In [21]:
config

src.movie_predictor.config.configuration.Configuartion