# Data Loader

## Imports

In [6]:
import logging as log
import numpy as np
import pandas as pd

## Implementation

In [7]:
class DataLoader(object):
    original_data: pd.DataFrame = None
    labels_column_names: list = None
    date_column_name: str = None
    test_split: float = None
    custom_featurizers: list = None
    state_dict: dict = None

    def __init__(self, data: pd.DataFrame, labels_column_names: list, date_column_name: str = None, test_split: float = 0.2):
        self.original_data = data.copy()
        self.labels_column_names = labels_column_names
        self.date_column_name = date_column_name
        self.test_split = test_split
        self.custom_featurizers = list()
        self.state_dict = {}

        if len(data.columns) < 2:
            raise Exception(f'The provided dataset should have at least 2 columns.')

        if data.shape[1] <= 0:
            raise Exception(f'The provided dataset should have at least 1 row.')

        if len(labels_column_names) < 1:
            raise Exception(f'The provided labels collection should have at least 1 label (column name to predict).')

        for label_column_name in labels_column_names:
            if label_column_name not in data.columns:
                raise Exception(f'The provided label column "{label_column_name}" does not exist in the provided dataset.')

        if date_column_name is not None and date_column_name not in data.columns:
            raise Exception(f'The provided date column "{date_column_name}" does not exist in the provided dataset.')
        
        log.info('Validation of labels and date column (if provided) has been successful against the provided dataset.')

    def __featurize_date_column__(self, data: pd.DataFrame, date_column_name: str) -> pd.DataFrame:
        if self.date_column_name is None:
            return data

        __data__: pd.DataFrame = data.copy()
        parsed_date_temporary_column = pd.to_datetime(__data__[date_column_name])
        __data__ = __data__.drop(columns=[date_column_name], inplace=False)
        __data__[f'{date_column_name}_year'] = parsed_date_temporary_column.dt.year
        __data__[f'{date_column_name}_month'] = parsed_date_temporary_column.dt.month
        __data__[f'{date_column_name}_day'] = parsed_date_temporary_column.dt.day
        __data__[f'{date_column_name}_hour'] = parsed_date_temporary_column.dt.hour
        __data__[f'{date_column_name}_am_or_pm'] = np.where(parsed_date_temporary_column.dt.hour < 12, 'am', 'pm')
        __data__[f'{date_column_name}_minute'] = parsed_date_temporary_column.dt.minute
        __data__[f'{date_column_name}_day_of_year'] = parsed_date_temporary_column.dt.dayofyear
        __data__[f'{date_column_name}_day_of_week'] = parsed_date_temporary_column.dt.dayofweek
        __data__[f'{date_column_name}_quarter'] = parsed_date_temporary_column.dt.quarter

        log.debug(f'Extracted time series features from column "{date_column_name}" and dropped the original column.')

        return __data__

    def __replace_missing_values__(self, data: pd.DataFrame, value: int = 0) -> pd.DataFrame:
        __data__: pd.DataFrame = data.copy()
        __data__ = __data__.fillna(value, inplace=False)
        
        log.debug(f'Replaced all missing values with {value}.')
        
        return __data__

    def __ensure_columns_exist__(self, data: pd.DataFrame, column_names: list, value: object) -> pd.DataFrame:
        __data__: pd.DataFrame = data.copy()

        for column_name in column_names:
            if column_name in __data__.columns:
                continue

            log.debug(f'Label column "{column_name}" did not exist. Creating it with values {value}.')
            __data__[column_name] = value

        return __data__

    def __get_km_distance_between_coordinates__(self,
                                                lat1: float,
                                                lng1: float,
                                                lat2: float,
                                                lng2: float) -> float:  
        '''
        Reference:
            Haversine Formula (https://en.wikipedia.org/wiki/Haversine_formula) to calculate distance between two sets of lat and long coordinates.
        '''  
        r = 6371  # Average radius of Earth in kilometers.

        phi1 = np.radians(lat1)
        phi2 = np.radians(lat2)
        
        delta_phi = np.radians(lat2-lat1)
        delta_lambda = np.radians(lng2-lng1)

        a = np.sin(delta_phi/2)**2 + np.cos(phi1) * np.cos(phi2) * np.sin(delta_lambda/2)**2
        c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
        d = (r * c) # In kilometers

        return d

    def __onehot_encode_categorical_columns__(self, data: pd.DataFrame) -> pd.DataFrame:
        __data__: pd.DataFrame = data.copy()
        categorical_column_names: list = [c for c in data.columns if 'float' not in str(data[c].dtype).lower() and 'int' not in str(data[c].dtype).lower()]
        
        if len(categorical_column_names) > 0:
            __data__ = pd.get_dummies(__data__, columns=categorical_column_names)
            
            log.debug(f'One-Hot encoded {len(categorical_column_names)} columns. -> {categorical_column_names}')
        
        return __data__

    def __determine_column_scale__(self, column_data: pd.Series) -> int:
        first_column_value_str: str = str(column_data.values[0])
        should_scale: bool = column_data.values[0] < 1 and len(first_column_value_str.split('.')[1]) > 1
        scale: int = 1
        
        while should_scale:
            scale *= 10
            should_scale = len(str(column_data.values[0] * scale).split('.')[1]) > 1

        return scale

    def apply_sigmoid_scale(self, data: pd.DataFrame, reverse: bool = False, only_scale_less_one_values: bool = True) -> pd.DataFrame:
        scale = lambda x, min, max: (x - min) / (max - min)
        reverse_scale = lambda y, min, max: y * (max - min) + min
        
        __data__: pd.DataFrame = data.copy()
        float_column_names: list = [c for c in __data__.columns if 'float' in str(__data__[c].dtype)]

        for column_name in float_column_names:
            column_scale_min_config_key: str = f'scale.sigmoid.column.{column_name}.min'
            column_scale_max_config_key: str = f'scale.sigmoid.column.{column_name}.max'
            
            if reverse:
                if column_scale_min_config_key not in self.state_dict or column_scale_max_config_key not in self.state_dict:
                    continue
                
                min: float = self.state_dict[column_scale_min_config_key]
                max: float = self.state_dict[column_scale_max_config_key]
                __data__[column_name] = __data__[column_name].apply(lambda val: reverse_scale(val, min, max))
            else:
                if only_scale_less_one_values and __data__[column_name].max() > 1:
                    continue
                
                if len(__data__[column_name].unique()) <= 1:
                    continue
            
                min: float = __data__[column_name].min()
                max: float = __data__[column_name].max()
                self.state_dict[column_scale_min_config_key] = min
                self.state_dict[column_scale_max_config_key] = max
                __data__[column_name] = __data__[column_name].apply(lambda val: scale(val, min, max))
        
        return __data__

    def register_custom_featurizer(self, custom_featurizer):
        '''
        Register a custom featurizer that gets executed at the end of the pipeline.

        :param custom_featurizer: func, A featurizer function that accepts a Pandas dataframe (featurized data) and should return an augmented dataframe. Operations are not inplace.
        '''
        log.info(f'Registered custom featurizer.')
        self.custom_featurizers.append(custom_featurizer)

    def custom_featurize_distance_between_coordinates(self,
                                                      data: pd.DataFrame,
                                                      lat1_column_name: str,
                                                      lng1_column_name: str,
                                                      lat2_column_name: str,
                                                      lng2_column_name: str,
                                                      output_column_name: str) -> pd.DataFrame:
        '''
        Return a featurized dataset that created a new column for a distance calculated given coordinates.

        :param data: pd.DataFrame, A dataframe to featurize.
        :param lat1_column_name: float, Source latitude.
        :param lng1_column_name: float, Source longitude.
        :param lat2_column_name: float, Target latitude.
        :param lng2_column_name: float, Target longitude.
        :param output_column_name: str, The name of the column to create and persist the calculated distance(s) to.
        '''
        __data__: pd.DataFrame = data.copy()
        __data__[output_column_name] = __data__.apply(lambda row: self.__get_km_distance_between_coordinates__(lat1=row[lat1_column_name],
                                                                                                               lng1=row[lng1_column_name],
                                                                                                               lat2=row[lat2_column_name],
                                                                                                               lng2=row[lng2_column_name]), axis=1)
        __data__ = __data__.drop(columns=[lat1_column_name, lng1_column_name, lat2_column_name, lng2_column_name], inplace=False)
        
        log.debug(f'Creating feature "{output_column_name}" for the distance from {lat1_column_name}:{lng1_column_name} -> {lat2_column_name}:{lng2_column_name}.')
        
        return __data__

    def get_train_test_split(self, featurized_data: pd.DataFrame = None, shuffle_data: bool = False, test_split: float = 0.2) -> list:
        '''
        Return a list with the training X & Y dataframes and the testing X & Y dataframes who's sizes respects the test_split parameter.

        :param data: pd.DataFrame, A dataframe to split.
        :param shuffle_data: bool, If True, the data passed in would get shuffled prior to being split.
        :param test_split: float, The ratio of data to use for testing.
        '''
        log.debug(f'Splitting data with a test portion of {test_split} and shuffling of data set to {shuffle_data}.')
        
        __data__: pd.DataFrame = featurized_data.copy() if featurized_data is not None else self.get_featurized_data()
        __data__: pd.DataFrame = __data__ if not shuffle_data else __data__.sample(frac=1)
        test_row_count: int = int(__data__.shape[0] * test_split)
        
        train_split: pd.DataFrame = __data__[:-test_row_count]
        test_split: pd.DataFrame = __data__[-test_row_count:]
        
        train_x: pd.DataFrame = train_split[[c for c in train_split.columns if c not in self.labels_column_names]]
        train_y: pd.DataFrame = train_split[self.labels_column_names]
        
        test_x: pd.DataFrame = test_split[[c for c in test_split.columns if c not in self.labels_column_names]]
        test_y: pd.DataFrame = test_split[self.labels_column_names]
        
        return train_x, train_y, test_x, test_y

    def get_featurized_data(self, data: pd.DataFrame = None) -> pd.DataFrame:
        '''
        Return a featurized Pandas dataframe. This function does not perform inplace featurization.

        :param data: pd.DataFrame, A dataframe to featurize. self.original_data can be passed in as the data parameter here.
        '''
        __data__: pd.DataFrame = data.copy() if data is not None else self.original_data.copy()
        __data__ = self.__featurize_date_column__(data=__data__, date_column_name=self.date_column_name)
        __data__ = self.__replace_missing_values__(data=__data__)
        __data__ = self.__ensure_columns_exist__(data=__data__, column_names=self.labels_column_names, value=-np.inf)
        __data__ = self.__onehot_encode_categorical_columns__(data=__data__)

        for custom_featurizer in self.custom_featurizers:
            __data__ = custom_featurizer(__data__)

        __data__ = self.apply_sigmoid_scale(data=__data__)
            
        return __data__

    def get_featurized_data_row(self, **kwargs) -> pd.DataFrame:
        '''
        Return a featurized Pandas dataframe with a single row.

        :param kwargs: dict, Arguments to provide a row value for each of the columns in the dataframe used to initialize the loader.
        '''
        for column_name in self.original_data.columns:
            if column_name not in kwargs and column_name not in self.labels_column_names:
                raise Exception(f'No kwarg provided for key "{column_name}". This is required due to the column existing with this name in the dataset.')

        data = {}

        for arg_key in kwargs:
            data[arg_key] = [ kwargs[arg_key] ]

        for label in self.labels_column_names:
            data[label] = -np.inf

        return self.get_featurized_data(data=pd.DataFrame(data), drop_unique_only_columns=False)