In [None]:
# ## 1. Imports and Setup
# This cell imports necessary libraries and defines configuration parameters
# for the project, including file paths, audio processing settings,
# and random seeds for reproducibility.

import os
import re
import sys # To print Python version
import numpy as np
import pandas as pd
import librosa
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import class_weight
import tensorflow as tf
import keras
from keras.models import Sequential
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, GaussianNoise, Layer, Reshape, Bidirectional, LSTM
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import matplotlib.pyplot as plt # For evaluation plots
import seaborn as sns # For confusion matrix
from sklearn.metrics import classification_report, confusion_matrix
import sounddevice as sd # For live demo
import soundfile as sf # For live demo
import time # For live demo

# --- Configuration ---
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Define dataset paths (UPDATE THESE if needed)
ravdess_path = r'C:\Code\Python\data\RAVDESS'
crema_d_path = r'C:\Code\Python\data\CREMA-D\AudioWAV'
iemocap_path = r'C:\Code\Python\data\IEMOCAP\IEMOCAP_full_release'

# Define audio processing parameters (Must match Flutter app)
SR = 22050           # Sample Rate
N_FFT = 2048         # FFT window size
HOP_LENGTH = 512     # Hop length for STFT
N_FREQ_BINS = 128    # Number of frequency bins to keep
MAX_PAD_LEN = 130    # Fixed length for spectrogram time axis

print(f"Python version: {sys.version}")
print(f"TensorFlow version: {tf.__version__}")

In [None]:
# ## 2. Data Loading and Preparation
# Loads audio file paths and emotion labels from the RAVDESS, CREMA-D,
# and IEMOCAP datasets into a single Pandas DataFrame.
# Ensures consistent labeling across datasets (e.g., maps 'exc' to 'happy').

print("\nStarting Data Loading...")
data = []
iemocap_data = []

# --- Process RAVDESS ---
print("Processing RAVDESS...")
ravdess_map = {'01': 'neutral', '03': 'happy', '04': 'sad', '05': 'angry'}
ravdess_count = 0
for subdir, dirs, files in os.walk(ravdess_path):
    for file in files:
        if file.endswith('.wav'):
            try:
                emotion_code = file.split('-')[2]
                if emotion_code in ravdess_map:
                    data.append({
                        'file_path': os.path.join(subdir, file),
                        'emotion': ravdess_map[emotion_code]
                    })
                    ravdess_count += 1
            except IndexError:
                print(f"Skipping malformed RAVDESS filename: {file}")
print(f"Found {ravdess_count} RAVDESS files.")

# --- Process CREMA-D ---
print("\nProcessing CREMA-D...")
crema_map = {'NEU': 'neutral', 'HAP': 'happy', 'SAD': 'sad', 'ANG': 'angry'}
crema_count = 0
for file in os.listdir(crema_d_path):
    if file.endswith('.wav'):
        try:
            emotion_code = file.split('_')[2]
            if emotion_code in crema_map:
                data.append({
                    'file_path': os.path.join(crema_d_path, file),
                    'emotion': crema_map[emotion_code]
                })
                crema_count += 1
        except IndexError:
            print(f"Skipping malformed CREMA-D filename: {file}")
print(f"Found {crema_count} CREMA-D files.")

# --- Process IEMOCAP ---
print("\nProcessing IEMOCAP...")
iemocap_map = {'neu': 'neutral', 'hap': 'happy', 'sad': 'sad', 'ang': 'angry', 'exc': 'happy'}
iemocap_count = 0
for session in os.listdir(iemocap_path):
    if session.startswith('Session'):
        eval_path = os.path.join(iemocap_path, session, 'dialog/EmoEvaluation/')
        if not os.path.isdir(eval_path): continue
        for eval_file in os.listdir(eval_path):
            if eval_file.endswith('.txt'):
                try:
                    with open(os.path.join(eval_path, eval_file), 'r') as f:
                        for line in f:
                            if line.strip().startswith('['):
                                parts = line.split('\t')
                                if len(parts) >= 3:
                                    utterance_id = parts[1]
                                    emotion_code = parts[2]
                                    if emotion_code in iemocap_map:
                                        parent_folder = utterance_id.rsplit('_', 1)[0]
                                        wav_path = os.path.join(iemocap_path, session, 'sentences/wav/', parent_folder, utterance_id + '.wav')
                                        if os.path.exists(wav_path):
                                            iemocap_data.append({
                                                'file_path': wav_path,
                                                'emotion': iemocap_map[emotion_code]
                                            })
                                            iemocap_count += 1
                except Exception as e:
                    print(f"Error processing IEMOCAP file {eval_file}: {e}")
