# <center> Deep Learning Project:

> - __Ahmed Abdelazeem__ (m20210433)
> - __Omar Jarir__ (m20201378)  
> - __Chung-Ting Huang__ (m20210437) 

***

- __The objective of this notebook is to perform hyperparameters tuning:__
- Some references:
   - https://neptune.ai/blog/keras-tuner-tuning-hyperparameters-deep-learning-model/amp
   - https://keras.io/guides/keras_tuner/getting_started/
   - https://keras.io/api/utils/serialization_utils/
   - https://www.tensorflow.org/tutorials/keras/keras_tuner
   - https://scikit-learn.org/stable/auto_examples/feature_selection/plot_feature_selection.html#sphx-glr-auto-examples-feature-selection-plot-feature-selection-py

In [None]:
                # !pip install keras-tuner --upgrade
# !pip install -q -U keras-tuner
#!pip install findspark
#!pip install tensorflow-addons

In [None]:
import time
t1 = time.perf_counter()

***

- __Importing the necessary libraries 📚:__

In [None]:
import numpy as np
import random as python_random
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from tensorflow import keras 
import keras_tuner as kt
import keras.backend as K
import tensorflow_addons as tfa

In [None]:
from keras.models import Model, Sequential
from keras import layers
from keras.callbacks import EarlyStopping, ModelCheckpoint, History, ReduceLROnPlateau 
from keras import optimizers

from sklearn.model_selection import train_test_split 
from sklearn.feature_selection import SelectKBest, chi2, mutual_info_classif, f_classif 
from sklearn.utils import class_weight

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn import metrics
from sklearn.metrics import classification_report, confusion_matrix

import IPython

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
SEED = 2022

Fixing the random number seed to ensure that the results are reproducible.

In [None]:
session = K.get_session()

np.random.seed(SEED)
tf.random.set_seed(SEED)
python_random.seed(SEED)

- __Helper functions:__

In [None]:
# Function to create dataframe with metrics

def performanceMetricsDF(metricsObj, yTrain, yPredTrain, yTest, yPredTest):
    measures_list = ['ACCURACY','PRECISION', 'RECALL','F1 SCORE','AUC']
    train_results = [metricsObj.accuracy_score(yTrain, yPredTrain),
                metricsObj.precision_score(yTrain, yPredTrain),
                metricsObj.recall_score(yTrain, yPredTrain, average='macro'),
                metricsObj.f1_score(yTrain, yPredTrain, average='macro'),
                metricsObj.roc_auc_score(yTrain, yPredTrain),    
                ]
    test_results = [metricsObj.accuracy_score(yTest, yPredTest),
               metricsObj.precision_score(yTest, yPredTest),
               metricsObj.recall_score(yTest, yPredTest, average='macro'),
               metricsObj.f1_score(yTest, yPredTest, average='macro'),
               metricsObj.roc_auc_score(yTest, yPredTest), 
               ]
    resultsDF = pd.DataFrame({'Measure': measures_list, 'Train': train_results, 'Test':test_results}, index=measures_list)
    return resultsDF

