NOTE: Used for running the RBDGuard framework on preprocessed data (see preprocessing.ipynb). Runs both the unsupervised and supervised training.

** I ran this on a Google Colab, so the file paths are slightly different since I uploaded the pickle files to a virtual machine **

Retrieve imports

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from pyedflib import highlevel
import pyedflib as plib
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from tensorflow.keras.models import load_model
import neurokit2 as nk
import os

Read back the unlabeled data stored in pickle files on desktop and store as DataFrames

In [None]:
# to read back
unlabeled_cfs = pd.read_pickle("/content/CFS_cleaned_ECG_data_30s_all.pkl.gz")
unlabeled_shhs1 = pd.read_pickle("/content/shhs1_cleaned_ECG_data_30s_all.pkl.gz")
unlabeled_shhs2 = pd.read_pickle("/content/shhs2_cleaned_ECG_data_30s_all.pkl.gz")

Concatenates the CFS, SHHS1, and SHHS2 DataFrame into one large unlabeled data DataFrame. Creates the training values.

In [None]:
unlabeled_df = pd.concat([unlabeled_cfs, unlabeled_shhs1, unlabeled_shhs2])

unlabeled_x_train = unlabeled_df.values

In [None]:
# reshape X so that it can pass through LSTM layer
unlabeled_x_train = unlabeled_x_train.reshape(unlabeled_x_train.shape[0], unlabeled_x_train.shape[1], 1)

Declares the model architecture for the unsupervised portion (bidirectional LSTM model).

In [None]:
class LSTMEncoderDecoder(keras.models.Model):
    def __init__(self):
        super(LSTMEncoderDecoder, self).__init__()
        self.encoder = tf.keras.Sequential([
            layers.Input((240, 1)),  # Define input shape here
            layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.2), name='Bi-LSTM1'), ## Change for different input sizes
            layers.Bidirectional(layers.LSTM(32, dropout=0.2), name='Bi-LSTM2')])

        self.decoder = tf.keras.Sequential([
            #layers.Input(shape=(32 * 2,)),  # Adjust based on encoder output
            layers.RepeatVector(240), # Change for different inputs
            layers.Bidirectional(layers.LSTM(32, return_sequences=True, dropout=0.2), name='Bi-LSTM3'),
            layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.2), name='Bi-LSTM4'),
            layers.TimeDistributed(layers.Dense(1))])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

Declares function to visualize the results.

In [None]:
def plot_history(history, loss=True, validation=False):
    if loss:
        plt.plot(history.history["loss"], label="Training Loss")
        if validation:
            plt.plot(history.history["val_loss"], label="Validation Loss")
        plt.legend()
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.show()
    else:
        plt.plot(history.history["accuracy"], label="Training Accuracy")
        if validation:
            plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
        plt.legend()
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.show()

Runs the unsupervised model portion. Uses an optimizer to improve efficiency.

In [None]:
autoencoder = LSTMEncoderDecoder()
autoencoder.compile(optimizer='adam', loss='mse')

In [None]:
tf.config.optimizer.set_jit(True)

In [None]:
history = autoencoder.fit(unlabeled_x_train, unlabeled_x_train, epochs=50, batch_size=256, verbose=1)
plot_history(history, loss=True, validation=False)

(OPTIONAL) Save the autoencoder model for later use

In [None]:
autoencoder.save('unsupervised_lstm_50epochs_cfs_shhs1_shhs2_202501041412.keras')

(OPTIONAL) Prints model summary of encoder and decoder -- table with layer type, output shape, and parameter count

In [None]:
autoencoder.encoder.summary()
autoencoder.decoder.summary()

Run to declare unsupervised + supervised model (RBDGuard).

In [None]:
## RUN EVERY TIME
x = autoencoder.encoder.layers[-1].output
x = layers.Dense(128, activation='relu')(x)  # Example hidden layer
x = layers.Dropout(0.25)(x)  # Dropout with 25% rate
x = layers.Dense(64, activation='relu')(x)  # Another hidden layer
x = layers.Dropout(0.25)(x)  # Dropout with 25% rate
x = layers.Dense(1, activation='sigmoid')(x)  # Output layer for binary classification (sigmoid)
predictive_model = keras.Model(autoencoder.encoder.layers[0].input, x)

Run to declare only supervised model. Note: Only run this cell if testing only-supervised.

