# Step 1: Import Libraries

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import librosa.display
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from dataclasses import dataclass
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor
# from transformers import WavLMForSequenceClassification, WavLMProcessor

from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')


# Step 2: Data Loading and Preparation
## 2.1: Load the Data

In [None]:
def load_data_ravdess(data_dir):
    emotion_labels = {
        '01': 'neutral',
        '02': 'calm',
        '03': 'happy',
        '04': 'sad',
        '05': 'angry',
        '06': 'fearful',
        '07': 'disgust',
        '08': 'surprised'
    }
    file_list = []
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.wav'):
                parts = file.split('-')
                emotion = emotion_labels.get(parts[2])
                file_list.append({'file_path': os.path.join(root, file), 'emotion': emotion})
    return pd.DataFrame(file_list)

def load_data_cremad(data_dir):
    emotion_labels = {
        'ANG': 'angry',
        'DIS': 'disgust',
        'FEA': 'fearful',
        'HAP': 'happy',
        'NEU': 'neutral',
        'SAD': 'sad'
    }
    file_list = []
    for file in os.listdir(data_dir):
        if file.endswith('.wav'):
            parts = file.split('_')
            emotion = emotion_labels.get(parts[2])
            if emotion:
                file_list.append({'file_path': os.path.join(data_dir, file), 'emotion': emotion})
    return pd.DataFrame(file_list)

def load_data_tess(data_dir):
    emotion_map = {
        'angry': 'angry',
        'disgust': 'disgust',
        'fear': 'fearful',
        'happy': 'happy',
        'ps': 'surprised',  # Pleasant surprise
        'sad': 'sad',
        'neutral': 'neutral'
    }
    file_list = []
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.wav'):
                emotion = file.split('_')[2].split('.')[0]
                emotion_label = emotion_map.get(emotion)
                if emotion_label:
                    file_list.append({'file_path': os.path.join(root, file), 'emotion': emotion_label})
    return pd.DataFrame(file_list)

def load_data_savee(data_dir):
    emotion_map = {
        'a': 'angry',
        'd': 'disgust',
        'f': 'fearful',
        'h': 'happy',
        'n': 'neutral',
        'sa': 'sad',
        'su': 'surprised'
    }
    file_list = []
    for file in os.listdir(data_dir):
        if file.endswith('.wav'):
            emotion_code = file.split('_')[1][:2]
            emotion_label = emotion_map.get(emotion_code)
            if emotion_label:
                file_list.append({'file_path': os.path.join(data_dir, file), 'emotion': emotion_label})
    return pd.DataFrame(file_list)



In [None]:
ravdess_df = load_data_ravdess('data/RAVDESS')
cremad_df = load_data_cremad('data/CREMA-D')
tess_df = load_data_tess('data/TESS')
savee_df = load_data_savee('data/SAVEE')

# Combine datasets
data_df = pd.concat([ravdess_df, cremad_df, tess_df, savee_df], ignore_index=True)



## 2.2: Encode the labels

In [None]:
# Standardize emotion labels
emotion_list = ['angry', 'disgust', 'fearful', 'happy', 'neutral', 'sad', 'surprised', 'calm']

# Handle any missing emotions in datasets
data_df = data_df[data_df['emotion'].isin(emotion_list)]

# Encode labels
label_encoder = LabelEncoder()
label_encoder.fit(emotion_list)
data_df['label'] = label_encoder.transform(data_df['emotion'])
num_classes = len(label_encoder.classes_)


In [None]:
# print the label encoder mapping dict
print(dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_))))

In [None]:
# save the data_df
data_df.to_csv('data_df.csv', index=False)

In [None]:
data_df.head()

In [None]:
# group by emotion
emotion_counts = data_df['emotion'].value_counts().sort_index()
emotion_counts

## 2.3: Data Splitting

In [None]:
train_df, temp_df = train_test_split(data_df, test_size=0.3, stratify=data_df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)

print(f'Training samples: {len(train_df)}') 
print(f'Validation samples: {len(val_df)}')
print(f'Test samples: {len(test_df)}')


In [None]:
train_df.head()

# Step 3: Baseline Models
## 3.1: Feature Extraction for Baseline Models
### 3.1.1: Data augmentation

In [None]:
def load_audio(file_path, target_sr=16000):
    y, sr = librosa.load(file_path, sr=None)  # Load with original sampling rate
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        sr = target_sr
    return y, sr

