In [None]:
# Defining Libraries:

import mne
import numpy as np
from scipy.signal import welch, stft
from scipy.stats import skew, kurtosis, entropy
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_classif, SelectKBest
from collections import Counter
from sklearn.preprocessing import LabelEncoder
import torch
import seaborn as sns
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, roc_curve, roc_auc_score, average_precision_score, precision_recall_curve
from sklearn.metrics import matthews_corrcoef, cohen_kappa_score
from itertools import cycle
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

In [None]:
# Defining File Paths:

file_participant_1 = 'Data/Participant_1.edf'
file_participant_2 = 'Data/Participant_2.edf'
file_participant_3 = 'Data/Participant_3.edf'
file_participant_4 = 'Data/Participant_4.edf'
file_participant_5 = 'Data/Participant_5.edf'
file_participant_6 = 'Data/Participant_6.edf'
file_participant_7 = 'Data/Participant_7.edf'
file_participant_8 = 'Data/Participant_8.edf'
file_participant_9 = 'Data/Participant_9.edf'
file_participant_10 = 'Data/Participant_10.edf'
file_participant_11 = 'Data/Participant_11.edf'
file_participant_12 = 'Data/Participant_12.edf'
file_participant_13 = 'Data/Participant_13.edf'

edf_data_files = [
    file_participant_1,
    file_participant_2,
    file_participant_3,
    file_participant_4,
    file_participant_5,
    file_participant_6,
    file_participant_7,
    file_participant_8,
    file_participant_9,
    file_participant_10,
    file_participant_11,
    file_participant_12,
    file_participant_13
]

In [None]:
# Remove Start, Finish and Breaks:

# Loading the first file to use as a reference for channel names:
reference_raw = mne.io.read_raw_edf(edf_data_files[0], preload = True)
reference_channels = reference_raw.info['ch_names']

raw_objects = []

# Define the segments of interest in seconds:
segments = [
    (30, 90),  # "I"
    (120, 180),  # "Yes"
    (210, 270),  # "No"
    (300, 360),  # "Want"
    (390, 450),  # "Help"
    (480, 540),  # "More"
    (570, 630),  # "That"
    (660, 720),  # "Stop"
    (750, 810),  # "Open"
    (840, 900)   # "Close"
]

for file_path in edf_data_files:
    print(f"Editing file: {file_path}...")
    raw = mne.io.read_raw_edf(file_path, preload = True)
    raw.pick_channels(reference_channels)
    
    # Create an empty list to store the segments:
    data_segments = []
    
    for start, end in segments:
        segment = raw.copy().crop(tmin=start, tmax=end)
        data_segments.append(segment)
    
    # Concatenate the segments:
    raw_concatenated = mne.concatenate_raws(data_segments)
    raw_objects.append(raw_concatenated)

In [None]:
# Displaying the Resulting Files after Segmentation:

raw_objects

In [None]:
# Concatenating all Loaded and Processed Files:

raw = mne.concatenate_raws(raw_objects)

In [None]:
# Function to Preprocess Raw Data:

def preprocess_raw_data(raw):

    print("STARTING PREPROCESSING: ")

    # Handling NaNs: Replace NaNs with the mean of the respective channel:
    raw_data = raw.get_data()

    for i in range(raw_data.shape[0]):
        nan_indices = np.isnan(raw_data[i])

        if np.any(nan_indices):
            mean_value = np.nanmean(raw_data[i])
            raw_data[i, nan_indices] = mean_value

    raw._data = raw_data

    print("\n")

    # Filtering: Bandpass filter between 0.5-30 Hz:
    raw.filter(0.5, 30., fir_design = 'firwin')

    print("\n")
    
    # Artifact Removal: Independent Component Analysis (ICA):
    ica = mne.preprocessing.ICA(n_components = 14, random_state = 97, max_iter = 800)
    ica.fit(raw)
    raw = ica.apply(raw)

    print("\n")
    
    # Spatial Filtering: Common Average Reference (CAR):
    raw.set_eeg_reference('average', projection = True)

    print("\n")
    
    # Channel Interpolation: Interpolate bad channels
    raw.interpolate_bads()

    print("\n")

    # Baseline Correction: Apply baseline correction using the mean of the segment
    raw.apply_function(lambda x: x - np.mean(x), picks = 'eeg')

    print("\n")
    print("PREPROCESSING DONE!")
    
    return raw

