# Concolutional Neural Network

## Importation librairies

In [None]:
import numpy as np

from tensorflow.keras.utils import normalize, plot_model
from tensorflow.keras.layers import Input, Conv1D, Activation, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras import models, callbacks

import matplotlib.pyplot as plt
import time
from sklearn.metrics import confusion_matrix, roc_curve, auc
import seaborn as sns

## Data import and display

In [None]:
# Data recovery
train_data = np.loadtxt('data/ECG200_TRAIN.tsv', delimiter='\t')
test_data = np.loadtxt('data/ECG200_TEST.tsv', delimiter='\t')

# Show first time series
print("health:", train_data[0,0])
print(train_data[0,1:])

# Show data dimensions
print("Dimensions des données d'entrainnement:", train_data.shape)
print("Dimensions des données de test:", test_data.shape)

# Show first 5 time series
nb_series_a_afficher = 5
plt.figure(figsize=(27, 7))
for i in range(nb_series_a_afficher):
    serie_temporelle = train_data[i, 1:]
    etat_sante = train_data[i, 0]
    
    plage_temps = range(len(serie_temporelle))
    
    plt.subplot(2, nb_series_a_afficher, i+1)
    plt.plot(plage_temps, serie_temporelle, label='Série temporelle')
    plt.xlabel('Temps')
    plt.ylabel('Valeurs')
    plt.title(f'Série temporelle {i+1} (Santé: {"Bon" if etat_sante == -1 else "Mauvais"})')
    plt.legend()
    plt.grid(True)

plt.tight_layout()
plt.show()



## Data processing

In [None]:
# Normalizer classes
train_data[train_data[:, 0] == -1, 0] = 0
test_data[test_data[:, 0] == -1, 0] = 0

# Separation of training and testing data
X_train, y_train = train_data[:, 1:], train_data[:, 0]
X_test, y_test = test_data[:, 1:], test_data[:, 0]

# Normalize time series between 0 and 1 independently of each other
X_train = normalize(X_train, axis=1)
X_test = normalize(X_test, axis=1)

# Display first data
print("health:", y_train[0])
print(X_train[0])
print("Dimensions des données d'entrainnement:", (X_train.shape, y_train.shape))


## Choice of Hyperparameters

❒ Layer hyperparameters: kernel size, dropout, method of activation of hidden layers, method of activation of the final layer, etc.

❒ The model compilation hyperparameters: optimizer, loss, learning rate, etc.

❒ Model execution hyperparameters: batch size, number of epochs, etc.

In [None]:
# Layers hyperparameters
filters = 3
kernel_size = 2
stride = 1
padding = 'same'
hidden_activation = 'relu'
final_activation = 'sigmoid' 
pool_size = 1
dropout_rate = 0.3
nb_classes = 1

# Compil hyperparameters
"""learning_rate = 0.001 # choose learning rate pour l'optimiseur adam"""                        
optimizer_algo = 'adam'
cost_function = 'binary_crossentropy'

# Execution hyperparameters
mini_batch_size = 16
nb_epochs = 1500
percentage_of_train_as_validation = 0.2

## Creating the model

In [None]:
# Input
input_shape = (96, 1)
input_layer = Input(input_shape)

# Hidden block
conv_layer_1_1 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding)(input_layer)
relu_layer_1_1 = Activation(hidden_activation)(conv_layer_1_1)
conv_layer_1_2 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding)(relu_layer_1_1)
relu_layer_1_2 = Activation(hidden_activation)(conv_layer_1_2)
pooling_layer_1 = MaxPooling1D(pool_size=pool_size,padding=padding)(relu_layer_1_2)
dropout_layer_1_1 = Dropout(rate=dropout_rate)(relu_layer_1_1)

# Output
flattened_layer = Flatten()(pooling_layer_1)
dropout_flattened = Dropout(rate=dropout_rate)(flattened_layer)
output_layer = Dense(units=nb_classes, activation=final_activation)(dropout_flattened)

model_cnn = models.Model(inputs=input_layer, outputs=output_layer)

## Model compilation

In [None]:
model_cnn.compile(loss=cost_function,optimizer=optimizer_algo, metrics=['accuracy'])

model_cnn.summary()

## Model training

In [None]:
# Specify the model checkpoint (to save the best model for each epoch)
model_checkpoint = callbacks.ModelCheckpoint('best_model_CNN.keras', monitor='val_loss', save_best_only=True)

# Start training
history = model_cnn.fit(X_train, y_train,
                    batch_size=mini_batch_size, 
                    epochs=nb_epochs,
                    validation_split=percentage_of_train_as_validation,
                    verbose=False,
                    callbacks=[model_checkpoint])


## Model evaluation

In [None]:
# Load the best validation's model
best_model_cnn = models.load_model('best_model_CNN.keras')


