# **import libraries, data and label**

In [None]:
# Import necessary libraries
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, roc_auc_score
from keras import regularizers
from keras.optimizers import Adagrad, SGD, Adamax, RMSprop, Adam
from keras.callbacks import ModelCheckpoint, Callback
from keras.models import Sequential, load_model
from keras.layers import (BatchNormalization, GlobalAveragePooling1D, Dense, Activation, Dropout, Flatten,
                          Conv1D, MaxPooling1D, AveragePooling1D)
from os import listdir
from os.path import isfile, join
import pandas as pd
import numpy as np
import scipy.io as sio
import tensorflow as tf
import math
import matplotlib.pyplot as plt

# Google Drive mount for Colab
from google.colab import drive

drive.mount('/content/drive/')

# Set parameters
size = 840  # Size parameter
np.random.seed(10)  # Set seed for reproducibility
number_of_classes = 2  # Number of output classes

In [None]:
# Load labels
mypath = '/content/drive/My Drive/valdata/'
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
ll = [f for f in onlyfiles if f[0] == 'w']
lbl = sio.loadmat(mypath + ll[0])['wlbl']
lbl = np.reshape(lbl, (840,))

# Load data and gather information
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
mats = [f for f in onlyfiles if f[4] == 'm']
mats = sorted(mats)

# Windowing
window = 5 * 500
overlap = 4 * 500
diff = window - overlap

X = np.zeros((1, window))
label = np.zeros((1, 1))

for i in range(size):
    temp = sio.loadmat(mypath + mats[i])['val'][0, :]
    number_of_windows = math.floor((len(temp) - overlap) / diff)

    for j in range(number_of_windows):
        Xw = np.zeros((1, window))
        Xw[0, :] = temp[diff * j: window + diff * j]
        X = np.vstack((X, Xw))

        lblw = np.zeros((1, 1))
        lblw[0, 0] = lbl[i]
        label = np.vstack((label, lblw))

X = np.delete(X, 0, 0)
label = np.delete(label, 0, 0)

# Save processed data to CSV files
np.savetxt("/content/drive/My Drive/valdata/X_no_reshape.csv", X, delimiter = ",")
np.savetxt("/content/drive/My Drive/valdata/label_no_reshape.csv", label, delimiter = ",")

In [None]:
# load ready data and label
X = np.loadtxt("/content/drive/My Drive/valdata/X_no_reshape.csv", delimiter = ",")
label = np.loadtxt("/content/drive/My Drive/valdata/label_no_reshape.csv", delimiter = ",")

# **data preparation**

In [None]:
# Function for picking the right class among outputs
def change(x):
    """
    Finds the class with the highest probability from the output layer.
    
    Args:
    x (numpy.ndarray): Output layer array from the model.

    Returns:
    numpy.ndarray: Array of the index with the highest probability for each entry.
    """
    answer = np.zeros((np.shape(x)[0]))
    for i in range(np.shape(x)[0]):
        max_value = max(x[i, :])
        max_index = list(x[i, :]).index(max_value)
        answer[i] = max_index
    return answer.astype(np.int)


# Convert integer labels to binary representation
Label_set = np.zeros((np.shape(label)[0], number_of_classes))
for i in range(81480):
    row = np.zeros((number_of_classes))
    row[int(label[i]) - 1] = 1
    Label_set[i, :] = row

# Normalization
Xnew = (X - X.mean()) / (X.std())
Xnew = np.reshape(Xnew, (np.shape(Xnew)[0], np.shape(Xnew)[1], 1))

In [None]:
# Train and test split - 5th slot as test
values = [i for i in range(80640)]
permutations = np.random.permutation(values)  # Generate a random permutation of indexes
Xnew = Xnew[permutations, :]  # Apply permutation to the feature data
Label_set = Label_set[permutations, :]  # Apply permutation to the label data

# Split the data into training and test sets
X_train = Xnew[:int(0.8 * 80640), :]  # 80% for training
Y_train = Label_set[:int(0.8 * 80640), :]  # Corresponding labels for training
X_test = Xnew[int(0.8 * 80640):, :]  # Remaining 20% for testing
Y_test = Label_set[int(0.8 * 80640):, :]  # Corresponding labels for testing
sizee = len(X_test)