In [None]:
# Function to plot confusion matrix - Adapted from https://github.com/DTrimarchi10/confusion_matrix/blob/master/cf_matrix.py
def make_confusion_matrix(cf,
                          group_names=None,
                          categories='auto',
                          count=True,
                          percent=True,
                          cbar=True,
                          xyticks=True,
                          xyplotlabels=True,
                          sum_stats=True,
                          figsize=None,
                          cmap='Blues',
                          title=None):
    '''
    This function will make a pretty plot of an sklearn Confusion Matrix cm using a Seaborn heatmap visualization.
    Arguments
    ---------
    cf:            confusion matrix to be passed in
    group_names:   List of strings that represent the labels row by row to be shown in each square.
    categories:    List of strings containing the categories to be displayed on the x,y axis. Default is 'auto'
    count:         If True, show the raw number in the confusion matrix. Default is True.
    normalize:     If True, show the proportions for each category. Default is True.
    cbar:          If True, show the color bar. The cbar values are based off the values in the confusion matrix.
                   Default is True.
    xyticks:       If True, show x and y ticks. Default is True.
    xyplotlabels:  If True, show 'True Label' and 'Predicted Label' on the figure. Default is True.
    sum_stats:     If True, display summary statistics below the figure. Default is True.
    figsize:       Tuple representing the figure size. Default will be the matplotlib rcParams value.
    cmap:          Colormap of the values displayed from matplotlib.pyplot.cm. Default is 'Blues'
                   See http://matplotlib.org/examples/color/colormaps_reference.html
                   
    title:         Title for the heatmap. Default is None.
    '''


    # CODE TO GENERATE TEXT INSIDE EACH SQUARE
    blanks = ['' for i in range(cf.size)]

    if group_names and len(group_names)==cf.size:
        group_labels = ["{}\n".format(value) for value in group_names]
    else:
        group_labels = blanks

    if count:
        group_counts = ["{0:0.0f}\n".format(value) for value in cf.flatten()]
    else:
        group_counts = blanks

    if percent:
        group_percentages = ["{0:.2%}".format(value) for value in cf.flatten()/np.sum(cf)]
    else:
        group_percentages = blanks

    box_labels = [f"{v1}{v2}{v3}".strip() for v1, v2, v3 in zip(group_labels,group_counts,group_percentages)]
    box_labels = np.asarray(box_labels).reshape(cf.shape[0],cf.shape[1])


    # CODE TO GENERATE SUMMARY STATISTICS & TEXT FOR SUMMARY STATS
    if sum_stats:
        #Accuracy is sum of diagonal divided by total observations
        accuracy  = np.trace(cf) / float(np.sum(cf))

        #if it is a binary confusion matrix, show some more stats
        if len(cf)==2:
            #Metrics for Binary Confusion Matrices
            precision = cf[1,1] / sum(cf[:,1])
            recall    = cf[1,1] / sum(cf[1,:])
            f1_score  = 2*precision*recall / (precision + recall)
            stats_text = "\n\nAccuracy={:0.3f}\nPrecision={:0.3f}\nRecall={:0.3f}\nF1 Score={:0.3f}".format(
                accuracy,precision,recall,f1_score)
        else:
            stats_text = "\n\nAccuracy={:0.3f}".format(accuracy)
    else:
        stats_text = ""


    # SET FIGURE PARAMETERS ACCORDING TO OTHER ARGUMENTS
    if figsize==None:
        #Get default figure size if not set
        figsize = plt.rcParams.get('figure.figsize')

    if xyticks==False:
        #Do not show categories if xyticks is False
        categories=False


    # MAKE THE HEATMAP VISUALIZATION
    plt.figure(figsize=figsize)
    ax = sns.heatmap(cf,annot=box_labels, fmt="",cmap=cmap,cbar=cbar,xticklabels=categories,yticklabels=categories)

    if xyplotlabels:
        plt.ylabel('True label')
        plt.xlabel('Predicted label' + stats_text)
    else:
        plt.xlabel(stats_text)
    
    if title:
        plt.title(title)

In [None]:
# Function to find outliers on columns based on percentile
def removeOutliers(df, colList, lowPercentile=0.05, highPercentile=0.95, verbose=False):
    quant_df = df[colList].quantile([lowPercentile, highPercentile])
    if verbose:
        print(quant_df)
    for name in list(df[colList].columns):
        df = df[(df[name] >= quant_df.loc[lowPercentile, name]) & (df[name] <= quant_df.loc[highPercentile, name])]
    return df

In [None]:
def MutualInfoScores(X, y):    
    mi = pd.Series(mutual_info_classif(X, y))
    mi /= mi.max()
    mi.index = X.columns
    mi.sort_values(ascending=False).plot.bar(figsize=(20, 6))
    plt.title("Feature univariate score")
    plt.ylabel('Mutual Information')
    plt.show();

