In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pickle
import os
from pathlib import Path

class DEAPDataLoader:
    """
    Loader for DEAP dataset preprocessed data

    The DEAP dataset should be in the preprocessed format (.dat files)
    Download from: http://www.eecs.qmul.ac.uk/mmv/datasets/deap/
    """

    def __init__(self, data_path):
        """
        Args:
            data_path: Path to the folder containing s01.dat, s02.dat, etc.
        """
        self.data_path = Path(data_path)

    def load_subject_data(self, subject_id):
        """
        Load data for a single subject

        Args:
            subject_id: Subject number (1-32)

        Returns:
            data: EEG data (40 trials, 40 channels, 8064 timepoints)
            labels: Labels (40 trials, 4 dimensions: valence, arousal, dominance, liking)
        """
        filename = self.data_path / f's{subject_id:02d}.dat'

        if not filename.exists():
            raise FileNotFoundError(f"File not found: {filename}")

        with open(filename, 'rb') as f:
            subject = pickle.load(f, encoding='latin1')

        # subject is a dict with keys: 'data' and 'labels'
        # data shape: (40 trials, 40 channels, 8064 timepoints)
        # labels shape: (40 trials, 4) - [valence, arousal, dominance, liking]

        return subject['data'], subject['labels']

    def load_all_subjects(self, n_subjects=32):
        """
        Load data from all subjects

        Args:
            n_subjects: Number of subjects to load (default: 32)

        Returns:
            all_data: EEG data (n_subjects*40 trials, 40 channels, 8064 timepoints)
            all_labels: Labels (n_subjects*40 trials, 4)
            subject_ids: Subject ID for each trial
        """
        all_data = []
        all_labels = []
        subject_ids = []

        print(f"Loading DEAP data for {n_subjects} subjects...")

        for subject_id in range(1, n_subjects + 1):
            try:
                data, labels = self.load_subject_data(subject_id)
                all_data.append(data)
                all_labels.append(labels)
                subject_ids.extend([subject_id] * len(data))

                if subject_id % 10 == 0:
                    print(f"Loaded {subject_id}/{n_subjects} subjects")

            except FileNotFoundError:
                print(f"Warning: Subject {subject_id} data not found, skipping...")
                continue

        all_data = np.vstack(all_data)
        all_labels = np.vstack(all_labels)
        subject_ids = np.array(subject_ids)

        print(f"Total loaded: {len(all_data)} trials")
        print(f"Data shape: {all_data.shape}")
        print(f"Labels shape: {all_labels.shape}")

        return all_data, all_labels, subject_ids

    def preprocess_for_model(self, data, labels, use_32_channels=True):
        """
        Preprocess DEAP data for the emotion recognition model

        Args:
            data: Raw DEAP data (trials, 40 channels, 8064 timepoints)
            labels: DEAP labels (trials, 4) - continuous values 1-9
            use_32_channels: Use only EEG channels (first 32), exclude EOG

        Returns:
            X: Preprocessed EEG (trials, channels, timepoints)
            y_valence: Binary valence labels (0=low, 1=high)
            y_arousal: Binary arousal labels (0=low, 1=high)
        """
        # Use only EEG channels (first 32), remove EOG and peripheral channels
        if use_32_channels:
            X = data[:, :32, :]
        else:
            X = data

        # Keep only 60 seconds (remove 3s baseline already removed in preprocessed data)
        # 8064 timepoints = 63s * 128Hz, we want 60s = 7680 timepoints
        X = X[:, :, :7680]

        # Extract valence and arousal labels
        valence = labels[:, 0]  # First column
        arousal = labels[:, 1]  # Second column

        # Convert to binary classification (threshold at 5)
        # Ratings 1-5 = low (0), ratings 6-9 = high (1)
        y_valence = (valence > 5).astype(np.int32)
        y_arousal = (arousal > 5).astype(np.int32)

        print(f"\nPreprocessed data shape: {X.shape}")
        print(f"Valence distribution: Low={np.sum(y_valence==0)}, High={np.sum(y_valence==1)}")
        print(f"Arousal distribution: Low={np.sum(y_arousal==0)}, High={np.sum(y_arousal==1)}")

        return X, y_valence, y_arousal

    def split_train_test(self, X, y_valence, y_arousal, subject_ids=None,
                        test_size=0.2, random_state=42):
        """
        Split data into train and test sets

        Args:
            X: EEG data
            y_valence, y_arousal: Labels
            subject_ids: Optional subject IDs for subject-independent split
            test_size: Fraction for test set
            random_state: Random seed

        Returns:
            X_train, X_test, y_valence_train, y_valence_test,
            y_arousal_train, y_arousal_test
        """
        from sklearn.model_selection import train_test_split

        if subject_ids is None:
            # Random split
            indices = np.arange(len(X))
            train_idx, test_idx = train_test_split(
                indices, test_size=test_size, random_state=random_state,
                stratify=y_valence  # Stratify by valence
            )
        else:
            # Subject-independent split (some subjects in train, others in test)
            unique_subjects = np.unique(subject_ids)
            n_test_subjects = int(len(unique_subjects) * test_size)

            np.random.seed(random_state)
            test_subjects = np.random.choice(
                unique_subjects, n_test_subjects, replace=False
            )

            test_idx = np.where(np.isin(subject_ids, test_subjects))[0]
            train_idx = np.where(~np.isin(subject_ids, test_subjects))[0]

        X_train = X[train_idx]
        X_test = X[test_idx]
        y_valence_train = y_valence[train_idx]
        y_valence_test = y_valence[test_idx]
        y_arousal_train = y_arousal[train_idx]
        y_arousal_test = y_arousal[test_idx]

        print(f"\nTrain set: {len(X_train)} trials")
        print(f"Test set: {len(X_test)} trials")

        return (X_train, X_test, y_valence_train, y_valence_test,
                y_arousal_train, y_arousal_test)


