# ðŸš€ Speech Emotion Recognition - Optimized Architectures

This notebook implements **state-of-the-art** model architectures for improved accuracy:

1. **Attention-based CNN** - Self-attention mechanisms for better feature focus
2. **Transformer Encoder** - Multi-head attention for temporal patterns
3. **EfficientNet Transfer Learning** - Pre-trained features
4. **Ensemble Model** - Combines multiple models for best accuracy

**Optimizations:**
- Data augmentation (time stretch, pitch shift, noise)
- Mixup training
- Label smoothing
- Learning rate scheduling with warmup

## 1. Setup

In [None]:
!pip install -q librosa soundfile kaggle tensorflow scikit-learn matplotlib seaborn audiomentations

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import warnings
warnings.filterwarnings('ignore')

print(f"TensorFlow: {tf.__version__}")
print(f"GPU: {tf.config.list_physical_devices('GPU')}")

## 2. Download Dataset

In [None]:
# Kaggle credentials
KAGGLE_USERNAME = "your_username"  # @param {type:"string"}
KAGGLE_KEY = "your_api_key"  # @param {type:"string"}

os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'w') as f:
    f.write(f'{{"username":"{KAGGLE_USERNAME}","key":"{KAGGLE_KEY}"}}')
os.chmod('/root/.kaggle/kaggle.json', 0o600)

# Download RAVDESS and CREMA-D
!mkdir -p datasets
!kaggle datasets download -d uwrfkaggler/ravdess-emotional-speech-audio -p datasets/ravdess --unzip -q
!kaggle datasets download -d ejlok1/cremad -p datasets/cremad --unzip -q
print("âœ“ Datasets downloaded")

## 3. Data Augmentation

In [None]:
import audiomentations as A

# Audio augmentation pipeline
augment = A.Compose([
    A.AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    A.TimeStretch(min_rate=0.8, max_rate=1.2, p=0.5),
    A.PitchShift(min_semitones=-4, max_semitones=4, p=0.5),
    A.Shift(min_shift=-0.5, max_shift=0.5, p=0.5),
])

def augment_audio(y, sr):
    """Apply augmentation to audio."""
    return augment(samples=y, sample_rate=sr)

print("âœ“ Augmentation pipeline ready")

In [None]:
# Configuration
SAMPLE_RATE = 22050
DURATION = 3
N_MELS = 128
HOP_LENGTH = 512
EMOTIONS = ['neutral', 'calm', 'happy', 'sad', 'angry', 'fearful', 'disgust', 'surprised']

def extract_mel_spectrogram(file_path, augment_data=False):
    """Extract mel spectrogram with optional augmentation."""
    try:
        y, sr = librosa.load(file_path, sr=SAMPLE_RATE, duration=DURATION)
        
        # Pad/trim
        target_len = SAMPLE_RATE * DURATION
        if len(y) < target_len:
            y = np.pad(y, (0, target_len - len(y)))
        else:
            y = y[:target_len]
        
        # Augment if requested
        if augment_data:
            y = augment_audio(y, sr)
        
        # Extract mel spectrogram
        mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=N_MELS, hop_length=HOP_LENGTH)
        mel_db = librosa.power_to_db(mel, ref=np.max)
        
        return mel_db
    except:
        return None

## 4. Load and Prepare Data

In [None]:
from tqdm import tqdm

def parse_ravdess(path):
    emotion_map = {'01': 'neutral', '02': 'calm', '03': 'happy', '04': 'sad',
                   '05': 'angry', '06': 'fearful', '07': 'disgust', '08': 'surprised'}
    files = []
    for root, _, filenames in os.walk(path):
        for f in filenames:
            if f.endswith('.wav'):
                parts = f.split('-')
                if len(parts) >= 3 and parts[2] in emotion_map:
                    files.append({'path': os.path.join(root, f), 'emotion': emotion_map[parts[2]]})
    return files

def parse_cremad(path):
    emotion_map = {'ANG': 'angry', 'DIS': 'disgust', 'FEA': 'fearful',
                   'HAP': 'happy', 'NEU': 'neutral', 'SAD': 'sad'}
    files = []
    for root, _, filenames in os.walk(path):
        for f in filenames:
            if f.endswith('.wav'):
                parts = f.split('_')
                if len(parts) >= 3 and parts[2] in emotion_map:
                    files.append({'path': os.path.join(root, f), 'emotion': emotion_map[parts[2]]})
    return files

