In [None]:
cd ..

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scikitplot as skplt

import tensorflow as tf

from keras.models import Sequential
from keras.layers import Dense, Dropout

from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import accuracy_score
from sklearn.metrics import balanced_accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

from src.cv_winner import winner_model_rings
from src.damage import merge_df
from src.damage import read_damage_min_max
from src.damage import read_damage_mean

# Damage Dataset

In [None]:
rmse_url = "cv_output/ringsranking_rmse.csv"
rings_url = "cv_output/ringsranking_rings.csv"
df_ringsranking = winner_model_rings(rmse_url, rings_url)

In [None]:
rmse_url = "cv_output/pine_rmse.csv"
rings_url = "cv_output/pine_rings.csv"
df_pine = winner_model_rings(rmse_url, rings_url)

In [None]:
rmse_url = "cv_output/fur_rmse.csv"
rings_url = "cv_output/fur_rings.csv"
df_fur = winner_model_rings(rmse_url, rings_url)

In [None]:
rmse_url = "cv_output/tracy_rmse.csv"
rings_url = "cv_output/tracy_rings.csv"
df_tracy = winner_model_rings(rmse_url, rings_url)

In [None]:
rmse_url = "cv_output/data_rmse.csv"
rings_url = "cv_output/data_rings.csv"
df_data = winner_model_rings(rmse_url, rings_url)

In [None]:
#damage
damage_url = "damage_dataset/damage.csv"

#merge
df_merge = merge_df(df_ringsranking, df_pine, df_fur, df_tracy, df_data)

df_damage_min_max = read_damage_min_max(damage_url, df_merge)

df_damage_mean = read_damage_mean(damage_url, df_merge)

In [None]:
#STOP

# Config

In [None]:
training_percent = 0.9

epochs, batch_size = 50, 32
kf = KFold(n_splits=3, shuffle=True, random_state=42)

# mode = max for monitro val_accuracy and val_prc
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', 
    patience=10,
    restore_best_weights=True)

METRICS = [
    tf.keras.metrics.TruePositives(name='tp'),
    tf.keras.metrics.FalsePositives(name='fp'),
    tf.keras.metrics.TrueNegatives(name='tn'),
    tf.keras.metrics.FalseNegatives(name='fn'), 
    tf.keras.metrics.BinaryAccuracy(name='accuracy'),
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
    tf.keras.metrics.AUC(name='auc'),
    tf.keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
]

#columns to drop
columns = ['image', 'algo']

In [None]:
def heat_map_normalized(y_true, y_pred):
    
    class_names = ['Not Damage', 'Damaged']
    
    skplt.metrics.plot_confusion_matrix(y_true, y_pred,
                                        figsize=(4,3),
                                        normalize=True)
    
    # Customize axis tick labels
    plt.xticks([0,1], class_names)
    plt.yticks([0,1], class_names)
    
    plt.yticks(rotation=90)
    
    plt.xlabel('Predicted labels')
    plt.ylabel('True labels')
    plt.show()

In [None]:
def heat_map(_cm):
    # plot confusion matrix as heatmap
    labels = ['Not Damage', 'Damaged']
    
    # Set up the matplotlib figure
    plt.figure(figsize=(4, 3))
    
    # Generate a custom diverging colormap
    cmap = sns.diverging_palette(230, 20, as_cmap=True)
    
    sns.heatmap(_cm, annot=True, cmap=cmap, xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted labels')
    plt.ylabel('True labels')
    plt.show()

In [None]:
units = [32, 64, 128, 256]
activations = ['relu', 'tanh', 'sigmoid']
kernel_size = [3, 5]
learning_rate = [0.001, 0.0001, 0.00001]
optimizer = ['Adam', 'RMSprop', 'Nadam']
dropout = [0.1, 0.2, 0.3]

# Model

In [None]:
class Baseline():
    def __init__(self, _input_dim, unit, activation, kernel_size, learning_rate, optimizer, dropout, metrics=METRICS):
        # Define the model architecture
        model = Sequential()
        
        # hidden layer
        model.add(Dense(unit, activation=activation, input_dim=_input_dim))
        model.add(Dropout(dropout))
        
        # hidden layer
        model.add(Dense(unit, activation=activation))
        model.add(Dropout(dropout))
        
        # hidden layer
        model.add(Dense(unit, activation=activation))
        model.add(Dropout(dropout))
        
        # hidden layer
        model.add(Dense(unit, activation=activation))
        model.add(Dropout(dropout))
        
        # output layer
        model.add(Dense(1, activation='sigmoid'))

        if optimizer == 'Adam':
            ops = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        elif optimizer == 'RMSprop':
            ops = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
        elif optimizer == 'Nadam':
            ops = tf.keras.optimizers.Nadam(learning_rate=learning_rate)
            
        # Compile the model
        model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        self.model = model
        
    def train(self, X_train, y_train, X_val, y_val, epochs, batch_size):
        history = self.model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=epochs, batch_size=batch_size, callbacks=[early_stopping], verbose=2)
        return history

    def evaluate(self, X_test, y_test):
        results = self.model.evaluate(X_test, y_test, verbose=0)
        return results

    def predict(self, X): #X_train X_test
        predictions = self.model.predict(X)
        return predictions
    
    def summary(self):
        self.model.summary()
        
    def metrics_names(self):
        return self.model.metrics_names

# Model MinMax

In [None]:
df = df_damage_min_max.drop(columns=columns)

X = df.drop(columns=['Damage'])
y = df['Damage']

# split the data into a holdout set and the rest for K-Fold cross-validation
X_train, X_holdout, y_train, y_holdout = train_test_split(X, y, train_size=training_percent, random_state=42)

In [None]:
# Initialize StandardScaler
scaler = StandardScaler()

# Fit scaler on training data
scaler.fit(X_train)

# Transform both training and test data
X_train_scaled = scaler.transform(X_train)
X_holdout_scaled = scaler.transform(X_holdout)

In [None]:
best_acc = 0.0
baseline_values = { 'units': 0, 'activation': '', 'kernel_size': 0, 'learning_rate' : 0.0, 'optimizer': '', 'dropout': 0.0 }

In [None]:
input_dim = X_train.shape[1]

In [None]:
for u in units:
    for a in activations:
        for k in kernel_size:
            for l in learning_rate:
                for o in optimizer:
                    for d in dropout:
                        print('\nUnits:', u, 'Activation:', a, 'Kernel size:', k, 'Learning rate:', l, 'Optimizer:', o, 'Dropout:', d)
                        for train_index, val_index in kf.split(X_train_scaled):
                            # split the dataset into training and validation sets for this fold
                            X_train_kf, X_val_kf = X_train_scaled[train_index], X_train_scaled[val_index]
                            y_train_kf, y_val_kf = y_train.iloc[train_index], y_train.iloc[val_index]

                            model = Baseline(input_dim, u, a, k, l, o, d)

                            history = model.train(X_train_kf, y_train_kf, X_val_kf, y_val_kf, epochs, batch_size)
                            if (history.history['val_accuracy'][-1] > best_acc):
                                best_acc = history.history['val_accuracy'][-1]
                                baseline_values = { 'units': u, 'activation': a, 'kernel_size': k, 'learning_rate' : l, 'optimizer': o, 'dropout': d }

In [None]:
print(best_acc)
print(baseline_values)

In [None]:
#STOP