I wasn't able to implement the whole thing due to gpu restriction but checked with toy data for the clarrification 

In [22]:
import os
import glob
import numpy as np
import scipy.io as sio
import pywt
from scipy.interpolate import griddata
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

FS = 128     
WIN_SEC = 2       
OVERLAP = 0.5     
WAVELET = "db4"
DWT_LEVEL = 5

EEG_COORDS = np.array([
    [-0.5,  1.0],   
    [-1.0,  0.5],   
    [-0.5,  0.5],  
    [-0.8,  0.0],  
    [-1.0,  0.0],   
    [-0.8, -0.5],   
    [-0.5, -1.0],  
    [ 0.5, -1.0],   
    [ 0.8, -0.5],  
    [ 1.0,  0.0],   
    [ 0.5,  0.5],   
    [ 1.0,  0.5],   
    [ 0.5,  1.0],   
], dtype=np.float32)


def load_amigos_mat(mat_path):
    mat = sio.loadmat(mat_path, squeeze_me=False)
    joined = mat.get("joined_data")
    labels = mat.get("labels_selfassessment")
    if joined is None or labels is None:
        raise KeyError(f"joined_data or labels_selfassessment not found in {mat_path}")
    eeg_trials = []
    label_trials = []
    for i in range(joined.shape[1]):
        signal = joined[0, i]       
        label = labels[0, i]        
        eeg = signal[:, :14].T      
        eeg_trials.append(eeg.astype(np.float32))
        label_trials.append(np.asarray(label, dtype=np.float32))
    return eeg_trials, label_trials


In [23]:

def window_eeg(eeg_list, label_list, fs=FS, win_sec=WIN_SEC, overlap=OVERLAP):
    win_len = int(fs * win_sec)
    step = max(1, int(win_len * (1 - overlap)))
    segments = []
    labels = []
    for signal, label in zip(eeg_list, label_list):
        for start in range(0, signal.shape[1] - win_len + 1, step):
            window = signal[:, start:start + win_len]
            segments.append(window)
            labels.append(label[:2]) 
    return segments, labels

def dwt_band_energies(window, wavelet=WAVELET, level=DWT_LEVEL):
    feats = []
    for ch in window:
        coeffs = pywt.wavedec(ch, wavelet, level=level)
        band_energy = [np.mean(c ** 2) for c in coeffs[:5]]
        feats.append(band_energy)
    return np.array(feats, dtype=np.float32)

def create_topomaps(band_matrix, coords=EEG_COORDS, grid_size=32):
    n_channels, n_bands = band_matrix.shape
    m = min(coords.shape[0], n_channels)
    xs, ys = coords[:m, 0], coords[:m, 1]
    band_matrix = band_matrix[:m] 
    grid_x, grid_y = np.mgrid[
        np.min(xs):np.max(xs):complex(grid_size),
        np.min(ys):np.max(ys):complex(grid_size)
    ]
    topo = np.zeros((n_bands, grid_size, grid_size), dtype=np.float32)
    for b in range(n_bands):
        values = band_matrix[:, b]
        interp = griddata((xs, ys), values, (grid_x, grid_y),
                          method='linear', fill_value=np.mean(values))
        interp_min, interp_max = interp.min(), interp.max()
        topo[b] = (interp - interp_min) / (interp_max - interp_min + 1e-8)
    return topo


class AMIGOSDataset(Dataset):
    def __init__(self, mat_files, grid_size=32, transform=None):
        self.samples = []
        self.grid_size = grid_size
        self.transform = transform
        for mfile in mat_files:
            eeg_trials, label_trials = load_amigos_mat(mfile)
            segments, labels = window_eeg(eeg_trials, label_trials, fs=FS, win_sec=WIN_SEC, overlap=OVERLAP)
            for seg, lab in zip(segments, labels):
                seg_norm = seg / (np.max(np.abs(seg), axis=1, keepdims=True) + 1e-8)
                band_mat = dwt_band_energies(seg_norm, wavelet=WAVELET, level=DWT_LEVEL)
                topo = create_topomaps(band_mat, coords=EEG_COORDS, grid_size=grid_size)
                self.samples.append((topo.astype(np.float32), lab.astype(np.float32)))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        topo, label = self.samples[idx]
        topo_tensor = torch.tensor(topo, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.float32)
        if self.transform:
            topo_tensor = self.transform(topo_tensor)
        return { 'topo': topo_tensor, 'label': label_tensor }