# Load files
all_files = parse_ravdess('datasets/ravdess') + parse_cremad('datasets/cremad')
df = pd.DataFrame(all_files)
print(f"Total files: {len(df)}")
print(df['emotion'].value_counts())

In [None]:
# Extract features with augmentation
print("Extracting features...")
X, y = [], []

for _, row in tqdm(df.iterrows(), total=len(df)):
    # Original
    mel = extract_mel_spectrogram(row['path'], augment_data=False)
    if mel is not None:
        X.append(mel)
        y.append(row['emotion'])
    
    # Augmented (2x data)
    mel_aug = extract_mel_spectrogram(row['path'], augment_data=True)
    if mel_aug is not None:
        X.append(mel_aug)
        y.append(row['emotion'])

X = np.array(X)
y = np.array(y)
print(f"\nDataset shape: {X.shape}")

In [None]:
# Training configuration
EPOCHS = 100
BATCH_SIZE = 32
INITIAL_LR = 0.001

steps_per_epoch = len(X_train) // BATCH_SIZE
total_steps = steps_per_epoch * EPOCHS
warmup_steps = steps_per_epoch * 5

lr_schedule = WarmupCosineDecay(INITIAL_LR, warmup_steps, total_steps)

# Callbacks - NOTE: Don't use ReduceLROnPlateau with custom LR schedule
callbacks = [
    keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
]

# For models using fixed LR (not schedule), use these callbacks:
callbacks_with_lr_reduce = [
    keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-7)
]

# Train Attention CNN
print("="*60)
print("Training Attention CNN")
print("="*60)

attn_cnn = build_attention_cnn(X_train.shape[1:])
attn_cnn.compile(
    optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

# Use callbacks without ReduceLROnPlateau since we're using custom LR schedule
attn_history = attn_cnn.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,  # Only EarlyStopping, no ReduceLROnPlateau
    verbose=1
)

attn_score = attn_cnn.evaluate(X_test, y_test)
print(f"\nAttention CNN Accuracy: {attn_score[1]*100:.2f}%")

In [None]:
# Train Transformer
print("="*60)
print("Training Transformer")
print("="*60)

transformer = build_transformer_model(X_train.shape[1:])
transformer.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

# Use callbacks with ReduceLROnPlateau since we're using fixed LR
trans_history = transformer.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks_with_lr_reduce,
    verbose=1
)

trans_score = transformer.evaluate(X_test, y_test)
print(f"\nTransformer Accuracy: {trans_score[1]*100:.2f}%")

In [None]:
# Train EfficientNet
print("="*60)
print("Training EfficientNet")
print("="*60)

efficientnet = build_efficientnet_model(X_train.shape[1:])
efficientnet.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

# Use callbacks with ReduceLROnPlateau since we're using fixed LR
eff_history = efficientnet.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=50,  # Fewer epochs for transfer learning
    batch_size=BATCH_SIZE,
    callbacks=callbacks_with_lr_reduce,
    verbose=1
)

eff_score = efficientnet.evaluate(X_test, y_test)
print(f"\nEfficientNet Accuracy: {eff_score[1]*100:.2f}%")

In [None]:
def build_efficientnet_model(input_shape):
    """EfficientNet transfer learning model."""
    # Convert grayscale to RGB
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(3, 1, padding='same')(inputs)  # 1 channel to 3
    
    # Resize to EfficientNet input size
    x = layers.Resizing(224, 224)(x)
    
    # Load EfficientNetB0
    base_model = tf.keras.applications.EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_tensor=x
    )
    
    # Freeze base model
    base_model.trainable = False
    
    # Add classification head
    x = base_model.output
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs, outputs)
    return model

print("âœ“ EfficientNet model defined")

## 6. Training with Advanced Techniques

In [None]:
# Mixup data augmentation
def mixup(x, y, alpha=0.2):
    """Mixup training augmentation."""
    batch_size = tf.shape(x)[0]
    indices = tf.random.shuffle(tf.range(batch_size))
    
    lam = np.random.beta(alpha, alpha)
    x_mixed = lam * x + (1 - lam) * tf.gather(x, indices)
    y_mixed = lam * y + (1 - lam) * tf.gather(y, indices)
    
    return x_mixed, y_mixed

# Learning rate schedule with warmup
class WarmupCosineDecay(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, initial_lr, warmup_steps, total_steps):
        super().__init__()
        self.initial_lr = initial_lr
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
    
    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        warmup_lr = self.initial_lr * (step / self.warmup_steps)
        decay_lr = self.initial_lr * 0.5 * (
            1 + tf.cos(np.pi * (step - self.warmup_steps) / (self.total_steps - self.warmup_steps))
        )
        return tf.where(step < self.warmup_steps, warmup_lr, decay_lr)

