<a href="https://colab.research.google.com/github/DrDimos/Dissertation/blob/main/Selected_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Library Imports

1.   NumPy
2.   Tensorflow
1.   Matplotlib
2.   Time

## *NumPy Library Imports*

In [None]:
import numpy as np
from numpy import genfromtxt
from numpy import argmin, argmax

## *TensorFlow Library Imports*

In [None]:
import tensorflow as tf
from tensorflow import keras,metrics, math
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, BatchNormalization, Input, LSTM, ConvLSTM1D, Conv1D, MaxPooling1D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import Accuracy, CategoricalAccuracy, Precision, PrecisionAtRecall, Recall, RecallAtPrecision, AUC, SensitivityAtSpecificity, SpecificityAtSensitivity
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import EarlyStopping

## *Sklearn Library Imports*

In [None]:
import sklearn
from sklearn.preprocessing import label_binarize
from sklearn.metrics import auc, roc_auc_score, roc_curve
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

## *Matplotlib Library Imports*

In [None]:
import matplotlib.pyplot as plt

## Time Libray Imports

In [None]:
import time

## Google Colab File Imports


In [None]:
from google.colab import files
import os

# Import Data from the respective test and train "CSV" files.

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

loading_time_start = time.time()

# Train_fname = '/content/drive/My Drive/Train_Test_Data/DF08/Sel_Sliced/NN_Train_Set_80_20_Selected_FilterOrder_8_Sliced.csv'
# Test_fname = '/content/drive/My Drive/Train_Test_Data/DF08/Sel_Sliced/NN_Test_Set_80_20_Selected_FilterOrder_8_Sliced.csv'

Train_fname = '/content/drive/My Drive/Train_Test_Data/DF04/All_Sliced/NN_Train_Set_60_40_All_FilterOrder_4_Sliced.csv'
Test_fname = '/content/drive/My Drive/Train_Test_Data/DF04/All_Sliced/NN_Test_Set_60_40_All_FilterOrder_4_Sliced.csv'

Train_Dataset = np.genfromtxt(Train_fname, delimiter=",", dtype=np.float16)
Test_Dataset = np.genfromtxt(Test_fname, delimiter=",", dtype=np.float16)
X_Train_Values = Train_Dataset[:,:-1]
Y_Train_Labels = Train_Dataset[:,-1]
Y_Train_Labels_Categorical = to_categorical(Train_Dataset[:,-1])
Test_Validation_Split = int(len(Test_Dataset)/2)
X_Validation_Values = Test_Dataset[:Test_Validation_Split,:-1]
Y_Validation_Labels = Test_Dataset[:Test_Validation_Split,-1]
Y_Validation_Labels_Categorical = to_categorical(Y_Validation_Labels)
X_Test_Values = Test_Dataset[Test_Validation_Split:,:-1]
Y_Test_Labels = Test_Dataset[Test_Validation_Split:,-1]
Y_Test_Labels_Categorical = to_categorical(Y_Test_Labels)
loading_time_end = time.time()
loading_execution_time = loading_time_end-loading_time_start

print("\n \t\t\t Train_Dataset - Shape: \t\t\t\t",Train_Dataset.shape)
print("\n \t\t\t Test_Dataset - Shape: \t\t\t\t\t",Test_Dataset.shape)
print("\n \t\t\t X_Train_Values - Shape: \t\t\t\t",X_Train_Values.shape)
print("\n \t\t\t Y_Train_Labels - Shape: \t\t\t\t",Y_Train_Labels.shape)
print("\n \t\t\t Y_Train_Labels_Categorical - Shape: \t\t\t",Y_Train_Labels_Categorical.shape)
print("\n \t\t\t X_Validation_Values - Shape: \t\t\t\t",X_Validation_Values.shape)
print("\n \t\t\t Y_Validation_Labels - Shape: \t\t\t\t",Y_Validation_Labels.shape)
print("\n \t\t\t Y_Validation_Labels_Categorical - Shape: \t\t",Y_Validation_Labels_Categorical.shape)
print("\n \t\t\t X_Test_Values - Shape: \t\t\t\t",X_Test_Values.shape)
print("\n \t\t\t Y_Test_Labels - Shape: \t\t\t\t",Y_Test_Labels.shape)
print("\n \t\t\t Y_Test_Labels_Categorical - Shape: \t\t\t",Y_Test_Labels_Categorical.shape)
print("\n \t\t\t Loading Time in seconds: \t\t\t\t", loading_execution_time)

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).

 			 Train_Dataset - Shape: 				 (11289, 2511)

 			 Test_Dataset - Shape: 					 (7527, 2511)

 			 X_Train_Values - Shape: 				 (11289, 2510)

 			 Y_Train_Labels - Shape: 				 (11289,)

 			 Y_Train_Labels_Categorical - Shape: 			 (11289, 4)

 			 X_Validation_Values - Shape: 				 (3763, 2510)

 			 Y_Validation_Labels - Shape: 				 (3763,)

 			 Y_Validation_Labels_Categorical - Shape: 		 (3763, 4)

 			 X_Test_Values - Shape: 				 (3764, 2510)

 			 Y_Test_Labels - Shape: 				 (3764,)

 			 Y_Test_Labels_Categorical - Shape: 			 (3764, 4)

 			 Loading Time in seconds: 				 64.96667385101318