def normalize_audio(y):
    rms = np.sqrt(np.mean(y**2))
    if rms > 0:
        y_normalized = y / rms
    else:
        y_normalized = y
    return y_normalized

def augment_audio(y, sr):
    augmented_data = []
    
    # Original
    augmented_data.append(y)
    
    # Add noise
    noise = np.random.randn(len(y))
    y_noise = y + 0.005 * noise
    augmented_data.append(y_noise)
    
    # Time stretching
    y_stretch = librosa.effects.time_stretch(y, rate=0.9)
    augmented_data.append(y_stretch)
    y_stretch = librosa.effects.time_stretch(y, rate=1.1)
    augmented_data.append(y_stretch)
    
    # Pitch shifting
    y_shift = librosa.effects.pitch_shift(y, sr=sr, n_steps=2)
    augmented_data.append(y_shift)
    y_shift = librosa.effects.pitch_shift(y, sr=sr, n_steps=-2)
    augmented_data.append(y_shift)
    
    # Reverberation (simple simulation)
    y_reverb = librosa.effects.preemphasis(y)
    augmented_data.append(y_reverb)
    
    return augmented_data


### 3.1.2: Feature extraction

In [None]:

def extract_features(file_path, n_mfcc=40, target_sr=16000, augment=False):
    y, sr = load_audio(file_path, target_sr=target_sr)
    y = normalize_audio(y)
    
    if augment:
        augmented_audios = augment_audio(y, sr)
    else:
        augmented_audios = [y]
    
    features = []
    for augmented_y in augmented_audios:
        # Ensure consistent length by trimming or padding
        max_length = target_sr * 3  # 3 seconds
        # Trim or pad audio
        if len(augmented_y) > max_length:
            augmented_y = augmented_y[:max_length]
        else:
            augmented_y = np.pad(augmented_y, (0, max_length - len(augmented_y)), mode='constant')
        
        # Extract MFCCs
        mfccs = librosa.feature.mfcc(y=augmented_y, sr=sr, n_mfcc=n_mfcc)
        mfccs = np.mean(mfccs.T, axis=0)
        
        # Extract additional features if needed
        chroma = librosa.feature.chroma_stft(y=augmented_y, sr=sr)
        chroma = np.mean(chroma.T, axis=0)
        spectral_contrast = librosa.feature.spectral_contrast(y=augmented_y, sr=sr)
        spectral_contrast = np.mean(spectral_contrast.T, axis=0)
        
        # Concatenate all features
        feature_vector = np.concatenate([mfccs, chroma, spectral_contrast])
        features.append(feature_vector)
    
    return features


## 3.1.3: Extract Features for All Samples

In [None]:
def extract_features_for_df(df):
    features = []
    for idx, row in tqdm(df.iterrows(), total=df.shape[0], desc='Extracting features'):
        feature = extract_features(row['file_path'], augment=False)
        features.append(feature)
    return np.array(features)

In [None]:

# Extract features
X_train = extract_features_for_df(train_df)
X_val = extract_features_for_df(val_df)
X_test = extract_features_for_df(test_df)

# Get labels
y_train = train_df['label'].values
y_val = val_df['label'].values
y_test = test_df['label'].values


In [None]:
# reshape data
X_train = np.array(X_train).reshape(len(X_train), -1)
X_val = np.array(X_val).reshape(len(X_val), -1)
X_test = np.array(X_test).reshape(len(X_test), -1)

In [None]:
# Print the shape of the feature arrays
print(f'X_train shape: {X_train.shape}')
print(f'X_val shape: {X_val.shape}')
print(f'X_test shape: {X_test.shape}')

# Print a sample of the feature arrays
print('Sample features from X_train:')
print(X_train[0])

print('Sample features from X_val:')
print(X_val[0])

print('Sample features from X_test:')
print(X_test[0])

### 3.1.4: Feature Scaling

In [None]:
# Initialize the StandardScaler
scaler = StandardScaler()

# Fit the scaler on the training data and transform it
X_train = scaler.fit_transform(X_train)

# Transform the validation data using the fitted scaler
X_val = scaler.transform(X_val)

# Transform the test data using the fitted scaler
X_test = scaler.transform(X_test)


## 3.2: Train and Evaluate Baseline Models

### 3.2.1: Logistic Regression

In [None]:
# Initialize model
lr_model = LogisticRegression(max_iter=1000, random_state=42)