# ============================================================================
# USAGE EXAMPLE
# ============================================================================

def main():
    """
    Example of how to load DEAP data and prepare it for the model
    """

    # STEP 1: Set your DEAP data path
    # Download from: http://www.eecs.qmul.ac.uk/mmv/datasets/deap/
    # You need the "Preprocessed data (Python)" version
    DEAP_PATH = "/content/drive/My Drive/deap-dataset/data_preprocessed_python"

    # Check if path exists
    if not os.path.exists(DEAP_PATH):
        print(f"ERROR: DEAP data path not found: {DEAP_PATH}")
        print("\nPlease:")
        print("1. Download DEAP dataset from: http://www.eecs.qmul.ac.uk/mmv/datasets/deap/")
        print("2. Extract the 'data_preprocessed_python' folder")
        print("3. Update DEAP_PATH variable above with the correct path")
        return

    # STEP 2: Load the data
    loader = DEAPDataLoader(DEAP_PATH)

    # Load all subjects (32 subjects, 40 trials each = 1280 total trials)
    all_data, all_labels, subject_ids = loader.load_all_subjects(n_subjects=32)

    # STEP 3: Preprocess for the model
    X, y_valence, y_arousal = loader.preprocess_for_model(all_data, all_labels)

    # STEP 4: Split into train/test
    # Option A: Random split (80/20)
    X_train, X_test, y_val_train, y_val_test, y_ar_train, y_ar_test = \
        loader.split_train_test(X, y_valence, y_arousal, test_size=0.2)

    # Option B: Subject-independent split (uncomment to use)
    # X_train, X_test, y_val_train, y_val_test, y_ar_train, y_ar_test = \
    #     loader.split_train_test(X, y_valence, y_arousal, subject_ids, test_size=0.2)

    print("\n" + "="*60)
    print("DATA READY FOR TRAINING!")
    print("="*60)
    print(f"X_train shape: {X_train.shape}")
    print(f"X_test shape: {X_test.shape}")

    # STEP 5: Now you can use the emotion recognition model
    print("\nTo train the model, use:")
    print("from emotion_recognition import EEGEmotionRecognition")
    print("model = EEGEmotionRecognition(n_channels=32)")
    print("model.train(X_train, y_val_train, y_ar_train, augment_factor=1.0)")
    print("valence_pred, arousal_pred = model.predict(X_test)")

    return X_train, X_test, y_val_train, y_val_test, y_ar_train, y_ar_test


