In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import pearsonr
from scipy import stats

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [1]:
import pickle
import numpy as np
import os

def read_data(filename):
    with open(filename, 'rb') as f:
        x = pickle._Unpickler(f)
        x.encoding = 'latin1'
        data = x.load()
    return data

# List of participant file names
files = [f"{i:02}" for i in range(1, 33)]

labels = []
data = []

base_path = "/kaggle/input/deap-dataset/deap-dataset/data_preprocessed_python/"

for i in files:
    file_path = os.path.join(base_path, f"s{i}.dat")
    d = read_data(file_path)
    labels.append(d['labels'])
    data.append(d['data'])

## VAE model

In [29]:


class ConditionedVAE(nn.Module):
    def __init__(self, input_dim=2880, emotion_dim=4, latent_dim=128, hidden_dim=512):
        super(ConditionedVAE, self).__init__()
        
        self.input_dim = input_dim
        self.emotion_dim = emotion_dim
        self.latent_dim = latent_dim
        
        # encoder: EEG + emos
        self.encoder = nn.Sequential(
            nn.Linear(input_dim + emotion_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, hidden_dim//2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim//2, hidden_dim//4),
            nn.ReLU()
        )
        
        self.fc_mu = nn.Linear(hidden_dim//4, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim//4, latent_dim)
        
        # Decoder: latent space + emotions -> EEG
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + emotion_dim, hidden_dim//4),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim//4, hidden_dim//2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim//2, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, input_dim)
        )
    
    def encode(self, x, emotions):
        x_cond = torch.cat([x, emotions], dim=1)
        h = self.encoder(x_cond)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z, emotions):
        z_cond = torch.cat([z, emotions], dim=1)
        return self.decoder(z_cond)
    
    def forward(self, x, emotions):
        mu, logvar = self.encode(x, emotions)
        z = self.reparameterize(mu, logvar)
        recon_x = self.decode(z, emotions)
        return recon_x, mu, logvar

## Preprocessing

In [None]:
def preprocess_data(X, y, method='standardize'):
    """
    Preprocess EEG data and emotion labels
    """
    # Flatten EEG data
    X_flat = X.reshape(X.shape[0], -1)
    
    if method == 'standardize':
        scaler_X = StandardScaler()
        X_processed = scaler_X.fit_transform(X_flat)
    elif method == 'minmax':
        scaler_X = MinMaxScaler(feature_range=(-1, 1))
        X_processed = scaler_X.fit_transform(X_flat)
    
    # Normalize emotion labels
    scaler_y = StandardScaler()
    y_processed = scaler_y.fit_transform(y)
    
    return X_processed, y_processed, scaler_X, scaler_y

## Training

In [None]:
#new data generation function
def generate_new_samples(model, emotion_conditions, scaler_X, scaler_y, num_samples=10):
    """
    Generate new EEG samples given emotion conditions
    """
    device = next(model.parameters()).device
    model.eval()
    
    # Normalize emotion conditions
    emotion_normalized = scaler_y.transform(emotion_conditions)
    emotion_tensor = torch.FloatTensor(emotion_normalized).to(device)
    
    with torch.no_grad():
        # Sample from prior
        z = torch.randn(num_samples, model.latent_dim).to(device)
        
        # Decode with emotion conditions
        generated = model.decode(z, emotion_tensor)
        generated_np = generated.cpu().numpy()
        
        # Convert back to original scale
        generated_original = scaler_X.inverse_transform(generated_np)
        generated_shaped = generated_original.reshape(num_samples, 32, 90)
    
    return generated_shaped

In [None]:
def vae_loss_function(recon_x, x, mu, logvar, beta=1.0):
    """
    VAE loss = Reconstruction Loss + KL Divergence
    """
    # Reconstruction loss (MSE)
    recon_loss = nn.MSELoss(reduction='sum')(recon_x, x)
    
    # KL divergence
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + beta * kl_loss, recon_loss, kl_loss

In [None]:

def train_vae_with_evaluation(X, y, num_epochs=200, batch_size=32, lr=0.001, 
                             latent_dim=128, beta=1.0, test_size=0.2):
    """
    Complete VAE training with comprehensive evaluation
    """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Preprocess data
    print("Preprocessing data...")
    X_processed, y_processed, scaler_X, scaler_y = preprocess_data(X, y)
    
    # Split data into train/test
    X_train, X_test, y_train, y_test = train_test_split(
        X_processed, y_processed, test_size=test_size, random_state=42, stratify=None
    )
    
    print(f"Train set: {X_train.shape[0]} samples")
    print(f"Test set: {X_test.shape[0]} samples")
    
    # Convert to tensors
    X_train_tensor = torch.FloatTensor(X_train).to(device)
    y_train_tensor = torch.FloatTensor(y_train).to(device)
    X_test_tensor = torch.FloatTensor(X_test).to(device)
    y_test_tensor = torch.FloatTensor(y_test).to(device)
    
    # Create datasets and dataloaders
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
     
    # Initialize model
    model = ConditionedVAE(
        input_dim=X_processed.shape[1], 
        emotion_dim=y_processed.shape[1], 
        latent_dim=latent_dim
    ).to(device)
    
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # Optimizer
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=20, factor=0.5)
    
    # Training history
    train_losses = []
    test_losses = []
    train_recon_losses = []
    train_kl_losses = []
    
    print("Starting VAE training...")
    
    best_test_loss = float('inf')
    patience_counter = 0
    patience = 50
      for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0
        train_recon_loss = 0
        train_kl_loss = 0
        
        for batch_idx, (data, emotions) in enumerate(train_loader):
            optimizer.zero_grad()
            
            recon_data, mu, logvar = model(data, emotions)
            loss, recon_loss, kl_loss = vae_loss_function(recon_data, data, mu, logvar, beta)
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            train_recon_loss += recon_loss.item()
            train_kl_loss += kl_loss.item()
        
        # Evaluation phase
        model.eval()
        test_loss = 0
          
        with torch.no_grad():
            for data, emotions in test_loader:
                recon_data, mu, logvar = model(data, emotions)
                loss, _, _ = vae_loss_function(recon_data, data, mu, logvar, beta)
                test_loss += loss.item()
        
        # Calculate average losses
        avg_train_loss = train_loss / len(train_loader.dataset)
        avg_test_loss = test_loss / len(test_loader.dataset)
        avg_recon_loss = train_recon_loss / len(train_loader.dataset)
        avg_kl_loss = train_kl_loss / len(train_loader.dataset)
        
        # Store losses
        train_losses.append(avg_train_loss)
        test_losses.append(avg_test_loss)
        train_recon_losses.append(avg_recon_loss)
        train_kl_losses.append(avg_kl_loss)
        
        # Learning rate scheduling
        scheduler.step(avg_test_loss)
        
        # Early stopping
        if avg_test_loss < best_test_loss:
            best_test_loss = avg_test_loss
            patience_counter = 0
            # Save best model
            torch.save(model.state_dict(), 'best_vae_model.pth')
        else:
            patience_counter += 1
        
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch}")
            break
        
        # Print progress
        if epoch % 20 == 0 or epoch == num_epochs - 1:
            print(f'Epoch [{epoch}/{num_epochs}] | '
                  f'Train Loss: {avg_train_loss:.4f} | '
                  f'Test Loss: {avg_test_loss:.4f} | '
                  f'Recon: {avg_recon_loss:.4f} | '
                  f'KL: {avg_kl_loss:.4f}')
    
    # Load best model
    model.load_state_dict(torch.load('best_vae_model.pth'))
    
    return model, train_losses, test_losses, train_recon_losses, train_kl_losses, scaler_X, scaler_y, X_test, y_test

        

## Evaluation