class DCAB(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Conv2d(channels, channels // reduction, kernel_size=1, bias=False)
        self.relu = nn.ReLU()
        self.fc2 = nn.Conv2d(channels // reduction, channels, kernel_size=1, bias=False)
        self.spatial_conv = nn.Conv2d(2, 1, kernel_size=7, padding=3, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.fc2(self.relu(self.fc1(self.avg_pool(x))))
        max_out = self.fc2(self.relu(self.fc1(self.max_pool(x))))
        ca = self.sigmoid(avg_out + max_out)
        x = x * ca
        avg_pool = torch.mean(x, dim=1, keepdim=True)
        max_pool = torch.max(x, dim=1, keepdim=True)[0]
        sa = self.sigmoid(self.spatial_conv(torch.cat([avg_pool, max_pool], dim=1)))
        x = x * sa
        return x


class DPUMB(nn.Module):
    def __init__(self, channels, kernels=(3, 5, 7)):
        super().__init__()
        self.branches = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(channels, channels, kernel_size=k, padding=k // 2, groups=channels, bias=False),
                nn.BatchNorm2d(channels),
                nn.GELU()
            ) for k in kernels
        ])
        self.pointwise = nn.Conv2d(channels * len(kernels), channels, kernel_size=1, bias=False)
        self.bn = nn.BatchNorm2d(channels)
        self.act = nn.GELU()

    def forward(self, x):
        outs = [b(x) for b in self.branches]
        concat = torch.cat(outs, dim=1)
        out = self.pointwise(concat)
        out = self.bn(out)
        out = self.act(out)
        return x + out


class CNNEncoderLayer(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.norm = nn.GroupNorm(num_groups=1, num_channels=channels)
        self.conv = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False)
        self.act = nn.GELU()
        self.dcab = DCAB(channels)

    def forward(self, x):
        y = self.norm(x)
        y = self.conv(y)
        y = self.act(y)
        y = self.dcab(y)
        return x + y


class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=5, embed_dim=768, patch_size=16):
        super().__init__()
        self.pre_conv = nn.Sequential(
            nn.Conv2d(in_channels, embed_dim, kernel_size=3, padding=1, bias=False),
            nn.GELU(),
            nn.Conv2d(embed_dim, embed_dim, kernel_size=3, padding=1, groups=embed_dim, bias=False)
        )
        self.proj = nn.Conv2d(embed_dim, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.pre_conv(x)
        x = self.proj(x)
        B, C, H, W = x.shape
        x = x.flatten(2).transpose(1, 2)
        return x


class AttentionPool(nn.Module):
    def __init__(self, embed_dim, num_heads=8):
        super().__init__()
        self.class_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, embed_dim * 4),
            nn.GELU(),
            nn.Linear(embed_dim * 4, embed_dim)
        )
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        B = x.size(0)
        cls_tokens = self.class_token.expand(B, -1, -1)
        x = torch.cat([cls_tokens, x], dim=1)
        attn_out, _ = self.attn(x, x, x)
        x = x + attn_out
        x = self.norm1(x)
        ff_out = self.ffn(x)
        x = x + ff_out
        x = self.norm2(x)
        return x[:, 0]