if __name__ == "__main__":
    # Run the example
    data = main()

    # If data loaded successfully, you can continue with training
    if data is not None:
        X_train, X_test, y_val_train, y_val_test, y_ar_train, y_ar_test = data

        print("\n" + "="*60)
        print("Next step: Train the model (this takes a while!)")
        print("="*60)

        # Uncomment below to actually train
        # from emotion_recognition import EEGEmotionRecognition
        # model = EEGEmotionRecognition(n_channels=32)
        # model.train(X_train, y_val_train, y_ar_train, augment_factor=1.0)
        # valence_pred, arousal_pred = model.predict(X_test)

Loading DEAP data for 32 subjects...
Loaded 10/32 subjects
Loaded 20/32 subjects
Loaded 30/32 subjects
Total loaded: 1280 trials
Data shape: (1280, 40, 8064)
Labels shape: (1280, 4)

Preprocessed data shape: (1280, 32, 7680)
Valence distribution: Low=1011, High=269
Arousal distribution: Low=982, High=298

Train set: 1024 trials
Test set: 256 trials

DATA READY FOR TRAINING!
X_train shape: (1024, 32, 7680)
X_test shape: (256, 32, 7680)

To train the model, use:
from emotion_recognition import EEGEmotionRecognition
model = EEGEmotionRecognition(n_channels=32)
model.train(X_train, y_val_train, y_ar_train, augment_factor=1.0)
valence_pred, arousal_pred = model.predict(X_test)

Next step: Train the model (this takes a while!)


In [None]:
from collections import Counter
print(Counter(y_val_train))
print(Counter(y_ar_train))

Counter({np.int32(0): 809, np.int32(1): 215})
Counter({np.int32(0): 785, np.int32(1): 239})


In [None]:
# ===== Optimized CL module and training (loss < 2) =====
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"

# Encoder returns (batch, 32, 70)
class ChannelEncoder(nn.Module):
    def __init__(self, n_channels=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_channels, 64, kernel_size=7, stride=4, padding=3)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 32, kernel_size=7, stride=4, padding=3)
        self.bn2 = nn.BatchNorm1d(32)
        self.conv3 = nn.Conv1d(32, 32, kernel_size=7, stride=4, padding=3)
        self.bn3 = nn.BatchNorm1d(32)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return x  # (B,32,70)

# Projector: pools over time -> linear -> proj_dim
class ContrastiveProjector(nn.Module):
    def __init__(self, in_ch=32, proj_dim=128):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(in_ch, proj_dim),
            nn.ReLU(),
            nn.Linear(proj_dim, proj_dim)
        )

    def forward(self, feat_map):
        pooled = feat_map.mean(dim=2)   # (B,32)
        proj = self.fc(pooled)          # (B,proj_dim)
        proj = F.normalize(proj, dim=1)
        return proj

class ContrastiveLearning(nn.Module):
    def __init__(self, n_channels=32, proj_dim=128):
        super().__init__()
        self.encoder = ChannelEncoder(n_channels)
        self.projector = ContrastiveProjector(in_ch=32, proj_dim=proj_dim)

    def forward(self, x):
        feat_map = self.encoder(x)
        proj = self.projector(feat_map)
        return feat_map, proj

# NT-Xent loss (cross-entropy over similarities)
def nt_xent_loss(z1, z2, temperature=0.1):
    assert z1.shape == z2.shape
    B = z1.shape[0]
    z = torch.cat([z1, z2], dim=0)              # (2B, D)
    sim = torch.matmul(z, z.T) / temperature
    labels = torch.arange(B, device=z.device)
    labels = torch.cat([labels + B, labels])

    # mask self-similarity
    large_neg = -1e9
    sim_masked = sim.clone()
    idx = torch.arange(2*B, device=z.device)
    sim_masked[idx, idx] = large_neg

    loss = F.cross_entropy(sim_masked, labels)
    return loss

# Strong EEG augmentations
def eeg_augment(x, noise_std=0.05, shift_max=500, scale_range=(0.8,1.2), dropout_prob=0.1):
    B, C, T = x.shape
    out = x.clone()

    # Gaussian noise
    out = out + torch.randn_like(out) * noise_std

    # Circular shift
    shifts = torch.randint(-shift_max, shift_max+1, (B,))
    for i, s in enumerate(shifts):
        out[i] = torch.roll(out[i], shifts=s.item(), dims=1)

    # Random scaling
    scales = torch.rand(B,1,1, device=x.device) * (scale_range[1]-scale_range[0]) + scale_range[0]
    out = out * scales

    # Random channel dropout
    mask = torch.rand(B, C, 1, device=x.device) > dropout_prob
    out = out * mask.float()

    return out