In [None]:
def calculate_reconstruction_metrics(original, reconstructed):
    """
    Calculate various reconstruction accuracy metrics
    """
    # Flatten arrays for calculation
    orig_flat = original.flatten()
    recon_flat = reconstructed.flatten()
    
    # Mean Squared Error
    mse = mean_squared_error(orig_flat, recon_flat)
    
    # Root Mean Squared Error
    rmse = np.sqrt(mse)
    
    # Mean Absolute Error
    mae = mean_absolute_error(orig_flat, recon_flat)
    
    # R-squared score
    r2 = r2_score(orig_flat, recon_flat)
    
    # Pearson correlation
    correlation, p_value = pearsonr(orig_flat, recon_flat)
    
    # Signal-to-Noise Ratio
    signal_power = np.mean(orig_flat ** 2)
    noise_power = np.mean((orig_flat - recon_flat) ** 2)
    snr = 10 * np.log10(signal_power / noise_power) if noise_power > 0 else float('inf')
    
    # Percentage of variance explained
    variance_explained = 1 - (np.var(orig_flat - recon_flat) / np.var(orig_flat))
    
    return {
        'MSE': mse,
        'RMSE': rmse,
        'MAE': mae,
        'R2': r2,
        'Correlation': correlation,
        'P_value': p_value,
        'SNR_dB': snr,
        'Variance_Explained': variance_explained
    }

In [None]:
def comprehensive_evaluation(model, X_test, y_test, scaler_X, original_shape=(32, 90)):
    """
    Comprehensive evaluation of trained VAE
    """
    device = next(model.parameters()).device
    model.eval()
    
    # Convert test data to tensors
    X_test_tensor = torch.FloatTensor(X_test).to(device)
    y_test_tensor = torch.FloatTensor(y_test).to(device)
    
    with torch.no_grad():
        # Get reconstructions
        recon_data, mu, logvar = model(X_test_tensor, y_test_tensor)
        recon_data_np = recon_data.cpu().numpy()
    
    # Calculate reconstruction metrics
    metrics = calculate_reconstruction_metrics(X_test, recon_data_np)
    
    print("\n" + "="*50)
    print("VAE RECONSTRUCTION ACCURACY METRICS")
    print("="*50)
    print(f"Mean Squared Error (MSE):     {metrics['MSE']:.6f}")
    print(f"Root Mean Squared Error:      {metrics['RMSE']:.6f}")
    print(f"Mean Absolute Error:          {metrics['MAE']:.6f}")
    print(f"R-squared Score:              {metrics['R2']:.4f}")
    print(f"Pearson Correlation:          {metrics['Correlation']:.4f} (p={metrics['P_value']:.4e})")
    print(f"Signal-to-Noise Ratio:        {metrics['SNR_dB']:.2f} dB")
    print(f"Variance Explained:           {metrics['Variance_Explained']:.4f}")
    print("="*50)
    
    # Convert back to original shape for visualization
    original_reshaped = scaler_X.inverse_transform(X_test).reshape(-1, *original_shape)
    recon_reshaped = scaler_X.inverse_transform(recon_data_np).reshape(-1, *original_shape)
    
    return metrics, original_reshaped, recon_reshaped, mu.cpu().numpy(), logvar.cpu().numpy()

## Plotting