# Data Normalization for CNN, LSTM and ConvLSTM Models

In [None]:
n_traces_train = X_Train_Values.shape[0]
print("\n \t\t\t Number of Train Traces: \t\t\t\t", n_traces_train)
n_features_train = X_Train_Values.shape[1]
print("\t\t\t Number of Train Features: \t\t\t\t", n_features_train)
n_output_train = len(np.unique(Y_Train_Labels))
print("\t\t\t Number of Output Train Labels: \t\t\t", n_output_train)
n_output_train_categorical = len(np.unique(Y_Train_Labels_Categorical))
print("\t\t\t Number of Output Categorical Train Labels: \t\t", n_output_train_categorical)

n_trace_val = X_Validation_Values.shape[0]
print("\n \t\t\t Number of Validation Traces: \t\t\t\t", n_trace_val)
n_features_val = X_Validation_Values.shape[1]
print("\t\t\t Number of Validation Features: \t\t\t", n_features_val)
n_output_val = len(np.unique(Y_Validation_Labels))
print("\t\t\t Number of Output Validation Labels: \t\t\t", n_output_val)
n_output_val_categorical = len(np.unique(Y_Validation_Labels_Categorical))
print("\t\t\t Number of Output Categorical Validation Labels: \t", n_output_val_categorical)

n_trace_test = X_Test_Values.shape[0]
print("\n \t\t\t Number of Test Traces: \t\t\t\t", n_trace_test)
n_features_test = X_Test_Values.shape[1]
print("\t\t\t Number of Test Features: \t\t\t\t", n_features_test)
n_output_test = len(np.unique(Y_Test_Labels))
print("\t\t\t Number of Output Test Labels: \t\t\t\t", n_output_test)
n_output_test_categorical = len(np.unique(Y_Test_Labels_Categorical))
print("\t\t\t Number of Output Categorical Test Labels: \t\t", n_output_test_categorical)

X_Train = X_Train_Values.reshape(n_traces_train,n_features_train,1)
print("\n \t\t\t X_Train - shape: \t\t\t\t\t", X_Train.shape)
X_Validate = X_Validation_Values.reshape(n_trace_val,n_features_val,1)
print("\n \t\t\t X_Validate - shape: \t\t\t\t\t", X_Validate.shape)
X_Test = X_Test_Values.reshape(n_trace_test,n_features_test,1)
print("\n \t\t\t X_Test - shape: \t\t\t\t\t", X_Test.shape)


 			 Number of Train Traces: 				 11289
			 Number of Train Features: 				 2510
			 Number of Output Train Labels: 			 4
			 Number of Output Categorical Train Labels: 		 2

 			 Number of Validation Traces: 				 3763
			 Number of Validation Features: 			 2510
			 Number of Output Validation Labels: 			 4
			 Number of Output Categorical Validation Labels: 	 2

 			 Number of Test Traces: 				 3764
			 Number of Test Features: 				 2510
			 Number of Output Test Labels: 				 4
			 Number of Output Categorical Test Labels: 		 2

 			 X_Train - shape: 					 (11289, 2510, 1)

 			 X_Validate - shape: 					 (3763, 2510, 1)

 			 X_Test - shape: 					 (3764, 2510, 1)


In [None]:
Target_Names = ['Noise', 'Quake', 'Rockfall', 'Seism']
Learning_Rate = [1e-3, 1e-4, 1e-5]
Epochs = [10, 15, 25]
Batch_Size = [100, 128, 150]



Early_Stop = EarlyStopping(monitor='accuracy',mode='max',verbose=1,patience=5)

