In [688]:
import pandas as pd
import yaml
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
from dython import nominal


from sklearn.model_selection import train_test_split
# linear-regression
from sklearn.linear_model import LinearRegression

# decision dependencies
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix
from sklearn.tree import plot_tree

# random forest
from sklearn.ensemble import RandomForestRegressor

# Kmeans clustering
from sklearn.cluster import KMeans

# scatterplot dependencies
from sklearn.preprocessing import PowerTransformer

# embedding umap
import umap

# progress bar
from tqdm import tqdm

# Kmodes
from kmodes.kprototypes import KPrototypes

# LGBMclassifier
from lightgbm import LGBMClassifier


from typing import List, Dict, Tuple, Union, Any
sb.set()

# Exploratory Data Analysis Functions

## load_dataset()
- input: path to dataset folder
- returns: dataframe with datatypes formatted 
- description: Function will do the following tasks:
    - load the dataset (in this case Course_info.csv)
    - load the yaml to format the features to the appropiate dataypes (in this case data_types.yaml)
- remarks: None

In [None]:
def load_dataset(folder_path: str) -> pd.DataFrame:

    dataset: pd.DataFrame = pd.read_csv(f"{folder_path}/Course_info.csv")
    config: Dict[str, str] = yaml.safe_load(open(f"{folder_path}/data_types.yaml"))
    
    # assign the data types preset in the data_types.yaml
    for feature, datatype in config['data_types'].items():
        dataset[feature] = dataset[feature].astype(datatype)

    # for loop used to debug to ensure features are in the right data type
    # for column in dataset.columns:
    #     print(f"{column}: {dataset[column].dtype}")

    return dataset

## load_parameters()
- input: path of yaml file
- returns: parameters for the 
- description: will load the following parameters (in dictionary format):
    - include_features
    - Clean up
    - Types of EDA
    - 
- remarks: parameter file should contain the following details

In [None]:
def load_parameters(folder_path: str) -> Dict[str, Union[List, Dict[str, str]]]:

    parameters : Dict[str, Union[List, Dict[str, str]]] = yaml.safe_load(open(f"{folder_path}/parameters.yaml"))

    return parameters

## clean_up_dataset()
- input: 
    - Dataframe
    - parameters 
- returns: Dataframe
- description: Function does the following to the dataset
    - clean up 
    - drop row that contains NULL/NAN values
    - extract the interested features
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required for the clean up

In [689]:
def clean_up_dataset(
        dataframe: pd.DataFrame, 
        parameters: Dict[str, Union[List, Dict[str, str]]]
    ) -> pd.DataFrame:

    ## CLEAN UP RELATED
    # remove of rows based on constraint values set in the parameters 
    cleanup_parameters: Dict[str, int] = parameters['clean_up']
    
    for feature, constraint in cleanup_parameters.items():
        dataframe = dataframe[ dataframe[feature] > constraint]

    ## NULL/NAN RELATED
    # to print any null values
    # print(dataframe.isnull().sum())

    # drop any row that contains NULL/NAN values
    if dataframe.isnull().values.any():
        dataframe = dataframe.dropna()

    ## DUPLICATE RELATED
    # method to print duplicates on specific column
    # print(f"Duplicated instructors: \n{dataframe['instructor_name'].value_counts(ascending=False)}")
    # instructor_names = pd.DataFrame(dataframe['instructor_name'].value_counts(ascending=False))
    # instructor_names.to_csv('instructor_names.csv')

    # method to print duplicates exists on specific column
    # for columns in dataframe.columns:
    #     print(f"Duplicated {columns}: {dataframe[columns].duplicated().any()}")

    # total_duplicated_ids = dataframe[dataframe.duplicated('id', keep=False)]
    # print(f"Course with duplicated ids: {len(total_duplicated_ids)}")
    # add condition if there is duplicates

    ## EXTRACT FEATURE RELATED
    # create a new dataframe to extract the interested feature (set in the parameters)
    include_features: List[str] = parameters['include_features']
    extracted_dataset = pd.DataFrame( dataframe[include_features] )
    # extracted_dataset.info()

    return extracted_dataset.reset_index(drop=True)