class EmotionRecognitionModel(nn.Module):
    def __init__(self, in_channels=5, embed_dim=768, num_classes=2, num_encoder_blocks=4, num_dpumb_blocks=3, patch_size=16):
        super().__init__()
        self.patch_embed = PatchEmbedding(in_channels, embed_dim, patch_size)
        blocks = []
        total_blocks = num_encoder_blocks + num_dpumb_blocks
        for i in range(total_blocks):
            if i % 2 == 0:
                blocks.append(CNNEncoderLayer(embed_dim))
            else:
                blocks.append(DPUMB(embed_dim))
        self.encoder = nn.Sequential(*blocks)
        self.pool = AttentionPool(embed_dim, num_heads=8)
        self.head = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        tokens = self.patch_embed(x)
        tokens = tokens.transpose(1, 2)
        N = tokens.size(2)
        H_W = int(np.sqrt(N))
        x2d = tokens.view(tokens.size(0), tokens.size(1), H_W, H_W)
        features = self.encoder(x2d)
        B, C, H, W = features.shape
        tokens2 = features.flatten(2).transpose(1, 2)
        pooled = self.pool(tokens2)
        return self.head(pooled)

for single file dont run this if you want for whole dataset

In [18]:
#for toy data

if __name__ == "__main__":
    data_root = "/kaggle/input/amigos/AMIGOS DATASET"
    all_mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
    mat_files = [all_mat_files[0]] if all_mat_files else []

    if mat_files:
        dataset = AMIGOSDataset(mat_files, grid_size=32)
        print(f"Loaded {len(dataset)} samples from 1 mat file.")
        loader = DataLoader(dataset, batch_size=8, shuffle=True)
        model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
        for batch in loader:
            inputs = batch['topo']
            labels = batch['label']
            logits = model(inputs)
            print("logits shape:", logits.shape)
            break
    else:
        print("No .mat files found in", data_root)


Loaded 1442 samples from 1 mat file.
logits shape: torch.Size([8, 2])


In [19]:
if __name__ == "__main__":
    data_root = "/kaggle/input/amigos/AMIGOS DATASET"
    all_mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
    mat_files = [all_mat_files[0]] if all_mat_files else []
    if mat_files:
        dataset = AMIGOSDataset(mat_files, grid_size=32) 
        print(f"Loaded {len(dataset)} samples from 1 mat file.")
        loader = DataLoader(dataset, batch_size=8, shuffle=True)
        model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
        # Check a single forward pass
        for batch in loader:
            inputs = batch['topo']
            labels = batch['label']
            logits = model(inputs)
            print("logits shape:", logits.shape)
            break
    else:
        print("No .mat files found in", data_root)


Loaded 1442 samples from 1 mat file.
logits shape: torch.Size([8, 2])


In [77]:


if __name__ == "__main__":
   
    data_root = "/kaggle/input/amigos/AMIGOS DATASET"
    mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
    if mat_files:
        dataset = AMIGOSDataset(mat_files, grid_size=32)
        print(f"Loaded {len(dataset)} samples from {len(mat_files)} mat files.")
        loader = DataLoader(dataset, batch_size=8, shuffle=True)
        model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
        for batch in loader:
            inputs = batch['topo'] 
            labels = batch['label'] 
            logits = model(inputs)
            print("logits shape:", logits.shape)
            break
    else:
        print("No .mat files found in", data_root)

Loaded 129366 samples from 23 mat files.
logits shape: torch.Size([8, 2])


In [27]:
import os
import glob
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

def extract_binary_labels(label_tensor, threshold=5.0):
    # If labels are shaped (B, 1, 12), flatten to (B, 12)
    if label_tensor.dim() > 2:
        label_tensor = label_tensor.view(label_tensor.size(0), -1)
    labels = label_tensor[:, :2]
    return (labels >= threshold).float()