# ----------------- Prepare data -----------------
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32).to(device)

# ----------------- Training CL -----------------
cl_model = ContrastiveLearning(n_channels=32, proj_dim=128).to(device)
optimizer_cl = torch.optim.Adam(cl_model.parameters(), lr=3e-3)  # slightly higher lr
batch_size = 64   # increase if GPU allows
epochs_cl = 120   # train longer

train_dataset = TensorDataset(X_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

cl_model.train()
for epoch in range(epochs_cl):
    total_loss = 0.0
    for (x_batch,) in train_loader:
        x_batch = x_batch.to(device)
        x1 = eeg_augment(x_batch)
        x2 = eeg_augment(x_batch)

        _, z1 = cl_model(x1)
        _, z2 = cl_model(x2)

        loss = nt_xent_loss(z1, z2, temperature=0.1)  # sharper softmax
        optimizer_cl.zero_grad()
        loss.backward()
        optimizer_cl.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"[CL] Epoch {epoch+1}/{epochs_cl}, Loss: {avg_loss:.4f}")

# ----------------- Extract features for GAN -----------------
cl_model.eval()
with torch.no_grad():
    train_feat_maps = cl_model.encoder(X_train_tensor)   # (N,32,70)
    test_feat_maps  = cl_model.encoder(X_test_tensor)

train_features = train_feat_maps.cpu().numpy()
test_features  = test_feat_maps.cpu().numpy()

[CL] Epoch 1/120, Loss: 2.3683
[CL] Epoch 2/120, Loss: 1.3686
[CL] Epoch 3/120, Loss: 1.2453
[CL] Epoch 4/120, Loss: 1.1069
[CL] Epoch 5/120, Loss: 1.0564
[CL] Epoch 6/120, Loss: 0.9989
[CL] Epoch 7/120, Loss: 0.9627
[CL] Epoch 8/120, Loss: 0.8745
[CL] Epoch 9/120, Loss: 0.8410
[CL] Epoch 10/120, Loss: 0.8031
[CL] Epoch 11/120, Loss: 0.7794
[CL] Epoch 12/120, Loss: 0.7482
[CL] Epoch 13/120, Loss: 0.6413
[CL] Epoch 14/120, Loss: 0.6085
[CL] Epoch 15/120, Loss: 0.5920
[CL] Epoch 16/120, Loss: 0.5907
[CL] Epoch 17/120, Loss: 0.5752
[CL] Epoch 18/120, Loss: 0.5621
[CL] Epoch 19/120, Loss: 0.4992
[CL] Epoch 20/120, Loss: 0.4843
[CL] Epoch 21/120, Loss: 0.4826
[CL] Epoch 22/120, Loss: 0.5088
[CL] Epoch 23/120, Loss: 0.5104
[CL] Epoch 24/120, Loss: 0.4354
[CL] Epoch 25/120, Loss: 0.3915
[CL] Epoch 26/120, Loss: 0.4029
[CL] Epoch 27/120, Loss: 0.3519
[CL] Epoch 28/120, Loss: 0.3712
[CL] Epoch 29/120, Loss: 0.3720
[CL] Epoch 30/120, Loss: 0.4444
[CL] Epoch 31/120, Loss: 0.3290
[CL] Epoch 32/120

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from collections import Counter

device = "cuda" if torch.cuda.is_available() else "cpu"