val = 0.5  # Split the test data for validation
X_val = X_test[:int(val * sizee), :]  # 50% for validation
Y_val = Y_test[:int(val * sizee), :]  # Corresponding labels for validation
X_test = X_test[int(val * sizee):, :]  # Remaining 50% for final testing
Y_test = Y_test[int(val * sizee):, :]  # Corresponding labels for final testing

In [None]:
# Train and test split - 4th slot as test
values = [i for i in range(80640)]
permutations = np.random.permutation(values)  # Generate a random permutation of indexes
Xnew = Xnew[permutations, :]  # Apply permutation to the feature data
Label_set = Label_set[permutations, :]  # Apply permutation to the label data

# Split the data into training and test sets
X_train1 = Xnew[:int(0.6 * 80640), :]  # 60% for the first part of training
X_train2 = Xnew[int(0.8 * 80640):, :]  # 20% for the second part of training
X_train = np.vstack((X_train1, X_train2))  # Combine both parts for the complete training set
Y_train1 = Label_set[:int(0.6 * 80640), :]  # Corresponding labels for the first part of training
Y_train2 = Label_set[int(0.8 * 80640):, :]  # Corresponding labels for the second part of training
Y_train = np.vstack((Y_train1, Y_train2))  # Combine both parts for the complete training labels

X_test = Xnew[int(0.6 * 80640):int(0.8 * 80640), :]  # 20% for testing
Y_test = Label_set[int(0.6 * 80640):int(0.8 * 80640), :]  # Corresponding labels for testing
sizee = len(X_test)

val = 0.5  # Split the test data for validation
X_val = X_test[:int(val * sizee), :]  # 50% for validation
Y_val = Y_test[:int(val * sizee), :]  # Corresponding labels for validation
X_test = X_test[int(val * sizee):, :]  # Remaining 50% for final testing
Y_test = Y_test[int(val * sizee):, :]  # Corresponding labels for final testing

In [None]:
# Train and test split - 3rd slot as test
values = [i for i in range(80640)]
permutations = np.random.permutation(values)  # Generate a random permutation of indexes
Xnew = Xnew[permutations, :]  # Apply permutation to the feature data
Label_set = Label_set[permutations, :]  # Apply permutation to the label data

# Split the data into training and test sets
X_train1 = Xnew[:int(0.4 * 80640), :]  # 40% for the first part of training
X_train2 = Xnew[int(0.6 * 80640):, :]  # 20% for the second part of training
X_train = np.vstack((X_train1, X_train2))  # Combine both parts for the complete training set
Y_train1 = Label_set[:int(0.4 * 80640), :]  # Corresponding labels for the first part of training
Y_train2 = Label_set[int(0.6 * 80640):, :]  # Corresponding labels for the second part of training
Y_train = np.vstack((Y_train1, Y_train2))  # Combine both parts for the complete training labels

X_test = Xnew[int(0.4 * 80640):int(0.6 * 80640), :]  # 20% for testing
Y_test = Label_set[int(0.4 * 80640):int(0.6 * 80640), :]  # Corresponding labels for testing
sizee = len(X_test)

val = 0.5  # Split the test data for validation
X_val = X_test[:int(val * sizee), :]  # 50% for validation
Y_val = Y_test[:int(val * sizee), :]  # Corresponding labels for validation
X_test = X_test[int(val * sizee):, :]  # Remaining 50% for final testing
Y_test = Y_test[int(val * sizee):, :]  # Corresponding labels for final testing

In [None]:
# Train and test split - 2nd slot as test
values = [i for i in range(80640)]
permutations = np.random.permutation(values)  # Create a random permutation of indexes
Xnew = Xnew[permutations, :]  # Apply the permutation to the feature data
Label_set = Label_set[permutations, :]  # Apply the permutation to the label data

