In [None]:
# Core libraries
import numpy as np
import pandas as pd

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning and Data Processing
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.manifold import TSNE

# Deep Learning
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Data Loading
import wfdb

# IPython utilities for better display
from IPython.display import display


In [None]:
# Functions to load and process the data

def process_patient_data(patient_id, data_dir):
    file_name = f'/{patient_id}'

    # Load patient data
    record = wfdb.rdheader(data_dir + file_name)
    signals, fields = wfdb.rdsamp(data_dir + file_name)
    sleep_stage_annotations = wfdb.rdann(data_dir + file_name, 'st')

    # Initial definitions
    signal_names = fields['sig_name']
    signals_to_keep = ['ECG', 'BP', 'EEG']
    sampling_rate = 250
    window_length_samples = 30 * sampling_rate

    # Calculate the start index and the starting time
    start_index = sleep_stage_annotations.sample[0]
    time_start = start_index / sampling_rate
    adjusted_start_index = int(time_start * sampling_rate)

    # Filter and build the dictionary of selected signals
    filtered_signals_dict = {
        name: signals[:, i] if sleep_stage_annotations.sample[0] == 1 else signals[adjusted_start_index:, i]
        for i, name in enumerate(signal_names) if any(sig in name for sig in signals_to_keep)
    }

    # Split the signals into windows of 7500 samples
    windowed_signals_dict = {
        name: signal[:(len(signal) // window_length_samples) * window_length_samples].reshape(-1, window_length_samples)
        for name, signal in filtered_signals_dict.items()
    }

    # Extract labels for apneas
    apnea_values = ['H', 'HA', 'OA', 'X', 'CA', 'CAA']
    apnea_labels = [1 if any(marker in note for marker in apnea_values) else 0 for note in sleep_stage_annotations.aux_note]

    # Build the final dictionary with the signals and associated labels
    windows_with_labels = {
        f"{patient_id}_Window_{index}": {**{name: signals[index] for name, signals in windowed_signals_dict.items()}, 'Label': apnea_labels[index]}
        for index in range(min(len(next(iter(windowed_signals_dict.values()))), len(apnea_labels)))
    }

    return windows_with_labels

 ##################################################################################################################

def prepare_data(windows_with_labels):
    X_list = []
    for window in windows_with_labels.values():
        # Retrieve all values except the last one (which is the label)
        signals = list(window.values())[:-1]
        # Concatenate the signal arrays along a new axis to create a single numpy array for each window
        concatenated_signals = np.concatenate([signal[np.newaxis, :] for signal in signals], axis=0)
        X_list.append(concatenated_signals)

    # Convert the list of numpy arrays into a single numpy array
    X = np.array(X_list)

    # 'X' now has shape (number_of_windows, number_of_signals, samples_per_window)
    # Transpose 'X' to get the correct shape for LSTM model input: (number_of_windows, samples_per_window, number_of_signals)
    X = np.transpose(X, (0, 2, 1))

    # Extract labels from the dictionary 'windows_with_labels'
    y = np.array([window['Label'] for window in windows_with_labels.values()])

    return X, y




In [None]:
# Data directory and list of patients
data_dir = r'C:\Users\feder\Desktop\Advanced Signal Modelling\Project\Data\mit-bih-polysomnographic-database-1.0.0\mit-bih-polysomnographic-database-1.0.0'
patient_ids = ['slp04', 'slp66', 'slp16']


# Global dictionary to hold data from all patients
all_windows = {}

# Process data for each patient and accumulate the results
for patient_id in patient_ids:
    patient_data = process_patient_data(patient_id, data_dir)
    all_windows.update(patient_data)  # Add data from each patient to the global dictionary

# Print a summary of the results
print(f'Total windows processed from all patients: {len(all_windows)}')


In [None]:
# Prepare the data for the LSTM model
X, y = prepare_data(all_windows)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [None]:
# Training model

# Define the model with a compact structure including Dropout
model = Sequential([
    LSTM(64, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True),
    Dropout(0.2),  # Adds Dropout after the first LSTM
    LSTM(32, return_sequences=False),
    Dropout(0.2),  # Adds Dropout after the second LSTM
    Dense(32, activation='relu'),
    Dense(1, activation='sigmoid')
])

# Uncomment the following line if learning rate reduction is needed
# reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=20, verbose=1)

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()

# Configuration for EarlyStopping callback
early_stopping = EarlyStopping(
    monitor='val_loss',  # Monitor validation loss
    patience=20,  # Number of epochs with no improvement after which training will be stopped
    verbose=1,  # Show messages
    restore_best_weights=True  # Restore model weights from the best epoch
)

# Training the model with EarlyStopping
history = model.fit(
    X_train, y_train,
    epochs=2,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping]  # Add the callback to the list of callbacks
)

# Model evaluation
val_loss, val_acc = model.evaluate(X_test, y_test, verbose=2)
print(f"Model evaluation - Loss: {val_loss}, Accuracy: {val_acc}")


In [None]:
# Extracting accuracy and loss data from the history object
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(acc) + 1)

# Creating the plot for accuracy
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, acc, 'bo-', label='Training Acc')
plt.plot(epochs, val_acc, 'r^-', label='Validation Acc')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

# Creating the plot for loss
plt.subplot(1, 2, 2)
plt.plot(epochs, loss, 'bo-', label='Training Loss')
plt.plot(epochs, val_loss, 'r^-', label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Predicting the outputs using the model on the test data
y_pred = model.predict(X_test)
# Rounding the predictions to the nearest integer
y_pred = np.round(y_pred).astype(int)
# Printing the classification report comparing actual and predicted values
print(classification_report(y_test, y_pred, target_names=['No Apnea', 'Apnea']))


In [None]:
# Creating a confusion matrix from the test labels and predicted labels
cm = confusion_matrix(y_test, y_pred)

# Setting up the plot for the confusion matrix
plt.figure(figsize=(8, 6))
# Visualizing the confusion matrix as a heatmap
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', cbar=False)
# Labeling the x-axis as 'Predicted'
plt.xlabel('Predicted')
# Labeling the y-axis as 'True'
plt.ylabel('True')
# Setting the title of the plot as 'Confusion matrix'
plt.title('Confusion matrix')
# Displaying the plot
plt.show()


In [None]:
# Define a new model that takes inputs from an existing model and returns output from the second LSTM layer
latent_model = Model(inputs=model.input, outputs=model.layers[3].output)

In [None]:
# Extract the latent representations from the test set
latent_representations_test = latent_model.predict(X_test)

# Apply t-SNE to the latent representations of test data
tsne = TSNE(n_components=2, random_state=42)
X_embedded = tsne.fit_transform(latent_representations_test)

# Visualize the results with matplotlib
plt.figure(figsize=(10, 6))
plt.scatter(X_embedded[:, 0], X_embedded[:, 1], c=y_test, cmap='viridis')
plt.colorbar()
plt.title('t-SNE Visualization of Test Data Latent Representations')
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.show()


In [None]:
# Extract the latent representations from the training set
latent_representations_train = latent_model.predict(X_train)

# Apply t-SNE to the latent representations of the training set
tsne_train = TSNE(n_components=2, random_state=42)
X_embedded_train = tsne_train.fit_transform(latent_representations_train)

# Visualize the results with matplotlib
plt.figure(figsize=(10, 6))
plt.scatter(X_embedded_train[:, 0], X_embedded_train[:, 1], c=y_train, cmap='viridis')
plt.colorbar()
plt.title('t-SNE Visualization of Training Set Latent Representations')
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.show()
