#### Step 0: Defining Constants/ configurations [can be stored in seperate config.json file] 

In [37]:
# File paths
INPUT_FILE_PATH = "datasets/"
OUTPUT_FILE_PATH = "output/"

# Data Loading conditions
COLS_TO_DROP_DURING_LOAD = ['Current Price', 'Market Capitalization']
FILE_TO_EXCLUDE = 'datasets/other_metrics_final.csv'

# Preprocessing parameters
MISSING_THRESHOLD = 0.1
CORRELATION_THRESHOLD = 0.1

# Preprocessing strategies
MISSING_VALUE_STRATEGY = 'mean'  # Options: 'mean', 'median', 'most_frequent'

# Target
TARGET = 'Current Price'

# Non Essential Features to drop
COLS_TO_DROP = ['BSE Code', 'NSE Code', 'Name', 'join_key']

# Test to train data fraction
TEST_SIZE_FRACTION = 0.15

#### Step 1: Importing the libraries and logger set up

In [30]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
import logging
import os
from scipy.stats import zscore
import config
import glob
from typing import List, Tuple
from abc import ABC, abstractmethod

# Set up logging configuration
logging.basicConfig(
    filename='log_file.log',
    level=logging.INFO,    # Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format='%(asctime)s - %(levelname)s - %(message)s',  # Log message format
)

logger = logging.getLogger()

#### Step 2: Defining the Preprossors (Classes)

In [31]:
class Preprocessor(ABC):
    def __init__(self):
        pass

    @abstractmethod
    def process(self, data: pd.DataFrame, *args) -> pd.DataFrame:
        raise NotImplementedError("Subclasses must implement the 'process' method")

class DataLoader(Preprocessor):
    def __init__(self, file_path=INPUT_FILE_PATH, cols_to_drop_during_load=None, file_to_exclude=None):
        # Step 1: Initialize file pattern, columns to drop, and file exclusion
        self.file_pattern = file_path + "*"
        self.cols_to_drop_during_load = cols_to_drop_during_load
        self.file_to_exclude = file_to_exclude
        logger.info(f"DataLoader initialized with file pattern: {self.file_pattern}")

    def drop_cols(self, file_path: str, df: pd.DataFrame) -> pd.DataFrame:
        try:
            # Step 1: Drop specified columns unless the file is excluded
            if file_path != self.file_to_exclude:
                logger.info(f"Dropping columns: {self.cols_to_drop_during_load} from file: {file_path}")
                return df.drop(self.cols_to_drop_during_load, axis=1)
            else:
                logger.info(f"Skipping column drop for file: {file_path}")
                return df
        except Exception as e:
            logger.error(f"Error in drop_cols for file {file_path}: {str(e)}")

    def process(self, *args) -> List[pd.DataFrame]:
        try:
            # Step 1: Load data files matching the given pattern
            logger.info(f"Loading data from files matching pattern: {self.file_pattern}")
            file_paths = glob.glob(self.file_pattern)
            if not file_paths:
                logger.warning(f"No files found matching pattern: {self.file_pattern}")
            logger.info(f"Found files: {file_paths}")
            
            data_frames = []
            for file_path in file_paths:
                try:
                    # Process each file based on its extension
                    file_extension = file_path[file_path.find('.') + 1 :]
                    logger.info(f"Processing file: {file_path} with extension: {file_extension}")
                    
                    if file_extension == 'csv':
                        # Read CSV and apply column exclusions
                        data_frames.append(self.drop_cols(file_path=file_path, df=pd.read_csv(file_path)))
                    elif file_extension == 'xlsx':
                        # Read Excel and apply column exclusions
                        data_frames.append(self.drop_cols(file_path=file_path, df=pd.read_excel(file_path)))
                    else:
                        # Log unsupported file types
                        logger.error(f"Unsupported file type: {file_extension} for file: {file_path}")
                        raise ValueError(f"Unsupported file type: {file_extension}")
                        
                except Exception as e:
                    # Log errors for file processing failures
                    logger.error(f"Error processing file {file_path}: {str(e)}")
                    continue

            logger.info(f"Successfully loaded {len(data_frames)} data frames.")
            return data_frames

        except Exception as e:
            logger.error(f"Error in DataLoader process: {str(e)}")  
            
