### Assignment 2

# Imports and Variables


In [1]:
import os
import h5py
from scipy.signal import butter, filtfilt, decimate
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
TRAIN_PATH = "Final Project data/Intra/train/"
TEST_PATH = "Final Project data/Intra/test/"


# data specific
NUM_CHANNELS = 248
NUM_CLASSES = 4
LABEL_MAP = {'rest':0, 'taskmotor':1, 'taskstorymath':2, 'taskworkingmemory':3}


# Model specific
NUM_EPOCHS = 10

# Load and Preprocess Data
Apply a lowpass filter for downsampling the frequency

In [3]:
# lowpass filter
def bandpass_filter(data, lowcut=1.0, highcut=150.0, fs=2034, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data, axis=-1)

# down sampling data to a lower frequency that still contains usable information
def downsample_sample(data, orig_fs=2034, target_fs=250):
    factor = int(orig_fs / target_fs)
    return decimate(data, factor, axis=-1, ftype='fir', zero_phase=True)


def get_dataset_name(filename_with_dir):
    filename_without_dir = filename_with_dir.split( '/')[-1]
    temp = filename_without_dir.split ('_')[:-1]
    dataset_name = "_". join(temp)
    return dataset_name

# get the label from the file name
def get_y_from_filename(filename):
    split_name = filename.split('_')
    return "".join(split_name[:-2]).lower()

# load data X from the given file
def load_data_from_file(file_path):
    with h5py.File(file_path , 'r') as f :
        dataset_name = get_dataset_name(file_path)
        return f.get(dataset_name)[()]

# individual min max scaling
def min_max_scale_sample(data):
    scaler = MinMaxScaler()
    flat = data.reshape(-1, 1)
    return scaler.fit_transform(flat).reshape(data.shape)


In [4]:
# apply all preprocessing steps to a given file
def preprocess_file(filepath):
    data = load_data_from_file(filepath)
    filtered = bandpass_filter(data)
    downsampled = downsample_sample(filtered)

    return min_max_scale_sample(downsampled)

# Define Models

## 1. A simple MegNet model

In [5]:
class SimpleMEGNet(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(SimpleMEGNet, self).__init__()
        self.conv1 = nn.Conv1d(n_channels, 64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        self.adaptive_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(256, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, n_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.adaptive_pool(x).squeeze(-1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Model Training

In [6]:
# use cuda if possible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = SimpleMEGNet(NUM_CHANNELS, NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# get all file names form the given folder
file_list = [os.path.join(TRAIN_PATH, f) for f in os.listdir(TRAIN_PATH) if f.endswith('.h5')]

In [7]:
# Train model
for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0

    # run training per file
    for filepath in file_list:
        filename = os.path.basename(filepath)
        label_str = get_y_from_filename(filename)

        y = torch.tensor([LABEL_MAP[label_str]], dtype=torch.long).to(device)
        X = preprocess_file(filepath)
        input_tensor = torch.tensor(X, dtype=torch.float32).unsqueeze(0).to(device)

        optimizer.zero_grad()
        output = model(input_tensor)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {total_loss:.4f}")

Epoch 1/10, Loss: 45.3717
Epoch 2/10, Loss: 44.5509
Epoch 3/10, Loss: 42.5294
Epoch 4/10, Loss: 40.5417
Epoch 5/10, Loss: 36.8453
Epoch 6/10, Loss: 32.9748
Epoch 7/10, Loss: 28.2415
Epoch 8/10, Loss: 22.1935
Epoch 9/10, Loss: 18.2981
Epoch 10/10, Loss: 11.6083


# Test Model on testing data

In [8]:
test_files = [os.path.join(TEST_PATH, f) for f in os.listdir(TEST_PATH) if f.endswith('.h5')]

In [9]:
model.eval()
correct = 0

with torch.no_grad():
    for f in test_files:

        # preprocess files
        x = preprocess_file(f)
        x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0)

        # get prediction for input
        outputs = model(x_tensor)
        _, predicted = torch.max(outputs, dim=1)  # TODO try: pred = output.argmax(dim=1).item()

        # get true label for the input
        filename = os.path.basename(f)
        true_label = LABEL_MAP[get_y_from_filename(filename)]

        correct += (predicted.item() == true_label)


test_accuracy = correct / len(test_files)
print(f"Test accuracy: {test_accuracy:.3f}")

Test accuracy: 0.125
