# ANN, SVM, RF, and KNN for Emotion Classification (DEAP)

This notebook compares **shallow and traditional machine learning models** for classifying emotions using the **DEAP dataset**.

**Models:** ANN (per-channel with fusion), SVM, RF, KNN (feature fusion)
**Targets:** Valence, Arousal
**Output:** Final accuracy results only

In [None]:
# Suppress all processing messages, progress bars, and warnings
import warnings
warnings.filterwarnings('ignore')
import os
os.environ['PYTHONWARNINGS'] = 'ignore'

# Disable tqdm progress bars
try:
    from tqdm import tqdm
    import functools
    _original_tqdm = tqdm
    @functools.wraps(_original_tqdm)
    def tqdm(*args, **kwargs):
        kwargs['disable'] = True
        return _original_tqdm(*args, **kwargs)
except ImportError:
    pass

# Suppress stdout during processing
import sys
import contextlib

@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, 'w') as devnull:
        old_stdout = sys.stdout
        sys.stdout = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import os

In [None]:
# Configuration
metadata_folder_path = '../datasets/DEAP/deap-dataset/Metadata/'
channel_data_folder_path = '../datasets/DEAP/deap-dataset/extracted_features'
file_path = '../datasets/DEAP/deap-dataset/Metadata/participant_ratings.xls'

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(999)
np.random.seed(42)

## Reusable Functions