class DataJoiner(Preprocessor):
    def __init__(self, join_type = 'inner', on_column = 'join_key'):
        # Step 1: Initialize join type and on_column
        self.join_type = join_type
        self.on_column = on_column
        logger.info(f"DataJoiner initialized with join_type: {self.join_type}, on_column: {self.on_column}")
        
    def process(self, tables: List[pd.DataFrame]) -> pd.DataFrame:
        # Step 1: Start the join operation with the first table and additional tables
        data = tables[0]
        additional_tables = tables[1:]
        logger.info(f"Starting join operation with {len(additional_tables)} other tables.")
        for idx, table in enumerate(additional_tables):
            try:
                # Keep only the columns not in the main table, except for the on_column
                table = table[[col for col in table.columns if col not in data.columns or col == self.on_column]]
                
                # Perform the join operation on the specified column
                data = data.merge(table, how = self.join_type, on = self.on_column)
                logger.info(f"Table {idx+1} successfully joined.")
            except:
                # Log error if joining fails for any table
                logger.error(f"Error joining table {idx+1}: Error - {str(e)}")
                continue
        return data

class MissingValuePreprocessor(Preprocessor):
    def __init__(self, strategy=MISSING_VALUE_STRATEGY, missing_threshold = MISSING_THRESHOLD):
        
        # Step 1: Initialize imputer strategy, missing threshold and imputer instance
        self.imputer = SimpleImputer(strategy=strategy, keep_empty_features=True)
        self.missing_threshold = missing_threshold
        self.strategy = strategy
        logger.info(f"MissingValuePreprocessor initialized with strategy: {self.strategy}, missing_threshold: {self.missing_threshold}")

    def drop_columns_by_missing_threshold(self, df: pd.DataFrame, missing_threshold: float) -> pd.DataFrame:
        try:
            logger.info(f"Starting column drop based on missing value threshold: {missing_threshold}")
            
            # Calculate missing percentage for each column
            missing_percentages = dict(df.isna().sum() / df.shape[0])
            
            # Select columns where missing percentage is within the threshold
            cols_to_keep = [k for k, v in missing_percentages.items() if v <= missing_threshold]
            
            result_df = df[cols_to_keep]
            logger.info(f"Successfully dropped columns exceeding missing threshold")
            
            return result_df
        
        except Exception as e:
            logger.error(f"Error in drop_columns_by_missing_threshold: {str(e)}")
        
    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            logger.info("Starting missing value handling process.")
            
            # Step 1: Drop columns based on missing value threshold
            data = self.drop_columns_by_missing_threshold(df=data, missing_threshold=self.missing_threshold)
            logger.info(f"Remaining data after dropping columns: {data.shape[1]} columns.")
            
            # Step 2: Separate numerical and categorical columns
            numerical_cols = data.select_dtypes(include=['number']).columns
            categorical_cols = data.select_dtypes(exclude=['number']).columns
            
            # Step 3: Impute missing values for numerical columns
            num_df = pd.DataFrame(data=self.imputer.fit_transform(data[numerical_cols]), columns=numerical_cols)
            
            # Step 4: Handle categorical columns (no transformation shown in this snippet)
            cat_df = pd.DataFrame(data=data, columns=categorical_cols)
            
            # Step 5: Combine numerical and categorical data frames
            final_data = pd.concat([cat_df, num_df], axis=1)
            logger.info("Concatenated numerical and categorical data frames.")
            logger.info("Missing value handling process completed successfully.")
            
            return final_data
        
        except Exception as e:
            logger.error(f"Error in MissingValuePreprocessor: {str(e)}")

class ColumnRelevancePreprocessor(Preprocessor):
    def __init__(self, 
                 correlation_threshold = CORRELATION_THRESHOLD, 
                 target = TARGET
                ):
        # Step 1: Initialize correlation threshold and target variable for Column Relevance checking
        self.correlation_threshold = correlation_threshold
        self.target = target
        logger.info(f"ColumnRelevancePreprocessor initialized with Correlation Threshold: {self.correlation_threshold}, Target Column: {self.target}")

    def extract_relevant_cols_by_target_correlation(self, df: pd.DataFrame) -> List[str]:
        try:
            # Step 1: Calculate correlation with target for numerical columns
            logger.info("Calculating correlation of numerical columns with target.")
            corr_df = df.select_dtypes(include=['number']).corr()[self.target].apply(abs).reset_index()
            corr_df.rename(columns={"index": "column"}, inplace=True)
            
            # Step 2: Select columns with correlation above threshold
            logger.info(f"Filtering columns with correlation greater than threshold: {self.correlation_threshold}")
            relevant_num_cols = corr_df[corr_df[self.target].apply(lambda x: x > self.correlation_threshold)]['column'].tolist()
            logger.info(f"Relevant numerical columns based on correlation: {relevant_num_cols}")
            
            # Step 3: Get categorical columns
            cat_cols = df.select_dtypes(exclude=['number']).columns.tolist()
            
            # Step 4: Combine numerical and categorical columns
            relevant_cols = relevant_num_cols + cat_cols
            logger.info(f"Total relevant columns: {len(relevant_cols)}")
            
            return relevant_cols

        except Exception as e:
            logger.error(f"Error in extract_relevant_cols_by_target_correlation: {str(e)}")

    def process(self, data : pd.DataFrame) -> pd.DataFrame:
        try:
            logging.info('Starting process for relevant column extraction')
            # Step 1: Calling the function for selecting relevant features 
            data = data[self.extract_relevant_cols_by_target_correlation(data)]
            logging.info(f"Relevant Column extracted successfully | Total relevant col : {len(data.columns)}")
            
            return data
        
        except Exception as e:
            logger.error(f"Error in Relevant Feature extraction : {str(e)}")