In [None]:
def FTestScores(X, y):
    f_scores = pd.Series(-np.log10(f_classif(X, y)[1]))
    f_scores /= f_scores.max()
    f_scores.index = X.columns
    f_scores.sort_values(ascending=False).plot.bar(figsize=(20,6))
    plt.title("Feature univariate score")
    plt.ylabel(r"Univariate score ($-Log(p_{value})$)")
    plt.show()

In [None]:
def f1(y_true, y_pred):
    y_pred = K.round(y_pred)
    tp = K.sum(K.cast(y_true*y_pred, 'float'), axis=0)
    fp = K.sum(K.cast((1-y_true)*y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true*(1-y_pred), 'float'), axis=0)

    p = tp / (tp + fp + K.epsilon())
    r = tp / (tp + fn + K.epsilon())

    f1 = 2*p*r / (p+r+K.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return K.mean(f1)

***

- __Loading the dataset:__

In [None]:
ds= pd.read_csv("data.csv")

In [None]:
ds.head()

In [None]:
# Checking the shape of the dataset:

ds.shape

In [None]:
# We can see that our dataset does not contain any duplicates.

ds.duplicated().sum()

In [None]:
# Checking the type of the columns:

ds.dtypes

In [None]:
# Checking that the target distribution is imbalanced.

sns.countplot(y='Bankrupt?', data = ds, palette='viridis', orient = 'h');

In [None]:
ds["Bankrupt?"].value_counts(normalize=True)

In [None]:
# MutualInfoScores(ds.drop(columns="Bankrupt?"), ds["Bankrupt?"])

In [None]:
X = ds.copy(deep=True)

y = X["Bankrupt?"]
X = X.drop(columns=["Bankrupt?"])

### Splitting the dataset:

In [None]:
# Split the dataset intro train and test sets.

x_train, x_val, y_train, y_val = train_test_split(X, y, test_size = 0.3,
                                   shuffle =True, stratify=y, random_state=SEED)

x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size = 0.3,
                                                   shuffle = True, stratify=y_train, random_state=SEED)

### Dealing with outliers:

In [None]:
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

In [None]:
# x_train = removeOutliers(x_train, colList=x_train.columns , lowPercentile=0.01, highPercentile=0.99, verbose=False)
# x_val = removeOutliers(x_val, colList=x_val.columns , lowPercentile=0.01, highPercentile=0.99, verbose=False)
# x_test = removeOutliers(x_test, colList=x_test.columns , lowPercentile=0.01, highPercentile=0.99, verbose=False)

In [None]:
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

In [None]:
print(y_train.value_counts(normalize=True))
print(y_val.value_counts(normalize=True))
print(y_test.value_counts(normalize=True))

In [None]:
# selector = SelectKBest(f_classif, k=4)
# selector.fit(X_train, y_train)
# scores = -np.log10(selector.pvalues_)
# scores /= scores.max()

In [None]:
MutualInfoScores(x_train, y_train)

In [None]:
FTestScores(x_train, y_train)

- The figure above shows that the number of features to select is equal to 24,because they have a value superior than 0.2

In [None]:
# MutualInfoScores(x_train, y_train)

In [None]:
selector = SelectKBest(f_classif, k=15)

# selecting the features:
selector.fit(x_train, y_train)

kept_columns = list(x_train.columns[selector.get_support()]) 

In [None]:
kept_columns

In [None]:
x_train = x_train[kept_columns]
x_val = x_val[kept_columns]
x_test = x_test[kept_columns]

# Data Pre-processing:

In [None]:
# X = ds.copy(deep=True)

# y = X["Bankrupt?"]
# X = X.drop(columns=["Bankrupt?"])

## Data Normalization:

In [None]:
# We use the standard scaler in order to normalize the data:

scaler = StandardScaler() 
x_train = scaler.fit_transform(x_train) 
x_val = scaler.transform(x_val)
x_test = scaler.transform(x_test)

