In [688]:
import pandas as pd
from pandas.core.groupby.generic import DataFrameGroupBy
import yaml
# import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt

from dython import nominal
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
# from sklearn.model_selection import train_test_split

from typing import List, Dict, Union, Any
sb.set()

# Exploratory Data Analysis Functions

## load_parameters()
- input: path of yaml file
- returns: parameters for the 
- description: will load the following parameters (in dictionary format):
    - include_features
    - Clean up
    - Types of EDA
    - 
- remarks: parameter file should contain the following details

In [None]:
def load_parameters(folder_path: str) -> Dict[str, Union[List, Dict[str, str]]]:

    parameters : Dict[str, Union[List, Dict[str, str]]] = yaml.safe_load(open(f"{folder_path}/parameters.yaml"))

    return parameters

## load_dataset()
- input: parameters
- returns: dataframe with datatypes formatted 
- description: Function will do the following tasks:
    - load the dataset (in this case xxxxxxx.csv)
    - load the yaml to format the features to the appropiate dataypes (in this case data_types.yaml)
- remarks: 
    - Features are not mentioned in the data_types.yaml will be remained as default datatypes
    - **Have not tested datasets of different features**

In [None]:
def load_dataset(parameters: Dict[str, Any]) -> pd.DataFrame:

    folder_path: str = parameters['dataset_location']

    # Support multi dataset and load into one dataframe
    def combined_datasets(datasets: List[str]) -> pd.DataFrame:
        load_datasets: List[pd.DataFrame] = list()

        for dataset in datasets:
            # print(f"dataset to load: {dataset}")
            temp_dataset: pd.DataFrame = pd.read_csv(f"{folder_path}/{dataset}.csv")
            load_datasets.append(temp_dataset)

        # print(f"load_dataset size: {len(load_datasets)}")

        return pd.concat(load_datasets, ignore_index=True)
    
    # assign the data types preset in the data_types.yaml
    def reassign_features_dataypes(dataset: pd.DataFrame) -> None:
        config: Dict[str, str] = yaml.safe_load(open(f"{folder_path}/data_types.yaml"))
        config_datatypes: Dict[str, str] = config['data_types']
        dataset_features: List[str] = dataset.columns.to_list()
        
        for dataset_feature in dataset_features:
            if dataset_feature not in config_datatypes.keys():
                # print(f"'{dataset_feature}' not in {config_datatypes.keys()}")
                continue            
            dataset[dataset_feature] = dataset[dataset_feature].astype(config_datatypes[dataset_feature])

    dataset = combined_datasets(parameters['datasets_to_load'])
    # dataset.info()
    # print()
    reassign_features_dataypes(dataset)

    return dataset

## clean_up_dataset()
- input: 
    - Dataframe
    - parameters 
- returns: Dataframe
- description: Function does the following to the dataset
    - clean up 
    - drop row that contains NULL/NAN values
    - extract the interested features
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required for the clean up

In [689]:
def clean_up_dataset(
        dataframe: pd.DataFrame, 
        parameters: Dict[str, Any]
    ) -> pd.DataFrame:

    def morph_feature_type(dataset: pd.DataFrame) -> None:
        morph_feature_configs: Dict[str, str] = parameters['morph_feature_type']

        for morph_feature, morph_feature_type in morph_feature_configs.items():
            # print(f"{morph_feature} with type {morph_feature_type}")
            # dataset[morph_feature].str.extract('(\d+)').astype(int)
            dataset[morph_feature]=dataset[morph_feature].str.extract('(\d+)').astype(morph_feature_type)


    ## EXTRACT FEATURE RELATED
    # create a new dataframe to extract the interested feature (set in the parameters)
    include_features_config: List[str] = parameters['include_features']
    extracted_dataset = pd.DataFrame( dataframe[include_features_config] )
    # extracted_dataset.info()

    morph_feature_type(extracted_dataset)

    ## DUPLICATE RELATED
    # method to print duplicates on specific column
    # print(f"Duplicated instructors: \n{dataframe['instructor_name'].value_counts(ascending=False)}")
    # instructor_names = pd.DataFrame(dataframe['instructor_name'].value_counts(ascending=False))
    # instructor_names.to_csv('instructor_names.csv')

    # method to print duplicates exists on specific column
    # for columns in dataframe.columns:
    #     print(f"Duplicated {columns}: {dataframe[columns].duplicated().any()}")

    # total_duplicated_ids = dataframe[dataframe.duplicated('id', keep=False)]
    # print(f"Course with duplicated ids: {len(total_duplicated_ids)}")
    # add condition if there is duplicates
    

    return extracted_dataset.reset_index(drop=True)