# Evaluate the model on train and test set
train_loss, train_accuracy = best_model_cnn.evaluate(X_train, y_train)
print(f"Test Loss: {train_loss}")
print(f"Test Accuracy: {train_accuracy}")

test_loss, test_accuracy = best_model_cnn.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

### Graph

In [None]:
# Plot training and validation accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.ylim(0, 1)
plt.show()

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.ylim(0, 1)
plt.show()

### Confusion matrix

In [None]:
# Make predictions about the test set
y_pred = model_cnn.predict(X_test)

# Convert y_pred probabilities into binary classes
y_pred_classes = (y_pred > 0.5).astype(int).flatten()

# Display the confusion matrix as a heatmap
conf_matrix = confusion_matrix(y_test, y_pred_classes)
class_names = ['Positif', 'Négatif']
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, cmap="Blues", fmt='g', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()


## Search for hyperparameters
by grid search and random search (to do: with cross-validation)

look hyperparameter_search_CNN


## Best result

In [None]:
# Hyperparametres
filters = 5
kernel_size = 3
stride = 1
padding = 'same'
use_bias = False
hidden_activation = 'relu'
final_activation = 'sigmoid' 
pool_size = 2
dropout_rate = 0.2
nb_classes = 1
optimizer_algo = 'adam'
cost_function = 'binary_crossentropy'
mini_batch_size = 16
nb_epochs = 1000
percentage_of_train_as_validation = 0.2

# Build and compile model
input_shape = (96, 1)
input_layer = Input(input_shape)
conv_layer_1_1 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(input_layer)
relu_layer_1_1 = Activation(hidden_activation)(conv_layer_1_1)
conv_layer_1_2 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(relu_layer_1_1)
relu_layer_1_2 = Activation(hidden_activation)(conv_layer_1_2)
pooling_layer_1 = MaxPooling1D(pool_size=pool_size, padding=padding)(relu_layer_1_2)
dropout_layer_1_1 = Dropout(rate=dropout_rate)(pooling_layer_1)
conv_layer_2_1 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(dropout_layer_1_1)
relu_layer_2_1 = Activation(hidden_activation)(conv_layer_2_1)
conv_layer_2_2 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(relu_layer_2_1)
relu_layer_2_2 = Activation(hidden_activation)(conv_layer_2_2)
pooling_layer_2 = MaxPooling1D(pool_size=pool_size, padding=padding)(relu_layer_2_2)
flattened_layer = Flatten()(pooling_layer_2)
dropout_flattened = Dropout(rate=dropout_rate)(flattened_layer)
output_layer = Dense(units=nb_classes,activation=final_activation)(dropout_flattened)
model_cnn = models.Model(inputs=input_layer, outputs=output_layer)
model_cnn.compile(loss='binary_crossentropy',optimizer='adam', metrics=['accuracy'])

model_checkpoint = callbacks.ModelCheckpoint('best_model_CNN.keras', monitor='val_loss', save_best_only=True)

# Start training
start_training = time.time()
history = model_cnn.fit(X_train, y_train, 
                    batch_size=mini_batch_size, 
                    epochs=nb_epochs,
                    validation_split=percentage_of_train_as_validation,
                    verbose=False,
                    callbacks=[model_checkpoint])
end_training = time.time()

# Evaluate best model
best_model_cnn = models.load_model('best_model_CNN.keras')
train_loss, train_accuracy = best_model_cnn.evaluate(X_train, y_train)
start_evaluate = time.time()
test_loss, test_accuracy = best_model_cnn.evaluate(X_test, y_test)
end_evaluate = time.time()

y_pred = model_cnn.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype(int).flatten()
conf_matrix = confusion_matrix(y_test, y_pred_classes)
class_names = ['Positif', 'Négatif']
fpr, tpr, thresholds = roc_curve(y_test, y_pred)
roc_auc = auc(fpr, tpr)

# Result
model_cnn.summary()
training_time_seconds = end_training - start_training
evaluate_time_seconds = end_evaluate - start_evaluate
print(f"\nNombre total de paramètres : {best_model_cnn.count_params()}")
print(f"\nTemps d'entraînement : {training_time_seconds:.3f} secondes.")
print(f"Temps d'évaluation : {evaluate_time_seconds:.3f} secondes.")
print(f'\nMoyenne de train_accuracy_vals: {np.mean(train_accuracy) * 100:.2f}%')
print(f'Moyenne de train_loss_vals: {np.mean(train_loss) * 100:.2f}%')
print(f'\nMoyenne de test_accuracy_vals: {np.mean(test_accuracy) * 100:.2f}%')
print(f'Moyenne de test_loss_vals: {np.mean(test_loss) * 100:.2f}%')
print(f'\nAUC-ROC : {roc_auc * 100:.2f}%')

# Plot 
plt.figure(figsize=(16, 12))
    # Training and Validation Accuracy
