In [None]:
from os.path import dirname, join as pjoin
import scipy.io as sio

: 

In [None]:
mat_fname = 'ECoG_Handpose.mat'

: 

In [None]:
mat_contents = sio.loadmat(mat_fname)['y']

: 

In [None]:
mat_contents

: 

In [None]:
mat_data = mat_contents

: 

In [None]:
mat_data.shape
# 67 channels
# channel 1 - sample time
# channel 2-61 - ECoG
# channel 62 - paradigm info
#              0 - relax
#              1 - fist movement
#              2 - peace movement
#              3 - open hand
# channel 63-67 - glove info
#              63 - thumb
#              64 - index
#              65 - middle
#              66 - ring
#              67 - little

: 

In [None]:
import mne
import numpy as np
from scipy import signal

import matplotlib.pyplot as plt
from mne.time_frequency import fit_iir_model_raw
from mne.filter import create_filter

: 

In [None]:
# extract ecog channels into nparray
ecog_data = mat_data[1:61]

: 

In [None]:
paradigm_info = mat_data[61]

: 

In [None]:
# count unique movements (labels?)
np.unique(paradigm_info, return_counts=True)

: 

In [None]:
mat_data[0]

: 

# CREATE MNE OBJECT

In [None]:
sampling_freq = 1200
ch_names = ["sample_time"] + [f'CH_{i}' for i in range(1, 61)] + ["paradigm_info"]
ch_types = ['misc'] + ['ecog'] * 60 + ['stim']

: 

In [None]:
len(ch_names), len(ch_types)

: 

In [None]:
info = mne.create_info(ch_names=ch_names, sfreq=1200, ch_types=ch_types)

: 

In [None]:
raw = mne.io.RawArray(mat_data[0:62, :], info)

: 

In [None]:
# plot ECoG channels only
raw.plot_psd(average=True)
plt.show();

: 

In [None]:
# # plot all channels individually over time using matplotlib

# fig, axes = plt.subplots(60, 1, figsize=(10, 60))
# for i, ax in enumerate(axes):
#     ax.plot(ecog_data[i])
#     ax.set_title(raw.ch_names[i])

# fig.tight_layout()
# plt.show()
# plt.close()

: 

In [None]:
# show info on the raw object
raw.info

: 

In [None]:
sample_time = mat_data[0, :]

: 

In [None]:
time_diff = np.diff(sample_time)

: 

In [None]:
paradigm_info = mat_data[61, :]
finger_movement_onsets = mat_data[62:, :]

: 

In [None]:
# re-reference to average
raw.set_eeg_reference('average')

: 

In [None]:
raw = raw.filter(100, 300)

: 

In [None]:
# recursive 6th order notch filter
raw.notch_filter(60, filter_length='auto', phase='zero')

: 

In [None]:
event_labels = dict(fist_movement = 1, peace_movement = 2, open_hand = 3)
# find events in the MNE raw object
events = mne.find_events(raw, stim_channel="paradigm_info", shortest_event=1, verbose=True)

: 

In [None]:
epochs = mne.Epochs(raw, events, tmin=-0.75, tmax=0.75, event_id=event_labels, preload=True, baseline=None)

: 

In [None]:
order = 10  # define model order

# Estimate AR models on raw data
b, a = fit_iir_model_raw(raw, order=order, picks=['ecog'], tmin=sample_time[0], tmax=sample_time[-1])
d, times = raw[25, 10000:10200]  # look at 25th channel for 200 samples
d = d.ravel()  # make flat vector
innovation = signal.convolve(d, a, 'valid')
d_ = signal.lfilter(b, a, innovation)  # regenerate the signal
d_ = np.r_[d_[0] * np.ones(order), d_]  # dummy samples to keep signal length

: 

In [None]:
plt.figure()
plt.psd(d, Fs=raw.info['sfreq'], NFFT=2048)
plt.psd(innovation, Fs=raw.info['sfreq'], NFFT=2048)
plt.psd(d_, Fs=raw.info['sfreq'], NFFT=2048, linestyle='--')
plt.legend(('Signal', 'Innovation', 'Regenerated signal'))
plt.show()

: 

In [None]:
m = raw._data
m.mean(), m.std(), m.shape

: 

In [None]:
raw.plot_psd(average=True)
plt.show()

: 

In [None]:
import scipy.io as sio
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder

: 

In [None]:
# load filtered out data from raw object
data = raw.get_data()

: 

In [None]:
X = data[1:61, :]  # ECoG data (channels 2-61)
y = data[61, :].astype(int)  # Paradigm info (channel 62)

: 

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X.T, y, test_size=0.2, random_state=42)
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

: 

In [None]:
# Scaling the y values
scaler = MinMaxScaler()
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

: 

In [None]:
scaler.fit(y_train)
y_train = scaler.transform(y_train).flatten()
y_test = scaler.transform(y_test).flatten()

: 

In [None]:
encoder = LabelEncoder()
encoder.fit([0, 1, 2, 3])
y_train = encoder.transform(y_train)
y_test = encoder.transform(y_test)

: 

In [None]:
unique_y_train_values = np.unique(y_train)
unique_y_test_values = np.unique(y_test)

: 

In [None]:
# Reshape the data into the required input shape for the LSTM model
timesteps = 1  # the number of timesteps
n_features = X_train.shape[1]

: 

In [None]:
X_train = X_train.reshape(-1, timesteps, n_features)
X_test = X_test.reshape(-1, timesteps, n_features)

: 

In [None]:
import torch
import torch.nn as nn

: 

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_prob=0.5):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)

        out, _ = self.lstm(x, (h0, c0))
        out = self.dropout(out)  # Apply dropout after the LSTM layer
        out = self.fc(out[:, -1, :])
        return out
    
    def get_hidden_states(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)

        _, (hidden_states, _) = self.lstm(x, (h0, c0))
        return hidden_states

: 

In [None]:
input_size = n_features
hidden_size = 128
num_layers = 1
num_classes = len(np.unique(y))

: 

In [None]:
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)

: 

In [None]:
# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

: 

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
losses = []  # Initialize an empty list to store the losses

: 

In [None]:
# Train the model
num_epochs = 100
for epoch in range(num_epochs):
    # Forward pass
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # Record the loss
    losses.append(loss.item())

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

: 

In [None]:
# Visualize the training loss

import matplotlib.pyplot as plt

plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')

y_ticks = np.arange(0, np.max(losses), 0.2)
plt.yticks(y_ticks)
plt.savefig('training_loss.png', dpi=300, bbox_inches='tight')

plt.show()

: 

In [None]:
# LDA

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

# Extract the hidden states from the LSTM model
hidden_states_train = model.get_hidden_states(X_train_tensor)[-1].detach().numpy()
hidden_states_test = model.get_hidden_states(X_test_tensor)[-1].detach().numpy()

# Train the LinearDiscriminantAnalysis model using the extracted hidden states
lda = LinearDiscriminantAnalysis()
lda.fit(hidden_states_train, y_train)

# Evaluate the LDA model using the test dataset
lda_accuracy = lda.score(hidden_states_test, y_test)
print("LDA accuracy: {:.2f}%".format(lda_accuracy * 100))

: 

: 