print(f"Found {iemocap_count} IEMOCAP files.")

# --- Combine DataFrames ---
df_ravdess_crema = pd.DataFrame(data)
df_iemocap = pd.DataFrame(iemocap_data)
df = pd.concat([df_ravdess_crema, df_iemocap], ignore_index=True)

print(f"\nTotal audio files from all datasets: {len(df)}")
print("\nFinal emotion distribution:")
print(df['emotion'].value_counts())
print("\nFirst 5 rows of final data:")
print(df.head())
print("\nData Loading Complete.")

Starting Data Loading...
Processing RAVDESS...
Found 1344 RAVDESS files.

Processing CREMA-D...
Found 4900 CREMA-D files.

Processing IEMOCAP...
Found 5531 IEMOCAP files.

Total audio files from all datasets: 11775

Final emotion distribution:
emotion
happy      3291
neutral    2987
angry      2758
sad        2739
Name: count, dtype: int64

First 5 rows of final data:
                                           file_path  emotion
0  C:\Code\Python\data\RAVDESS\Actor_01\03-01-01-...  neutral
1  C:\Code\Python\data\RAVDESS\Actor_01\03-01-01-...  neutral
2  C:\Code\Python\data\RAVDESS\Actor_01\03-01-01-...  neutral
3  C:\Code\Python\data\RAVDESS\Actor_01\03-01-01-...  neutral
4  C:\Code\Python\data\RAVDESS\Actor_01\03-01-03-...    happy

Data Loading Complete.


In [None]:
# ## 3. Feature Extraction (Linear Spectrogram)
# Defines the function to convert raw audio waves into Linear Spectrograms
# (dB scale) with fixed dimensions, matching the processing logic intended
# for the Flutter app. Applies this function to all audio files via pandas apply.

print("\nDefining Feature Extraction Function...")
# Define the CORRECT feature extractor for LINEAR spectrograms
def get_linear_spectrogram(file_path, sr=SR, n_fft=N_FFT, hop_length=HOP_LENGTH, n_freq_bins=N_FREQ_BINS, max_pad_len=MAX_PAD_LEN):
    try:
        y, _ = librosa.load(file_path, sr=sr) # Load with target sample rate
        stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)

        # Manually convert amplitude to dB (matches Flutter's logic)
        stft_mag = np.abs(stft)
        stft_db = 20 * np.log10(np.maximum(1e-6, stft_mag))

        stft_db = stft_db[:n_freq_bins, :] # Keep only the required frequency bins

        # Pad or truncate time axis
        if stft_db.shape[1] > max_pad_len:
            stft_db = stft_db[:, :max_pad_len]
        else:
            pad_width = max_pad_len - stft_db.shape[1]
            min_val = np.min(stft_db) if stft_db.size > 0 else -80.0 # Use min value for padding
            stft_db = np.pad(stft_db, pad_width=((0, 0), (0, pad_width)), mode='constant', constant_values=min_val)

        return stft_db.astype(np.float32) # Ensure float32 for TensorFlow
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# --- Extract Features ---
print("\nExtracting Linear Spectrograms for all files...")
# Initialize tqdm for pandas apply
tqdm.pandas(desc="Extracting Spectrograms")
# Apply the function with progress bar
df['spectrogram'] = df['file_path'].progress_apply(get_linear_spectrogram)
# Create a new DataFrame dropping rows where extraction failed
df_processed = df.dropna(subset=['spectrogram'])
failed_count = len(df) - len(df_processed)
print(f"\nSuccessfully processed {len(df_processed)} files.")
if failed_count > 0:
    print(f"❌ Failed to process {failed_count} files.")
print("Feature Extraction Complete.")


Defining Feature Extraction Function...

Extracting Linear Spectrograms for all files...


Extracting Spectrograms:   0%|          | 0/11775 [00:00<?, ?it/s]

Successfully processed 11775 files.
Feature Extraction Complete.


In [None]:
# ## 4. Data Splitting, Encoding, and Class Weights
# Prepares the spectrogram data and labels for model training.
# Splits the data into stratified training and testing sets (80/20).
# Encodes labels into numerical format (one-hot).
# Calculates class weights based on the training set distribution to address imbalances.

