In [2]:
import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from datahandling import BcomMEG
import pickle
from torch.utils.data import Dataset


In [2]:
reading_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_reading_epochs.npy')
producing_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_producing_epochs.npy')
reading_labels = np.zeros(reading_data.shape[0])
producing_labels = np.ones(producing_data.shape[0])
data = np.concatenate((reading_data, producing_data), axis=0)
del reading_data, producing_data
labels = np.concatenate((reading_labels, producing_labels), axis=0)
del reading_labels, producing_labels

In [3]:
data.shape

(24644, 247, 241)

In [None]:
class MEGSequenceDataset(Dataset):
    def __init__(self, sequences, labels, transform=None):
        """
        Args:
            sequences: list or np.ndarray of shape (N, seq_len, input_size)
            labels: list or np.ndarray of shape (N,)
            transform: optional transform to apply to each sample
        """
        # normalization
        mean = sequences.mean(axis=(0,2), keepdims=True)
        std = sequences.std(axis=(0,2), keepdims=True)
        sequences = (sequences - mean) / std
        
        self.sequences = torch.tensor(sequences.transpose(0,2,1), dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.transform = transform

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        x = self.sequences[idx]
        y = self.labels[idx]

        if self.transform:
            x = self.transform(x)

        return x, y

In [44]:
class MEGru(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, dropout=0.5):
        super(MEGru, self).__init__()
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout = dropout if num_layers > 1 else 0,
            bidirectional=False,
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, h_n = self.gru(x)
        out = out[:, -1, :]#h_n[-1] 
        self.dropout(out)
        out = self.fc(out)
        return out

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42, stratify=labels)

train_dataset = MEGSequenceDataset(X_train, y_train)
test_dataset = MEGSequenceDataset(X_test, y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)


In [7]:
def train_model(model, train_loader, optimizer, criterion, device='mps', log_interval=10):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * inputs.size(0)

        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        if log_interval is not None and (batch_idx + 1) % log_interval == 0:
            print(f"  [Train] Batch {batch_idx+1}/{len(train_loader)} | "
                  f"Loss: {loss.item():.4f} | Acc: {(predicted == labels).float().mean():.4f}")

    avg_loss = train_loss / total
    avg_acc = correct / total
    print(f"[Train] Epoch Done | Avg Loss: {avg_loss:.4f} | Avg Acc: {avg_acc:.4f}")
    return avg_loss, avg_acc

def eval_model(model, test_loader, criterion, device='mps', log_interval=10):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_idx, (inputs, labels) in enumerate(test_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            test_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            if log_interval is not None and (batch_idx + 1) % log_interval == 0:
                print(f"  [Eval ] Batch {batch_idx+1}/{len(test_loader)} | "
                      f"Loss: {loss.item():.4f} | Acc: {(predicted == labels).float().mean():.4f}")

        avg_loss = test_loss / total
        avg_acc = correct / total
        print(f"[Eval ] Epoch Done | Avg Loss: {avg_loss:.4f} | Avg Acc: {avg_acc:.4f}")
        return avg_loss, avg_acc

In [45]:
layer_results = {}
channels = 247
num_classes = 2
device = "mps"

for num_layers in [5]:
    print(f"\n🔁 Training GRU with {num_layers} layer(s)")
    
    model = MEGru(input_size=channels, hidden_size=128, output_size=num_classes, num_layers=num_layers).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-3, weight_decay=1e-5)

    best_test_acc = 0.0

    for epoch in range(10):  # or whatever number of epochs
        train_loss, train_acc = train_model(model, train_loader, optimizer, criterion, device=device)
        test_loss, test_acc = eval_model(model, test_loader, criterion, device=device)

        if test_acc > best_test_acc:
            best_test_acc = test_acc

        print(f"Epoch {epoch+1:2d} | Train Acc: {train_acc:.4f} | Test Acc: {test_acc:.4f}")

    layer_results[num_layers] = best_test_acc

print("\n Best Accuracy by Layer Count:")
for n_layers, acc in layer_results.items():
    print(f"  {n_layers} layer(s): {acc:.4f}")