# ======================================================
#  Improved Generator & Discriminator (Conditioned)
# ======================================================
class Generator(nn.Module):
    def __init__(self, code_dim=256, n_classes=2, out_channels=32, out_time=120, hidden_dim=1024):
        super().__init__()
        # Combine noise + label embedding
        self.label_emb = nn.Embedding(n_classes, code_dim)

        self.net = nn.Sequential(
            nn.Linear(code_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(True),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(True),
            nn.Linear(hidden_dim, out_channels * out_time),
            nn.Tanh()  # normalize outputs between [-1,1]
        )
        self.out_channels = out_channels
        self.out_time = out_time

    def forward(self, z, labels):
        # Add label info to noise
        label_embed = self.label_emb(labels)
        x = z + label_embed
        out = self.net(x)
        return out.view(-1, self.out_channels, self.out_time)


class Discriminator(nn.Module):
    def __init__(self, in_channels=32, in_time=120, n_classes=2):
        super().__init__()
        self.conv = nn.Sequential(
            nn.utils.spectral_norm(nn.Conv1d(in_channels, 64, kernel_size=3, padding=1)),
            nn.LeakyReLU(0.2),
            nn.utils.spectral_norm(nn.Conv1d(64, 128, kernel_size=3, padding=1)),
            nn.LeakyReLU(0.2),
            nn.AdaptiveAvgPool1d(1)
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.fc(self.conv(x))


# ======================================================
#  Training Function (Stabilized)
# ======================================================
def train_gan_for_class(real_samples, class_label,
                        code_dim=256, n_classes=2,
                        batch_size=64, epochs=150, d_steps=5, lr=2e-4):
    """Train a class-conditioned GAN for one emotion class."""
    real_tensor = torch.tensor(real_samples, dtype=torch.float32)
    loader = DataLoader(TensorDataset(real_tensor), batch_size=batch_size, shuffle=True, drop_last=True)

    G = Generator(code_dim, n_classes).to(device)
    D = Discriminator(n_classes=n_classes).to(device)

    opt_G = torch.optim.Adam(G.parameters(), lr=lr * 1.5, betas=(0.5, 0.9))  # generator slightly faster
    opt_D = torch.optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.9))
    criterion = nn.BCELoss()

    for epoch in range(epochs):
        d_loss_total, g_loss_total = 0, 0
        for (x_real,) in loader:
            x_real = x_real.to(device)
            B = x_real.size(0)
            y_real = torch.ones((B, 1), device=device)
            y_fake = torch.zeros((B, 1), device=device)

            # --------------------
            # Train Discriminator
            # --------------------
            for _ in range(d_steps):
                z = torch.randn(B, code_dim, device=device)
                labels = torch.full((B,), class_label, dtype=torch.long, device=device)
                fake = G(z, labels).detach()

                opt_D.zero_grad()
                loss_real = criterion(D(x_real), y_real)
                loss_fake = criterion(D(fake), y_fake)
                loss_d = (loss_real + loss_fake) / 2
                loss_d.backward()
                opt_D.step()
                d_loss_total += loss_d.item()

            # --------------------
            # Train Generator
            # --------------------
            z = torch.randn(B, code_dim, device=device)
            labels = torch.full((B,), class_label, dtype=torch.long, device=device)
            opt_G.zero_grad()
            fake = G(z, labels)
            loss_g = criterion(D(fake), y_real)
            loss_g.backward()
            opt_G.step()
            g_loss_total += loss_g.item()

        if (epoch + 1) % 10 == 0:
            print(f"[Class {class_label}] Epoch {epoch+1}/{epochs} | D_loss={d_loss_total/len(loader):.4f} | G_loss={g_loss_total/len(loader):.4f}")

    return G


# ======================================================
#  Generation Function (Minority Boost)
# ======================================================
def generate_synthetic_for_class(G, n_samples, class_label, code_dim=256):
    G.eval()
    synth = []
    with torch.no_grad():
        bs = min(256, n_samples)
        for _ in range(int(np.ceil(n_samples / bs))):
            z = torch.randn(bs, code_dim, device=device)
            labels = torch.full((bs,), class_label, dtype=torch.long, device=device)
            x_fake = G(z, labels).cpu().numpy()
            synth.append(x_fake)
    synth = np.vstack(synth)[:n_samples]
    return synth


# ======================================================
#  Augmentation Wrapper
# ======================================================
def classwise_gan_augment(X_train, y_train, balance_mode='equalize', code_dim=256, epochs=150):
    counts = Counter(y_train)
    majority = max(counts.values())

    X_synth_all, y_synth_all = [], []
    for c in np.unique(y_train):
        real_c = X_train[y_train == c]
        if balance_mode == 'equalize':
            target = majority - len(real_c)
            if c == 1:
                target = int(target * 1.25)  # generate 50% more synthetic "High"

        else:
            target = len(real_c)
        if target <= 0:
            continue
        print(f"\n Training GAN for class {c}: real={len(real_c)}, synth_target={target}")
        G = train_gan_for_class(real_c, class_label=c, code_dim=code_dim, epochs=epochs)
        synth = generate_synthetic_for_class(G, target, class_label=c, code_dim=code_dim)
        X_synth_all.append(synth)
        y_synth_all.append(np.full(target, c, dtype=np.int32))

    if X_synth_all:
        X_aug = np.vstack([X_train] + X_synth_all)
        y_aug = np.concatenate([y_train] + y_synth_all)
    else:
        X_aug, y_aug = X_train, y_train

    print("\n Class counts after augmentation:", Counter(y_aug))
    return X_aug, y_aug