print("\nPreparing Data for Training...")
# Prepare data arrays from the successfully processed DataFrame
X = np.array(df_processed['spectrogram'].tolist())
X = X[..., np.newaxis] # Add channel dimension -> (N, freq, time, 1)
y = np.array(df_processed['emotion'].tolist())

# Encode labels
le = LabelEncoder()
y_labels = le.fit_transform(y) # Integer labels
y_encoded = to_categorical(y_labels) # One-hot encoded labels
NUM_CLASSES = len(le.classes_)

# Stratified Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded,
    test_size=0.2,
    random_state=SEED,
    stratify=y_labels # Use integer labels for stratification
)

print(f"\nInput shape: {X.shape}")
print(f"Output shape (one-hot): {y_encoded.shape}")
print(f"Classes: {list(le.classes_)}")
print(f"\nTraining samples: {len(X_train)}")
print(f"Testing samples: {len(X_test)}")

# Calculate Class Weights (using integer labels from the training split)
y_train_labels = np.argmax(y_train, axis=1) # Convert one-hot back to integer labels
class_weights = class_weight.compute_class_weight(
    class_weight='balanced', # Automatically balance classes
    classes=np.unique(y_train_labels),
    y=y_train_labels
)
# Convert weights to a dictionary format expected by Keras
class_weights_dict = dict(enumerate(class_weights))
print("\nClass Weights:", class_weights_dict)
print("Data Preparation Complete.")


Preparing Data for Training...

Input shape: (11775, 128, 130, 1)
Output shape (one-hot): (11775, 4)
Classes: [np.str_('angry'), np.str_('happy'), np.str_('neutral'), np.str_('sad')]

Training samples: 9420
Testing samples: 2355

Class Weights: {0: np.float64(1.0675430643699002), 1: np.float64(0.8944170148120015), 2: np.float64(0.9853556485355649), 3: np.float64(1.074851665905979)}
Data Preparation Complete.


In [None]:
# ## 5. Custom SpecAugment Layer Definition
# Defines a custom Keras layer for SpecAugment (frequency and time masking)
# used for data augmentation during training. Includes necessary configuration
# methods (`get_config`) and registration (`@keras.saving.register_keras_serializable()`)
# for saving and loading the model correctly.

print("\nDefining Custom SpecAugment Layer...")
@keras.saving.register_keras_serializable()
class SpecAugment(Layer):
    def __init__(self, freq_mask_param, time_mask_param, **kwargs):
        super(SpecAugment, self).__init__(**kwargs)
        self.freq_mask_param = freq_mask_param
        self.time_mask_param = time_mask_param

    def call(self, inputs, training=None):
        if not training:
            return inputs # Augmentation is only applied during training

        freq_max = tf.shape(inputs)[1] # Assuming shape is (batch, freq, time, ch)
        f = tf.random.uniform(shape=(), minval=0, maxval=self.freq_mask_param, dtype=tf.int32)
        f0 = tf.random.uniform(shape=(), minval=0, maxval=freq_max - f, dtype=tf.int32)
        # Create frequency mask
        mask_f_before = tf.ones((f0, tf.shape(inputs)[2]))
        mask_f_zero = tf.zeros((f, tf.shape(inputs)[2]))
        mask_f_after = tf.ones((freq_max - f0 - f, tf.shape(inputs)[2]))
        mask_f = tf.concat([mask_f_before, mask_f_zero, mask_f_after], axis=0)

        time_max = tf.shape(inputs)[2]
        t = tf.random.uniform(shape=(), minval=0, maxval=self.time_mask_param, dtype=tf.int32)
        t0 = tf.random.uniform(shape=(), minval=0, maxval=time_max - t, dtype=tf.int32)
        # Create time mask
        mask_t_before = tf.ones((tf.shape(inputs)[1], t0))
        mask_t_zero = tf.zeros((tf.shape(inputs)[1], t))
        mask_t_after = tf.ones((tf.shape(inputs)[1], time_max - t0 - t))
        mask_t = tf.concat([mask_t_before, mask_t_zero, mask_t_after], axis=1)

        # Expand masks to match input dimensions and apply
        mask_f = mask_f[..., tf.newaxis] # Add channel dim
        mask_t = mask_t[..., tf.newaxis] # Add channel dim

        # Apply masks element-wise
        augmented = inputs * mask_f # Frequency masking
        augmented = augmented * mask_t # Time masking
        return augmented

    # get_config is crucial for saving and loading the model
    def get_config(self):
        config = super(SpecAugment, self).get_config()
        config.update({
            "freq_mask_param": self.freq_mask_param,
            "time_mask_param": self.time_mask_param,
        })
        return config