class ColumnDropper(Preprocessor):
    def __init__(self, cols_to_drop : List[str]= None):
        
        self.cols_to_drop = cols_to_drop
        logger.info(f"ColumnDropper initialized with Columns to drop : {self.cols_to_drop}")

    def process(self, data : pd.DataFrame) -> pd.DataFrame:
        try:
            logging.info('Column Dropping Process Started')
            # Step 1: Dropping Columns
            data = data.drop(columns = self.cols_to_drop)
            logging.info(f"Columns Dropped successfully")
            
            return data

        except Exception as e:
            logger.error(f"Error in dropping columns : {str(e)}")

class OutlierRemover(Preprocessor):
    def __init__(self):
        logger.info(f"OutlierRemover initialized")
        pass

    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            logging.info('Outlier Removal Process Started')
            
            # Step 1: Getting the Qualtiles for all columns
            Q1 = data.select_dtypes(include=['number']).quantile(0.25)
            Q3 = data.select_dtypes(include=['number']).quantile(0.75)
            IQR = Q3 - Q1

            # Step 2: Calculating the lower_bound and upper_bound
            lower_bound = Q1 - IQR * 1.5
            upper_bound = Q3 + IQR * 1.5
    
            # Step 3: Getting outlier filter condition to the numeric columns
            numeric_cols = data.select_dtypes(include=['number']).columns
            condition = ((data[numeric_cols] >= lower_bound) & (data[numeric_cols] <= upper_bound)).all(axis=1)
            
            # Step 4: Apply the same filter to both numeric and categorical columns to keep rows aligned
            df_filtered = data[condition]
            
            logging.info('Outlier Removal Process completed successfully')
            
            return df_filtered

        except Exception as e:
            logger.error(f"Error in Outlier Removal Process : {str(e)}")
            
class OneHotEncoderPreprocessor(Preprocessor):
    def __init__(self):
        logger.info(f"OneHotEncoderPreprocessor initialized")
        self.encoder = OneHotEncoder(sparse_output=False)
        logger.info(f"OneHotEncoder initialized")

    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            logging.info('One Hot Encoding Process started')
            # Step 1: Separate numerical and categorical columns
            logger.info("Separating numerical and categorical columns.")
            num_df = data.select_dtypes(include=['number']).reset_index(drop=True)
            cat_df = data.select_dtypes(exclude=['number']).reset_index(drop=True)
            logger.info(f"Identified {len(num_df.columns)} numerical and {len(cat_df.columns)} categorical columns.")
            
            # Step 2: Encode categorical columns
            logger.info("Encoding categorical columns using the encoder.")
            encoded_data = self.encoder.fit_transform(cat_df)
            encoded_columns = self.encoder.get_feature_names_out(cat_df.columns)
            
            # Step 3: Create DataFrame from encoded data
            encoded_df = pd.DataFrame(encoded_data, columns=encoded_columns)
            
            # Step 4: Concatenate numerical and encoded categorical data
            logger.info("Concatenating numerical and encoded categorical data frames.")
            final_data = pd.concat([num_df, encoded_df], axis=1)
            logger.info(f"Final data frame created with {final_data.shape[1]} columns.")
            logging.info('One Hot Encoding Process completed successfully')
            
            return final_data
        
        except Exception as e:
            logger.error(f"Error in one hot encoding process: {str(e)}")