In [None]:
# Preprocessing the Raw Data:

raw = preprocess_raw_data(raw)

In [None]:
# Creating Fixed-Length Epochs:

epoch_duration = 60  # seconds
start_times = np.arange(0, raw.times[-1] - epoch_duration, epoch_duration)
end_times = start_times + epoch_duration

In [None]:
# Displaying Epoch Start and End Times:

print("Start Times: ", start_times)
print("\nTotal Number of Start Times: ", len(start_times))
print("\n")
print("End Times: ", end_times)
print("\nTotal Number of End Times: ", len(end_times))

In [None]:
# Defining the Words:

words = ['I', 'Yes', 'No', 'Want', 'Help', 'More', 'That', 'Stop', 'Open', 'Close']
print("Words: ", words)

In [None]:
# Defining the Frequency:

sfreq = raw.info['sfreq']
print("Frequency Across Channels: ", sfreq, "Hz")

In [None]:
# Feature Extraction Function:

def extract_features(epoch_data, sfreq):

    # Calculating Statistical Features:
    mean_vals = np.mean(epoch_data, axis = 1) # Mean value of the signal for each channel.
    std_vals = np.std(epoch_data, axis = 1) # Standard deviation of the signal for each channel.
    skew_vals = skew(epoch_data, axis = 1) # Skewness of the signal for each channel, indicating asymmetry.
    kurt_vals = kurtosis(epoch_data, axis = 1) # Kurtosis of the signal for each channel, indicating peakedness.

    # Power Spectral Density (PSD) Features:
    freqs, psd = welch(epoch_data, sfreq, nperseg = int(sfreq)) # Computes the PSD using Welch’s method.
    # Average power in the theta (4-8 Hz), alpha (8-12 Hz), and beta (12-30 Hz) frequency bands:
    theta_power = psd[:, (freqs > 4) & (freqs <= 8)].mean(axis = 1)
    alpha_power = psd[:, (freqs > 8) & (freqs <= 12)].mean(axis = 1)
    beta_power = psd[:, (freqs > 12) & (freqs <= 30)].mean(axis = 1)

    # Short-Time Fourier Transform (STFT) Features:
    _, _, Zxx = stft(epoch_data, fs = sfreq, nperseg = int(sfreq/2)) # Computes the STFT, which provides time-frequency representation of the signal.
    stft_power = np.abs(Zxx).mean(axis = 2) # Mean power from the STFT representation, averaged over time.
    
    # Entropy Feature:
    entropy_vals = np.array([entropy(np.abs(epoch_data[channel, :])) for channel in range(epoch_data.shape[0])]) # Computes the entropy of the signal for each channel, indicating the complexity or randomness of the signal.

    # Combining Features:
    features = np.stack([
        mean_vals,
        std_vals,
        skew_vals,
        kurt_vals,
        theta_power,
        alpha_power,
        beta_power,
        stft_power.mean(axis = 1),
        entropy_vals
    ], axis = 1) # Combines all extracted features into a single array with each feature as a column.

    return features

In [None]:
# Segmenting the Data into Epochs and Sub-Epochs:
# First -> 60 second epochs.
# Second -> 2 second sub-epochs.
# Third -> Extracting Features.

labeled_features_data = []
sub_epoch_duration = 2  # seconds

for i, (start, end) in enumerate(zip(start_times, end_times)): # Loops through each 30-second epoch:
    start_sample = int(start * sfreq)
    end_sample = int(end * sfreq)
    epoch_data, _ = raw[:, start_sample:end_sample] # Extracts the EEG data for the current epoch.
    word_label = words[i % len(words)] # Assigns a label to the current epoch using a list of predefined words.
    
    for j in range(int(epoch_duration / sub_epoch_duration)): # Iterate over Sub-Epochs:
        sub_start = j * sub_epoch_duration * int(sfreq)
        sub_end = (j + 1) * sub_epoch_duration * int(sfreq)
        sub_epoch_data = epoch_data[:, sub_start:sub_end]
        
        # Calls the extract_features function to extract from sub-epoch:
        features = extract_features(sub_epoch_data, sfreq)
        labeled_features_data.append((features, word_label)) # Stores the extracted features along with the label.

