<a href="https://colab.research.google.com/github/holly-hewitt/Abnormal-Infant-Movement-Detection/blob/main/Code/DataSetTestingClean.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#import tensorflow.compat.v1 as tf
#tf.enable_eager_execution(tf.ConfigProto(log_device_placement=False))
#tf.test.gpu_device_name()

import numpy as np
from sklearn.model_selection import KFold
from itertools import product
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Masking, LSTM, GRU
from sklearn.model_selection import KFold
# import early stopping
from tensorflow.keras.callbacks import EarlyStopping
import pickle
from sklearn.metrics import accuracy_score, recall_score, precision_score
from sklearn.utils import class_weight

from tensorflow.keras.layers import Input, Concatenate, Permute, Reshape, Multiply, Lambda, Add
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def specificity_score(y_true, y_pred):

    # Convert probabilities to binary predictions
    y_pred_bin = np.argmax(y_pred, axis=1)
    y_true_bin = np.argmax(y_true, axis=1)

    tn = np.sum((y_true_bin == 0) & (y_pred_bin == 0))
    fp = np.sum((y_true_bin == 0) & (y_pred_bin != 0))
    specificity = tn / (tn + fp) if (tn + fp) != 0 else 0
    return specificity

In [None]:
def train_and_evaluate(dataset_names, create_model_fn, class_weights):

    outer_cv = KFold(n_splits=3, shuffle=True, random_state=42)

    # Find best dataset to train and test model on
    #dataset_names = ['X_smoothed_mean_norm']

    for dataset_name in dataset_names:

         # Load in dataset from pickle
        with open(f'drive/MyDrive/Pickles/{dataset_name}.pickle', 'rb') as handle:
            dataset = pickle.load(handle)

        dataset = np.array(dataset)

        print(f'Working on dataset: {dataset_name}')

        accuracies = []
        sensitivities = []
        false_positive_rates = []
        specificities = []
        precisions = []

        fold = 1

        for train_index, test_index in outer_cv.split(dataset):

            # Print current progress
            print(f'Working on fold: {fold}')
            fold += 1

            X_train, X_test = dataset[train_index], dataset[test_index]
            Y_train, Y_test = abnormal_encoded[train_index], abnormal_encoded[test_index]

            X_train = X_train.astype('float32')
            Y_train = Y_train.astype('float32')
            X_test = X_test.astype('float32')
            Y_test = Y_test.astype('float32')

            model = create_model_fn(X_train.shape[1:])
            early_stopping = EarlyStopping(monitor='val_loss', patience=3)

            if class_weights:

                Y_train_classes = np.argmax(Y_train, axis=1)

                # Compute class weights
                cw = class_weight.compute_class_weight('balanced',
                                                    classes=np.unique(Y_train_classes),
                                                    y=Y_train_classes)

                class_weights_dict = dict(enumerate(cw))

                #Fit the model
                print('Fitting model')
                model.fit(X_train, Y_train, epochs=5, batch_size=4, validation_split=0.2, callbacks=[early_stopping], verbose=1, class_weight=class_weights_dict)

            else:
                #Fit the model
                print('Fitting model')
                model.fit(X_train, Y_train, epochs=5, batch_size=4, validation_split=0.2, callbacks=[early_stopping], verbose=1)

            # Predict the test set
            print('Predicting test set')
            Y_pred = model.predict(X_test)

            Y_pred_classes = np.argmax(Y_pred, axis=1)
            Y_test_classes = np.argmax(Y_test, axis=1)

            # Calulate accuracy, sensitivity, false positive rate, specificity and precision
            accuracies.append(accuracy_score(Y_test_classes, Y_pred_classes))
            sensitivities.append(recall_score(Y_test_classes, Y_pred_classes, average='macro'))
            false_positive_rates.append(1 - specificity_score(Y_test, Y_pred))
            specificities.append(specificity_score(Y_test, Y_pred))
            precisions.append(precision_score(Y_test_classes, Y_pred_classes, average='macro'))



        avg_accuracy = np.mean(accuracies)
        avg_sensitivity = np.mean(sensitivities)
        avg_false_positive_rate = np.mean(false_positive_rates)
        avg_specificity = np.mean(specificities)
        avg_precision = np.mean(precisions)

        std_accuracy = np.std(accuracies)
        std_sensitivity = np.std(sensitivities)
        std_false_positive_rate = np.std(false_positive_rates)
        std_specificity = np.std(specificities)
        std_precision = np.std(precisions)

        dataset_results[dataset_name]['Accuracy'] = (avg_accuracy, std_accuracy)
        dataset_results[dataset_name]['Sensitivity'] = (avg_sensitivity, std_sensitivity)
        dataset_results[dataset_name]['False Positive Rate'] = (avg_false_positive_rate, std_false_positive_rate)
        dataset_results[dataset_name]['Specificity'] = (avg_specificity, std_specificity)
        dataset_results[dataset_name]['Precision'] = (avg_precision, std_precision)

        for dataset_name, results in dataset_results.items():
            print(f'Dataset: {dataset_name}')
            for metric, (avg, std) in results.items():
                print(f'{metric}: {avg} +/- {std}')
            print('\n')


        # Delete dataset to free up memory
        del dataset
        del Y_pred
    return dataset_results