CNN_Scores = []
CNN_Score_Update = []
Best_Scores = []
Execution_Time = []
Y_Pred = []
CNN_Class_Y_Pred = [[]]

LSTM_Scores = []
LSTM_Score_Update = []
LSTM_Best_Scores = []
LSTM_Y_Pred = []
LSTM_Class_Y_Pred = [[]]
LSTM_Execution_Time = []


Best_Score_Update = []

# *Convolutional Neural Network (CNN)*

## Model Generator

In [None]:
def CNN_Generate_Acc_Loss_Plot(History, rate, epoch, batch):
  plt.subplot(211)
  plt.plot(History.history['accuracy'])
  plt.plot(History.history['val_accuracy'])
  plt.title('Model Accuracy')
  plt.ylabel('Accuracy')
  plt.xlabel('Eopch')
  plt.legend(['Training', 'Validation'], loc='best')
        
  plt.subplot(212)
  plt.plot(History.history['loss'])
  plt.plot(History.history['val_loss'])
  plt.title('Model Loss')
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['Training', 'Validation'], loc='best')
        
  plt.tight_layout()
  plt.gcf()
  plt.savefig('/content/drive/MyDrive/dissertation_results/CNN_lr_'+str(rate)+'e_'+str(epoch)+'b_'+str(batch)+'.png', bbox_inches='tight')
  plt.show()
  

In [None]:
def Model_Generator(X_Train):
  CNN_Model = Sequential()
  CNN_Model.add(Conv1D(filters=16, kernel_size=3,  activation='relu', padding='same'))
  CNN_Model.add(Conv1D(filters=16, kernel_size=3,  activation='relu', padding='same'))
  CNN_Model.add(MaxPooling1D(pool_size=2, padding='same'))
  CNN_Model.add(Conv1D(filters=32, kernel_size=3,  activation='relu', padding='same'))
  CNN_Model.add(Conv1D(filters=32, kernel_size=3,  activation='relu', padding='same'))
  CNN_Model.add(MaxPooling1D(pool_size=2, padding='same'))
  CNN_Model.add(Conv1D(filters=64, kernel_size=3,  activation='relu', padding='same'))
  CNN_Model.add(MaxPooling1D(pool_size=2, padding='same'))
  CNN_Model.add(Flatten())
  CNN_Model.add(Dense(units=16, activation='relu'))
  CNN_Model.add(BatchNormalization())
  CNN_Model.add(Dense(units=8, activation='relu'))
  CNN_Model.add(BatchNormalization())
  CNN_Model.add(Dense(4, activation='softmax'))
  
  CNN_Model.build(X_Train.shape)
  CNN_Model.summary()
  return CNN_Model


## Model Training