In [None]:
# Extracting Features and Labels:

features = np.array([f[0] for f in labeled_features_data])
labels = np.array([f[1] for f in labeled_features_data])

In [None]:
len(features)

In [None]:
features

In [None]:
len(labels)

In [None]:
labels

In [None]:
# Preparing Data - Flattening & Scaling:

# Flattening the last two dimensions of the features array:
# To transform the 3D feature array into a 2D array where each row represents a single sample and each column represents a feature.
features_2d = features.reshape(features.shape[0], -1)

# Scaling the features:
# To standardise the features by scaling them so that they have a mean of 0 and a standard deviation of 1.
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_2d)

In [None]:
# Feature Set Preperation:

# Principal Component Analysis:
pca = PCA(n_components = 0.95) 
features_pca = pca.fit_transform(features_scaled)

# Mutual Information:
num_sub_epochs_per_epoch = int(epoch_duration / sub_epoch_duration)
total_sub_epochs = num_sub_epochs_per_epoch * len(start_times)

num_features = features_scaled.shape[1]
k_best = min(num_features, 20)  # Ensure k does not exceed the number of available features.
mi_selector = SelectKBest(mutual_info_classif, k = k_best)
features_mi = mi_selector.fit_transform(features_scaled, labels)

selected_features = features_mi # Choosing feature set to use for further model training.

num_features_mi = features_mi.shape[1]  # Number of features after MI

In [None]:
num_features_mi

In [None]:
num_features

In [None]:
k_best

In [None]:
# Creating Train, Test and Validation Sets:

train_features, test_features, train_labels, test_labels = train_test_split(
    selected_features, labels, test_size = 0.3, random_state = 42, stratify = labels)

val_features, test_features, val_labels, test_labels = train_test_split(
    test_features, test_labels, test_size = 0.5, random_state = 42, stratify = test_labels)

print("Total Dataset Size: ", (len(train_features) + len(val_features) + len(test_features)))
print("\n")
print(f"Training Data Size: {len(train_features)}")
print(f"Validation Data Size: {len(val_features)}")
print(f"Testing Data Size: {len(test_features)}")
print("\n")

# Counting occurrences of each label in the training, validation, and testing sets
train_label_counts = Counter(train_labels)
val_label_counts = Counter(val_labels)
test_label_counts = Counter(test_labels)

# Calculating the total number of samples in each set
total_train = len(train_labels)
total_val = len(val_labels)
total_test = len(test_labels)

# Printing the distribution of each label in each set
print("Training set label distribution:")
for label, count in train_label_counts.items():
    print(f"{label}: {count} ({count / total_train * 100:.2f}%)")

print("\nValidation set label distribution:")
for label, count in val_label_counts.items():
    print(f"{label}: {count} ({count / total_val * 100:.2f}%)")

print("\nTesting set label distribution:")
for label, count in test_label_counts.items():
    print(f"{label}: {count} ({count / total_test * 100:.2f}%)")

In [None]:
# Designing Training Dataset:

class EEGDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype = torch.float32)
        self.labels = torch.tensor(labels, dtype = torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]
    
# Encoding string labels to integers:
label_encoder = LabelEncoder()
train_labels_encoded = label_encoder.fit_transform(train_labels)
val_labels_encoded = label_encoder.transform(val_labels)
test_labels_encoded = label_encoder.transform(test_labels)

# Print each label and its equivalent encoded label:
for label, encoded_label in zip(label_encoder.classes_, range(len(label_encoder.classes_))):
    print(f'Label: {label}, Encoded: {encoded_label}')
    
train_dataset = EEGDataset(train_features, train_labels_encoded)
val_dataset = EEGDataset(val_features, val_labels_encoded)
test_dataset = EEGDataset(test_features, test_labels_encoded)

# Defining DataLoader:
batch_size = 256
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


In [None]:
# Define the Transformer Model:

class TransformerModel(nn.Module):
    def __init__(self, feature_dim, num_labels, model_dim = 256, num_heads = 8, encoder_layers = 16, ff_dim = 512, dropout_prob = 0.1, noise_level = 0.01):
        
        super(TransformerModel, self).__init__()
        self.noise_level = noise_level # Initialise noise level for input data augmentation.
        self.input_projection = nn.Linear(feature_dim, model_dim) # Linear layer to project input features to model dimension.
        self.input_dropout = nn.Dropout(dropout_prob) # Dropout for input projection layer.
        
        # Define a single transformer encoder layer with pre-LayerNorm (Dimensionality of the model, Number of attention heads, Dimension of the feedforward network, Dropout probability, Activation function, pre-layer normalisation):
        encoder_layer = TransformerEncoderLayer(d_model = model_dim, nhead = num_heads, dim_feedforward = ff_dim, dropout = dropout_prob, activation = 'gelu', norm_first = True)

        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers = encoder_layers) # Stack multiple transformer encoder layers.
        self.batch_norm = nn.BatchNorm1d(model_dim) # Batch normalisation layer for transformer output.
        self.output_projection = nn.Linear(model_dim, num_labels) # Linear layer to project transformer output to the number of classes.
        self.output_dropout = nn.Dropout(dropout_prob) # Dropout for output projection layer.

    def forward(self, inputs):

        # Apply noise to inputs during training for regularisation:
        if self.training and self.noise_level > 0.0:
            noise = torch.randn_like(inputs) * self.noise_level
            inputs = inputs + noise

        inputs = self.input_projection(inputs) # Project inputs to the model dimension.
        inputs = self.input_dropout(inputs) # Apply dropout to the projected inputs.
        inputs = self.transformer_encoder(inputs) # Project inputs to the model dimension.
        inputs = self.batch_norm(inputs) # Apply batch normalisation to the transformer outputs.
        inputs = self.output_dropout(inputs) # Apply dropout to the transformer outputs.
        inputs = self.output_projection(inputs) # Project the normalised outputs to the number of classes.

        return F.log_softmax(inputs, dim = 1) # Return the log-softmax of the output projections.

In [None]:
# Transformer Architecture:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(feature_dim = train_features.shape[1], num_labels = len(np.unique(train_labels_encoded))).to(device)

print(model)

# Training with No Early Stopping and No Loss over Epoch:

# Hyperparameters for Training:
learning_rate = 1e-5 # Learning rate for the optimiser.
epochs = 600 # Number of epochs to train the model.
l1_lambda = 0.0001 # Lambda for L1 regularisation.
train_accuracies = [] # List to store training accuracies for each epoch.
val_accuracies = [] # List to store validation accuracies for each epoch.

# Loss Function and Optimiser:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay = 1e-8)

# Training Loop:
for epoch in range(epochs):

    # Initialisation:
    model.train()
    train_loss = 0.0
    train_correct = 0
    total_train = 0
    all_train_labels = [] 
    all_train_preds = [] 

    # Iterate over the training data loader:
    for features, labels in train_loader:

        features, labels = features.to(device), labels.to(device) 
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        l1_norm = sum(p.abs().sum() for p in model.parameters())
        loss += l1_lambda * l1_norm  # L1 regularization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        train_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        train_correct += (predictions == labels).sum().item()
        total_train += labels.size(0)
        all_train_labels.extend(labels.cpu().numpy()) #Added this
        all_train_preds.extend(predictions.cpu().numpy()) #Added this

    train_accuracy = train_correct / total_train
    train_accuracies.append(train_accuracy)

    # Validation:
    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0
    all_val_labels = [] 
    all_val_preds = [] 

    with torch.no_grad():
        for features, labels in val_loader:
            features, labels = features.to(device), labels.to(device)
            output = model(features)
            loss = criterion(output, labels)
            val_loss += loss.item()
            predictions = torch.max(output, 1)[1]
            val_correct += (predictions == labels).sum().item()
            total_val += labels.size(0)

            all_val_labels.extend(labels.cpu().numpy()) #Added this
            all_val_preds.extend(predictions.cpu().numpy()) #Added this

    val_accuracy = val_correct / total_val
    val_accuracies.append(val_accuracy)

    # Print the training and validation metrics for the current epoch:
    print(f'Epoch {epoch+1}, Loss: {train_loss / total_train}, Training Accuracy: {train_accuracy}, '
          f'Validation Loss: {val_loss / total_val}, Validation Accuracy: {val_accuracy}')

# Training with No Early Stopping and with Accuracy and Loss over Epochs:

import torch
import torch.nn as nn
import matplotlib.pyplot as plt