# Train model
lr_model.fit(X_train, y_train)

# Predict on validation set
y_val_pred_lr = lr_model.predict(X_val)

# Evaluate
val_accuracy_lr = accuracy_score(y_val, y_val_pred_lr)
val_f1_lr = f1_score(y_val, y_val_pred_lr, average='weighted')

print(f'Logistic Regression - Validation Accuracy: {val_accuracy_lr:.4f}, F1 Score: {val_f1_lr:.4f}')


### 3.4.2: Random Forest

In [None]:
# Initialize model
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)

# Train model
rf_model.fit(X_train, y_train)

# Predict on validation set
y_val_pred_rf = rf_model.predict(X_val)

# Evaluate
val_accuracy_rf = accuracy_score(y_val, y_val_pred_rf)
val_f1_rf = f1_score(y_val, y_val_pred_rf, average='weighted')

print(f'Random Forest - Validation Accuracy: {val_accuracy_rf:.4f}, F1 Score: {val_f1_rf:.4f}')


In [None]:
import random

import IPython.display as ipd

# Select random samples
num_samples = 5
random_indices = random.sample(range(len(test_df)), num_samples)
sample_df = test_df.iloc[random_indices]

# Extract features for the selected samples
sample_features = np.array([extract_features(row['file_path'])[0] for _, row in sample_df.iterrows()])
sample_features = scaler.transform(sample_features)

# Get true labels and predictions
sample_labels = sample_df['label'].values
sample_predictions_lr = lr_model.predict(sample_features)
sample_predictions_rf = rf_model.predict(sample_features)

# Display the samples
for i, (index, row) in enumerate(sample_df.iterrows()):
    print(f"Sample {i+1}:")
    print(f"Path: {row['file_path']}")
    print(f"True Label: {label_encoder.inverse_transform([sample_labels[i]])[0]}")
    print(f"Logistic Regression Prediction: {label_encoder.inverse_transform([sample_predictions_lr[i]])[0]}")
    print(f"Random Forest Prediction: {label_encoder.inverse_transform([sample_predictions_rf[i]])[0]}")
    ipd.display(ipd.Audio(row['file_path']))
    print("\n")

In [None]:
# Logistic Regression
y_test_pred_lr = lr_model.predict(X_test)
test_accuracy_lr = accuracy_score(y_test, y_test_pred_lr)
test_f1_lr = f1_score(y_test, y_test_pred_lr, average='weighted')
print(f'Logistic Regression - Test Accuracy: {test_accuracy_lr:.4f}, F1 Score: {test_f1_lr:.4f}')

# Random Forest
y_test_pred_rf = rf_model.predict(X_test)
test_accuracy_rf = accuracy_score(y_test, y_test_pred_rf)
test_f1_rf = f1_score(y_test, y_test_pred_rf, average='weighted')
print(f'Random Forest - Test Accuracy: {test_accuracy_rf:.4f}, F1 Score: {test_f1_rf:.4f}')


# Step 4: Advanced Models

## 4.1: Prepare Data

In [None]:
from ser_wav2vec import SpeechEmotionRecognition

In [None]:
df_train_val = pd.concat([train_df, val_df], ignore_index=True)
# save the df_train_val
df_train_val.to_csv('train_val_df.csv', index=False)

In [None]:
args = {
    'batch_size': 32,
    'lr': 1e-4,
    'epochs': 30,
    'gradient_accumulation_steps': 4,
    'checkpoint_dir': 'checkpoints',
    'checkpoint': None
}

In [None]:
ser = SpeechEmotionRecognition(
    df=df_train_val,
    model_name='facebook/wav2vec2-base',
    batch_size=args['batch_size'],
    lr=args['lr'],
    num_epochs=args['epochs'],
    checkpoint_dir=args['checkpoint_dir'],
    gradient_accumulation_steps=args['gradient_accumulation_steps'],
    checkpoint_path=args['checkpoint']
)


In [None]:
ser.train()

In [None]:
ser_best = SpeechEmotionRecognition(
    df=df_train_val,
    model_name='facebook/wav2vec2-base',
    batch_size=args['batch_size'],
    lr=args['lr'],
    num_epochs=args['epochs'],
    checkpoint_dir=args['checkpoint_dir'],
    gradient_accumulation_steps=args['gradient_accumulation_steps'],
    checkpoint_path='best_model.pt'
)



In [None]:
ser_best.evaluate_test_set(test_df)