class StandardizationPreprocessor(Preprocessor):
    def __init__(self):
        logger.info(f"StandardizationPreprocessor initialized")
        self.scaler = StandardScaler()
        logger.info(f"Scaler initialized")

    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            logging.info('Standardization Process Started')
            # Step 1: Separate numerical and categorical columns
            logger.info("Identifying numerical and categorical columns.")
            numeric_cols = data.select_dtypes(include=['number']).columns
            num_df = data.select_dtypes(include=['number'])
            cat_df = data.select_dtypes(exclude=['number']).reset_index(drop=True)
            logger.info(f"Found {len(numeric_cols)} numerical columns and {cat_df.shape[1]} categorical columns.")
            
            # Step 2: Scale numerical columns
            logger.info("Scaling numerical columns.")
            scaled_data = self.scaler.fit_transform(num_df)
            scaled_df = pd.DataFrame(scaled_data, columns=numeric_cols)
            logger.info("Scaling completed for numerical columns.")
            
            # Step 3: Concatenate scaled numerical and categorical data
            logger.info("Concatenating scaled numerical and categorical columns.")
            final_data = pd.concat([scaled_df, cat_df], axis=1)
            logger.info(f"Final data frame created with {final_data.shape[1]} columns.")
            logging.info('Standardization Process completed successfully')
            
            return final_data
            
        except Exception as e:
            logger.error(f"Error in data scaling process: {str(e)}")

class TrainTestSplitter(Preprocessor):
    def __init__(self, test_size = 0.2, random_state = 42, target = TARGET):
        self.test_size = test_size
        self.random_state = random_state
        self.target = target
        logger.info(f"TrainTestSplitter initialized with Test Size : {self.test_size} | Target Variable : {self.target} | Random State : {self.random_state}")

    def process(self, data: pd.DataFrame) -> Tuple[pd.DataFrame]:
        try:
            logging.info('Train Test split process started successfully')
            # Step 1: Separate independent and dependent variables
            logger.info("Separating independent and dependent variables.")
            independent_var_df = data[[x for x in data.columns if x != self.target]]
            dependent_var_df = data[[self.target]]
            logger.info(f"Independent variables: {independent_var_df.shape[1]} columns, Dependent variable: {dependent_var_df.shape[1]} columns.")
            
            # Step 2: Split the data into training and testing sets
            logger.info(f"Splitting data into training and testing sets with test size = {self.test_size}.")
            X_train, X_test, y_train, y_test = train_test_split(
                independent_var_df,
                dependent_var_df,
                test_size=self.test_size,
                random_state=self.random_state)
            logger.info(f"Data split completed. Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}.")
            
            logging.info('Train Test split process started completed successfully')
            
            # Step 3: Return the split datasets
            return X_train, X_test, y_train, y_test
        
        except Exception as e:
            logger.error(f"Error during the data splitting process: {str(e)}")

#### Step 3: Defining Preprocessor Factory

In [32]:
class PreprocessorFactory:
    @staticmethod
    def get_preprocessor(preprocessor_type: str, **kwargs) -> Preprocessor:
        try:
            # Checking for preprocessor type and return the required Preprocessor Class
            if preprocessor_type == 'data_loader':
                return DataLoader(**kwargs)
            elif preprocessor_type == 'data_joiner':
                return DataJoiner(**kwargs)
            elif preprocessor_type == 'missing_values':
                return MissingValuePreprocessor(**kwargs)
            elif preprocessor_type == 'feature_selection':
                return ColumnRelevancePreprocessor(**kwargs)
            elif preprocessor_type == 'drop_columns':
                return ColumnDropper(**kwargs)
            elif preprocessor_type == 'outlier_removal':
                return OutlierRemover(**kwargs)
            elif preprocessor_type == 'one_hot_encoder':
                return OneHotEncoderPreprocessor(**kwargs)
            elif preprocessor_type == 'feature_scaler':
                return StandardizationPreprocessor(**kwargs)
            elif preprocessor_type == 'train_test_split':
                return TrainTestSplitter(**kwargs)
                
        except Exception as e:
            raise Exception(f"Preprocessor Not Defined | Error : {str(e)}")

#### Step 4: Defining DataPreprocessingPipeline Class to execute the steps sequentially