## get_outlier_samples()
- inputs: Dataframe
- returns: Series of outliers based on supplied dataframe
- description: identify the outliers based on the supplied dataframe
- remarks: None

In [691]:
def get_outlier_samples(dataframe: pd.DataFrame) -> pd.core.series.Series:

    q1 = dataframe.quantile(0.25)
    q3 = dataframe.quantile(0.75)
    interquartile_range = q3-q1

    lower_whisker = q1-1.5*interquartile_range
    upper_whisker = q3+1.5*interquartile_range
    outliers: pd.core.series.Series = ((dataframe < lower_whisker) | (dataframe > upper_whisker))

    return outliers

## print_feature_outliers()
- inputs: Dataframe
- returns: None
- description: prints number of outliers for every numerical features/column
- remarks: None

In [692]:
def print_feature_outliers(dataframe: pd.DataFrame) -> None:

    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    for column in numerical_dataframe.columns:

        outliers = sum(get_outlier_samples(numerical_dataframe[column]))
        print(f"[{column}] total outliers: {outliers}")

## remove_outliers()
- inputs: Dataframe
- returns: Dataframe with outliers removed for every numerical feature/column 
- description: Function remove **UNION** outlier of the dataset. In other words remove the entire row containing outliers
- remarks: None

In [693]:
def remove_outliers(dataframe: pd.DataFrame) -> pd.DataFrame:

    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    union_outliers = (get_outlier_samples(numerical_dataframe)).any(axis=1)
    dataframe_with_outliers_removed: pd.DataFrame = dataframe[~union_outliers].reset_index(drop=True)

    # print(f"Total 'UNION' outliers: {sum(union_outliers)}")
    # instructor_names = pd.DataFrame(dataframe_with_outliers_removed['instructor_name'].value_counts(ascending=False))
    # instructor_names.to_csv('instructor_names.csv')
    return dataframe_with_outliers_removed

## generate_numerical_eda_visualization()
- inputs: 
    - dataframe
    - List of catergorical features/columns which are numerical types
    - plot title
- returns: None
- description: generates box, histo and violin plot for every numerical features (column) of the dataset
- remakrs: None

In [None]:
def generate_numerical_eda_visualization(
        dataframe: pd.DataFrame,
        plot_title: str
    ) -> None:
    
    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])
    total_features = len(numerical_dataframe.columns)
    figure, axes = plt.subplots(
        total_features, 
        3, 
        figsize=(24,4.8*total_features)
    )
    
    # figure.suptitle(plot_title, fontsize=20)
    axes[0, 1].set_title(plot_title, fontsize=25)
    row = 0
    for column in numerical_dataframe.columns:
        sb.boxplot(data=numerical_dataframe[column], orient='h', ax=axes[row,0])
        sb.histplot(data=numerical_dataframe[column], ax=axes[row,1])
        sb.violinplot(data=numerical_dataframe[column], orient='h', ax=axes[row,2])
        row = row + 1

## generate_categorical_count_visualization()
- input: 
    - Dataframe
    - parameters 
- returns: None
- description: generates categorical plot for selected catergorical features (column) of the dataset
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required for the catergorical count visualization

In [694]:
def generate_categorical_count_visualization(
        dataframe: pd.DataFrame,
        parameters: Dict[str, Union[List, Dict[str, str]]]
    ) -> None:

    visualization_parameters: List[str] = parameters['categorical_count_visualization']
    visualization_dataframe = pd.DataFrame( dataframe[visualization_parameters] )

    for column in visualization_dataframe.columns:
        category_total_types = len(dataframe[column].value_counts())        
        sb.catplot(y=column, data=visualization_dataframe, kind="count", height=category_total_types)
        plt.title(f"Total count for all types available in '{column}' feature", fontsize=20)