# Hyperparameters for Training:
learning_rate = 1e-5 # Learning rate for the optimiser.
epochs = 600 # Number of epochs to train the model.
l1_lambda = 0.0001 # Lambda for L1 regularisation.
train_accuracies = [] # List to store training accuracies for each epoch.
val_accuracies = [] # List to store validation accuracies for each epoch.
train_losses = [] # List to store training losses for each epoch.
val_losses = [] # List to store validation losses for each epoch.

# Loss Function and Optimiser:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay = 1e-8)

# Training Loop:
for epoch in range(epochs):

    # Initialisation:
    model.train()
    train_loss = 0.0
    train_correct = 0
    total_train = 0
    all_train_labels = [] 
    all_train_preds = [] 

    # Iterate over the training data loader:
    for features, labels in train_loader:

        features, labels = features.to(device), labels.to(device) 
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        l1_norm = sum(p.abs().sum() for p in model.parameters())
        loss += l1_lambda * l1_norm  # L1 regularization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        train_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        train_correct += (predictions == labels).sum().item()
        total_train += labels.size(0)
        all_train_labels.extend(labels.cpu().numpy()) #Added this
        all_train_preds.extend(predictions.cpu().numpy()) #Added this

    train_accuracy = train_correct / total_train
    train_accuracies.append(train_accuracy)
    train_losses.append(train_loss / total_train) # Add train loss

    # Validation:
    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0
    all_val_labels = [] 
    all_val_preds = [] 

    with torch.no_grad():
        for features, labels in val_loader:
            features, labels = features.to(device), labels.to(device)
            output = model(features)
            loss = criterion(output, labels)
            val_loss += loss.item()
            predictions = torch.max(output, 1)[1]
            val_correct += (predictions == labels).sum().item()
            total_val += labels.size(0)

            all_val_labels.extend(labels.cpu().numpy()) #Added this
            all_val_preds.extend(predictions.cpu().numpy()) #Added this

    val_accuracy = val_correct / total_val
    val_accuracies.append(val_accuracy)
    val_losses.append(val_loss / total_val) # Add validation loss

    # Print the training and validation metrics for the current epoch:
    print(f'Epoch {epoch+1}, Loss: {train_loss / total_train}, Training Accuracy: {train_accuracy}, '
          f'Validation Loss: {val_loss / total_val}, Validation Accuracy: {val_accuracy}')

In [None]:
# Training with Early Stopping and with Accuracy and Loss over Epochs:

# Hyperparameters for Training:
learning_rate = 1e-5 # Learning rate for the optimiser.
epochs = 600 # Number of epochs to train the model.
l1_lambda = 0.0001 # Lambda for L1 regularisation.
train_accuracies = [] # List to store training accuracies for each epoch.
val_accuracies = [] # List to store validation accuracies for each epoch.
train_losses = [] # List to store training losses for each epoch.
val_losses = [] # List to store validation losses for each epoch.

# Early stopping parameters
patience = 10
best_val_loss = np.inf
patience_counter = 0

# Loss Function and Optimiser:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay = 1e-8)

# Training Loop:
for epoch in range(epochs):

    # Initialisation:
    model.train()
    train_loss = 0.0
    train_correct = 0
    total_train = 0
    all_train_labels = [] 
    all_train_preds = [] 

    # Iterate over the training data loader:
    for features, labels in train_loader:

        features, labels = features.to(device), labels.to(device) 
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        l1_norm = sum(p.abs().sum() for p in model.parameters())
        loss += l1_lambda * l1_norm  # L1 regularization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        train_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        train_correct += (predictions == labels).sum().item()
        total_train += labels.size(0)
        all_train_labels.extend(labels.cpu().numpy()) #Added this
        all_train_preds.extend(predictions.cpu().numpy()) #Added this

    train_accuracy = train_correct / total_train
    train_accuracies.append(train_accuracy)
    train_losses.append(train_loss / total_train) # Add train loss

    # Validation:
    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0
    all_val_labels = [] 
    all_val_preds = [] 

    with torch.no_grad():
        for features, labels in val_loader:
            features, labels = features.to(device), labels.to(device)
            output = model(features)
            loss = criterion(output, labels)
            val_loss += loss.item()
            predictions = torch.max(output, 1)[1]
            val_correct += (predictions == labels).sum().item()
            total_val += labels.size(0)

            all_val_labels.extend(labels.cpu().numpy()) #Added this
            all_val_preds.extend(predictions.cpu().numpy()) #Added this

    val_accuracy = val_correct / total_val
    val_accuracies.append(val_accuracy)
    val_losses.append(val_loss / total_val) # Add validation loss

    # Print the training and validation metrics for the current epoch:
    print(f'Epoch {epoch+1}, Loss: {train_loss / total_train}, Training Accuracy: {train_accuracy}, '
          f'Validation Loss: {val_loss / total_val}, Validation Accuracy: {val_accuracy}')

    # Early Stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        # Save the best model
        torch.save(model.state_dict(), 'Phase_1/Final_Models/CNN_Model.pth')
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("Early stopping triggered")
        break