# Split the data into training and test sets
X_train1 = Xnew[:int(0.2 * 80640), :]  # 20% of the data for the first part of training
X_train2 = Xnew[int(0.4 * 80640):, :]  # 40% for the second part of training
X_train = np.vstack((X_train1, X_train2))  # Combine both parts for the complete training set
Y_train1 = Label_set[:int(0.2 * 80640), :]  # Corresponding labels for the first part of training
Y_train2 = Label_set[int(0.4 * 80640):, :]  # Corresponding labels for the second part of training
Y_train = np.vstack((Y_train1, Y_train2))  # Combine both parts for the complete training labels

X_test = Xnew[int(0.2 * 80640):int(0.4 * 80640), :]  # 20% for testing
Y_test = Label_set[int(0.2 * 80640):int(0.4 * 80640), :]  # Corresponding labels for testing
sizee = len(X_test)

val = 0.5  # Split the test data for validation
X_val = X_test[:int(val * sizee), :]  # 50% for validation
Y_val = Y_test[:int(val * sizee), :]  # Corresponding labels for validation
X_test = X_test[int(val * sizee):, :]  # Remaining 50% for final testing
Y_test = Y_test[int(val * sizee):, :]  # Corresponding labels for final testing

In [None]:
# Train and test split - 1st slot as test
values = [i for i in range(80640)]
permutations = np.random.permutation(values)  # Create a random permutation of indexes
Xnew = Xnew[permutations, :]  # Apply the permutation to the feature data
Label_set = Label_set[permutations, :]  # Apply the permutation to the label data

# Split the data into training and test sets
X_train = Xnew[int(0.2 * 80640):, :]  # 80% of the data for training
Y_train = Label_set[int(0.2 * 80640):, :]  # Corresponding labels for training
X_test = Xnew[:int(0.2 * 80640), :]  # 20% for testing
Y_test = Label_set[:int(0.2 * 80640), :]  # Corresponding labels for testing
sizee = len(X_test)

val = 0.5  # Split the test data for validation
X_val = X_test[:int(val * sizee), :]  # 50% for validation
Y_val = Y_test[:int(val * sizee), :]  # Corresponding labels for validation
X_test = X_test[int(val * sizee):, :]  # Remaining 50% for final testing
Y_test = Y_test[int(val * sizee):, :]  # Corresponding labels for final testing

# **Functions**

In [None]:
def pretty_plot(history, field, fn):
    """
    Creates visualizations to display the training and validation metrics over epochs.

    Args:
    - history (object): The training history object generated during model training.
    - field (string): The specific metric to plot over epochs.
    - fn (function): A function to determine the best index for the specified field in the validation data.

    Returns:
    - Visualization plots showing the specified field's performance over all epochs and the last 15% of epochs.

    The function generates two plots:
    1. Performance of the specified field (e.g., loss, accuracy) over all epochs with a marker for the best value.
    2. Performance of the specified field for the last 15% of epochs with a marker for the best value from the entire data.

    The best index and value are determined using the provided function on the validation data.
    """

    def plot(data, val_data, best_index, best_value, title):
        # Plot train and validation data with optional markers for the best index and value
        plt.plot(range(1, len(data) + 1), data, label = 'train')  # Plot training data
        plt.plot(range(1, len(data) + 1), val_data, label = 'validation')  # Plot validation data
        if best_index is not None:
            plt.axvline(x = best_index + 1, linestyle = ':', c = "#777777")  # Vertical line for best index
        if best_value is not None:
            plt.axhline(y = best_value, linestyle = ':', c = "#777777")  # Horizontal line for best value
        plt.xlabel('Epoch')
        plt.ylabel(field)
        plt.xticks(range(0, len(data), 20))  # Set the ticks on the x-axis for every 20 epochs
        plt.title(title)
        plt.legend()
        plt.show()

    data = history.history[field]  # Extract the specified field from the history
    val_data = history.history['val_' + field]  # Extract the corresponding validation data
    tail = int(0.15 * len(data))  # Extract the last 15% of the data

    best_index = fn(val_data)  # Get the best index using the provided function
    best_value = val_data[best_index]  # Get the best value from the validation data

    # Plot for entire epochs and for the last 15% of the data
    plot(data, val_data, best_index, best_value, "{} over epochs (best {:06.4f})".format(field, best_value))
    plot(data[-tail:], val_data[-tail:], None, best_value, "{} over last {} epochs".format(field, tail))