🔁 Training GRU with 5 layer(s)
  [Train] Batch 10/155 | Loss: 0.6884 | Acc: 0.5312
  [Train] Batch 20/155 | Loss: 0.6764 | Acc: 0.5391
  [Train] Batch 30/155 | Loss: 0.7026 | Acc: 0.4062
  [Train] Batch 40/155 | Loss: 0.6877 | Acc: 0.5469
  [Train] Batch 50/155 | Loss: 0.6948 | Acc: 0.5078
  [Train] Batch 60/155 | Loss: 0.6980 | Acc: 0.5078
  [Train] Batch 70/155 | Loss: 0.6854 | Acc: 0.6094
  [Train] Batch 80/155 | Loss: 0.7203 | Acc: 0.3906
  [Train] Batch 90/155 | Loss: 0.6883 | Acc: 0.5391
  [Train] Batch 100/155 | Loss: 0.7044 | Acc: 0.4922
  [Train] Batch 110/155 | Loss: 0.7097 | Acc: 0.4844
  [Train] Batch 120/155 | Loss: 0.6940 | Acc: 0.5312
  [Train] Batch 130/155 | Loss: 0.6884 | Acc: 0.5391
  [Train] Batch 140/155 | Loss: 0.7109 | Acc: 0.5078
  [Train] Batch 150/155 | Loss: 0.7058 | Acc: 0.4375
[Train] Epoch Done | Avg Loss: 0.6997 | Avg Acc: 0.4943
  [Eval ] Batch 10/155 | Loss: 0.6989 | Acc: 0.5625
  [Eval ] Batch 20/155 | Loss: 0.7429 | Acc: 0.3125
  [Eval ] Batch 30/155

KeyboardInterrupt: 

In [32]:
tiny_dataset = torch.utils.data.Subset(train_dataset, range(1000))
tiny_loader = torch.utils.data.DataLoader(tiny_dataset, batch_size=20, shuffle=True)
tiny_test = torch.utils.data.Subset(test_dataset, range(10))
tiny_test_loader = torch.utils.data.DataLoader(tiny_test, batch_size=4, shuffle=False)

In [33]:
layer_results = {}
channels = 247
num_classes = 2
device = "mps"

for num_layers in [2, 3, 4]:
    print(f"\n🔁 Training GRU with {num_layers} layer(s)")
    
    model = MEGru(input_size=channels, hidden_size=128, output_size=num_classes, num_layers=num_layers).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=3e-3)

    best_test_acc = 0.0

    for epoch in range(10):  # or whatever number of epochs
        train_loss, train_acc = train_model(model, tiny_loader, optimizer, criterion, device=device)
        test_loss, test_acc = eval_model(model, tiny_test_loader, criterion, device=device)

        if test_acc > best_test_acc:
            best_test_acc = test_acc

        print(f"Epoch {epoch+1:2d} | Train Acc: {train_acc:.4f} | Test Acc: {test_acc:.4f}")

    layer_results[num_layers] = best_test_acc

print("\n Best Accuracy by Layer Count:")
for n_layers, acc in layer_results.items():
    print(f"  {n_layers} layer(s): {acc:.4f}")


🔁 Training GRU with 2 layer(s)
  [Train] Batch 10/50 | Loss: 0.6849 | Acc: 0.6000
  [Train] Batch 20/50 | Loss: 0.6503 | Acc: 0.7500
  [Train] Batch 30/50 | Loss: 0.6862 | Acc: 0.6500
  [Train] Batch 40/50 | Loss: 0.6443 | Acc: 0.7000
  [Train] Batch 50/50 | Loss: 0.7922 | Acc: 0.3000