In [None]:
# Save the Trained Model:

model_path = 'Phase_1/Final_Models/Transformer_Model.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

# Training with Early Stopping: 

# Hyperparameters for Training:
learning_rate = 1e-5  # Learning rate for the optimiser.
epochs = 1000  # Number of epochs to train the model.
l1_lambda = 0.0001  # Lambda for L1 regularisation.
patience = 10  # Patience for early stopping.
best_val_loss = float('inf')  # Best validation loss initialized to infinity.
early_stop_counter = 0  # Counter for early stopping.

train_accuracies = []  # List to store training accuracies for each epoch.
val_accuracies = []  # List to store validation accuracies for each epoch.

# Loss Function and Optimiser:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-8)

# Training Loop:
for epoch in range(epochs):

    # Initialisation:
    model.train()
    train_loss = 0.0
    train_correct = 0
    total_train = 0
    all_train_labels = []
    all_train_preds = []

    # Iterate over the training data loader:
    for features, labels in train_loader:
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        l1_norm = sum(p.abs().sum() for p in model.parameters())
        loss += l1_lambda * l1_norm  # L1 regularization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        train_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        train_correct += (predictions == labels).sum().item()
        total_train += labels.size(0)
        all_train_labels.extend(labels.cpu().numpy())
        all_train_preds.extend(predictions.cpu().numpy())

    train_accuracy = train_correct / total_train
    train_accuracies.append(train_accuracy)

    # Validation:
    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0
    all_val_labels = []
    all_val_preds = []

    with torch.no_grad():
        for features, labels in val_loader:
            features, labels = features.to(device), labels.to(device)
            output = model(features)
            loss = criterion(output, labels)
            val_loss += loss.item()
            predictions = torch.max(output, 1)[1]
            val_correct += (predictions == labels).sum().item()
            total_val += labels.size(0)
            all_val_labels.extend(labels.cpu().numpy())
            all_val_preds.extend(predictions.cpu().numpy())

    val_accuracy = val_correct / total_val
    val_accuracies.append(val_accuracy)

    # Print the training and validation metrics for the current epoch:
    print(f'Epoch {epoch+1}, Loss: {train_loss / total_train}, Training Accuracy: {train_accuracy}, '
          f'Validation Loss: {val_loss / total_val}, Validation Accuracy: {val_accuracy}')

    # Early stopping:
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        # Save the model if the validation loss decreases
        torch.save(model.state_dict(), 'best_model.pt')
    else:
        early_stop_counter += 1
        if early_stop_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            model.load_state_dict(torch.load('best_model.pt'))  # Load the best model
            break

In [None]:
# Training and Validation Loss:

epochs_range = range(1, epochs + 1)

plt.figure(figsize = (8, 6))
plt.plot(epochs_range, train_losses, label = 'Train Loss', color = 'blue')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Train Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize = (8, 6))
plt.plot(epochs_range, val_losses, label = 'Validation Loss', color = 'orange')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Validation Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

# Plotting Validation Accuracy:
plt.figure(figsize = (8, 6))
plt.plot(epochs_range, val_accuracies, label = 'Validation Accuracy', color = 'green')
plt.xlabel('Epochs') 
plt.ylabel('Accuracy')
plt.title('Validation Accuracy over Epochs')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Evaluation Function:

def calculate_metrics(labels, preds, dataset_name):

    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average = 'weighted')
    recall = recall_score(labels, preds, average = 'weighted')
    f1 = f1_score(labels, preds, average = 'weighted')
    class_report = classification_report(labels, preds)
    conf_matrix = confusion_matrix(labels, preds)
    std_dev = np.std(preds)
    kappa = cohen_kappa_score(labels, preds)
    mcc = matthews_corrcoef(labels, preds)

    # Binarize the labels for ROC curve calculation
    n_classes = 10
    labels_binarized = label_binarize(labels, classes = range(n_classes))
    preds_binarized = label_binarize(preds, classes = range(n_classes))

    # Compute ROC curve and ROC area for each class:
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(labels_binarized[:, i], preds_binarized[:, i])
        roc_auc[i] = roc_auc_score(labels_binarized[:, i], preds_binarized[:, i])

     # Compute Precision-Recall curve and PR area for each class:
    precision_curve = dict()
    recall_curve = dict()
    average_precision = dict()
    for i in range(n_classes):
        precision_curve[i], recall_curve[i], _ = precision_recall_curve(labels_binarized[:, i], preds_binarized[:, i])
        average_precision[i] = average_precision_score(labels_binarized[:, i], preds_binarized[:, i])

    print(f'---------- {dataset_name} Evaluation Metrics: ----------\n')
    
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-score: {f1:.4f}')
    print("\n")

    print(f'Cohen\'s Kappa: {kappa:.4f}')
    print(f'Matthews Correlation Coefficient: {mcc:.4f}')
    print("\n")

    print('Classification Report:')
    print(class_report)
    print("\n")

    print('Confusion Matrix:')
    print("\n")
    plt.figure(figsize = (10, 7))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap = 'Blues', xticklabels = words, yticklabels = words)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title(f'{dataset_name} Confusion Matrix')
    plt.show()

    print(f'Standard Deviation: {std_dev:.4f}\n')

    print("ROC Curve and AUC Scores: ")
    # Plot ROC curve
    plt.figure(figsize = (10, 7))
    colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
    for i, color in zip(range(n_classes), colors):
        plt.plot(fpr[i], tpr[i], color = color, lw = 2,
                 label = f'ROC curve of class {words[i]} (area = {roc_auc[i]:0.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw = 2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{dataset_name} ROC Curve')
    plt.legend(loc = "lower right")
    plt.show()

    print("Precision-Recall Curve: ")
    # Plot Precision-Recall curve
    plt.figure(figsize=(10, 7))
    for i, color in zip(range(n_classes), colors):
        plt.plot(recall_curve[i], precision_curve[i], color = color, lw = 2,
                 label=f'PR curve of class {words[i]} (area = {average_precision[i]:0.2f})')

    plt.plot([0, 1], [1, 0], 'k--', lw = 2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'{dataset_name} Precision-Recall Curve')
    plt.legend(loc = "lower left")
    plt.show()

In [None]:
# Calculate metrics for the Training and Validation Sets:

calculate_metrics(all_train_labels, all_train_preds, dataset_name = f'Training')
calculate_metrics(all_val_labels, all_val_preds, dataset_name = f'Validation')

In [None]:
# Calculate metrics for the Testing Set:

model.eval()
all_test_labels = []
all_test_preds = []

with torch.no_grad():
    for features, labels in test_loader:
        features, labels = features.to(device), labels.to(device)
        output = model(features)
        predictions = torch.max(output, 1)[1]
        all_test_labels.extend(labels.cpu().numpy())
        all_test_preds.extend(predictions.cpu().numpy())

# Calculate and display metrics for the test set
calculate_metrics(all_test_labels, all_test_preds, dataset_name = 'Test Set')

In [None]:
# Real-time Predictions:

def predict(model, input_data, device):
    model.eval()
    input_data = torch.tensor(input_data, dtype = torch.float32).to(device)
    with torch.no_grad():
        output = model(input_data)
        _, predicted = torch.max(output.data, 1)
    return predicted.cpu().numpy()

# Load the trained model weights
model.load_state_dict(torch.load('Phase_1/Final_Models/Transformer_Model.pth'))

# Example usage
sample_inputs = test_features[:10]  # Take five samples from the test set
actual_labels = test_labels[:10]

predicted_labels = predict(model, sample_inputs, device)

# Print actual vs predicted labels
for actual, predicted in zip(actual_labels, predicted_labels):
    print(f'Actual: {label_encoder.inverse_transform([actual])[0]}, Predicted: {label_encoder.inverse_transform([predicted])[0]}')