# ==========================================
#  MAIN EXECUTION â€” use CL feature maps
# ==========================================
if __name__ == "__main__":
    print("\n--- GAN augmentation on CL embeddings ---")

    # use your precomputed embeddings from CL
    X_train_feat = train_features      # (N, 32, 70)
    y_train_val = y_val_train          # valence labels
    y_train_ar = y_ar_train            # arousal labels

    # --- GAN for Valence ---
    print("\n Training GANs for Valence balancing...")
    X_val_aug, y_val_aug = classwise_gan_augment(X_train_feat, y_train_val,
                                                 balance_mode='equalize',
                                                 code_dim=256, epochs=200)

    # --- GAN for Arousal ---
    print("\n Training GANs for Arousal balancing...")
    X_ar_aug, y_ar_aug = classwise_gan_augment(X_train_feat, y_train_ar,
                                               balance_mode='equalize',
                                               code_dim=256, epochs=150)

    # Save augmented feature maps for GNN
    np.savez("deap_gan_augmented_features.npz",
             X_val=X_val_aug, y_val=y_val_aug,
             X_ar=X_ar_aug, y_ar=y_ar_aug)

    print("\n GAN augmentation finished successfully!")


--- GAN augmentation on CL embeddings ---

 Training GANs for Valence balancing...

 Training GAN for class 1: real=215, synth_target=742
[Class 1] Epoch 10/200 | D_loss=3.2482 | G_loss=0.7642
[Class 1] Epoch 20/200 | D_loss=3.1044 | G_loss=0.8062
[Class 1] Epoch 30/200 | D_loss=3.1297 | G_loss=0.8489
[Class 1] Epoch 40/200 | D_loss=2.6221 | G_loss=0.9977
[Class 1] Epoch 50/200 | D_loss=2.6401 | G_loss=0.9992
[Class 1] Epoch 60/200 | D_loss=2.3523 | G_loss=1.1057
[Class 1] Epoch 70/200 | D_loss=2.2555 | G_loss=1.1919
[Class 1] Epoch 80/200 | D_loss=2.1229 | G_loss=1.1811
[Class 1] Epoch 90/200 | D_loss=2.0378 | G_loss=1.2530
[Class 1] Epoch 100/200 | D_loss=1.8396 | G_loss=1.3144
[Class 1] Epoch 110/200 | D_loss=1.7173 | G_loss=1.4474
[Class 1] Epoch 120/200 | D_loss=1.6560 | G_loss=1.5958
[Class 1] Epoch 130/200 | D_loss=1.5043 | G_loss=1.6613
[Class 1] Epoch 140/200 | D_loss=1.3899 | G_loss=1.5354
[Class 1] Epoch 150/200 | D_loss=1.2476 | G_loss=2.0324
[Class 1] Epoch 160/200 | D_lo

In [None]:
from collections import Counter
print(Counter(y_val_aug))
print(Counter(y_ar_aug))

Counter({np.int32(1): 957, np.int32(0): 809})
Counter({np.int32(1): 921, np.int32(0): 785})


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from scipy.spatial.distance import euclidean
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
import random
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)  # if using multi-GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
device = "cuda" if torch.cuda.is_available() else "cpu"