## generate_numerical_vs_categorical_eda_visualization()
- inputs: 
    - dataframe
    - parameters
    - plot title
- returns: None
- description: generates categorical plot for every catergorical features (column) of the dataset
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required to generate eda visualization

In [695]:
def generate_numerical_vs_categorical_eda_visualization(
        dataframe: pd.DataFrame,
        parameters: Dict[str, Union[List, Dict[str, str]]],
        plot_title: str
    ) -> None:

    visualization_parameters: Dict[str, str] = parameters['numerical_vs_categorical_eda_visualization']

    total_features = len(visualization_parameters)

    figure, axes = plt.subplots(
        total_features, 
        1, 
        figsize=(20,10*total_features),
        constrained_layout=True
    )
    figure.tight_layout(pad=10.0)

    row = 0
    for numerical_feature, categorical_feature in visualization_parameters.items():
        axes[row].set_title(
            f"{categorical_feature} boxplot based on {numerical_feature}", 
            fontdict={'fontsize': 25, 'fontweight': 'medium'}
        )

        sb_plot = sb.boxplot(
            y=numerical_feature, 
            x=categorical_feature, 
            data=dataframe, 
            order=dataframe.groupby(categorical_feature)[numerical_feature].median().sort_values().index,
            ax=axes[row]
        )
        sb_plot.set_xticklabels(sb_plot.get_xticklabels(), rotation=40, ha='right')

        row = row + 1

## generate_numerical_heatmap()
- inputs: 
    - dataframe
    - plot title
- returns: None
- description: generates heatmap for every numerical features (column) of the dataset
- remarks: None

In [696]:
def generate_numerical_heatmap(
        dataframe: pd.DataFrame,
        plot_title: str
    ) -> None:
    
    numerical_dataframe = dataframe.select_dtypes(include=['int64', 'float64'])

    plt.figure(figsize=(13, 13))
    plt.title(plot_title, fontsize=20)
    sb.heatmap(numerical_dataframe.corr(), vmin = -1, vmax = 1, linewidths = 1,
        annot = True, fmt = ".2f", annot_kws = {"size": 18}, cmap = "RdBu"
    )
    plt.show()

## generate_numerical_categorical_heatmap()
- inputs: dataframe
- returns: None
- description: generates heatmap for numerical and catergorical features (column) of the dataset
 heatmap produces by calculate the correlation/strength-of-association of features in data-set with both categorical and continuous features using: 
    - Pearson's R for continuous-continuous cases 
    - Correlation Ratio for categorical-continuous cases 
    - Cramer's V or Theil's U for categorical-categorical cases