In [None]:
# Model functions

def create_cnn_model(shape, filters=32, kernel_size=3, dropout_rate=0.5):
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=shape))  # Adjust the input_shape to match your dataset
    model.add(Conv1D(filters, kernel_size, activation='relu', input_shape=(19301, 16)))
    model.add(MaxPooling1D(2))
    model.add(Flatten())
    model.add(Dense(32, activation='relu'))  # Reduced the number of neurons in the dense layer
    model.add(Dense(3, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_lstm_model(shape, lstm_units=32, dropout_rate=0.5, output_classes=3):
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=shape))
    model.add(LSTM(lstm_units, return_sequences=False))  # 'return_sequences=False' because we only need the last output
    model.add(Dropout(dropout_rate))
    model.add(Dense(lstm_units, activation='relu'))
    model.add(Dense(output_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_gru_model(shape, gru_units=32, dropout_rate=0.5, output_classes=3):
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=shape))
    model.add(GRU(gru_units, return_sequences=False))  # return_sequences=False because we only need the last output
    model.add(Dropout(dropout_rate))
    model.add(Dense(gru_units, activation='relu'))
    model.add(Dense(output_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_cnn_lstm_model(shape, filters=32, kernel_size=3, lstm_units=64, dropout_rate=0.5, output_classes=3):
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=shape))
    model.add(Conv1D(filters, kernel_size, activation='relu'))
    model.add(MaxPooling1D(2))
    model.add(LSTM(lstm_units, return_sequences=False))
    model.add(Dropout(dropout_rate))
    model.add(Dense(lstm_units, activation='relu'))
    model.add(Dense(output_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_cnn_gru_model(shape, filters=32, kernel_size=3, gru_units=64, dropout_rate=0.5, output_classes=3):
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=shape))
    model.add(Conv1D(filters, kernel_size, activation='relu'))
    model.add(MaxPooling1D(2))
    model.add(GRU(gru_units, return_sequences=False))
    model.add(Dropout(dropout_rate))
    model.add(Dense(gru_units, activation='relu'))
    model.add(Dense(output_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_lstm_attention_model(shape, lstm_units=64, dropout_rate=0.5, output_classes=3):
    inputs = Input(shape=shape)
    lstm_out = LSTM(lstm_units, return_sequences=True)(inputs)
    attention = Dense(1, activation='tanh')(lstm_out)
    attention = Flatten()(attention)
    attention = Activation('softmax')(attention)
    attention = RepeatVector(lstm_units)(attention)
    attention = Permute([2, 1])(attention)
    sent_representation = Multiply()([lstm_out, attention])
    sent_representation = Lambda(lambda xin: K.sum(xin, axis=-2), output_shape=(lstm_units,))(sent_representation)
    dropout = Dropout(dropout_rate)(sent_representation)
    dense = Dense(lstm_units, activation='relu')(dropout)
    outputs = Dense(output_classes, activation='softmax')(dense)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model



In [None]:
 # Load in dataset from pickle
with open('drive/MyDrive/Pickles/abnormal_encoded.pickle', 'rb') as handle:
    abnormal_encoded = pickle.load(handle)
dataset_results = {'X_smoothed_mean_norm_month': {}, 'X_smoothed_median_norm_month': {}, 'X_smoothed_mean_norm': {}, 'X_smoothed_median_norm': {}}

In [None]:


cnn_dataset_result_cw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_cnn_model, True)



In [None]:
cnn_dataset_result_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_cnn_model, False)


In [None]:
lstm_dataset_results_cw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_lstm_model, True)


In [None]:
lstm_dataset_results_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_lstm_model, False)


In [None]:
gru_dataset_results_cw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_gru_model, True)


In [None]:
gru_dataset_results_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'], create_gru_model, False)


In [None]:
cnn_lstm_results_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'],create_cnn_lstm_model,False)

In [None]:
cnn_gru_results_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'],create_cnn_gru_model,False)

In [None]:
lstm_attention_results_ncw = train_and_evaluate(['X_smoothed_mean_norm', 'X_smoothed_mean_norm_month','X_smoothed_median_norm', 'X_smoothed_median_norm_month'],create_lstm_attention_model,False)