print("✅ Custom Layer Defined.")


Defining Custom SpecAugment Layer...
✅ Custom Layer Defined.


In [None]:
# ## 6. Build the CRNN Model Architecture
# Defines the Convolutional Recurrent Neural Network (CRNN) model.
# Includes data augmentation (GaussianNoise, SpecAugment), multiple CNN blocks
# for feature extraction, reshaping, Bidirectional LSTMs for sequence modeling,
# and Dense layers for final classification. Strong regularization (Dropout,
# BatchNormalization) is used throughout.

print("\nBuilding CRNN Model...")
INPUT_SHAPE = (X_train.shape[1], X_train.shape[2], 1) # Should be (128, 130, 1)

model = Sequential([
    Input(shape=INPUT_SHAPE, name='input_spectrogram'),
    GaussianNoise(0.1, name='noise'),
    SpecAugment(freq_mask_param=15, time_mask_param=30, name='spec_augment'),

    # CNN Block 1
    Conv2D(64, (3, 3), activation='relu', padding='same', name='conv1'),
    BatchNormalization(name='bn1'),
    MaxPooling2D((2, 2), name='pool1'),
    Dropout(0.2, name='drop1'),

    # CNN Block 2
    Conv2D(128, (3, 3), activation='relu', padding='same', name='conv2'),
    BatchNormalization(name='bn2'),
    MaxPooling2D((2, 2), name='pool2'),
    Dropout(0.2, name='drop2'),

    # CNN Block 3
    Conv2D(256, (3, 3), activation='relu', padding='same', name='conv3'),
    BatchNormalization(name='bn3'),
    MaxPooling2D((2, 2), name='pool3'),
    Dropout(0.3, name='drop3'),

    # CNN Block 4
    Conv2D(256, (3, 3), activation='relu', padding='same', name='conv4'),
    BatchNormalization(name='bn4'),
    MaxPooling2D((2, 2), name='pool4'),
    Dropout(0.3, name='drop4'),

    # Reshape for RNN: (batch, time, features)
    # After 4 pooling layers with pool_size=(2,2), the dimensions become:
    # Freq: 128 / 16 = 8
    # Time: 130 / 16 = 8 (integer division)
    # Channels: 256
    # Reshape to (batch, time_steps=8, features=8*256)
    Reshape((8, 8 * 256), name='reshape_for_rnn'),

    # RNN Blocks (Bidirectional LSTMs)
    Bidirectional(LSTM(128, return_sequences=True), name='bilstm1'),
    Dropout(0.4, name='drop_rnn1'),

    Bidirectional(LSTM(64), name='bilstm2'),
    Dropout(0.4, name='drop_rnn2'),

    # Dense Layers
    Dense(256, activation='relu', name='dense1'),
    BatchNormalization(name='bn_dense1'),
    Dropout(0.5, name='drop_dense1'),

    Dense(128, activation='relu', name='dense2'),
    BatchNormalization(name='bn_dense2'),
    Dropout(0.5, name='drop_dense2'),

    Dense(NUM_CLASSES, activation='softmax', name='output')
], name="CRNN_Emotion_Model")


Building CRNN Model...



In [None]:
# ## 7. Compile Model and Define Callbacks
# Compiles the CRNN model using the Adam optimizer with a low learning rate,
# categorical cross-entropy loss (suitable for multi-class classification),
# and accuracy as the evaluation metric. Defines callbacks for training:
# - EarlyStopping: Stops training if validation loss doesn't improve.
# - ReduceLROnPlateau: Reduces learning rate if validation accuracy plateaus.
# - ModelCheckpoint: Saves the best performing model based on validation accuracy.

print("\nCompiling Model and Defining Callbacks...")
# Compile with a lower initial learning rate and Adam optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Define callbacks
early_stop = EarlyStopping(patience=20, monitor='val_loss', restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=5, min_lr=0.000001, verbose=1)
# Define checkpoint path
checkpoint_filepath = 'best_crnn_model.keras'
model_checkpoint = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='val_accuracy',
    save_best_only=True, # Only save the best model
    verbose=1
)

model.summary()
print("✅ Model Compiled and Callbacks Defined.")


Compiling Model and Defining Callbacks...


