In [1]:
import numpy as np

data = np.load('A01T.npz')  
signal = data['s']  # Assuming shape is (total_samples, 22)
event_types = data['etyp'].T[0]
event_positions = data['epos'].T[0]

# Initialize empty arrays
signals = []
trial_types = []
valid_labels = {769, 770, 771, 772}

for i in range(0, len(event_positions) - 1):
    event_type = event_types[i]
    next_event_type = event_types[i + 1]
    
    if event_type == 768 and next_event_type in valid_labels:  # Valid trial start
        pos = event_positions[i+1]
        
        # Extract 750 samples x 22 channels
        trial_signal = signal[pos+750 : pos+1500, 0:22]  # All 22 channels
        
        # Verify the shape is correct
        if trial_signal.shape != (750, 22):
            print(f"Unexpected shape at trial {len(signals)}: {trial_signal.shape}")
            continue
            
        signals.append(trial_signal)
        trial_types.append(next_event_type)

# Convert to numpy arrays
signals_array = np.array(signals)  # Shape (288, 750, 22)
labels_array = np.array(trial_types)  # Shape (288,)

# Verify final shapes
print("Signals shape:", signals_array.shape)
print("Labels shape:", labels_array.shape)
print("Unique labels:", np.unique(labels_array))

Signals shape: (273, 750, 22)
Labels shape: (273,)
Unique labels: [769 770 771 772]


In [2]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch.nn as nn 
import torch.optim as optim 

class EEGDataset(Dataset):
    def __init__(self, trials, labels):
        # Trials: (num_trials, 750, 22)
        # Labels: (num_trials,)
        
        # Normalize data per channel
        self.data = torch.tensor(trials, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
        
        # Map original labels to 0-3
        self.label_mapping = {769: 0, 770: 1, 771: 2, 772: 3}
        self.labels = torch.tensor([self.label_mapping[int(x)] for x in labels])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Assuming you have loaded your data into trials and labels arrays
trials = np.load('eeg_signals.npy')  # Shape (288, 750, 22)
labels = np.load('eeg_labels.npy')     # Shape (288,)

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    trials, labels, test_size=0.2, stratify=labels, random_state=42
)

# Create datasets and dataloaders
train_dataset = EEGDataset(X_train, y_train)
test_dataset = EEGDataset(X_test, y_test)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

class LTC_Cell(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(LTC_Cell, self).__init__()
        self.hidden_dim = hidden_dim
        self.W_xh = nn.Linear(input_dim, hidden_dim)
        self.W_hh = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.W_tau = nn.Linear(hidden_dim, hidden_dim)
        
    def forward(self, x, h):
        tau = torch.sigmoid(self.W_tau(h)) + 0.1
        dh = -h / tau + torch.tanh(self.W_xh(x) + self.W_hh(h))
        return h + 0.1 * dh

class LTC_Transformer(nn.Module):
    def __init__(self, input_dim=22, ltc_hidden_dim=64, num_classes=4, 
                 num_ltc_layers=1, num_transformer_layers=1, nhead=2, dropout=0.5):
        super().__init__()
        self.input_dim = input_dim
        self.ltc_hidden_dim = ltc_hidden_dim
        self.num_ltc_layers = num_ltc_layers
        
        # LTC layers with weight normalization
        self.ltc_layers = nn.ModuleList([
            LTC_Cell(input_dim if i==0 else ltc_hidden_dim, ltc_hidden_dim)
            for i in range(num_ltc_layers)
        ])
        
        # Transformer encoder with increased dropout
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=ltc_hidden_dim, 
            nhead=nhead, 
            dim_feedforward=ltc_hidden_dim*2,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(
            encoder_layer, 
            num_layers=num_transformer_layers
        )
        
        # Classifier head with stronger regularization
        self.classifier = nn.Sequential(
            nn.Linear(ltc_hidden_dim, 32),
            nn.BatchNorm1d(32),  # Add batch normalization
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, num_classes)
        )
        
        # Apply weight initialization
        self._initialize_weights()
    
    def _initialize_weights(self):
        # Xavier/Glorot initialization for better convergence
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        # x shape: [batch_size, seq_len, input_dim]
        batch_size, seq_len, _ = x.size()
        
        # Initialize hidden states
        hiddens = [torch.zeros(batch_size, self.ltc_hidden_dim, device=x.device) 
                  for _ in range(self.num_ltc_layers)]
        
        # Process sequence through LTC layers
        all_hidden = []
        for t in range(seq_len):
            x_t = x[:, t, :]
            for layer_idx in range(self.num_ltc_layers):
                hiddens[layer_idx] = self.ltc_layers[layer_idx](
                    x_t if layer_idx == 0 else hiddens[layer_idx-1],
                    hiddens[layer_idx]
                )
            all_hidden.append(hiddens[-1])
        
        # Stack hidden states for transformer input
        hidden_stack = torch.stack(all_hidden, dim=1)  # [batch_size, seq_len, ltc_hidden_dim]
        
        # Pass through transformer
        transformer_out = self.transformer(hidden_stack)
        
        # Global average pooling
        pooled = transformer_out.mean(dim=1)
        
        # Classification
        return self.classifier(pooled)