#learning rate finder
from tf.keras.callbacks import Callback
import tf.keras.backend as K
import numpy as np
import matplotlib.pyplot as plt


class LRFinder(Callback):
    """
    Up-to date version: https://github.com/WittmannF/LRFinder
    Example of usage:
        from keras.models import Sequential
        from keras.layers import Flatten, Dense
        from keras.datasets import fashion_mnist
        !git clone https://github.com/WittmannF/LRFinder.git
        from LRFinder.keras_callback import LRFinder
        # 1. Input Data
        (X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()
        mean, std = X_train.mean(), X_train.std()
        X_train, X_test = (X_train-mean)/std, (X_test-mean)/std
        # 2. Define and Compile Model
        model = Sequential([Flatten(),
                            Dense(512, activation='relu'),
                            Dense(10, activation='softmax')])
        model.compile(loss='sparse_categorical_crossentropy', \
                      metrics=['accuracy'], optimizer='sgd')
        # 3. Fit using Callback
        lr_finder = LRFinder(min_lr=1e-4, max_lr=1)
        model.fit(X_train, y_train, batch_size=128, callbacks=[lr_finder], epochs=2)
    """

    def __init__(self, min_lr, max_lr, mom=0.9, stop_multiplier=None,
                 reload_weights=True, batches_lr_update=5):
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.mom = mom
        self.reload_weights = reload_weights
        self.batches_lr_update = batches_lr_update
        if stop_multiplier is None:
            self.stop_multiplier = -20 * self.mom / 3 + 10  # 4 if mom=0.9
            # 10 if mom=0
        else:
            self.stop_multiplier = stop_multiplier

    def on_train_begin(self, logs={}):
        p = self.params
        try:
            n_iterations = p['epochs'] * p['samples'] // p['batch_size']
        except:
            n_iterations = p['steps'] * p['epochs']

        self.learning_rates = np.geomspace(self.min_lr, self.max_lr, \
                                           num = n_iterations // self.batches_lr_update + 1)
        self.losses = []
        self.iteration = 0
        self.best_loss = 0
        if self.reload_weights:
            self.model.save_weights('tmp.hdf5')

    def on_batch_end(self, batch, logs={}):
        loss = logs.get('loss')

        if self.iteration != 0:  # Make loss smoother using momentum
            loss = self.losses[-1] * self.mom + loss * (1 - self.mom)

        if self.iteration == 0 or loss < self.best_loss:
            self.best_loss = loss

        if self.iteration % self.batches_lr_update == 0:  # Evaluate each lr over 5 epochs

            if self.reload_weights:
                self.model.load_weights('tmp.hdf5')

            lr = self.learning_rates[self.iteration // self.batches_lr_update]
            K.set_value(self.model.optimizer.lr, lr)

            self.losses.append(loss)

        if loss > self.best_loss * self.stop_multiplier:  # Stop criteria
            self.model.stop_training = True

        self.iteration += 1

    def on_train_end(self, logs=None):
        if self.reload_weights:
            self.model.load_weights('tmp.hdf5')

        plt.figure(figsize = (12, 6))
        plt.plot(self.learning_rates[:len(self.losses)], self.losses)
        plt.xlabel("Learning Rate")
        plt.ylabel("Loss")
        plt.xscale('log')
        plt.show()

# **model structure**

In [None]:
# Define the structure of the CNN model
model = Sequential()

# Block 1
model.add(Conv1D(32, 4, strides = 1, padding = 'same', activation = 'relu', input_shape = (2500, 1)))
model.add(Conv1D(32, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(AveragePooling1D(4, 2))  # Down-sampling layer
model.add(BatchNormalization())  # Normalize the activations

# Block 2
model.add(Conv1D(64, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(Conv1D(64, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(AveragePooling1D(4, 2))
model.add(BatchNormalization())

# Block 3
model.add(Conv1D(128, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(Conv1D(128, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(AveragePooling1D(4, 2))
model.add(BatchNormalization())

# Block 4
model.add(Conv1D(256, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(Conv1D(256, 4, strides = 1, padding = 'same', activation = 'relu'))
model.add(AveragePooling1D(4, 2))
model.add(BatchNormalization())

# Global Average Pooling
model.add(GlobalAveragePooling1D())  # Pooling layer for spatial data reduction

# Dense Layers
model.add(Dense(256, kernel_initializer = 'normal', activation = 'relu'))  # Fully connected layer
model.add(Dropout(0.2))  # Dropout layer for regularization
model.add(Dense(2, kernel_initializer = 'normal', activation = 'softmax'))  # Output layer with Softmax activation for classification

# **train model**

In [None]:
# Set the optimizer and compile the model
optimizer = tf.keras.optimizers.Adamax(lr = 0.001)
model.compile(optimizer = optimizer, loss = 'mse', metrics = ['accuracy'])

# Learning Rate Finder
lr_finder = LRFinder(min_lr = 1e-12, max_lr = 1e+1)

# Fit the model to the training data using LR Finder callback to observe loss at different learning rates
model.fit(X_train, Y_train, batch_size = 128, callbacks = [lr_finder], epochs = 5)

In [None]:
# Configure optimizer and compile the model
opt = Adamax(learning_rate = 0.001, decay = 0.001, clipvalue = 0.5, epsilon = 1e-07)
model.compile(loss = 'mse', optimizer = opt, metrics = ['accuracy'])

# Define the path for storing model and history
pathh = '/content/drive/My Drive/Conv_models'

# Create a callback to save the best model based on validation accuracy
checkpointer = ModelCheckpoint(filepath = pathh, monitor = 'val_accuracy', mode = 'max', verbose = 1, save_best_only = True)

# Train the model on the training data and validate on the validation set
# Save the best model and the history to specified file paths
hist = model.fit(X_train, Y_train, validation_data = (
X_val, Y_val), batch_size = 128, epochs = 280, verbose = 2, shuffle = True, callbacks = [checkpointer])
pd.DataFrame(hist.history).to_csv(path_or_buf = '/content/drive/My Drive/Conv_models/History.csv')

# **train information**

In [None]:
#plot run info
pretty_plot(hist, 'loss', lambda x: np.argmin(x))
pretty_plot(hist, 'accuracy', lambda x: np.argmax(x))

In [None]:
#test
testmodel = load_model('/content/drive/My Drive/Conv_models')
tst_loss, tst_acc = testmodel.evaluate(X_test, Y_test)

In [None]:
#print model summary
model.summary()

In [None]:
# Initialize arrays for model evaluation metrics
accuracy_model = np.zeros((1, 1))
Sensitivity_model = np.zeros((1, 1))
Specificity_model = np.zeros((1, 1))

# Get predictions from the model and compute the confusion matrix
Y_pred = model.predict(X_test, verbose = 0, max_queue_size = 10, workers = 1, use_multiprocessing = False)
cm = confusion_matrix(change(Y_test), change(Y_pred))

# Calculate accuracy, sensitivity, and specificity from the confusion matrix
accuracy_model = accuracy_score(change(Y_test), change(Y_pred))
Sensitivity_model = cm[0, 0] / (cm[0, 0] + cm[0, 1])
Specificity_model = cm[1, 1] / (cm[1, 0] + cm[1, 1])

# Calculate ROC AUC score using predictions and actual labels
auc = roc_auc_score(Y_test, Y_pred, average = 'macro', sample_weight = None, max_fpr = None, multi_class = 'raise', labels = None)

# Display the evaluation metrics and classification report
print('\n\n\n\n roc_auc_score : \n', auc)
print('\n\n confusion_matrix : \n', cm)
print('\n\n test accuracy is :', accuracy_model)
print(' Sensitivity :', Sensitivity_model)
print(' Specificity :', Specificity_model)
print('\n\n classification report : \n', classification_report(change(Y_test), change(Y_pred)))