In [None]:
import os
import h5py
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from scipy.signal import resample
import torch
from torch.utils.data import DataLoader, TensorDataset
from ray import cloudpickle as cloudpickle
import os
import pickle

### Settings

In [None]:
label_mapping = {'rest': 0, 'motor': 1, 'story': 2, 'memory': 3}
downsample_to = int(35595 * 0.75)
batch_size = 4
epochs = 5

### Importing Data

In [None]:
def get_dataset_name(file_name_with_dir) :
    filename_without_dir = file_name_with_dir.split('/')[-1]
    temp = filename_without_dir.split('_')[:-1]
    dataset_name = "_".join(temp)
    return dataset_name

def get_dataset_from_dir(directory_name, steps_after_downsampling=1000, scaling='standard'): # NB: steps_after_downsampling can be seen as a hyperparameter
    data_list = []
    labels_list = []
    for filename in os.listdir(directory_name):
        filename_path = directory_name + '/' + filename
        with h5py.File(filename_path, 'r') as f:
            dataset_name = get_dataset_name(filename_path)
            data = f.get(dataset_name)
            data = np.array(data)
            if 'rest' in filename:
                labels_list.append('rest')
            elif 'motor' in filename:
                labels_list.append('motor')
            elif 'story' in filename:
                labels_list.append('story')
            elif 'memory' in filename:
                labels_list.append('memory')
            else:
                raise ValueError(f'Inappropriate filename: {directory_name}/{filename}')

        # scaling
        if scaling == 'standard':
            scaler = StandardScaler()
            data_scaled = scaler.fit_transform(data) # NB each h5 file is scaled seperately
        elif scaling =='minmax':
            scaler = MinMaxScaler()
            data_scaled = scaler.fit_transform(data)
        else:
            data_scaled = data
        # downsampling
        if steps_after_downsampling is not None:
            data_downsampled = resample(data_scaled, num=steps_after_downsampling, axis=1)
        else:
            data_downsampled = data_scaled
        data_transposed = np.transpose(data_downsampled) # rows: timesteps, columns: sensors
        data_list.append(data_transposed)
    return data_list, labels_list

# if you want to use the cross datasets, substitute the directories with "Data/Cross/train/" and "Data/Cross/test{1, 2, 3}/"
# does assume that you have a Data directory in the same directory as this notebook
cross_train_data_list, cross_train_labels_list = get_dataset_from_dir('Data/Cross/train/', steps_after_downsampling=downsample_to, scaling='standard')
cross_test1_data_list, cross_test1_labels_list = get_dataset_from_dir('Data/Cross/test1/', steps_after_downsampling=downsample_to, scaling='standard')
cross_test2_data_list, cross_test2_labels_list = get_dataset_from_dir('Data/Cross/test2/', steps_after_downsampling=downsample_to, scaling='standard')
cross_test3_data_list, cross_test3_labels_list = get_dataset_from_dir('Data/Cross/test3/', steps_after_downsampling=downsample_to, scaling='standard')

# sanity checks
print("Are train and test the same shape?", cross_train_data_list[0].shape == cross_test1_data_list[0].shape)
print("Train shape:", cross_train_data_list[0].shape)

#print(cross_train_data_list[0]) 

### Data preparation

In [None]:
# Stack the training and testing data arrays
training_data_array = np.stack(cross_train_data_list)  # Stack training data arrays
testing1_data_array = np.stack(cross_test1_data_list)   # Stack testing data arrays 
testing2_data_array = np.stack(cross_test2_data_list)   # Stack testing data arrays 
testing3_data_array = np.stack(cross_test3_data_list)   # Stack testing data arrays 


# Map labels to numerical values using label_mapping dictionary
training_labels_array = np.array([label_mapping[label] for label in cross_train_labels_list])  # Convert training labels to numerical values
testing1_labels_array = np.array([label_mapping[label] for label in cross_test1_labels_list])    # Convert testing labels to numerical values
testing2_labels_array = np.array([label_mapping[label] for label in cross_test2_labels_list])    # Convert testing labels to numerical values
testing3_labels_array = np.array([label_mapping[label] for label in cross_test3_labels_list])    # Convert testing labels to numerical values