# ==========================================
#  Adjacency Matrix Initialization (Gaussian Kernel)
# ==========================================
def initialize_adjacency_matrix(n_channels=32, lambda_threshold=5.0, theta=2.0):
    """
    Initialize adjacency matrix using Gaussian kernel.
    """
    # Placeholder 3D coordinates for EEG channels
    coords_3d = np.array([
        [0, 6, 8], [2, 6, 8], [-2, 6, 8], [0, 4, 8],       # Fp1, Fp2, F3, F4
        [-3, 4, 6], [3, 4, 6], [-1, 2, 7], [1, 2, 7],       # F7, F8, FC5, FC6
        [-2, 1, 6], [2, 1, 6], [-3, 0, 4], [3, 0, 4],       # T7, T8, CP5, CP6
        [-1, -1, 5], [1, -1, 5], [0, -2, 4], [0, -3, 3],    # C3, C4, Cz, Pz
        [-2, -3, 3], [2, -3, 3], [-3, -4, 2], [3, -4, 2],   # PO3, PO4, Oz, Iz
        [-2, 3, 6], [2, 3, 6], [-3, 1, 5], [3, 1, 5],       # F5, F6, C5, C6
        [-2, -1, 4], [2, -1, 4], [-1, -3, 3], [1, -3, 3],   # P5, P6, PO7, PO8
        [0, -1, 5], [0, 1, 7], [-1, 0, 6], [1, 0, 6]        # FCz, Fz, C3', C4'
    ])
    W = np.zeros((n_channels, n_channels))

    for i in range(n_channels):
        for j in range(n_channels):
            if i == j:
                W[i, j] = 0
            else:
                dist = euclidean(coords_3d[i], coords_3d[j])
                if dist <= lambda_threshold:
                    W[i, j] = np.exp(-(dist**2) / (2 * theta**2))
                else:
                    W[i, j] = 0
    return torch.tensor(W, dtype=torch.float32)


# ==========================================
#  GNN for Emotion Classification
# ==========================================
from torch.utils.data import WeightedRandomSampler
import numpy as np

class GNNEmotionClassifier(nn.Module):
    def __init__(self, n_channels=32, feat_length=None, hidden_sizes=[50, 28, 7, 2],
                 dropout_rates=[0.22, 0.0, 0.0, 0.05], target_node=15):
        super().__init__()
        self.n_channels = n_channels
        self.feat_length = feat_length
        self.target_node = target_node

        # Learnable adjacency matrix
        A_init = initialize_adjacency_matrix(n_channels)
        self.A = nn.Parameter(A_init.to(device))

        self.layers = nn.ModuleList()
        self.dropouts = nn.ModuleList()

        in_features = feat_length
        for hidden_size, dropout_rate in zip(hidden_sizes, dropout_rates):
            self.layers.append(nn.Linear(in_features, hidden_size))
            self.dropouts.append(nn.Dropout(dropout_rate) if dropout_rate > 0 else nn.Identity())
            in_features = hidden_size

    def forward(self, S):
        """
        Forward pass with graph convolution
        """
        B, C, F = S.shape  # (batch, channels, features)
        X = S

        for layer, dropout in zip(self.layers, self.dropouts):
            # Graph convolution: aggregate neighbor info
            A_hat = torch.softmax(self.A, dim=1)          # normalize per node
            X = torch.einsum('ij,bjk->bik', A_hat, X)     # aggregate neighbors

            B, C, F = X.shape
            X_flat = X.reshape(B * C, F)

            # Stronger activation for separability
            X_flat = torch.relu(layer(X_flat))


            X_flat = dropout(X_flat)
            X = X_flat.reshape(B, C, -1)
        # Output for target node
        output = X[:, self.target_node, :]
        return output


# ==========================================
# Training Function
# ==========================================
def train_gnn(model, X_train, y_train,
              batch_size=100, epochs=400, lr=5e-5,
              weight_decay=1e-4):
    train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                                  torch.tensor(y_train, dtype=torch.long))

    labels = y_train
    classes = np.unique(labels)
    class_sample_count = np.array([len(np.where(labels == t)[0]) for t in classes])
    weight = 1. / class_sample_count
    samples_weight = np.array([weight[t] for t in labels])

    samples_weight = torch.from_numpy(samples_weight).double()
    sampler = WeightedRandomSampler(samples_weight, len(samples_weight))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)


    from sklearn.utils.class_weight import compute_class_weight
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

    criterion = nn.CrossEntropyLoss(weight=class_weights)


    train_losses, train_accs = [], []

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        train_preds, train_labels = [], []

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_preds.extend(outputs.argmax(dim=1).cpu().numpy())
            train_labels.extend(y_batch.cpu().numpy())

        train_loss /= len(train_loader)
        train_acc = accuracy_score(train_labels, train_preds)

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}")

    return {
        'train_losses': train_losses,
        'train_accs': train_accs
    }