- remarks: More info on the library checkout [dython](http://shakedzy.xyz/dython/modules/nominal/)

In [None]:
def generate_numerical_categorical_heatmap(
    dataframe: pd.DataFrame
) -> None:
    nominal.associations(dataset=dataframe, figsize=(13, 10), title="Correlation/Strength-of-association of features")
    

## generate_kprototype_clusters()
- inputs: 
    - dataframe
    - parameters
- returns: Dataframe including clusters generated by kprototype ML algorithm
- description: using of kprototype to generate cluster based on catergorical features
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required to create kprototype clusters

In [None]:
def generate_kprototype_clusters(
        dataset: pd.DataFrame, 
        parameters: Dict[str, Union[List[str], Dict[str, Any]]]
    ) -> pd.DataFrame:
    
    # extract kprototype parameters
    kprototype_parameters: Dict[str, Any] = parameters['kprototype_clusters']
    number_of_clusters: int = kprototype_parameters['number_of_clusters']
    number_of_iteratio: int = kprototype_parameters['number_of_iteratio']
    cluster_name: str = kprototype_parameters['cluster_name']
    categorical_features: List[str] = kprototype_parameters['categorical_features']
    numerical_features: List[str] = kprototype_parameters['numerical_features']
    include_features: List[str] = categorical_features + numerical_features
    extracted_dataset = pd.DataFrame( dataset[include_features] )


    # get the index of catergorical features from the dataset
    categorical_features_idx = list()
    for index, column in enumerate(extracted_dataset.columns):
        if extracted_dataset[column].dtype == 'category':
            categorical_features_idx.append(index)

    mark_array: np.ndarray = extracted_dataset.values
    kproto: KPrototypes = KPrototypes(
        n_clusters=number_of_clusters, 
        n_init=number_of_iteratio, 
        verbose=2
    ).fit(mark_array, categorical=categorical_features_idx)

    #print centroids
    # print(kproto.cluster_centroids_)

    clusters = kproto.predict(mark_array, categorical=categorical_features_idx)
    print(f"clusters type: {type(clusters)}, \n{clusters}")
    extracted_dataset[cluster_name] = list(clusters)
    extracted_dataset[cluster_name] = extracted_dataset[cluster_name].astype('category')
    print(extracted_dataset.groupby(cluster_name).size())

    kproto_cost: np.float64 = kproto.cost_
    kproto_label: np.ndarray = kproto.labels_
    # print(f"kproto cost type: {type(kproto_cost)}, {kproto_cost}")
    # print(f"kproto label type: {type(kproto_label)}, {kproto_label}")

    plt.plot(kproto_cost.tolist())
    plt.xlabel('K')
    plt.ylabel('cost')
    plt.show

    # taking an hour to create
    # costs = []
    # n_clusters = []
    # clusters_assigned = []

    # for i in tqdm(range(0, 5)):
    #     try:
    #         kproto = KPrototypes(n_clusters=i, init='Huang', verbose=2)
    #         clusters = kproto.fit_predict(
    #             mark_array, 
    #             categorical=categorical_features_idx
    #         )
    #         costs.append(kproto.cost_)
    #         n_clusters.append(i)
    #         clusters_assigned.append(clusters)
    #     except:
    #         print(f"Can't cluster with {i} clusters")
    
    # plt.plot(costs)
    # plt.xlabel('K')
    # plt.ylabel('cost')
    # plt.show

    # fig, ax = plt.subplots()
    # fig.set_size_inches((20, 10))
    # scatter = ax.scatter(embedding[:, 0], embedding[:, 1], s=2, c=clusters, cmap='tab20b', alpha=1.0)

    # # produce a legend with the unique colors from the scatter
    # legend1 = ax.legend(*scatter.legend_elements(num=15),
    #                     loc="lower left", title="Classes")
    # ax.add_artist(legend1)

    return extracted_dataset

## generate_kprototype_cost_curve()
- inputs: 
    - dataframe
    - numerical_feature: the numerical feature to associate with different type of x and y axis
- returns: Dataframe with clusters
- description: using of kprototype to generate cluster based on catergorical features
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required to create kprototype clusters

## generate_decision_tree_classifier()
- inputs: 
    - dataframe
    - parameters
- returns: Tuple of 
    - DecisionTreeClassier object
    - Train dataframe for response
    - Train dataframe for predictors 
    - Test dataframe for response 
    - Test dataframe for predictors
- description: using of kprototype to generate cluster based on catergorical features
- remarks: parameter input is dictionary data that loads from parameters.yaml file which contains all the configuration required to create kprototype clusters

In [None]:
def generate_decision_tree_classifier(
        dataset: pd.DataFrame,
        parameters: Dict[str, Union[List[str], Dict[str, Any]]]
) -> Tuple[
    DecisionTreeClassifier,
    pd.DataFrame, # Train dataframe for response
    pd.DataFrame, # Train dataframe for predictors 
    pd.DataFrame, # Test dataframe for response 
    pd.DataFrame # Test dataframe for predictors
]:
    
    # extract decision tree clissier parameters
    decision_tree_classifier_parameters: Dict[str, Any] = parameters['decision_tree_classifier']
    max_depth: int = decision_tree_classifier_parameters['max_depth']
    train_test_ratio: float = decision_tree_classifier_parameters['train_test_ratio']
    response: List[str] = decision_tree_classifier_parameters['response']
    predictors: List[str] = decision_tree_classifier_parameters['predictors']

    y = pd.DataFrame(dataset[response])
    X = pd.DataFrame(dataset[predictors])
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = train_test_ratio)
    decision_tree: DecisionTreeClassifier = DecisionTreeClassifier(max_depth = max_depth)

    return (decision_tree, X_train, X_test, y_train, y_test)

    