In [33]:
class DataPreprocessingPipeline:
    def __init__(self):
        logging.info('Data Processing Pipeline Initialized')
        self.steps = []

    def add_step(self, step: Preprocessor):
        try:
            # Step 1: Adding Step to Pipeline
            self.steps.append(step)
        except Exception as e:
            logging.error(f'Error in adding step to pipeline: {str(e)}')

    def execute(self, initial_data: pd.DataFrame = None) -> Tuple[pd.DataFrame]:
        try:
            logging.info('Pipeline Execution Started')
            # Step 1: Giving the initial data as None to get the process started
            data = initial_data
            for step in self.steps:
                data = step.process(data)

            logging.info('Pipeline Execution Completed Successfully.')
            
            return data
            
        except Exception as e:
            logging.error(f'Error occured in Pipeline Execution : {str(e)}')
            print(e)

#### Step 5: Execution of Data Preprocessing Pipeline

In [34]:
# Step 1: Instantiating the DataPreprocessingPipeline Class
pipeline = DataPreprocessingPipeline()

# Step 2: Adding the steps to Pipeline with required parameters
pipeline.add_step(PreprocessorFactory.get_preprocessor(
    'data_loader', 
    file_path = INPUT_FILE_PATH,
    cols_to_drop_during_load = COLS_TO_DROP_DURING_LOAD,
    file_to_exclude = FILE_TO_EXCLUDE
))
pipeline.add_step(PreprocessorFactory.get_preprocessor('data_joiner', join_type = 'inner', on_column = 'join_key'))
pipeline.add_step(PreprocessorFactory.get_preprocessor('drop_columns', cols_to_drop = COLS_TO_DROP))
pipeline.add_step(PreprocessorFactory.get_preprocessor(
    'missing_values', 
    strategy = MISSING_VALUE_STRATEGY,
    missing_threshold = MISSING_THRESHOLD
))
pipeline.add_step(PreprocessorFactory.get_preprocessor(
    'feature_selection', 
    correlation_threshold = CORRELATION_THRESHOLD, 
    target = TARGET
))
pipeline.add_step(PreprocessorFactory.get_preprocessor('outlier_removal'))
pipeline.add_step(PreprocessorFactory.get_preprocessor('feature_scaler'))
pipeline.add_step(PreprocessorFactory.get_preprocessor('one_hot_encoder'))
pipeline.add_step(PreprocessorFactory.get_preprocessor(
    'train_test_split', 
    test_size = TEST_SIZE_FRACTION, 
    random_state = 234,
    target = TARGET
))

# Step 3: Pipeline Execution
processed_data = pipeline.execute()

# Step 4: Extrating the Training and Test data for ML algorithm
X_train, X_test, y_train, y_test = processed_data

#### Step 6: Checking the shape of dfs for train and test

In [35]:
X_train.shape,X_test.shape,y_train.shape,y_test.shape

((2230, 103), (394, 103), (2230, 1), (394, 1))

In [36]:
X_train

Unnamed: 0,52w Index,Down from 52w high,From 52w high,NCAVPS,Market Capitalization,Piotroski score,Public holding,FII holding,DII holding,EPS,...,Industry_Textiles - Jute - Yarn / Products,Industry_Textiles - Manmade,Industry_Textiles - Processing,Industry_Textiles - Products,Industry_Textiles - Spinning - Synthetic / Blended,Industry_Trading,Industry_Transmisson Line Towers / Equipment,Industry_Transport - Airlines,Industry_Travel Agencies,Industry_Tyres
619,-0.085094,-0.484143,0.475923,-0.218499,-0.498459,0.489071,-0.537281,-0.348204,-0.356160,-1.384755,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
297,-0.310741,-0.244670,0.274773,-0.087416,-0.182986,-1.070882,0.252610,-0.348204,-0.332548,-0.409315,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
930,-1.061394,1.367919,-1.334428,-0.234026,-0.498677,-0.030914,1.122269,-0.348204,-0.356160,-0.473489,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
899,-0.504421,1.149911,-1.133278,-0.757461,2.204389,2.049024,0.509909,-0.095686,1.473717,-0.422150,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1565,0.827273,-0.508963,0.475923,-0.568451,-0.420512,1.009055,-0.488100,-0.348204,-0.356160,0.862927,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1185,0.924302,-1.052306,1.079373,-0.799861,-0.519241,-0.550898,2.522719,-0.348204,-0.356160,-1.084743,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
863,-0.047110,-0.147405,0.140673,-0.536800,5.968943,0.489071,-1.342765,4.757057,-0.297131,-0.390063,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
324,-0.034323,0.397280,-0.395727,-0.621601,-0.212452,0.489071,1.779611,-0.337225,-0.356160,-0.436589,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
1951,-0.501413,-0.066909,0.073623,-0.712075,-0.492030,-1.070882,0.266404,-0.348204,-0.356160,-0.507180,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