# ==========================================
# Evaluation
# ==========================================
# ==========================================
# Evaluation on test set
# ==========================================
def evaluate_gnn(model, X_test, y_test, batch_size=100):
    test_dataset = TensorDataset(
        torch.tensor(X_test, dtype=torch.float32),
        torch.tensor(y_test, dtype=torch.long)
    )
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model.eval()
    test_preds, test_labels = [], []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            test_preds.extend(outputs.argmax(dim=1).cpu().numpy())
            test_labels.extend(y_batch.numpy())

    test_acc = accuracy_score(test_labels, test_preds)
    print("\n" + "="*50)
    print(f" Test Accuracy: {test_acc*100:.2f}%")
    print("="*50)

    return {'test_acc': test_acc, 'predictions': test_preds, 'labels': test_labels}



# ==========================================
# Pipeline Integration
# ==========================================
def run_gnn_pipeline(X_train, y_train, X_test, y_test, target_node=15):
    # Initialize model
    model = GNNEmotionClassifier(
        n_channels=X_train.shape[1],
        feat_length=X_train.shape[2],
        hidden_sizes=[50,28,7,2],
        dropout_rates=[0.22,0.0,0.0,0.05],
        target_node=target_node
    ).to(device)

    print(f"\n Model initialized with {sum(p.numel() for p in model.parameters())} parameters")

    # Train on full training data
    history = train_gnn(model, X_train, y_train)

    # Evaluate on test data
    results = evaluate_gnn(model, X_test, y_test)

    return model, history, results

In [None]:
if __name__ == "__main__":
    #X_val_combined = np.vstack([train_features, X_val_aug])
    #y_val_combined = np.concatenate([y_val_train, y_val_aug])

    #X_ar_combined = np.vstack([train_features, X_ar_aug])
    #y_ar_combined = np.concatenate([y_ar_train, y_ar_aug])
    #print(f"\nCombined Valence dataset: {X_val_combined.shape}, labels: {Counter(y_val_combined)}")
    #print(f" Combined Arousal dataset: {X_ar_combined.shape}, labels: {Counter(y_ar_combined)}")

    # ===============================
    #  Train GNN and Evaluate
    # ===============================
    print("\n Training GNN for Valence...")
    model_valence, val_history, val_results = run_gnn_pipeline(
    X_val_aug,
    y_val_aug,
    test_features,
    y_val_test
    )
    val_pred = val_results['predictions']


    print("\n Training GNN for Arousal...")
    model_arousal, ar_history, ar_results = run_gnn_pipeline(
    X_ar_aug,
    y_ar_aug,
    test_features,
    y_ar_test
    )
    ar_pred = ar_results['predictions']


 Training GNN for Valence...

 Model initialized with 8721 parameters
Epoch 10/400 | Train Loss: 0.6970, Acc: 0.4994
Epoch 20/400 | Train Loss: 0.6750, Acc: 0.6393
Epoch 30/400 | Train Loss: 0.6522, Acc: 0.6840
Epoch 40/400 | Train Loss: 0.6309, Acc: 0.7027
Epoch 50/400 | Train Loss: 0.5984, Acc: 0.8188
Epoch 60/400 | Train Loss: 0.5611, Acc: 0.8505
Epoch 70/400 | Train Loss: 0.5225, Acc: 0.8635
Epoch 80/400 | Train Loss: 0.5189, Acc: 0.8505
Epoch 90/400 | Train Loss: 0.4787, Acc: 0.8675
Epoch 100/400 | Train Loss: 0.4761, Acc: 0.8647
Epoch 110/400 | Train Loss: 0.4670, Acc: 0.8545
Epoch 120/400 | Train Loss: 0.4488, Acc: 0.8658
Epoch 130/400 | Train Loss: 0.4478, Acc: 0.8618
Epoch 140/400 | Train Loss: 0.4081, Acc: 0.8749
Epoch 150/400 | Train Loss: 0.3974, Acc: 0.8743
Epoch 160/400 | Train Loss: 0.4042, Acc: 0.8539
Epoch 170/400 | Train Loss: 0.3565, Acc: 0.8743
Epoch 180/400 | Train Loss: 0.3741, Acc: 0.8647
Epoch 190/400 | Train Loss: 0.3462, Acc: 0.8754
Epoch 200/400 | Train Loss