## generate_confusion_matrix()
- inputs: Tuple of 
    - DecisionTreeClassier object
    - Train dataframe for response
    - Train dataframe for predictors 
    - Test dataframe for response 
    - Test dataframe for predictors
- returns: None
- description: Generates confusion matrix
- remarks: None

In [None]:
def generate_confusion_matrix(
    decision_tree_output: Tuple[
        DecisionTreeClassifier,
        pd.DataFrame, # Train dataframe for response
        pd.DataFrame, # Test dataframe for response
        pd.DataFrame,  # Train dataframe for predictors 
        pd.DataFrame # Test dataframe for predictors
    ]
) -> None:

    dectree: DecisionTreeClassifier = decision_tree_output[0]
    X_train: pd.DataFrame = decision_tree_output[1]
    X_test: pd.DataFrame = decision_tree_output[2]
    y_train: pd.DataFrame = decision_tree_output[3]
    y_test: pd.DataFrame = decision_tree_output[4]
    

    y_train_pred = dectree.predict(X_train)
    y_test_pred = dectree.predict(X_test)

    print("Goodness of Fit of Model \tTrain Dataset")
    print("Classification Accuracy \t:", dectree.score(X_train, y_train))
    print()

    print("Goodness of Fit of Model \tTest Dataset")
    print("Classification Accuracy \t:", dectree.score(X_test, y_test))
    print()

    figure, axes = plt.subplots(1, 2, figsize=(40, 20))

    sb.heatmap(
        confusion_matrix(
            y_train, 
            y_train_pred
        ),
        annot = True, 
        fmt=".0f", 
        annot_kws={"size": 18}, 
        ax = axes[0]
    )

    sb.heatmap(
        confusion_matrix(
            y_test, 
            y_test_pred
        ), 
        annot = True, 
        fmt=".0f", 
        annot_kws={"size": 18}, 
        ax = axes[1]
    )

## generate_decision_tree()
- inputs: Tuple of 
    - DecisionTreeClassier object
    - Train dataframe for response
    - Train dataframe for predictors 
    - Test dataframe for response 
    - Test dataframe for predictors
- returns: None
- description: Generates binary tree
- remarks: None

In [None]:
def generate_decision_tree(
    decision_tree_output: Tuple[
        DecisionTreeClassifier,
        pd.DataFrame, # Train dataframe for response
        pd.DataFrame, # Train dataframe for predictors 
        pd.DataFrame, # Test dataframe for response 
        pd.DataFrame # Test dataframe for predictors
    ],
    parameters: Dict[str, Union[List[str], Dict[str, Any]]]
) -> None:

    

    dectree: DecisionTreeClassifier = decision_tree_output[0]
    X_train: pd.DataFrame = decision_tree_output[1]
    X_test: pd.DataFrame = decision_tree_output[2]
    y_train: pd.DataFrame = decision_tree_output[3]
    y_test: pd.DataFrame = decision_tree_output[4]

    f = plt.figure(figsize=(70,10))
    plot_tree(dectree, filled=True, rounded=True, fontsize=10,
        feature_names=X_train.columns, 
        class_names=["0","1", "2", "3", "4", "5", "6", "7", "8", "9"])
    plt.savefig('tree_high_dpi', dpi=100)