## get_outlier_samples()
- inputs: Dataframe
- returns: Series of outliers based on supplied dataframe
- description: identify the outliers based on the supplied dataframe
- remarks: None

In [691]:
def get_outlier_samples(dataframe: pd.DataFrame) -> pd.core.series.Series:

    q1 = dataframe.quantile(0.25)
    q3 = dataframe.quantile(0.75)
    interquartile_range = q3-q1

    lower_whisker = q1-1.5*interquartile_range
    upper_whisker = q3+1.5*interquartile_range
    outliers: pd.core.series.Series = ((dataframe < lower_whisker) | (dataframe > upper_whisker))

    return outliers

## print_feature_outliers()
- inputs: Dataframe
- returns: None
- description: prints number of outliers for every numerical features/column
- remarks: None

In [692]:
def print_feature_outliers(dataframe: pd.DataFrame) -> None:

    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    for column in numerical_dataframe.columns:

        outliers = sum(get_outlier_samples(numerical_dataframe[column]))
        print(f"[{column}] total outliers: {outliers}")

## remove_outliers()
- inputs: Dataframe
- returns: Dataframe with outliers removed for every numerical feature/column 
- description: Function remove **UNION** outlier of the dataset. In other words remove the entire row containing outliers
- remarks: None

In [693]:
def remove_outliers(dataframe: pd.DataFrame) -> pd.DataFrame:

    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    union_outliers = (get_outlier_samples(numerical_dataframe)).any(axis=1)
    dataframe_with_outliers_removed: pd.DataFrame = dataframe[~union_outliers].reset_index(drop=True)

    # print(f"Total 'UNION' outliers: {sum(union_outliers)}")
    # instructor_names = pd.DataFrame(dataframe_with_outliers_removed['instructor_name'].value_counts(ascending=False))
    # instructor_names.to_csv('instructor_names.csv')
    return dataframe_with_outliers_removed

## generate_numerical_eda_visualization()
- inputs: 
    - dataframe
    - List of catergorical features/columns which are numerical types
    - plot title
- returns: None
- description: generates box, histo and violin plot for every numerical features (column) of the dataset
- remakrs: None

In [None]:
def generate_numerical_eda_visualization(
        dataframe: pd.DataFrame,
        plot_title: str
    ) -> None:
    
    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])
    total_features = len(numerical_dataframe.columns)
    figure, axes = plt.subplots(
        total_features, 
        3, 
        figsize=(24,4.8*total_features)
    )
    
    # figure.suptitle(plot_title, fontsize=20)
    axes[0, 1].set_title(plot_title, fontsize=25)
    row = 0
    for column in numerical_dataframe.columns:
        sb.boxplot(data=numerical_dataframe[column], orient='h', ax=axes[row,0])
        sb.histplot(data=numerical_dataframe[column], ax=axes[row,1])
        sb.violinplot(data=numerical_dataframe[column], orient='h', ax=axes[row,2])
        row = row + 1

## generate_numerical_heatmap() - MAY NEED TO REMOVE
- inputs: 
    - dataframe
    - plot title
- returns: None
- description: generates heatmap for every numerical features (column) of the dataset
- remarks: None

In [696]:
def generate_numerical_heatmap(
        dataframe: pd.DataFrame,
        plot_title: str
    ) -> None:
    
    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    plt.figure(figsize=(13, 13))
    plt.title(plot_title, fontsize=20)
    sb.heatmap(numerical_dataframe.corr(), vmin = -1, vmax = 1, linewidths = 1,
        annot = True, fmt = ".2f", annot_kws = {"size": 18}, cmap = "RdBu"
    )
    plt.show()

## generate_numerical_categorical_heatmap()
- inputs: dataframe
- returns: None
- description: generates heatmap for numerical and catergorical features (column) of the dataset
 heatmap produces by calculate the correlation/strength-of-association of features in data-set with both categorical and continuous features using: 
    - Pearson's R for continuous-continuous cases 
    - Correlation Ratio for categorical-continuous cases 
    - Cramer's V or Theil's U for categorical-categorical cases