# Hyper parameters tuning:

__Using Keras tuner:__

In [None]:
ClassWeights = dict(enumerate(class_weight.compute_class_weight('balanced', 
                            classes=np.unique(y_train), y=y_train)))
ClassWeights

In [None]:
EarlyStop = EarlyStopping(monitor='val_f1', patience=3, verbose=0)

CallBacksList = [EarlyStop,]  

In [None]:
class MyHyperModel(kt.HyperModel):

    def build(self ,hp):

        hp_units_1 = hp.Choice("Layer 1", [64, 128, 256])
        hp_units_2 = hp.Choice("Layer 2", [16, 32]) # 64
        hp_units_3 = hp.Choice("Layer 3", [4, 8, 16]) # 32, 64, 16 
        hp_activation = hp.Choice("activation", ["relu", "tanh"])
        hp_kernel_initializer = hp.Choice("kernel_initializer", ["glorot_uniform", "glorot_normal"])
        hp_learning_rate = hp.Float("learning_rate", 1e-4, 1e-2, sampling="log", default=1e-3)  
        hp_loss = hp.Choice("loss_type", ["BC", "FL"])
        hp_gamma = hp.Choice("Gamma", [0.0, 0.5, 1.0, 2.0])
        
        model = Sequential()
    
        model.add(layers.Dense(units = hp_units_1,
                        kernel_initializer = hp_kernel_initializer,
                        kernel_regularizer = tf.keras.regularizers.L2(0.1),
                        activation = hp_activation,
                        input_dim = x_train.shape[-1]   
                           )) 
    
        # Tune whether to use dropout.
        if hp.Boolean("Dropout 1"):
            model.add(layers.Dropout(rate=0.5))
    
        model.add(layers.Dense(units = hp_units_2, 
                            kernel_initializer = hp_kernel_initializer,    
                            activation = hp_activation))
            
        if hp.Boolean("Dropout 2"):
            model.add(layers.Dropout(rate=0.5))
            
        model.add(layers.Dense(units = hp_units_3, 
                            kernel_initializer = hp_kernel_initializer,    
                            activation = hp_activation))
            
        if hp.Boolean("Dropout 3"):
            model.add(layers.Dropout(rate=0.5))
            
        model.add(layers.Dense(units=1, 
                            kernel_initializer = hp_kernel_initializer,
                            activation="sigmoid"))
            
        if hp_loss == "BC":
            with hp.conditional_scope("loss_type", ["BC"]):
                model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = hp_learning_rate),
                        loss = tf.keras.losses.BinaryCrossentropy(),
                        metrics= [tf.keras.metrics.Accuracy(), tf.keras.metrics.AUC(), 
                                  tf.keras.metrics.Recall(), f1])
        if hp_loss == "FL":
            with hp.conditional_scope("loss_type", ["FL"]):
                model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = hp_learning_rate),
                        loss = tf.keras.losses.BinaryFocalCrossentropy(gamma = hp_gamma), 
                        metrics= [tf.keras.metrics.Accuracy(), tf.keras.metrics.AUC(), 
                                  tf.keras.metrics.Recall(), f1])
    
        return model

    def fit(self, hp, model, x, y, validation_data=None, **kwargs):
        hp_batch_size = hp.Choice("batch_size", [16, 32, 64])
        
        if hp.Boolean("normalize"):
            x = layers.Normalization()(x)
        return model.fit(
            x,
            y,
            # Tune whether to shuffle the data in each epoch.
            shuffle=hp.Boolean("shuffle"),
            validation_data=validation_data,
            batch_size = hp_batch_size,
            class_weight = ClassWeights,
            **kwargs,
        )

In [None]:
tuner = kt.BayesianOptimization(
    hypermodel = MyHyperModel(),
    objective = kt.Objective("val_f1", direction="max"), 
    seed = SEED,
    max_trials = 100,
    overwrite=True,
    directory = 'Deep_learning_project',
    project_name = "Default_predictions"
)