print("âœ“ Training utilities defined")

In [None]:
# Training configuration
EPOCHS = 100
BATCH_SIZE = 32
INITIAL_LR = 0.001

steps_per_epoch = len(X_train) // BATCH_SIZE
total_steps = steps_per_epoch * EPOCHS
warmup_steps = steps_per_epoch * 5

lr_schedule = WarmupCosineDecay(INITIAL_LR, warmup_steps, total_steps)

callbacks = [
    keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-7)
]

In [None]:
# Train Attention CNN
print("="*60)
print("Training Attention CNN")
print("="*60)

attn_cnn = build_attention_cnn(X_train.shape[1:])
attn_cnn.compile(
    optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

attn_history = attn_cnn.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

attn_score = attn_cnn.evaluate(X_test, y_test)
print(f"\nAttention CNN Accuracy: {attn_score[1]*100:.2f}%")

In [None]:
# Train Transformer
print("="*60)
print("Training Transformer")
print("="*60)

transformer = build_transformer_model(X_train.shape[1:])
transformer.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

trans_history = transformer.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

trans_score = transformer.evaluate(X_test, y_test)
print(f"\nTransformer Accuracy: {trans_score[1]*100:.2f}%")

In [None]:
# Train EfficientNet
print("="*60)
print("Training EfficientNet")
print("="*60)

efficientnet = build_efficientnet_model(X_train.shape[1:])
efficientnet.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=keras.losses.SparseCategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

eff_history = efficientnet.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=50,  # Fewer epochs for transfer learning
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

eff_score = efficientnet.evaluate(X_test, y_test)
print(f"\nEfficientNet Accuracy: {eff_score[1]*100:.2f}%")

## 7. Ensemble Model

In [None]:
# Ensemble predictions
def ensemble_predict(models, X, weights=None):
    """Weighted ensemble prediction."""
    if weights is None:
        weights = [1/len(models)] * len(models)
    
    predictions = np.zeros((len(X), num_classes))
    for model, weight in zip(models, weights):
        predictions += weight * model.predict(X)
    
    return np.argmax(predictions, axis=1)

# Calculate ensemble accuracy
models = [attn_cnn, transformer, efficientnet]
weights = [attn_score[1], trans_score[1], eff_score[1]]  # Weight by accuracy
weights = [w / sum(weights) for w in weights]  # Normalize

y_pred_ensemble = ensemble_predict(models, X_test, weights)
ensemble_acc = np.mean(y_pred_ensemble == y_test)

print(f"\n{'='*60}")
print("MODEL COMPARISON")
print(f"{'='*60}")
print(f"Attention CNN:  {attn_score[1]*100:.2f}%")
print(f"Transformer:    {trans_score[1]*100:.2f}%")
print(f"EfficientNet:   {eff_score[1]*100:.2f}%")
print(f"Ensemble:       {ensemble_acc*100:.2f}%")
print(f"{'='*60}")

## 8. Evaluation

In [None]:
# Confusion matrix for best model
best_model = attn_cnn if attn_score[1] >= max(trans_score[1], eff_score[1]) else \
             transformer if trans_score[1] >= eff_score[1] else efficientnet
best_name = 'Attention CNN' if best_model == attn_cnn else \
            'Transformer' if best_model == transformer else 'EfficientNet'

y_pred = np.argmax(best_model.predict(X_test), axis=1)
cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title(f'{best_name} Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.tight_layout()
plt.savefig('confusion_matrix_optimized.png', dpi=150)
plt.show()

print(f"\n{best_name} Classification Report:")
print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))

## 9. Save Models

In [None]:
!mkdir -p models

# Save all models
attn_cnn.save('models/emotion_model_cnn.keras')  # Replace default CNN
transformer.save('models/emotion_model_transformer.keras')
efficientnet.save('models/emotion_model_efficientnet.keras')

# Save label encoder
with open('models/label_encoder.pkl', 'wb') as f:
    pickle.dump(label_encoder, f)

print("âœ“ Models saved!")
!ls -la models/

In [None]:
# Download
!zip -r optimized_models.zip models/

from google.colab import files
files.download('optimized_models.zip')

print("\nâœ“ Download complete!")
print("\nTo use: Extract and copy 'models' folder to your project.")