In [None]:
# ANN Model
class EmotionClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.layer1 = nn.Linear(input_dim, 64)
        self.layer2 = nn.Linear(64, 32)
        self.relu = nn.LeakyReLU()
        self.bn1 = nn.BatchNorm1d(64)
        self.lastlayer = nn.Linear(32, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.dropout(x)
        x = self.layer2(x)
        x = self.relu(x)
        x = self.lastlayer(x)
        return torch.sigmoid(x)


def train_ann_per_channel(channel_data_folder_path, train_indices, test_indices, y_train, y_test, device, num_epochs=50, n_folds=5):
    """Train ANN model per channel and fuse predictions."""
    scaler = StandardScaler()
    probabilities_list = []
    
    with suppress_stdout():
        for ch_name in os.listdir(channel_data_folder_path):
            channel_full_path = os.path.join(channel_data_folder_path, ch_name)
            if not os.path.isdir(channel_full_path):
                continue
            
            ch_data = []
            for file_name in os.listdir(channel_full_path):
                if file_name.endswith('.csv'):
                    file_path = os.path.join(channel_full_path, file_name)
                    file_data = pd.read_csv(file_path)
                    ch_data.append(file_data)
            
            if not ch_data:
                continue
            
            ch_data = pd.concat(ch_data, axis=0, ignore_index=True)
            if 'trial' in ch_data.columns:
                ch_data = ch_data.drop(columns=['trial'])
            
            X = ch_data.values
            X_scaled = scaler.fit_transform(X)
            
            X_train = X_scaled[train_indices]
            X_test = X_scaled[test_indices]
            
            X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
            X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
            y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
            
            # Cross-validation training
            skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
            best_model = None
            
            for train_idx, val_idx in skf.split(X_train, y_train):
                x_tr = X_train_tensor[train_idx].to(device)
                y_tr = y_train_tensor[train_idx].to(device)
                
                train_loader = DataLoader(TensorDataset(x_tr, y_tr), batch_size=64, shuffle=True)
                
                model = EmotionClassifier(X_train.shape[1], 1).to(device)
                optimizer = optim.AdamW(model.parameters(), lr=0.001)
                criterion = nn.BCELoss()
                
                for epoch in range(num_epochs):
                    model.train()
                    for inputs, labels in train_loader:
                        optimizer.zero_grad()
                        outputs = model(inputs)
                        loss = criterion(outputs.squeeze(), labels)
                        loss.backward()
                        optimizer.step()
                
                best_model = model
            
            # Get probabilities for test set
            best_model.eval()
            with torch.no_grad():
                probabilities = best_model(X_test_tensor).cpu().numpy().squeeze()
                probabilities_list.append(probabilities)
    
    # Fusion: Average probabilities
    if probabilities_list:
        fused_probabilities = np.mean(probabilities_list, axis=0)
        fused_predictions = (fused_probabilities > 0.5).astype(int)
        accuracy = accuracy_score(y_test.values, fused_predictions)
        return accuracy
    return 0.0


def load_all_channels(channel_data_folder_path, train_indices, test_indices):
    """Load and concatenate all channel features."""
    scaler = StandardScaler()
    X_train_list = []
    X_test_list = []
    
    with suppress_stdout():
        for ch_name in os.listdir(channel_data_folder_path):
            channel_full_path = os.path.join(channel_data_folder_path, ch_name)
            if not os.path.isdir(channel_full_path):
                continue
            
            ch_data = []
            for file_name in os.listdir(channel_full_path):
                if file_name.endswith('.csv'):
                    file_path = os.path.join(channel_full_path, file_name)
                    file_data = pd.read_csv(file_path)
                    ch_data.append(file_data)
            
            if not ch_data:
                continue
            
            ch_data = pd.concat(ch_data, axis=0, ignore_index=True)
            if 'trial' in ch_data.columns:
                ch_data = ch_data.drop(columns=['trial'])
            
            X = ch_data.values
            X_scaled = scaler.fit_transform(X)
            
            X_train_list.append(X_scaled[train_indices])
            X_test_list.append(X_scaled[test_indices])
    
    # Concatenate all channels
    if X_train_list:
        X_train = np.hstack(X_train_list)
        X_test = np.hstack(X_test_list)
        return X_train, X_test
    return None, None


def train_svm(X_train, y_train, X_test, y_test):
    """Train SVM model and return accuracy."""
    param_grid = {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
    with suppress_stdout():
        grid_search = GridSearchCV(SVC(probability=True, random_state=42), param_grid, cv=5, scoring='accuracy', verbose=0, n_jobs=-1)
        grid_search.fit(X_train, y_train)
        y_pred = grid_search.predict(X_test)
    return accuracy_score(y_test, y_pred)


def train_rf(X_train, y_train, X_test, y_test):
    """Train Random Forest model and return accuracy."""
    param_grid = {'n_estimators': [50, 100, 200], 'max_depth': [10, 20, None]}
    with suppress_stdout():
        grid_search = GridSearchCV(RandomForestClassifier(random_state=42), param_grid, cv=5, scoring='accuracy', verbose=0, n_jobs=-1)
        grid_search.fit(X_train, y_train)
        y_pred = grid_search.predict(X_test)
    return accuracy_score(y_test, y_pred)


def train_knn(X_train, y_train, X_test, y_test):
    """Train KNN model and return accuracy."""
    param_grid = {'n_neighbors': [3, 5, 7, 9], 'weights': ['uniform', 'distance']}
    with suppress_stdout():
        grid_search = GridSearchCV(KNeighborsClassifier(), param_grid, cv=5, scoring='accuracy', verbose=0, n_jobs=-1)
        grid_search.fit(X_train, y_train)
        y_pred = grid_search.predict(X_test)
    return accuracy_score(y_test, y_pred)

## Run All Models

In [None]:
# Load targets
targets = pd.read_excel(file_path, index_col=0)
valence_targets = targets['Valence']
arousal_targets = targets['Arousal']

y_valence = (valence_targets >= 4.5).astype(int)
y_arousal = (arousal_targets >= 4.5).astype(int)

# Split data once for consistency
num_samples = len(y_valence)
indices = np.arange(num_samples)

train_indices_val, test_indices_val = train_test_split(
    indices, test_size=0.2, random_state=42, stratify=y_valence
)
train_indices_aro, test_indices_aro = train_test_split(
    indices, test_size=0.2, random_state=42, stratify=y_arousal
)

y_test_valence = y_valence.iloc[test_indices_val]
y_test_arousal = y_arousal.iloc[test_indices_aro]
y_train_valence = y_valence.iloc[train_indices_val]
y_train_arousal = y_arousal.iloc[train_indices_aro]

In [None]:
# Store results
results = []

# Train all models (suppress all output during training)
with suppress_stdout():
    # ANN: Per-channel with fusion
    acc_val_ann = train_ann_per_channel(
        channel_data_folder_path, train_indices_val, test_indices_val,
        y_train_valence, y_test_valence, device
    )
    results.append({'Model': 'ANN', 'Target': 'Valence', 'Accuracy': acc_val_ann})

    acc_aro_ann = train_ann_per_channel(
        channel_data_folder_path, train_indices_aro, test_indices_aro,
        y_train_arousal, y_test_arousal, device
    )
    results.append({'Model': 'ANN', 'Target': 'Arousal', 'Accuracy': acc_aro_ann})

    # Load fused features for SVM, RF, KNN
    X_train_val, X_test_val = load_all_channels(channel_data_folder_path, train_indices_val, test_indices_val)
    X_train_aro, X_test_aro = load_all_channels(channel_data_folder_path, train_indices_aro, test_indices_aro)

    if X_train_val is None or X_train_aro is None:
        raise ValueError('No channel data found. Please ensure extracted_features directory exists.')

    # SVM
    acc_val_svm = train_svm(X_train_val, y_train_valence, X_test_val, y_test_valence)
    results.append({'Model': 'SVM', 'Target': 'Valence', 'Accuracy': acc_val_svm})

    acc_aro_svm = train_svm(X_train_aro, y_train_arousal, X_test_aro, y_test_arousal)
    results.append({'Model': 'SVM', 'Target': 'Arousal', 'Accuracy': acc_aro_svm})

    # RF
    acc_val_rf = train_rf(X_train_val, y_train_valence, X_test_val, y_test_valence)
    results.append({'Model': 'RF', 'Target': 'Valence', 'Accuracy': acc_val_rf})

    acc_aro_rf = train_rf(X_train_aro, y_train_arousal, X_test_aro, y_test_arousal)
    results.append({'Model': 'RF', 'Target': 'Arousal', 'Accuracy': acc_aro_rf})

    # KNN
    acc_val_knn = train_knn(X_train_val, y_train_valence, X_test_val, y_test_valence)
    results.append({'Model': 'KNN', 'Target': 'Valence', 'Accuracy': acc_val_knn})

    acc_aro_knn = train_knn(X_train_aro, y_train_arousal, X_test_aro, y_test_arousal)
    results.append({'Model': 'KNN', 'Target': 'Arousal', 'Accuracy': acc_aro_knn})

In [None]:
# Display results in a clean table
results_df = pd.DataFrame(results)
results_pivot = results_df.pivot(index='Model', columns='Target', values='Accuracy')

print('\n' + '='*60)
print('FINAL ACCURACY RESULTS')
print('='*60)
print(results_pivot.to_string())
print('\n' + '='*60)
print(f'\nBest Valence: {results_pivot["Valence"].idxmax()} ({results_pivot["Valence"].max():.4f})')
print(f'Best Arousal: {results_pivot["Arousal"].idxmax()} ({results_pivot["Arousal"].max():.4f})')