- remarks: More info on the library checkout [dython](http://shakedzy.xyz/dython/modules/nominal/)

In [None]:
def generate_numerical_categorical_heatmap(
    dataframe: pd.DataFrame
) -> None:
    numerical_categorical_dataframe = dataframe.select_dtypes(exclude=['datetime64[ns]'])
    nominal.associations(dataset=numerical_categorical_dataframe, figsize=(15, 15), title="Correlation/Strength-of-association of features")

## generate_categorical_count_visualization()
- input: 
    - Dataframe
    - parameters 
- returns: None
- description: generates categorical plot for selected catergorical features (column) of the dataset
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required for the catergorical count visualization

In [None]:
def generate_categorical_count_visualization(
        dataframe: pd.DataFrame,
        visualization_parameters: List[str]
    ) -> None:

    visualization_dataframe = pd.DataFrame( dataframe[visualization_parameters] )

    for column in visualization_dataframe.columns:
        category_total_types = len(dataframe[column].value_counts())
        g = sb.catplot(y=column, data=visualization_dataframe, kind="count", height=category_total_types)

        for ax in g.axes.ravel():

            for c in ax.containers:
                ax.bar_label(c, label_type='edge')

        plt.title(f"Total count for all types available in '{column}' feature", fontsize=20)

## generate_numerical_vs_categorical_eda_visualization()
- inputs: 
    - dataframe
    - parameters
    - plot title
- returns: None
- description: generates categorical plot for every catergorical features (column) of the dataset
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required to generate eda visualization

In [None]:
def generate_numerical_vs_categorical_eda_visualization(
        dataframe: pd.DataFrame,
        visualization_parameters: Dict[str, str],
        plot_title: str
    ) -> None:

    total_features = len(visualization_parameters)

    figure, axes = plt.subplots(
        total_features, 
        1, 
        figsize=(20,10*total_features),
        constrained_layout=True
    )
    figure.tight_layout(pad=10.0)

    row = 0
    for numerical_feature, categorical_feature in visualization_parameters.items():
        axes[row].set_title(
            f"{categorical_feature} boxplot based on {numerical_feature}", 
            fontdict={'fontsize': 25, 'fontweight': 'medium'}
        )

        sb_plot = sb.boxplot(
            y=numerical_feature, 
            x=categorical_feature, 
            data=dataframe, 
            order=dataframe.groupby(categorical_feature)[numerical_feature].median().sort_values().index,
            ax=axes[row]
        )
        sb_plot.set_xticklabels(sb_plot.get_xticklabels(), rotation=40, ha='right')

        row = row + 1

## generate_time_series_visualization()
- this should give an oveview of the resale_price over a priod of time
- should support for catergorical variables based on [max, mean, lowest] sale_price by town? flat type? flat_model? remaining lease?

links:
    - https://www.geeksforgeeks.org/creating-a-time-series-plot-with-seaborn-and-pandas/

time series based by location
minimize lose
linear regression on time series
capital gain

- inputs: 
    - dataframe
    - parameters
- returns: None
- description: 
- remarks: 

In [None]:
# def generate_time_series_visualization(dataframe: pd.DataFrame, parameters: List[Dict[str, Any]]) -> None:
def generate_time_series_visualization(dataframe: pd.DataFrame, parameters: List[Dict[str, Any]]) -> Union[None, pd.DataFrame]:
    
    # need to specify which numerical feature to use for max, mean and min
    # print(f"parameters: {parameters}")

    # TODO create a function for splitting dataframe based on the parameters 
    print('Dataframe by split')
    dataframe_split: pd.DataFrame = dataframe[['month', 'town', 'resale_price']].copy() #data is sort based on month format mm/dd/yyyy followed by town (alphabetical order), resale_price in ascending order
    # return dataframe_split

    print('Dataframe group date by max')
    ################################## OLD_VERSION ######################################################################################
    # reference: https://www.geeksforgeeks.org/python-pandas-dataframe-groupby/

    # # using groupby to generate max()
    # print('Dataframe by group')
    # dataframe_split_group: DataFrameGroupBy = dataframe_split.groupby('town') # <class 'pandas.core.groupby.generic.DataFrameGroupBy'>
    
    # # NOTE can iterate using the get_group since it returns pd.DataFrame type
    # dataframe_split_group: pd.DataFrame = dataframe_split_group.get_group('ANG MO KIO') # <class 'pandas.core.frame.DataFrame'>
    # # return dataframe_split_group
    
    # # NOTE max() cannot be done with categorical exist in the dataframe
    # # referrence: https://datascienceparichay.com/article/pandas-groupby-maximum/
    # print('Dataframe group date by max')
    # dataframe_split_group_date_max: pd.Series = dataframe_split_group.groupby('month')['resale_price'].max() # <class 'pandas.core.series.Series'>
    # # return dataframe_split_group_date_max


    # # print(dataframe_test.mean())

    # print('Series to Dataframe')
    # dataframe_split_group_date_max_dataframe = dataframe_split_group_date_max.to_frame()
    # print(type(dataframe_split_group_date_max_dataframe)) 
    # print()
    ################################## OLD_VERSION ######################################################################################

    ################################## IMPROVED_VERSION0 ######################################################################################
    # This version does retain the columns thus unique() method does not work
    # refeerence: https://sparkbyexamples.com/pandas/pandas-convert-groupby-output-from-series-to-dataframe/
    # NOTE improved version of group by 'month', 'town' and picking max value
    
    # dataframe_split_group_date_max: pd.Series = dataframe_split.groupby(['month', 'town'])['resale_price'].max() # <class 'pandas.core.series.Series'>
    # print(type(dataframe_split_group_date_max)) 
    # print()
    # return dataframe_split_group_date_max    

    # print(dataframe_split_group_date_max['town'].unique())
    # print(dataframe_split_group_date_max.to_frame().columns)
    ################################## IMPROVED_VERSION0 ######################################################################################

    ################################## IMPROVED_VERSION1 ######################################################################################
    # this version need to specify the min, mean, max
    # dataframe_split_group_date_max: pd.DataFrame = dataframe_split.groupby(['month', 'town'], as_index=False)['resale_price'].max() # <class 'pandas.core.frame.DataFrame'>
    # print(dataframe_split_group_date_max.columns)
    # print(dataframe_split_group_date_max['town'].unique())
    # return dataframe_split_group_date_max
    ################################## IMPROVED_VERSION1 ######################################################################################

    ################################## IMPROVED_VERSION2 ######################################################################################
    # this version allow choice between min,mean and max
    # creation of graph will still require to be done using iteration of the groups
    # referrence: https://datagy.io/pandas-groupby-multiple-columns/
    dataframe_split_group_date_max: pd.DataFrame = dataframe_split.groupby(['month', 'town'], as_index=False)['resale_price'].aggregate(['min', 'mean','max']) # <class 'pandas.core.frame.DataFrame'>
    # print(type(dataframe_split_group_date_max)) 
    # print(dataframe_split_group_date_max.columns)
    # print(dataframe_split_group_date_max['town'].unique())
    # return dataframe_split_group_date_max

    town_list: List[str] = dataframe_split_group_date_max['town'].unique().tolist() # without tolist() it will be <class 'pandas.core.arrays.categorical.Categorical'>
    # print(f"town_list type: {type(town_list)} and member type: {type(town_list[0])}")
    dataframe_split_group_date_max_groupby_town: DataFrameGroupBy = dataframe_split_group_date_max.groupby('town') # dataframe_split_group_date_max_groupby_town
    # print(f"dataframe_split_group_date_max_groupby_town type: {type(dataframe_split_group_date_max_groupby_town)}")

    # for testing purposes
    # town_dataframe: pd.DataFrame = dataframe_split_group_date_max_groupby_town.get_group('ANG MO KIO') # <class 'pandas.core.frame.DataFrame'>
    # print(f"town_dataframe type: {type(town_dataframe)}, columns: {town_dataframe.columns}")

    total_town: int = len(town_list)
    figure, axes = plt.subplots(total_town, 1, figsize = (20, 8*total_town), constrained_layout=True)
    for row, town_name in enumerate(town_list):
        # print(f"Current town: {town}, index: {row}")
        town_dataframe = dataframe_split_group_date_max_groupby_town.get_group(town_name)
        

        axes[row].set_title(
            f"Time series graph for {town_name} town", 
            fontdict={'fontsize': 25, 'fontweight': 'medium'}
        )
        axes[row].set_xlabel('time')
        axes[row].set_ylabel('resale_price')

        min_line = sb.lineplot( 
            x = "month", 
            y = "min", 
            color = 'r', 
            data = town_dataframe, 
            ax = axes[row],
            label ='min'
        )
        mean_line = sb.lineplot( 
            x = "month", 
            y = "mean", 
            color = 'b', 
            data = town_dataframe, 
            ax = axes[row],
            label ='mean'
        )
        max_line = sb.lineplot( 
            x = "month", 
            y = "max", 
            color = 'g', 
            data = town_dataframe, 
            ax = axes[row],
            label ='max'
        )
        axes[row].legend(loc='best')
        # axes[row].legend((min_line, mean_line, max_line), ('min', 'mean', 'max'), loc='best')
        
        # colors = ['r', 'b', 'g']
        # labels = ['min', 'mean', 'max']
        # figure.legend(('r', 'b', 'g'), ('min', 'mean', 'max'), loc='upper left')

    ################################## IMPROVED_VERSION2 ######################################################################################


    ################################## IMPROVED_VERSION3 ######################################################################################
    # This version create 
    # this version allow choice between min,mean and max
    # dataframe_split_group_date_max: pd.DataFrame = dataframe_split.groupby(['month', 'town'], axis='columns')
    # print(type(dataframe_split_group_date_max)) 
    # print(dataframe_split_group_date_max.columns)
    # print(dataframe_split_group_date_max['town'].unique())
    # return dataframe_split_group_date_max
    ################################## IMPROVED_VERSION3 ######################################################################################


    ################################## GRAPH RELATED ######################################################################################
    # fig, axes = plt.subplots(1, 1, figsize = (20, 8))

    # sb.lineplot( 
    #     x = "month", 
    #     y = "resale_price", 
    #     # color = 'r', 
    #     data = dataframe_split_group_date_max_dataframe, 
    #     ax = axes
    # )

    

    return None



## generate_linear_regression
NOTE: 
- Need to find out if possible to generate linear_regression with catergorical variables
- The reason for this is because there is correlation between
    - resale_price vs [flat_type, flat_model]
    - remaining_lease vs [town, flat_model]
    - floor_area_sqm vs flat_type
- Need to accept list of parameters to generate different types of linear_regression lines for different response and predictors

links:
    - https://stackoverflow.com/questions/34007308/linear-regression-analysis-with-string-categorical-features-variables
    - https://investigate.ai/classification/scikit-learn-and-categorical-features/

In [None]:
# def generate_linear_regression(parameters: Dict[str, str]) -> None:
    # Extract Response and Predictors
    # y = pd.DataFrame(pkmndata["Total"])
    # X = pd.DataFrame(pkmndata[["HP", "Attack", "Defense"]])

    # Linear Regression using Train Data
    # linreg = LinearRegression()         # create the linear regression object
    # linreg.fit(X_train, y_train)        # train the linear regression model
    
    # Coefficients of the Linear Regression line
    # print('Intercept of Regression \t: b = ', linreg.intercept_)
    # print('Coefficients of Regression \t: a = ', linreg.coef_)
    # print()

    # Print the Coefficients against Predictors
    # pd.DataFrame(list(zip(X_train.columns, linreg.coef_[0])), columns = ["Predictors", "Coefficients"])


    # Predict the Total values from Predictors
    # y_train_pred = linreg.predict(X_train)
    # y_test_pred = linreg.predict(X_test)

    # # Plot the Predictions vs the True values
    # f, axes = plt.subplots(1, 2, figsize=(24, 12))
    # axes[0].scatter(y_train, y_train_pred, color = "blue")
    # axes[0].plot(y_train, y_train, 'w-', linewidth = 1)
    # axes[0].set_xlabel("True values of the Response Variable (Train)")
    # axes[0].set_ylabel("Predicted values of the Response Variable (Train)")
    # axes[1].scatter(y_test, y_test_pred, color = "green")
    # axes[1].plot(y_test, y_test, 'w-', linewidth = 1)
    # axes[1].set_xlabel("True values of the Response Variable (Test)")
    # axes[1].set_ylabel("Predicted values of the Response Variable (Test)")
    # plt.show()


    # Check the Goodness of Fit (on Train Data)
    # print("Goodness of Fit of Model \tTrain Dataset")
    # print("Explained Variance (R^2) \t:", linreg.score(X_train, y_train))
    # print("Mean Squared Error (MSE) \t:", mean_squared_error(y_train, y_train_pred))
    # print()

    # # Check the Goodness of Fit (on Test Data)
    # print("Goodness of Fit of Model \tTest Dataset")
    # print("Explained Variance (R^2) \t:", linreg.score(X_test, y_test))
    # print("Mean Squared Error (MSE) \t:", mean_squared_error(y_test, y_test_pred))
    # print()