In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import os
import scipy.io
from scipy.signal import butter, lfilter
import scipy.signal

In [5]:
# Load DEAP dataset (Change path accordingly)
data_path = "E:/FYP/Finalise Fyp/EEg-based-Emotion-Recognition/data_preprocessed_matlab/"
subject_data = []
subject_labels = []

# Load all 32 subjects
for i in range(1, 33):
    mat = scipy.io.loadmat(f"{data_path}s{i:02d}.mat")
    
    # Extract EEG data and labels
    subject_data.append(mat["data"])     # Shape: (40 trials, 40 channels, 8064 samples)
    subject_labels.append(mat["labels"]) # Shape: (40 trials, 4 labels)

# Convert lists to NumPy arrays
subject_data = np.array(subject_data)     # Expected Shape: (32, 40, 40, 8064)
subject_labels = np.array(subject_labels) # Expected Shape: (32, 40, 4)

# Print shapes to confirm
print("EEG Data Shape:", subject_data.shape)   # (32, 40, 40, 8064)
print("Labels Shape:", subject_labels.shape)   # (32, 40, 4)


EEG Data Shape: (32, 40, 40, 8064)
Labels Shape: (32, 40, 4)


In [6]:
import numpy as np
import scipy.signal
# Frequency band definitions
freq_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 14),
    "beta": (14, 30),
    "gamma": (31, 50),
}

# Define function to compute Differential Entropy (DE)
def compute_de(signal):
    """Compute Differential Entropy (DE) for a given EEG segment"""
    variance = np.var(signal, axis=-1, keepdims=True)  # Compute variance
    de = 0.5 * np.log(2 * np.pi * np.e * variance)  # Apply DE formula
    return de.squeeze()  # Remove extra dimensions

# Define function to extract DE features
def extract_de_features(subject_data, fs=128, window_size=128):
    """
    Extract DE features from EEG data.
    - subject_data: EEG data of shape (32, 40, 40, 8064)
    - fs: Sampling frequency (128 Hz)
    - window_size: 1 second (128 samples)
    Returns: DE feature array of shape (32, 40, 40, 5, 63)
    """
    num_subjects, num_trials, num_channels, num_samples = subject_data.shape
    num_bands = len(freq_bands)
    num_windows = num_samples // window_size  # 8064 / 128 = 63 windows

    # Initialize DE feature array
    de_features = np.zeros((num_subjects, num_trials, num_channels, num_bands, num_windows))

    # Loop through subjects, trials, and channels
    for subj in range(num_subjects):
        for trial in range(num_trials):
            for ch in range(num_channels):
                # Extract single-channel EEG data for this trial
                signal = subject_data[subj, trial, ch, :]

                # Apply bandpass filters and compute DE for each frequency band
                for b_idx, (band, (low, high)) in enumerate(freq_bands.items()):
                    # Bandpass filter
                    sos = scipy.signal.butter(4, [low, high], btype="bandpass", fs=fs, output="sos")
                    filtered_signal = scipy.signal.sosfilt(sos, signal)

                    # Segment into 1-second windows (128 samples each)
                    segmented = np.array(np.split(filtered_signal, num_windows, axis=-1))

                    # Compute DE for each window
                    de_features[subj, trial, ch, b_idx, :] = compute_de(segmented)

    return de_features

# Extract DE features
de_features = extract_de_features(subject_data)

# Print shape to confirm
print("DE Feature Shape:", de_features.shape)  # Expected: (32, 40, 40, 5, 63)

DE Feature Shape: (32, 40, 40, 5, 63)


In [7]:
print(type(de_features))  # Should be <class 'numpy.ndarray'>, NOT a tensor


<class 'numpy.ndarray'>


In [8]:
import numpy as np

def normalize_de_features(de_features):
    """
    Normalize DE features using Z-score normalization.
    Ensures compatibility with NumPy arrays.
    """
    de_features = np.array(de_features)  # Ensure NumPy format
    print("Normalization Step - Shape:", de_features.shape)  # Debugging

    # Compute mean and std deviation across subjects & trials (axes 0,1), not windows
    mean = np.mean(de_features, axis=(0, 1), keepdims=True)  
    std = np.std(de_features, axis=(0, 1), keepdims=True)  

    return (de_features - mean) / (std + 1e-6)  # Normalize & avoid division by zero

# Apply normalization
de_features = normalize_de_features(de_features)

# Print shape to confirm
print("Normalized DE Feature Shape:", de_features.shape)  # Should remain (32, 40, 40, 5, 63)


Normalization Step - Shape: (32, 40, 40, 5, 63)
Normalized DE Feature Shape: (32, 40, 40, 5, 63)


In [9]:
### Reshaping defeatures

from sklearn.decomposition import PCA

# Reshape DE features to (1280, 12600)
reshaped_features = de_features.reshape(-1, 12600)  

# Apply PCA with the desired feature size (e.g., 500 instead of 200)
pca = PCA(n_components=1000)
pca_features = pca.fit_transform(reshaped_features)  # Shape: (1280, 500)

# Reshape back to (32, 40, 500)
pca_features = pca_features.reshape(32, 40, 1000)

# Print shape to confirm
print("PCA Reduced Feature Shape:", pca_features.shape)  # Expected: (32, 40, 500)


PCA Reduced Feature Shape: (32, 40, 1000)


In [10]:
print("Explained variance by first 1000 components:", sum(pca.explained_variance_ratio_))


Explained variance by first 1000 components: 0.9912161190295471


**COMMON FEATURE EXTRACTION**

In [11]:
#1. Common Feature Extractor (CFE)
class CommonFeatureExtractor(nn.Module):
    def __init__(self, input_dim=1000, output_dim=64):
        super(CommonFeatureExtractor, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, output_dim)
        self.activation = nn.LeakyReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.activation(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.activation(self.bn2(self.fc2(x)))
        x = self.fc3(x)
        return x

**CHECKING CFE IMPLEMENTATION**

In [12]:
# Generate random input matching PCA-reduced feature size
test_input = torch.randn(32, 40, 500)  # Batch of 32 subjects, 40 trials each

# Initialize CFE model
cfe = CommonFeatureExtractor(input_dim=500, output_dim=64)

# Forward pass
output = cfe(test_input.view(-1, 500))  # Flatten batch & trials

# Check output shape
print("Input Shape:", test_input.shape)    # Expected: (32, 40, 500)
print("Output Shape:", output.shape)       # Expected: (32*40, 64) → (1280, 64)


Input Shape: torch.Size([32, 40, 500])
Output Shape: torch.Size([1280, 64])