def train_and_evaluate(train_loader, test_loader, model_save_path='best_ltc_transformer_model.pth'):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Model with enhanced regularization
    model = LTC_Transformer(
        input_dim=22,
        ltc_hidden_dim=64,
        num_classes=4,
        num_ltc_layers=1,
        num_transformer_layers=1,
        nhead=2,
        dropout=0.6  # Increased dropout
    ).to(device)

    # Loss function with label smoothing
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    
    # Optimizer with weight decay (L2 regularization)
    optimizer = optim.AdamW(
        model.parameters(), 
        lr=3e-4,  # Lower learning rate
        weight_decay=2e-4,  # Increased weight decay
        betas=(0.9, 0.999)
    )
    
    # Learning rate scheduler with more patience
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, 
        mode='min', 
        patience=7,  # Increased patience
        factor=0.5,
        min_lr=1e-6
    )

    best_val_acc = 0
    epochs_same_acc = 0
    last_val_acc = None
    num_epochs = 150
    
    # Initialize history tracking
    history = {
        'train_loss': [],
        'val_loss': [],
        'val_acc': []
    }

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        
        for batch_idx, (data, labels) in enumerate(train_loader):
            data, labels = data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(train_loader)
        history['train_loss'].append(train_loss)
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for data, labels in test_loader:
                data, labels = data.to(device), labels.to(device)
                outputs = model(data)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        val_loss /= len(test_loader)
        val_acc = 100.0 * correct / total
        
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        scheduler.step(val_loss)
        current_lr = optimizer.param_groups[0]['lr']

        # Early stopping: stop if accuracy stays the same for 10 epochs
        if last_val_acc is not None and abs(val_acc - last_val_acc) < 1e-6:  # Using a small tolerance
            epochs_same_acc += 1
        else:
            epochs_same_acc = 0
        last_val_acc = val_acc

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved with improved accuracy: {val_acc:.2f}%")

        print(f'Epoch {epoch+1}/{num_epochs} | '
              f'Train Loss: {train_loss:.4f} | '
              f'Val Loss: {val_loss:.4f} | '
              f'Val Acc: {val_acc:.2f}% | '
              f'LR: {current_lr:.2e} | '
              f'Same acc: {epochs_same_acc}/10')

        if epochs_same_acc >= 10:
            print(f'\nEarly stopping triggered after {epoch+1} epochs due to 10 consecutive epochs with same accuracy!')
            break
    
    # Load best model and final evaluation
    model.load_state_dict(torch.load(model_save_path))
    print(f'\nTraining complete. Best validation accuracy: {best_val_acc:.2f}%')
    
    # Final test evaluation
    model.eval()
    test_correct = 0
    test_total = 0
    
    with torch.no_grad():
        for data, labels in test_loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()
    
    final_test_acc = 100.0 * test_correct / test_total
    print(f'Final test accuracy: {final_test_acc:.2f}%')
    
    return model, history

# Example usage:
# train_and_evaluate(train_loader, test_loader)


In [8]:
model, history = train_and_evaluate(train_loader, test_loader)

Model saved with improved accuracy: 27.27%
Epoch 1/150 | Train Loss: 1.8104 | Val Loss: 1.4241 | Val Acc: 27.27% | LR: 3.00e-04 | Same acc: 0/10
Epoch 2/150 | Train Loss: 1.7423 | Val Loss: 1.4335 | Val Acc: 25.45% | LR: 3.00e-04 | Same acc: 0/10
Epoch 3/150 | Train Loss: 1.8541 | Val Loss: 1.4639 | Val Acc: 27.27% | LR: 3.00e-04 | Same acc: 0/10
Epoch 4/150 | Train Loss: 1.7840 | Val Loss: 1.5306 | Val Acc: 27.27% | LR: 3.00e-04 | Same acc: 1/10
Epoch 5/150 | Train Loss: 1.7259 | Val Loss: 1.6419 | Val Acc: 25.45% | LR: 3.00e-04 | Same acc: 0/10
Epoch 6/150 | Train Loss: 1.7579 | Val Loss: 1.7621 | Val Acc: 25.45% | LR: 3.00e-04 | Same acc: 1/10
Epoch 7/150 | Train Loss: 1.7061 | Val Loss: 1.8621 | Val Acc: 21.82% | LR: 3.00e-04 | Same acc: 0/10
Epoch 8/150 | Train Loss: 1.5969 | Val Loss: 1.9911 | Val Acc: 23.64% | LR: 3.00e-04 | Same acc: 0/10
Epoch 9/150 | Train Loss: 1.6244 | Val Loss: 2.0374 | Val Acc: 23.64% | LR: 1.50e-04 | Same acc: 1/10
Epoch 10/150 | Train Loss: 1.5449 | Val

  model.load_state_dict(torch.load(model_save_path))


Final test accuracy: 27.27%