# Convert data arrays to PyTorch tensors
X_train_tensor = torch.FloatTensor(training_data_array)  # Convert training data to PyTorch tensor
y_train_tensor = torch.LongTensor(training_labels_array)  # Convert training labels to PyTorch tensor
X_test1_tensor = torch.FloatTensor(testing1_data_array)    # Convert testing data to PyTorch tensor
y_test1_tensor = torch.LongTensor(testing1_labels_array)    # Convert testing labels to PyTorch tensor
X_test2_tensor = torch.FloatTensor(testing2_data_array)    # Convert testing data to PyTorch tensor
y_test2_tensor = torch.LongTensor(testing2_labels_array)    # Convert testing labels to PyTorch tensor
X_test3_tensor = torch.FloatTensor(testing3_data_array)    # Convert testing data to PyTorch tensor
y_test3_tensor = torch.LongTensor(testing3_labels_array)    # Convert testing labels to PyTorch tensor

# Create PyTorch datasets using tensors
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)  # Create dataset for training
test1_dataset = TensorDataset(X_test1_tensor, y_test1_tensor)     # Create dataset for testing
test2_dataset = TensorDataset(X_test2_tensor, y_test2_tensor)     # Create dataset for testing
test3_dataset = TensorDataset(X_test3_tensor, y_test3_tensor)     # Create dataset for testing


test_abs = int(len(train_dataset) * 0.8)
train_subset, val_subset = torch.utils.data.random_split(
    train_dataset, [test_abs, len(train_dataset) - test_abs]
)

train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)   # DataLoader for validation set with shuffling
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=True)   # DataLoader for validation set with shuffling
test1_loader = DataLoader(test1_dataset, batch_size=batch_size, shuffle=False)   # DataLoader for testing set without shuffling
test2_loader = DataLoader(test2_dataset, batch_size=batch_size, shuffle=False)   # DataLoader for testing set without shuffling
test3_loader = DataLoader(test3_dataset, batch_size=batch_size, shuffle=False)   # DataLoader for testing set without shuffling


for X, y in test1_loader:
    print(X.shape)
    print(y.shape)
    
# Printing shape and data type of a batch from test_loader (for debugging or inspection purposes)
for batch_idx, (data,  targets) in enumerate(train_loader):
    print(f"Batch Index: {batch_idx}")
    print(f"Data shape: {data.shape}")  # Shape of the input data tensor in the batch
    print(f"Targets shape: {targets.shape}")  # Shape of the target labels tensor in the batch
    break  # Break after printing the shape of the first batch for demonstration purposes


### Model definition