[Train] Epoch Done | Avg Loss: 0.7234 | Avg Acc: 0.4940
[Eval ] Epoch Done | Avg Loss: 0.7326 | Avg Acc: 0.6000
Epoch  1 | Train Acc: 0.4940 | Test Acc: 0.6000
  [Train] Batch 10/50 | Loss: 0.7166 | Acc: 0.4500
  [Train] Batch 20/50 | Loss: 0.6801 | Acc: 0.6000
  [Train] Batch 30/50 | Loss: 0.6668 | Acc: 0.5500
  [Train] Batch 40/50 | Loss: 0.7000 | Acc: 0.5500
  [Train] Batch 50/50 | Loss: 0.7325 | Acc: 0.5000
[Train] Epoch Done | Avg Loss: 0.6953 | Avg Acc: 0.5230
[Eval ] Epoch Done | Avg Loss: 0.7690 | Avg Acc: 0.4000
Epoch  2 | Train Acc: 0.5230 | Test Acc: 0.4000
  [Train] Batch 10/50 | Loss: 0.6137 | Acc: 0.7000
  [Train] Batch 20/50 | Loss: 0.7269 | Acc: 0.5500
  [Train] Batch 30/50 | Loss: 0.6376

KeyboardInterrupt: 

In [4]:
reading_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_reading_epochs.npy')
producing_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_producing_epochs.npy')
reading_labels = np.zeros(reading_data.shape[0])
producing_labels = np.ones(producing_data.shape[0])
data = np.concatenate((reading_data, producing_data), axis=0)
data = data.reshape(data.shape[0], -1)
del reading_data, producing_data
labels = np.concatenate((reading_labels, producing_labels), axis=0)
del reading_labels, producing_labels

In [5]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data = scaler.fit_transform(data)

print("done scaling")
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42, stratify=labels)

In [8]:
from sklearn.decomposition import PCA

pca = PCA(n_components=300)
X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)

In [None]:
from sklearn.svm import SVC

svm = SVC(kernel='linear', C=1.0, gamma='scale', random_state=42)
svm.fit(X_train_pca, y_train)


In [None]:
from sklearn.metrics import accuracy_score, classification_report

y_pred = svm.predict(X_test_pca)
acc = accuracy_score(y_test, y_pred)

print(f"Accuracy: {acc:.4f}")
print(classification_report(y_test, y_pred))

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller

def adf_test_all_channels(data, channel_names=None, alpha=0.05):
    """
    Runs ADF test on each channel (flattened across epochs) and plots the p-values.

    Parameters:
        data : ndarray
            Shape (N, C, T) — epochs x channels x time
        channel_names : list of str, optional
            Names of channels. If None, will use integers.
        alpha : float
            Significance level (default = 0.05)

    Returns:
        p_values : ndarray of shape (C,)
            ADF p-values for each channel
    """
    N, C, T = data.shape
    p_values = []

    if channel_names is None:
        channel_names = [f"Ch {i}" for i in range(C)]

    for c in range(C):
        signal = data[:, c, :].reshape(-1)[::10]  # flatten across epochs
        try:
            _, pval, *_ = adfuller(signal)
        except Exception as e:
            print(f"ADF failed on channel {c}: {e}")
            pval = np.nan
        p_values.append(pval)

    p_values = np.array(p_values)

    # Plot
    plt.figure(figsize=(12, 5))
    plt.bar(range(C), p_values, tick_label=channel_names)
    plt.axhline(y=alpha, color='r', linestyle='--', label=f'p = {alpha}')
    plt.ylabel("ADF p-value")
    plt.xlabel("Channel")
    plt.title("ADF Test per Channel (Lower = More Stationary)")
    plt.xticks(rotation=90)
    plt.legend()
    plt.tight_layout()
    plt.show()

    return p_values

In [4]:
reading_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_reading_epochs.npy')
producing_data = np.load('/Users/ciprianbangu/Cogmaster/M2 Internship/BCI code/covert_producing_epochs.npy')
print("data loaded")
# reading_labels = np.zeros(reading_data.shape[0])
# producing_labels = np.ones(producing_data.shape[0])
data = np.concatenate((reading_data, producing_data), axis=0)

data loaded


In [5]:
del reading_data, producing_data

In [6]:
p_vals = adf_test_all_channels(data)

KeyboardInterrupt: 