In [None]:
# Define the model input layer
# input_layer = keras.Input(shape=((240,1)))  # Replace input_shape with your input size

# Add layers as per your model's architecture
# x = layers.Flatten()(input_layer)
# x = layers.Dense(128, activation='relu')(x)  # Example hidden layer
# x = Dropout(0.5)(x)  # Dropout with 50% rate
# x = layers.Dense(64, activation='relu')(x)  # Another hidden layer
# x = Dropout(0.3)(x)  # Dropout with 30% rate
# x = layers.Dense(1, activation='sigmoid')(x)  # Output layer for binary classification (sigmoid)

# # Create the model
# predictive_model = keras.Model(inputs=input_layer, outputs=x)

(OPTIONAL) Prints model summary of supervised model portion -- table with layer type, output shape, and parameter count

In [None]:
predictive_model.summary()

Divides the labeled data for training, validation, and testing (85/15/15 split). Randomly shuffles the data.

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd

# Read labeled data
labeled_df = pd.read_pickle("/content/capslpdb_cleaned_ECG_data_30s_all.pk1.gz")

# First split: 15% for the test set and 85% for the train/validation split
labeled_train_val_df, labeled_test_df = train_test_split(labeled_df, test_size=0.15)

# Second split: 15% of the original data (from the 85%) for validation
validation_ratio = 0.15 / 0.85  # Adjust to get 15% of the original dataset for validation
labeled_train_df, labeled_val_df = train_test_split(labeled_train_val_df, test_size=validation_ratio)

# Print dataset sizes
print("Training set size:", len(labeled_train_df))
print("Validation set size:", len(labeled_val_df))
print("Test set size:", len(labeled_test_df))
print(labeled_train_df.head(10))
print(labeled_val_df.head(10))
print(labeled_test_df.head(10))

# Extract features (X) and labels (y)
labeled_x_train = labeled_train_df.drop(columns=0).values
labeled_y_train = labeled_train_df[0].values

labeled_x_val = labeled_val_df.drop(columns=0).values
labeled_y_val = labeled_val_df[0].values

labeled_x_test = labeled_test_df.drop(columns=0).values
labeled_y_test = labeled_test_df[0].values

# Encode labels
encode_label = LabelEncoder()
encode_label.fit(labeled_y_train)

y_train = encode_label.transform(labeled_y_train)
y_val = encode_label.transform(labeled_y_val)
y_test = encode_label.transform(labeled_y_test)

# Print to confirm encoding
print("Classes:", encode_label.classes_)

In [None]:
# reshape X so that it can pass through LSTM layer
labeled_x_train = labeled_x_train.reshape(labeled_x_train.shape[0], unlabeled_x_train.shape[1], 1)

Run supervised model portion. Accounts for class imbalance and implements early stopping.

In [None]:
class_0_weight = len(y_train) / np.sum(y_train == 0)
class_1_weight = len(y_train) / np.sum(y_train == 1)

class_weights = {0: class_0_weight, 1: class_1_weight}
print(class_weights)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

predictive_model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=["accuracy"])
model_history = predictive_model.fit(
    labeled_x_train,
    y_train,
    epochs=100,
    batch_size=32,
    verbose=1,
    validation_data=(labeled_x_val, y_val),  # Include validation data here
    callbacks=[early_stopping]
)
plot_history(model_history, loss=True, validation=True)
plot_history(model_history, loss=False, validation=True)

(OPTIONAL) Save the RBDGuard model for later use

In [None]:
predictive_model.save('supervised_lstm_100epochs_70_15_15split_cfs_shhs1_shhs2_all_balanced.keras')

Tests RBDGuard using the testing data (prints accuracy, F1 score, precision, recall, and confusion matrices)

In [None]:
pred_test = predictive_model.predict(labeled_x_test)
pred_test = np.where(pred_test > 0.5, 1,0).reshape(-1,)
print (pred_test)
print(f'Test Accuracy: {accuracy_score(y_test, pred_test)*100}%')
print(f'Test F1 Score: {f1_score(y_test, pred_test, average="micro")*100:.1f}%')

In [None]:
cm = confusion_matrix(y_test, pred_test)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["RBD", "Normal"])
disp.plot(cmap=plt.cm.Blues)
plt.show()

In [None]:
print(classification_report(y_test, pred_test, target_names=["RBD", "Normal"]))