In [None]:
class MEG_LSTM(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = torch.nn.LSTM(248, self.hidden_size, self.num_layers, batch_first=True) # input size is 248, not configurable
        self.dropout = torch.nn.Dropout(p=dropout_prob)
        self.fc = torch.nn.Linear(hidden_size, 4) # output size is 4, not configurable

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        out, _ = self.lstm(x, (h0, c0))  # Passing input through LSTM
        out = self.dropout(out) # Passing through dropout
        # Get output from the last time step
        out = self.fc(out[:, -1, :])
        return out
    
class MEG_LSTM_Bidirectional(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM_Bidirectional, self).__init__()
        self.input_size = 248 # input size is data from 248 sensors, not configurable
        self.output_size = 4 # output size is 4 classes, not configurable
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = torch.nn.LSTM(248, self.hidden_size, self.num_layers, dropout=dropout_prob, bidirectional=True, batch_first=True) # input size is 248, not configurable
        self.dropout = torch.nn.Dropout(p=dropout_prob)
        self.fc = torch.nn.Linear(hidden_size * 2, 4) # output size is 4, not configurable

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
        
        # lstm layer
        out, _ = self.lstm(x, (h0, c0))
        # dropout
        out = self.dropout(out)
        # output layer
        out = self.fc(out[:, -1, :])
        return out

class MEG_LSTM_Dense(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM_Dense, self).__init__()
        self.input_size = 248
        self.output_size = 4
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layer
        self.lstm = torch.nn.LSTM(248, self.hidden_size, self.num_layers, dropout=dropout_prob, bidirectional=True, batch_first=True)
        # Additional dense layer
        self.dense = torch.nn.Linear(hidden_size * 2, hidden_size * 2)
        # ReLU activation
        self.relu = torch.nn.ReLU()
        # Output layer
        self.fc = torch.nn.Linear(hidden_size * 2, self.output_size)

    def forward(self, x):
        # LSTM layer
        out, _ = self.lstm(x)
        # Dense layer
        out = self.dense(out)
        # Apply ReLU activation
        out = self.relu(out)
        # Output layer forward pass
        out = self.fc(out[:, -1, :])

        return out

class MEG_LSTM_Expanded(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM_Expanded, self).__init__()
        self.input_size = 248
        self.output_size = 4
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layer
        self.lstm = torch.nn.LSTM(248, self.hidden_size, self.num_layers, dropout=dropout_prob, bidirectional=True, batch_first=True)
        # Batch Normalization
        self.batch_norm = torch.nn.BatchNorm1d(self.hidden_size * 2)
        # Dense layer
        self.dense = torch.nn.Linear(hidden_size * 2, hidden_size * 2)
        # ReLU activation
        self.relu = torch.nn.ReLU()
        # Output layer
        self.fc = torch.nn.Linear(hidden_size * 2, self.output_size)


    def forward(self, x):
        # LSTM forward pass
        out, _ = self.lstm(x)
        # batch normalization
        out = self.batch_norm(out.permute(0, 2, 1)).permute(0, 2, 1)
        # dense layer
        out = self.dense(out)
        # Apply ReLU activation
        out = self.relu(out)
        # Output layer
        out = self.fc(out[:, -1, :])

        return out
    
class MEG_LSTM_Complex(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM_Complex, self).__init__()
        self.input_size = 248
        self.output_size = 4
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layer
        self.lstm1 = torch.nn.LSTM(248, self.hidden_size, self.num_layers, dropout=dropout_prob, bidirectional=True, batch_first=True)
        # tanh activation
        self.tanh1 = torch.nn.Tanh()
        # Batch normalization
        self.batch_norm1 = torch.nn.BatchNorm1d(self.hidden_size * 2)
        # lstm layer 2
        self.lstm2 = torch.nn.LSTM(self.hidden_size * 2, self.hidden_size, self.num_layers, dropout=dropout_prob, bidirectional=True, batch_first=True)
        # tanh activation 2
        self.tanh2 = torch.nn.Tanh()
        # batch normalization 2
        self.batch_norm2 = torch.nn.BatchNorm1d(self.hidden_size * 2)
        # dense layer
        self.dense = torch.nn.Linear(hidden_size * 2, hidden_size * 2)
        # ELU activation
        self.elu = torch.nn.ELU()
        # Output layer
        self.fc = torch.nn.Linear(hidden_size * 2, self.output_size)


    def forward(self, x):
        # lstm layer 1
        out, _ = self.lstm1(x)
        # tanh layer 1
        out = self.tanh1(out)
        # batch normalization 1
        out = self.batch_norm1(out.permute(0, 2, 1)).permute(0, 2, 1)
        # lstm layer 2
        out, _ = self.lstm2(out)
        # tanh layer 2
        out = self.tanh2(out)
        # batch normalization 2
        out = self.batch_norm2(out.permute(0, 2, 1)).permute(0, 2, 1)
        # dense layer
        out = self.dense(out[:, -1, :])
        # elu activation
        out = self.elu(out)
        # output layer
        out = self.fc(out)
        return out

class MEG_LSTM_Layered(torch.nn.Module):
    def __init__(self, hidden_size=64, num_layers=2, dropout_prob=0.1):
        super(MEG_LSTM_Layered, self).__init__()
        self.input_size = 248
        self.output_size = 4
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # lstm layer 1
        self.lstm1 = torch.nn.LSTM(248, self.hidden_size, self.num_layers, batch_first=True)
        # dropout layer 1
        self.dropout1 = torch.nn.Dropout(p=dropout_prob)
        # lstm layer 2
        self.lstm2 = torch.nn.LSTM(self.hidden_size, self.hidden_size, self.num_layers, batch_first=True)
        # dropout layer 2
        self.dropout2 = torch.nn.Dropout(p=dropout_prob)
        # lstm layer 3
        self.lstm3 = torch.nn.LSTM(self.hidden_size, self.hidden_size, self.num_layers, batch_first=True)
        # dropout layer 3
        self.dropout3 = torch.nn.Dropout(p=dropout_prob)
        self.fc = torch.nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        out, _ = self.lstm1(x)
        out = self.dropout1(out)
        out, _ = self.lstm2(out)
        out = self.dropout2(out)
        out, _ = self.lstm3(out)
        out = self.dropout3(out)
        out = self.fc(out[:, -1, :])
        return out


### Training and testing loops

In [None]:
# Early stopping mechanism to speed up training
class EarlyStopping:
    def __init__(self, patience=3, delta=1):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.min_val_loss = float('inf')
    
    def stop_training(self, val_loss) -> bool:
        """Assess whether early stopping is necessary through validation loss."""
        if val_loss < self.min_val_loss:
            self.min_val_loss = val_loss
            self.counter = 0
        elif val_loss > (self.min_val_loss + self.delta):
            self.counter += 1
            if self.counter > self.patience:
                return True
        return False

In [None]:
def train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs=10):
    # initialize early stopping
    stopper = EarlyStopping(patience=3, delta=0)
    # begin training loop
    for epoch in range(num_epochs):
        running_loss = 0.0
        for batch_idx, (data, targets) in enumerate(train_loader):
            optimizer.zero_grad()
            # forward
            outputs = model(data)
            loss = loss_function(outputs, targets)
            # backward
            loss.backward()
            # optimize
            optimizer.step()
            running_loss += loss.item()
            if batch_idx % 5 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}] Batch [{batch_idx + 1}/{len(train_loader)}] Loss: {running_loss / 100:.4f}")
                running_loss = 0.0

        # Validation loss
        val_loss = 0.0
        val_steps = 0
        total = 0
        correct = 0
        for _, data in enumerate(val_loader, 0):
            with torch.no_grad():
                inputs, labels = data
                inputs, labels = inputs.to("cpu"), labels.to("cpu")

                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                loss = loss_function(outputs, labels)
                print(f'val_loss: {loss}')
                val_loss += loss.cpu().numpy()
                val_steps += 1
    
        # check whether early stopping is necessary
        if stopper.stop_training(val_loss):
            print(f'stopped early at epoch {epoch}')
            break
        