In [None]:
def plot_training_results(train_losses, test_losses, train_recon_losses, train_kl_losses):
    """
    Plot training curves and losses
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Total loss
    axes[0,0].plot(train_losses, label='Train Loss', alpha=0.8)
    axes[0,0].plot(test_losses, label='Test Loss', alpha=0.8)
    axes[0,0].set_xlabel('Epoch')
    axes[0,0].set_ylabel('Total Loss')
    axes[0,0].set_title('Training and Test Loss')
    axes[0,0].legend()
    axes[0,0].grid(True, alpha=0.3)
    
    # Reconstruction and KL losses
    axes[0,1].plot(train_recon_losses, label='Reconstruction Loss', color='blue', alpha=0.8)
    axes[0,1].set_xlabel('Epoch')
    axes[0,1].set_ylabel('Reconstruction Loss', color='blue')
    axes[0,1].tick_params(axis='y', labelcolor='blue')
    axes[0,1].grid(True, alpha=0.3)
    
    ax2 = axes[0,1].twinx()
    ax2.plot(train_kl_losses, label='KL Divergence', color='red', alpha=0.8)
    ax2.set_ylabel('KL Divergence', color='red')
    ax2.tick_params(axis='y', labelcolor='red')
    axes[0,1].set_title('Reconstruction vs KL Loss')
    
    # Loss distribution
    axes[1,0].hist(train_losses[-50:], bins=20, alpha=0.7, label='Recent Train Loss')
    axes[1,0].hist(test_losses[-50:], bins=20, alpha=0.7, label='Recent Test Loss')
    axes[1,0].set_xlabel('Loss Value')
    axes[1,0].set_ylabel('Frequency')
    axes[1,0].set_title('Recent Loss Distribution')
    axes[1,0].legend()
    
    # Learning curve smoothed
    window = 10
    if len(train_losses) > window:
        train_smooth = np.convolve(train_losses, np.ones(window)/window, mode='valid')
        test_smooth = np.convolve(test_losses, np.ones(window)/window, mode='valid')
        axes[1,1].plot(train_smooth, label=f'Train Loss (smoothed)', alpha=0.8)
        axes[1,1].plot(test_smooth, label=f'Test Loss (smoothed)', alpha=0.8)
        axes[1,1].set_xlabel('Epoch')
        axes[1,1].set_ylabel('Smoothed Loss')
        axes[1,1].set_title('Smoothed Learning Curves')
        axes[1,1].legend()
        axes[1,1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
def plot_reconstruction_comparison(original, reconstructed, num_samples=3):
    """
    Visualize original vs reconstructed EEG data
    """
    fig, axes = plt.subplots(num_samples, 2, figsize=(15, 4*num_samples))
    
    for i in range(num_samples):
        # Original EEG
        im1 = axes[i,0].imshow(original[i], aspect='auto', cmap='viridis')
        axes[i,0].set_title(f'Original EEG Sample {i+1}')
        axes[i,0].set_xlabel('Time')
        axes[i,0].set_ylabel('Channels')
        plt.colorbar(im1, ax=axes[i,0])
        
        # Reconstructed EEG
        im2 = axes[i,1].imshow(reconstructed[i], aspect='auto', cmap='viridis')
        axes[i,1].set_title(f'Reconstructed EEG Sample {i+1}')
        axes[i,1].set_xlabel('Time')
        axes[i,1].set_ylabel('Channels')
        plt.colorbar(im2, ax=axes[i,1])
    
    plt.tight_layout()
    plt.show()


In [None]:
def main_vae_training(X, y):
    """
    Complete VAE training pipeline with all metrics
    """
    print(f"Starting VAE training with data shape: X{X.shape}, y{y.shape}")
    
    # Train the VAE
    model, train_losses, test_losses, train_recon_losses, train_kl_losses, scaler_X, scaler_y, X_test, y_test = train_vae_with_evaluation(
        X, y, num_epochs=250, batch_size=32, lr=0.001, latent_dim=128, beta=1.0
    )
    
    # Plot training results
    plot_training_results(train_losses, test_losses, train_recon_losses, train_kl_losses)
    
    # Comprehensive evaluation
    metrics, original_samples, recon_samples, mu, logvar = comprehensive_evaluation(
        model, X_test, y_test, scaler_X
    )
    
    # Visualize reconstructions
    plot_reconstruction_comparison(original_samples, recon_samples, num_samples=3)
    
    # Generate new samples
    test_emotions = [[5, 5, 5, 5], [7, 3, 6, 4], [2, 8, 3, 7]]  # Different emotion conditions
    generated_samples = generate_new_samples(model, test_emotions, scaler_X, scaler_y, len(test_emotions))
    
    print(f"\nGenerated {len(generated_samples)} new EEG samples with custom emotion conditions")
    
    return model, metrics, generated_samples, scaler_X, scaler_y

model, metrics, generated_samples, scaler_X, scaler_y = main_vae_training(X, y)