if __name__ == "__main__":
    data_root = "/kaggle/input/amigos/AMIGOS DATASET"

    all_mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
    mat_files = [all_mat_files[0]] if all_mat_files else []
    if not mat_files:
        raise FileNotFoundError(f"No .mat files found in {data_root}")

    # Build the dataset and DataLoader
    dataset = AMIGOSDataset(mat_files, grid_size=32)
    loader = DataLoader(dataset, batch_size=8, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
    model.to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # Training loop
    num_epochs = 3
    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        for batch in loader:
            inputs = batch["topo"].to(device)            
            raw_labels = batch["label"].to(device)       
            labels = extract_binary_labels(raw_labels)    
            optimizer.zero_grad()
            logits = model(inputs)                        
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        avg_loss = running_loss / len(loader.dataset)
        print(f"Epoch {epoch}: Train Loss = {avg_loss:.4f}")

    model.eval()
    correct_v = correct_a = 0
    total = 0
    with torch.no_grad():
        for batch in loader:
            inputs = batch["topo"].to(device)
            raw_labels = batch["label"].to(device)
            labels = extract_binary_labels(raw_labels)
            logits = model(inputs)
            preds = (logits > 0).float()
            correct_v += (preds[:, 0] == labels[:, 0]).sum().item()
            correct_a += (preds[:, 1] == labels[:, 1]).sum().item()
            total += labels.size(0)
    print(f"Training‑set accuracies – Valence: {correct_v/total:.3f}, Arousal: {correct_a/total:.3f}")

    save_path = "trained_weights.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Trained model weights saved to: {save_path}")


Epoch 1: Train Loss = 0.6200
Epoch 2: Train Loss = 0.5645
Epoch 3: Train Loss = 0.5439
Training‑set accuracies – Valence: 0.797, Arousal: 0.707
Trained model weights saved to: trained_weights.pth


for the whole dataset

In [78]:

#for the whole file run this


import os
import glob
import random
import numpy as np
from torch.utils.data import Subset, DataLoader

def subject_wise_split(mat_files, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, seed=42):

    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
    n_subjects = len(mat_files)
    subject_ids = list(range(n_subjects))
    random.seed(seed)
    random.shuffle(subject_ids)

    n_train = int(n_subjects * train_ratio)
    n_val   = int(n_subjects * val_ratio)

    train_subjects = subject_ids[:n_train]
    val_subjects   = subject_ids[n_train:n_train + n_val]
    test_subjects  = subject_ids[n_train + n_val:]

    train_indices, val_indices, test_indices = [], [], []
    sample_offset = 0
    for subj_idx, mfile in enumerate(mat_files):
        eeg_trials, label_trials = load_amigos_mat(mfile)
        segments, labels = window_eeg(eeg_trials, label_trials, fs=FS, win_sec=WIN_SEC, overlap=OVERLAP)
        num_samples = len(segments)
        idx_range = list(range(sample_offset, sample_offset + num_samples))

        if subj_idx in train_subjects:
            train_indices.extend(idx_range)
        elif subj_idx in val_subjects:
            val_indices.extend(idx_range)
        else:
            test_indices.extend(idx_range)

        sample_offset += num_samples

    return train_indices, val_indices, test_indices

data_root = "/kaggle/input/amigos/AMIGOS DATASET"
mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))

dataset = AMIGOSDataset(mat_files, grid_size=32)

train_idx, val_idx, test_idx = subject_wise_split(mat_files)

train_ds = Subset(dataset, train_idx)
val_ds   = Subset(dataset, val_idx)
test_ds  = Subset(dataset, test_idx)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=64, shuffle=False)
test_loader  = DataLoader(test_ds, batch_size=64, shuffle=False)

print(f"Train subjects: {len(train_idx)} samples")
print(f"Val subjects:   {len(val_idx)} samples")
print(f"Test subjects:  {len(test_idx)} samples")


Train subjects: 90412 samples
Val subjects:   18756 samples
Test subjects:  20198 samples


In [None]:


import os
import glob
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

def extract_binary_labels(label_tensor, threshold=5.0):
    """
    Flatten the input label tensor and convert the first two values
    (valence and arousal) to binary (0/1) based on the threshold.
    """
    if label_tensor.dim() > 2:
        label_tensor = label_tensor.view(label_tensor.size(0), -1)
    labels = label_tensor[:, :2]
    return (labels >= threshold).float()