In [None]:
for rate in Learning_Rate:
  for epoch in Epochs:
    for batch in Batch_Size:
      training_time_start = time.time()
      CNN_Model = Model_Generator(X_Train)
      CNN_Model.compile(optimizer= tf.keras.optimizers.Adam(learning_rate=rate),
                        loss=tf.keras.losses.MeanSquaredError(), 
                        metrics=['accuracy', tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
      History_Model = CNN_Model.fit(x=X_Train, y=Y_Train_Labels_Categorical, 
                                    epochs=epoch, batch_size=batch,
                                    validation_data=(X_Validate, Y_Validation_Labels_Categorical))
      CNN_Scores.append(CNN_Model.evaluate(X_Validate, Y_Validation_Labels_Categorical, batch_size=batch))
      Best_Scores.append(max(History_Model.history.get('accuracy')))
      Y_Pred.append(CNN_Model.predict(X_Test))
      training_time_end = time.time()
      training_execution_time = training_time_end-training_time_start
      Execution_Time.append(training_execution_time)
      
      CNN_Generate_Acc_Loss_Plot(History_Model, rate, epoch, batch)

Model: "sequential_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_35 (Conv1D)          (11289, 2510, 16)         64        
                                                                 
 conv1d_36 (Conv1D)          (11289, 2510, 16)         784       
                                                                 
 max_pooling1d_21 (MaxPoolin  (11289, 1255, 16)        0         
 g1D)                                                            
                                                                 
 conv1d_37 (Conv1D)          (11289, 1255, 32)         1568      
                                                                 
 conv1d_38 (Conv1D)          (11289, 1255, 32)         3104      
                                                                 
 max_pooling1d_22 (MaxPoolin  (11289, 628, 32)         0         
 g1D)                                                 

KeyboardInterrupt: ignored

## Score Calculations

In [None]:
print(len(CNN_Scores))
for i in range(len(CNN_Scores)):
   temp = str(CNN_Scores[i]).replace("[","").replace("]","").replace(" ","")
   CNN_Score_Update.append(temp.split(","))

In [None]:
i=0
for rate in Learning_Rate:
  for epoch in Epochs:
    for batch in Batch_Size:
      if (float(CNN_Score_Update[i][3])+float(CNN_Score_Update[i][4]) == 0):
        CNN_F1_Score = 0
      else:
        CNN_F1_Score = 2*((float(CNN_Score_Update[i][3])*float(CNN_Score_Update[i][4]))/(float(CNN_Score_Update[i][3])+float(CNN_Score_Update[i][4])))
      print("\t\t\t CNN Scores - Learning_Rate=%f, Epoch=%d, Batch=%d \t\t\t" % (rate, epoch, batch))
      print("CNN Mean Square Error: \t\t\t", CNN_Score_Update[i][0])
      print("CNN Validation Accuracy Score: \t\t", CNN_Score_Update[i][1])
      print("CNN Best Training Accuracy Score: \t", Best_Scores[i])    
      print("CNN F1 Score: \t\t\t\t", CNN_F1_Score)
      print("CNN Precision Score: \t\t\t", CNN_Score_Update[i][3])
      print("CNN Recall Score: \t\t\t", CNN_Score_Update[i][4])
      print("CNN AUC Score: \t\t\t\t", CNN_Score_Update[i][2])
      print("Execution Time: \t\t\t", Execution_Time[i])
      print("\n\n\n")      
      i = i+1

## ROC Curves Plot

In [None]:
Y_Test_Labels_Bin = label_binarize(Y_Test_Labels, classes=[0.0, 1.0, 2.0, 3.0])
Num_Classes = Y_Test_Labels_Bin.shape[1]
lw_list = [4, 3, 2, 1]
color_list = ['red','blue','yellow','green']

fpr = dict()
tpr = dict()
thres_r = dict()
roc_auc = dict()


for k in range(len(Y_Pred)):
  for i in range(Num_Classes):
    fpr[i], tpr[i], thres_r[i] = roc_curve(Y_Test_Labels_Bin[:, i], Y_Pred[k][:, i])
    plt.plot(fpr[i], tpr[i], color=color_list[i], lw=lw_list[i])
  plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
  plt.xlim([0.0, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver Operating Characteristic Curves')
  plt.legend(Target_Names)
  plt.gcf()
  plt.savefig('/content/drive/MyDrive/dissertation_results/CNN_roc_Y_Pred'+str(k)+'.png', bbox_inches='tight')
  plt.show()



In [None]:
for k in range(len(Y_Pred)):
  for i in range(Num_Classes):
    fpr[i], tpr[i], thres_r[i] = roc_curve(Y_Test_Labels_Bin[:, i], Y_Pred[k][:, i])
    print('AUC for Class {}: {}'.format(Target_Names[i], auc(fpr[i], tpr[i])))
  print("\n")

# *Long Short-Term Memory Neural Network (LSTM)*

## Model Generator

In [None]:
def LSTM_Generate_Acc_Loss_Plot(History, rate, epoch, batch):
  plt.subplot(211)
  plt.plot(History.history['accuracy'])
  plt.plot(History.history['val_accuracy'])
  plt.title('Model Accuracy')
  plt.ylabel('Accuracy')
  plt.xlabel('Eopch')
  plt.legend(['Training', 'Validation'], loc='best')
        
  plt.subplot(212)
  plt.plot(History.history['loss'])
  plt.plot(History.history['val_loss'])
  plt.title('Model Loss')
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['Training', 'Validation'], loc='best')
        
  plt.tight_layout()
  plt.gcf()
  plt.savefig('/content/drive/MyDrive/dissertation_results/LSTM_lr_'+str(rate)+'e_'+str(epoch)+'b_'+str(batch)+'.png', bbox_inches='tight')
  plt.show()

In [None]:

def LSTM_Model_Generator(X_Train):
  LSTM_Model = Sequential()
  LSTM_Model.add(LSTM(units=64, activation='tanh', return_sequences=True, input_shape=(X_Train.shape[1], X_Train.shape[2])))
  LSTM_Model.add(Dropout(0.2))
  LSTM_Model.add(Flatten())
  LSTM_Model.add(BatchNormalization())
  LSTM_Model.add((Dense(units=4,  activation='softmax')))
  LSTM_Model.build(X_Train.shape)
  LSTM_Model.summary()
  return LSTM_Model

## Model Training

In [None]:
for rate in Learning_Rate:
  for epoch in Epochs:
    for batch in Batch_Size:
      training_time_start = time.time()
      LSTM_Model = LSTM_Model_Generator(X_Train)
      LSTM_Model.compile(optimizer= tf.keras.optimizers.Adam(learning_rate=rate),
                        loss=tf.keras.losses.MeanSquaredError(), 
                        metrics=['accuracy', tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
      LSTM_History = LSTM_Model.fit(x=X_Train, y=Y_Train_Labels_Categorical, 
                                    epochs=epoch, batch_size=batch,
                                    validation_data=(X_Validate, Y_Validation_Labels_Categorical),
                                    callbacks=[Early_Stop])
      LSTM_Scores.append(LSTM_Model.evaluate(X_Validate, Y_Validation_Labels_Categorical, batch_size=batch))
      LSTM_Best_Scores.append(max(LSTM_History.history.get('accuracy')))
      LSTM_Y_Pred.append(LSTM_Model.predict(X_Test))
      training_time_end = time.time()
      training_execution_time = training_time_end-training_time_start
      LSTM_Execution_Time.append(training_execution_time)
      LSTM_Generate_Acc_Loss_Plot(LSTM_History, rate, epoch, batch)

## Score Calculations

In [None]:
print(len(LSTM_Scores))
for i in range(len(LSTM_Scores)):
  temp = str(LSTM_Scores[i]).replace("[","").replace("]","").replace(" ","")
  LSTM_Score_Update.append(temp.split(","))

In [None]:
i=0

for rate in Learning_Rate:
  for epoch in Epochs:
    for batch in Batch_Size:
      if (float(LSTM_Score_Update[i][3])+float(LSTM_Score_Update[i][4]) == 0):
        LSTM_F1_Score = 0
      else:
        LSTM_F1_Score = 2*((float(LSTM_Score_Update[i][3])*float(LSTM_Score_Update[i][4]))/(float(LSTM_Score_Update[i][3])+float(LSTM_Score_Update[i][4])))
      print("\t\t\t LSTM Scores - Learning_Rate=%f, Epoch=%d, Batch=%d" % (rate, epoch, batch))
      print("LSTM Mean Square Error: \t\t\t", LSTM_Score_Update[i][0])
      print("LSTM Accuracy Score: \t\t\t\t", LSTM_Score_Update[i][1])
      print("LSTM Best Training Accuracy Score: \t\t", LSTM_Best_Scores[i]) 
      print("LSTM F1 Score: \t\t\t\t\t", LSTM_F1_Score)
      print("LSTM Precision Score: \t\t\t\t", LSTM_Score_Update[i][3])
      print("LSTM Recall Score: \t\t\t\t", LSTM_Score_Update[i][4])
      print("LSTM AUC Score: \t\t\t\t", LSTM_Score_Update[i][2])
      print("LSTM Execution Time: \t\t\t\t", LSTM_Execution_Time[i])
      print("\n\n\n")
      i = i+1

## ROC Curves Plot

In [None]:
Y_Test_Labels_Bin = label_binarize(Y_Test_Labels, classes=[0.0, 1.0, 2.0, 3.0])
Num_Classes = Y_Test_Labels_Bin.shape[1]

fpr = dict()
tpr = dict()
thres_r = dict()
roc_auc = dict()


for k in range(len(LSTM_Y_Pred)):
  for i in range(Num_Classes):
    fpr[i], tpr[i], thres_r[i] = roc_curve(Y_Test_Labels_Bin[:, i], LSTM_Y_Pred[k][:, i])
    plt.plot(fpr[i], tpr[i], color=color_list[i], lw=lw_list[i])

  plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
  plt.xlim([0.0, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver Operating Characteristic Curves')
  plt.legend(Target_Names)
  plt.gcf()
  plt.savefig('/content/drive/MyDrive/dissertation_results/LSTM_roc_Y_Pred'+str(k)+'.png', bbox_inches='tight')
  plt.show()

In [None]:
for k in range(len(LSTM_Y_Pred)):
  for i in range(Num_Classes):
    fpr[i], tpr[i], thres_r[i] = roc_curve(Y_Test_Labels_Bin[:, i], LSTM_Y_Pred[k][:, i])
    print('LSTM AUC for Class {}: {}'.format(Target_Names[i], auc(fpr[i], tpr[i])))
  print("\n")