In [None]:
tuner.search(x_train, y_train,
             validation_data = (x_val, y_val),
             epochs = 100,
             verbose = 2,
             initial_epoch = 0,
             callbacks = CallBacksList,
             use_multiprocessing = True,
            )

In [None]:
# Get the optimal hyperparameters
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The hyperparameter search is complete. The optimal hyper parameters are:
The first layer: {best_hps.get("Layer 1")}. \n
The second layer: {best_hps.get("Layer 2")}. \n
The third layer: {best_hps.get("Layer 3")}. \n
The activation function: {best_hps.get("activation")}. \n
The kernel initializer: {best_hps.get("kernel_initializer")}. \n
The loss function: {best_hps.get("loss_type")}. \n  
The learning rate: {best_hps.get("learning_rate")}. \n
The batch size: {best_hps.get("batch_size")}. \n
Normalize the data: {best_hps.get("normalize")}. \n
Dropout 1: {best_hps.get("Dropout 1")}. \n
Dropout 2: {best_hps.get("Dropout 2")}. \n
Dropout 3: {best_hps.get("Dropout 3")}. \n
Shuffle: {best_hps.get("shuffle")}. \n
Gamma: {best_hps.get("Gamma")}. \n
""")

In [None]:
best_model_params = MyHyperModel().build(best_hps)
best_model_params

In [None]:
best_model_params.fit(\x_train,
            y_train,
            epochs = 100,
            # Tune whether to shuffle the data in each epoch.
            shuffle= True,
            validation_data = (x_val, y_val),
            batch_size = 64,
            class_weight = ClassWeights,
            callbacks = CallBacksList,
            )

In [None]:
best_model_params.summary()

In [None]:
y_pred_train = best_model_params.predict(x_train)
y_pred_test = best_model_params.predict(x_test)

In [None]:
y_pred_train = (y_pred_train>0.5).astype(int)
y_pred_test = (y_pred_test>0.5).astype(int)

In [None]:
# Showing the confusion matrix

cm = metrics.confusion_matrix(y_test, y_pred_test)
labels = ['True Neg','False Pos','False Neg','True Pos']
categories = ['0', '1']
make_confusion_matrix(cm, group_names=labels, categories=categories, cmap='Blues')

In [None]:
# Showing the classification report:

TargetNames=["No Default", "Default"]
print(classification_report(y_test, y_pred_test, target_names = TargetNames))

In [None]:
# Showing the results performance.

resultsDF = performanceMetricsDF(metrics, y_train, y_pred_train, y_test, y_pred_test)
resultsDF

In [None]:
# NOTICE THE DIFFERENCE BETWENN THESE TWO MODELS ONE IS BUILT USING BEST_PARAMS,
# AND ONE USING GET_BEST_PARAMS THIS IS A WIERD SITUATION.

In [None]:
best_model = tuner.get_best_models(1)[0]
best_model.summary()

In [None]:
y_pred_train = best_model.predict(x_train)
y_pred_test = best_model.predict(x_test)

In [None]:
y_pred_train = (y_pred_train>0.5).astype(int)
y_pred_test = (y_pred_test>0.5).astype(int)

In [None]:
# Showing the confusion matrix

cm = metrics.confusion_matrix(y_test, y_pred_test)
labels = ['True Neg','False Pos','False Neg','True Pos']
categories = ['0', '1']
make_confusion_matrix(cm, group_names=labels, categories=categories, cmap='Blues')

In [None]:
# Showing the classification report:

TargetNames=["No Default", "Default"]
print(classification_report(y_test, y_pred_test, target_names = TargetNames))

In [None]:
# Showing the results performance.

resultsDF = performanceMetricsDF(metrics, y_train, y_pred_train, y_test, y_pred_test)
resultsDF

***

In [None]:
t2 = time.perf_counter()
print('Time taken to run in minutes:',(t2-t1)/60.0)