✅ Model Compiled and Callbacks Defined.


In [None]:
# ## 8. Train the Model
# Trains the compiled CRNN model using the prepared training data.
# Uses the defined callbacks to manage the training process, apply class weights
# to handle imbalances, and validates performance on the test set after each epoch.
# Note: This step is computationally intensive and will take a long time.

print("\n🚀 Starting CRNN Training...")
history = model.fit(
    X_train, y_train,
    epochs=100, # Set a high number; EarlyStopping will find the optimal point
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stop, reduce_lr, model_checkpoint],
    class_weight=class_weights_dict, # Use class weights to address imbalance
    verbose=1
)
print("\n✅ Training Complete.")


🚀 Starting CRNN Training...
Epoch 1/100
[1m295/295[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 715ms/step - accuracy: 0.3023 - loss: 2.1323
Epoch 1: val_accuracy improved from -inf to 0.38429, saving model to best_crnn_model.keras
[1m295/295[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m229s[0m 760ms/step - accuracy: 0.3025 - loss: 2.1319 - val_accuracy: 0.3843 - val_loss: 1.4953 - learning_rate: 1.0000e-04
Epoch 2/100
[1m295/295[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 670ms/step - accuracy: 0.3678 - loss: 1.8214
Epoch 2: val_accuracy improved from 0.38429 to 0.41614, saving model to best_crnn_model.keras
[1m295/295[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m209s[0m 709ms/step - accuracy: 0.3679 - loss: 1.8213 - val_accuracy: 0.4161 - val_loss: 1.4780 - learning_rate: 1.0000e-04
Epoch 3/100
[1m295/295[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 646ms/step - accuracy: 0.3799 - loss: 1.6894
Epoch 3: val_accuracy improved from 0.41614 to 0.42

In [None]:
# ## 9. Load Best Model and Evaluate
# Loads the best model weights saved by the ModelCheckpoint callback during training.
# Evaluates this best model on the unseen test set to get the final performance metrics.
# Displays the final test accuracy, a detailed classification report (precision, recall, F1-score per class),
# and a confusion matrix visualizing the model's predictions vs. true labels.

print("\nLoading best model weights from checkpoint...")
# Load the best performing model saved during training
# Ensure custom objects are passed for the custom SpecAugment layer
try:
    # Load the model saved by ModelCheckpoint
    model = keras.models.load_model(
        checkpoint_filepath, # Use the path defined earlier
        custom_objects={'SpecAugment': SpecAugment}
    )
    print(f"✅ Best model loaded successfully from '{checkpoint_filepath}'.")

    # Evaluate the loaded best model
    print("\nEvaluating the best model on the test set...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"\n--- Final Best CRNN Model Accuracy: {accuracy*100:.2f}% ---")

    # Display Classification Report and Confusion Matrix
    print("\nGenerating evaluation plots...")
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true_classes = np.argmax(y_test, axis=1) # Convert one-hot back to integer labels

    print("\n📋 Classification Report:")
    # Use digits=3 for more precision in the report
    print(classification_report(y_true_classes, y_pred_classes, target_names=le.classes_, digits=3))

    # Plot Confusion Matrix
    cm = confusion_matrix(y_true_classes, y_pred_classes)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=le.classes_, yticklabels=le.classes_)
    plt.title('Confusion Matrix for Best Model')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"❌ Error loading or evaluating the best model from {checkpoint_filepath}: {e}")
    print("Evaluation skipped. Ensure training ran successfully and saved the checkpoint.")


Loading best model weights from checkpoint...
✅ Best model loaded successfully.

Evaluating the best model on the test set...

--- Final Best CRNN Model Accuracy: 60.42% ---
❌ Error loading or evaluating the best model: No module named 'seaborn'
Evaluation skipped.


In [None]:
# ## 10. Save Final Model for Server
# Saves the final, best-performing model (loaded from the checkpoint)
# in the Keras native format (`.keras`). This file contains the model architecture,
# weights, and optimizer state, ready to be loaded by the Python Flask server (`server.py`).

print("\nSaving final best model for the server...")
try:
    # Save the currently loaded best model
    final_model_path = 'unbiased_app_model.keras'
    model.save(final_model_path)
    print(f"✅ Final unbiased model saved as '{final_model_path}' for the server.")
    print("\n🎉 ALL DONE! Your model is ready to use in server.py.")
except Exception as e:
    print(f"❌ Error saving the final Keras model: {e}")