plt.subplot(2, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.ylim(0, 1)
    # Training and Validation Loss
plt.subplot(2, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.ylim(0, 1)
    # Confusion Matrix
plt.subplot(2, 2, 3)
sns.heatmap(conf_matrix, annot=True, cmap="Blues", fmt='g', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
    # ROC Curve
plt.subplot(2, 2, 4)
plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (AUC = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.show()


## Cross-validation

In [None]:
train_data = np.loadtxt('data/ECG200_TRAIN.tsv', delimiter='\t')
test_data = np.loadtxt('data/ECG200_TEST.tsv', delimiter='\t')

test_data[test_data[:, 0] == -1, 0] = 0
X_test, y_test = test_data[:, 1:], test_data[:, 0]
X_test = normalize(X_test, axis=1)

# Hyperparameters
filters = 5
kernel_size = 3
stride = 1
padding = 'same'
use_bias = False
hidden_activation = 'relu'
final_activation = 'sigmoid' 
pool_size = 2
dropout_rate = 0.2
nb_classes = 1
optimizer_algo = 'adam'
cost_function = 'binary_crossentropy'
mini_batch_size = 16
nb_epochs = 800
percentage_of_train_as_validation = 0.2

num_splits = 5
test_accuracy_vals = []
test_loss_vals = []
train_accuracy_vals = []
train_loss_vals = []

for i in range(num_splits) :
    train_data_suffled = train_data
    np.random.shuffle(train_data_suffled)
    train_data_suffled[train_data_suffled[:, 0] == -1, 0] = 0
    X_train, y_train = train_data_suffled[:, 1:], train_data_suffled[:, 0]
    X_trai = normalize(X_train, axis=1)

    # build and compil model
    input_shape = (96, 1)
    input_layer = Input(input_shape)
    conv_layer_1_1 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(input_layer)
    relu_layer_1_1 = Activation(hidden_activation)(conv_layer_1_1)
    conv_layer_1_2 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(relu_layer_1_1)
    relu_layer_1_2 = Activation(hidden_activation)(conv_layer_1_2)
    pooling_layer_1 = MaxPooling1D(pool_size=pool_size, padding=padding)(relu_layer_1_2)
    dropout_layer_1_1 = Dropout(rate=dropout_rate)(pooling_layer_1)
    conv_layer_2_1 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(dropout_layer_1_1)
    relu_layer_2_1 = Activation(hidden_activation)(conv_layer_2_1)
    conv_layer_2_2 = Conv1D(filters=filters,kernel_size=kernel_size,strides=stride,padding=padding,use_bias=use_bias)(relu_layer_2_1)
    relu_layer_2_2 = Activation(hidden_activation)(conv_layer_2_2)
    pooling_layer_2 = MaxPooling1D(pool_size=pool_size, padding=padding)(relu_layer_2_2)
    flattened_layer = Flatten()(pooling_layer_2)
    dropout_flattened = Dropout(rate=dropout_rate)(flattened_layer)
    output_layer = Dense(units=nb_classes,activation=final_activation)(dropout_flattened)
    model_cnn = models.Model(inputs=input_layer, outputs=output_layer)
    model_cnn.compile(loss='binary_crossentropy',optimizer='adam', metrics=['accuracy'])
    
    model_checkpoint = callbacks.ModelCheckpoint('best_model_CNN.keras', monitor='val_loss', save_best_only=True)

    # start training
    history = model_cnn.fit(X_train, y_train, 
                        batch_size=mini_batch_size, 
                        epochs=nb_epochs,
                        validation_split=percentage_of_train_as_validation,
                        verbose=False,
                        callbacks=[model_checkpoint])

    # evaluate best model
    best_model_cnn = models.load_model('best_model_CNN.keras')
    train_loss, train_accuracy = best_model_cnn.evaluate(X_train, y_train)
    test_loss, test_accuracy = best_model_cnn.evaluate(X_test, y_test)

    test_accuracy_vals.append(test_accuracy)
    test_loss_vals.append(test_loss)
    train_accuracy_vals.append(train_accuracy)
    train_loss_vals.append(train_loss)

# Display result
print("\nNombre d'essais :", num_splits)
print(f'\nMoyenne de train_accuracy_vals: {np.mean(train_accuracy_vals) * 100:.2f}%')
print(f'Moyenne de train_loss_vals: {np.mean(train_loss_vals) * 100:.2f}%')
print(f'\nMoyenne de test_accuracy_vals: {np.mean(test_accuracy_vals) * 100:.2f}%')
print(f'Moyenne de test_loss_vals: {np.mean(test_loss_vals) * 100:.2f}%')
print(f'\tLa variance associée de l\'accuracy: {np.var(test_accuracy_vals):.6f}')
print(f'\tLa variance associée de la loss: {np.var(test_loss_vals):.6f}')