if __name__ == "__main__":
    data_root = "/kaggle/input/amigos/AMIGOS DATASET"

    # Load all .mat files
    mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
    if not mat_files:
        raise FileNotFoundError(f"No .mat files found in {data_root}")

    # Build the dataset and DataLoader using all files
    dataset = AMIGOSDataset(mat_files, grid_size=32)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Set up model, loss, optimizer
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
    model.to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # Training 
    num_epochs = 5
    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        for batch in loader:
            inputs = batch["topo"].to(device)          
            raw_labels = batch["label"].to(device)       
            labels = extract_binary_labels(raw_labels)    
            optimizer.zero_grad()
            logits = model(inputs)                        
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        avg_loss = running_loss / len(loader.dataset)
        print(f"Epoch {epoch}: Train Loss = {avg_loss:.4f}")

    model.eval()
    correct_v = correct_a = 0
    total = 0
    with torch.no_grad():
        for batch in loader:
            inputs = batch["topo"].to(device)
            raw_labels = batch["label"].to(device)
            labels = extract_binary_labels(raw_labels)
            logits = model(inputs)
            preds = (logits > 0).float()
            correct_v += (preds[:, 0] == labels[:, 0]).sum().item()
            correct_a += (preds[:, 1] == labels[:, 1]).sum().item()
            total += labels.size(0)
    print(f"Training‑set accuracies – Valence: {correct_v/total:.3f}, Arousal: {correct_a/total:.3f}")

    save_path = "trained_weights.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Trained model weights saved to: {save_path}")


**test**

In [31]:
#testing for a single file
import os
import glob
import torch
from torch.utils.data import DataLoader

def extract_binary_labels(label_tensor, threshold=5.0):
    if label_tensor.dim() > 2:
        label_tensor = label_tensor.view(label_tensor.size(0), -1)
    labels = label_tensor[:, :2]
    return (labels >= threshold).float()

def evaluate_model(model, loader, device):
    model.eval()
    correct_v = correct_a = total = 0
    with torch.no_grad():
        for batch in loader:
            inputs = batch['topo'].to(device)
            raw_labels = batch['label'].to(device)
            labels = extract_binary_labels(raw_labels)
            logits = model(inputs)
            preds = (logits > 0).float()
            correct_v += (preds[:, 0] == labels[:, 0]).sum().item()
            correct_a += (preds[:, 1] == labels[:, 1]).sum().item()
            total += labels.size(0)
    return (
        correct_v / total if total > 0 else 0.0,
        correct_a / total if total > 0 else 0.0
    )

data_root = "/kaggle/input/amigos/AMIGOS DATASET"
weights_path = "/kaggle/working/trained_weights.pth"

all_mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
single_file = [all_mat_files[0]] if all_mat_files else []
if not single_file:
    raise FileNotFoundError(f"No .mat files found in {data_root}")

single_dataset = AMIGOSDataset(single_file, grid_size=32)
single_loader = DataLoader(single_dataset, batch_size=32, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
model.load_state_dict(torch.load(weights_path, map_location=device))
model.to(device)

val_acc, arousal_acc = evaluate_model(model, single_loader, device)
print(f"Single-file evaluation – Valence: {val_acc:.3f}, Arousal: {arousal_acc:.3f}")


Single-file evaluation – Valence: 0.797, Arousal: 0.707


testing for entire dataset

In [None]:
import os
import glob
import torch
from torch.utils.data import DataLoader


data_root = "/kaggle/input/amigos/AMIGOS DATASET"
weights_path = "/kaggle/working/trained_weights.pth"

all_mat_files = sorted(glob.glob(os.path.join(data_root, "*.mat")))
if not all_mat_files:
    raise FileNotFoundError(f"No .mat files found in {data_root}")

full_dataset = AMIGOSDataset(all_mat_files, grid_size=32)
full_loader = DataLoader(full_dataset, batch_size=32, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionRecognitionModel(in_channels=5, embed_dim=768, num_classes=2)
model.load_state_dict(torch.load(weights_path, map_location=device))
model.to(device)

val_acc, arousal_acc = evaluate_model(model, full_loader, device)
print(f"Full-dataset evaluation – Valence: {val_acc:.3f}, Arousal: {arousal_acc:.3f}")