# Define the testing function
def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, targets in test_loader:
            outputs = model(data)
            _, predictions = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predictions == targets).sum().item()

        accuracy = correct / total
        print(f"Accuracy: {accuracy}")
        return accuracy

### Training and testing models

In [None]:
# Simple LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM(hidden_size, num_layers, dropout_prob)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))


directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump('lol', savefile)


In [None]:
# Bidirectional LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM_Bidirectional(hidden_size, num_layers, dropout_prob)
print(model)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))

directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump(model, savefile)

In [None]:
# dense + LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM_Dense(hidden_size, num_layers, dropout_prob)
print(model)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))

directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump(model, savefile)

In [None]:
# Batch normalization + dense LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM_Expanded(hidden_size, num_layers, dropout_prob)
print(model)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))

directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump(model, savefile)

In [None]:
# Comples LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM_Complex(hidden_size, num_layers, dropout_prob)
print(model)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))

directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump(model, savefile)

In [None]:
# Layered LSTM
# Model instantiation
hidden_size = 64  # Number of features in the hidden state of the GRU
num_layers = 2  # Number of GRU layers
dropout_prob = 0.3

model = MEG_LSTM_Layered(hidden_size, num_layers, dropout_prob)
print(model)

# Loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Training the model
num_epochs = 10 # Number of epochs
train_model(model, train_loader, val_loader, optimizer, loss_function, num_epochs)

# Testing the model
ac1 = test_model(model, test1_loader)
ac2 = test_model(model, test2_loader)
ac3 = test_model(model, test3_loader)

print("total accuracy of all test sets: ", float((ac1 + ac2 + ac3) / 3))

directory = os.path.join(os.getcwd(), 'Models')
with open(os.path.join(directory, model._get_name()), 'wb') as savefile:
    pickle.dump(model, savefile)