## 1. Setup and Configuration

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
warnings.filterwarnings('ignore')

# Scikit-learn imports
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import class_weight

# TensorFlow/Keras imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (Conv2D, MaxPooling2D, Flatten, 
                                      Dense, Dropout, BatchNormalization)
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print("✓ Libraries imported successfully!")

## 2. Load and Prepare Labels

In [None]:
# Load labels from MELD dataset
print("Loading MELD labels...")
print("=" * 70)

# Load text embeddings for alignment (needed to extract utterance IDs)
pkl_path = 'MELD.Raw/text_emotion.pkl'
if not os.path.exists(pkl_path):
    raise FileNotFoundError(f"Text embeddings not found at: {pkl_path}")

with open(pkl_path, 'rb') as f:
    text_data = pickle.load(f)

train_text_dict, val_text_dict, test_text_dict = text_data
print(f"✓ Text embeddings loaded: {len(train_text_dict)} train, {len(val_text_dict)} val, {len(test_text_dict)} test dialogues")

# Load labels from CSV files
base_path = 'MELD.Raw'
train_labels_df = pd.read_csv(os.path.join(base_path, 'train_sent_emo.csv'))
dev_labels_df = pd.read_csv(os.path.join(base_path, 'dev_sent_emo.csv'))
test_labels_df = pd.read_csv(os.path.join(base_path, 'test_sent_emo.csv'))

print(f"✓ Labels loaded: {len(train_labels_df)} train, {len(dev_labels_df)} val, {len(test_labels_df)} test utterances")

# Create label lookup dictionaries
def create_label_dict(df):
    """Create dictionary mapping (dialogue_id, utterance_id) -> emotion"""
    return {(row['Dialogue_ID'], row['Utterance_ID']): row['Emotion'] 
            for _, row in df.iterrows()}

train_label_dict = create_label_dict(train_labels_df)
val_label_dict = create_label_dict(dev_labels_df)
test_label_dict = create_label_dict(test_labels_df)

# Extract utterances with labels (for text alignment)
def extract_utterances_with_labels(text_dict, label_dict):
    """Extract text features and labels for valid utterances"""
    X_text_list, y_list, dia_ids, utt_ids = [], [], [], []
    
    for dia_id in text_dict.keys():
        text_features = text_dict[dia_id]
        for utt_idx in range(text_features.shape[0]):
            if np.any(text_features[utt_idx]):  # Non-zero utterance
                key = (int(dia_id), utt_idx)
                if key in label_dict:
                    X_text_list.append(text_features[utt_idx])
                    y_list.append(label_dict[key])
                    dia_ids.append(dia_id)
                    utt_ids.append(utt_idx)
    
    return np.array(X_text_list), y_list, dia_ids, utt_ids

print("\nExtracting utterances with labels...")
X_text_train, train_emotions, train_dia_ids, train_utt_ids = extract_utterances_with_labels(train_text_dict, train_label_dict)
X_text_val, val_emotions, val_dia_ids, val_utt_ids = extract_utterances_with_labels(val_text_dict, val_label_dict)
X_text_test, test_emotions, test_dia_ids, test_utt_ids = extract_utterances_with_labels(test_text_dict, test_label_dict)

print(f"  Train: {len(X_text_train)} utterances")
print(f"  Val: {len(X_text_val)} utterances")
print(f"  Test: {len(X_text_test)} utterances")

# Encode labels
le = LabelEncoder()
all_emotions = train_emotions + val_emotions + test_emotions
le.fit(all_emotions)

y_train = le.transform(train_emotions)
y_val = le.transform(val_emotions)
y_test = le.transform(test_emotions)

num_classes = len(le.classes_)
print(f"\n✓ Labels encoded: {num_classes} emotion classes")
print(f"  Classes: {list(le.classes_)}")

# Convert to one-hot encoding
y_train_cat = to_categorical(y_train, num_classes=num_classes)
y_val_cat = to_categorical(y_val, num_classes=num_classes)
y_test_cat = to_categorical(y_test, num_classes=num_classes)

print("=" * 70)

## 3. Load Spectrogram Data

In [None]:
# Load pre-computed spectrograms
# Note: Spectrograms were generated using audio_to_spectrogram.ipynb
# They are already aligned with the labels (same indices)

spectrogram_path = 'MELD_spectrograms'

print(f"Loading spectrogram data from: {spectrogram_path}")
print("=" * 70)

X_audio_train = np.load(os.path.join(spectrogram_path, 'X_audio_train.npy'))
X_audio_val = np.load(os.path.join(spectrogram_path, 'X_audio_val.npy'))
X_audio_test = np.load(os.path.join(spectrogram_path, 'X_audio_test.npy'))

print(f"✓ Spectrogram data loaded successfully!")
print(f"  Train: {X_audio_train.shape}")
print(f"  Val: {X_audio_val.shape}")
print(f"  Test: {X_audio_test.shape}")

# Verify alignment with labels
assert X_audio_train.shape[0] == y_train.shape[0], "Train size mismatch!"
assert X_audio_val.shape[0] == y_val.shape[0], "Val size mismatch!"
assert X_audio_test.shape[0] == y_test.shape[0], "Test size mismatch!"

print(f"\n✓ Spectrograms are properly aligned with labels")
print("=" * 70)

## 4. Build 2D CNN Model

In [None]:
def build_2d_cnn_model(input_shape, num_classes):
    """
    Build 2D CNN for spectrogram-based emotion classification
    
    Args:
        input_shape: Shape of spectrogram input (height, width, channels)
        num_classes: Number of emotion classes
    
    Returns:
        Compiled Keras model
    """
    model = Sequential(name='CNN_Spectrogram_Model')
    
    # Conv Block 1
    model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.25))
    
    # Conv Block 2
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.25))
    
    # Conv Block 3
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.25))
    
    # Conv Block 4
    model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.3))
    
    # Dense layers
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.4))
    
    # Output layer
    model.add(Dense(num_classes, activation='softmax'))
    
    return model

# Build the CNN model
cnn_input_shape = X_audio_train.shape[1:]
cnn_model = build_2d_cnn_model(cnn_input_shape, num_classes)

print("2D CNN Model Architecture:")
cnn_model.summary()

## 5. Train 2D CNN Model

In [None]:
# Calculate class weights to handle imbalanced data
class_weights_array = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights_dict = dict(enumerate(class_weights_array))

# Cap weights to prevent instability
for k, v in class_weights_dict.items():
    class_weights_dict[k] = min(v, 3.0)

print(f"Class weights: {class_weights_dict}")

# Compile CNN model
cnn_model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Define callbacks
cnn_callbacks = [
    EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-7, verbose=1),
    ModelCheckpoint('best_cnn_spectrogram_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
]

print("\nTraining 2D CNN model...")
print("=" * 70)

# Train the model
cnn_history = cnn_model.fit(
    X_audio_train, y_train_cat,
    validation_data=(X_audio_val, y_val_cat),
    epochs=100,
    batch_size=32,
    callbacks=cnn_callbacks,
    class_weight=class_weights_dict,
    verbose=1
)

print("\n✓ 2D CNN model training complete!")
print(f"Best Validation Accuracy: {max(cnn_history.history['val_accuracy']):.4f}")

## 6. Visualize Training Progress

In [None]:
# Plot CNN training history
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Plot accuracy
axes[0].plot(cnn_history.history['accuracy'], label='Train Accuracy')
axes[0].plot(cnn_history.history['val_accuracy'], label='Val Accuracy')
axes[0].set_title('2D CNN Model Accuracy')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Accuracy')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot loss
axes[1].plot(cnn_history.history['loss'], label='Train Loss')
axes[1].plot(cnn_history.history['val_loss'], label='Val Loss')
axes[1].set_title('2D CNN Model Loss')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Final Training Accuracy: {cnn_history.history['accuracy'][-1]:.4f}")
print(f"Final Validation Accuracy: {cnn_history.history['val_accuracy'][-1]:.4f}")

## 7. Evaluate 2D CNN Model

In [None]:
# Evaluate CNN model on test set
print("2D CNN Model Evaluation")
print("=" * 70)

# Get predictions
y_pred_probs = cnn_model.predict(X_audio_test, verbose=0)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_cat, axis=1)

# Calculate test accuracy
test_loss, cnn_test_accuracy = cnn_model.evaluate(X_audio_test, y_test_cat, verbose=0)
print(f"Test Accuracy: {cnn_test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

# Classification report
print(f"\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=le.classes_))

# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Reds',
            xticklabels=le.classes_, yticklabels=le.classes_)
plt.title('2D CNN Model - Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.show()

## 8. Per-Class Performance Analysis

In [None]:
# Per-class performance analysis
from sklearn.metrics import precision_recall_fscore_support

print("PER-CLASS PERFORMANCE ANALYSIS")
print("=" * 70)

# Calculate precision, recall, and F1 scores per class
precision, recall, f1, support = precision_recall_fscore_support(
    y_true, y_pred, average=None, labels=range(num_classes)
)

# Create detailed dataframe
per_class_df = pd.DataFrame({
    'Emotion': le.classes_,
    'Precision': precision,
    'Recall': recall,
    'F1-Score': f1,
    'Support': support
})

print("\nDetailed Per-Class Metrics:")
print(per_class_df.to_string(index=False))

# Visualize per-class F1 scores
fig, ax = plt.subplots(figsize=(12, 6))
x = np.arange(len(le.classes_))

bars = ax.bar(x, f1, color='#e74c3c', alpha=0.8)
ax.set_xlabel('Emotion Class')
ax.set_ylabel('F1-Score')
ax.set_title('Per-Class F1-Score - 2D CNN Model')
ax.set_xticks(x)
ax.set_xticklabels(le.classes_, rotation=45, ha='right')
ax.set_ylim([0, 1.0])
ax.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for i, bar in enumerate(bars):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height,
            f'{height:.3f}',
            ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.show()

# Identify best and worst performing classes
print(f"\nBest performing emotions:")
best_emotions = per_class_df.nlargest(3, 'F1-Score')
for _, row in best_emotions.iterrows():
    print(f"  {row['Emotion']}: F1={row['F1-Score']:.4f} (support={int(row['Support'])})")

print(f"\nWorst performing emotions:")
worst_emotions = per_class_df.nsmallest(3, 'F1-Score')
for _, row in worst_emotions.iterrows():
    print(f"  {row['Emotion']}: F1={row['F1-Score']:.4f} (support={int(row['Support'])})")

## 9. Visualize Sample Spectrograms

In [None]:
# Visualize sample spectrograms from each emotion class
print("Sample Spectrograms by Emotion Class")
print("=" * 70)

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for i, emotion in enumerate(le.classes_):
    # Find first sample of this emotion
    emotion_idx = np.where(y_true == i)[0][0]
    sample_spec = X_audio_test[emotion_idx]
    
    # Plot spectrogram
    axes[i].imshow(sample_spec[:, :, 0], aspect='auto', origin='lower', cmap='viridis')
    axes[i].set_title(f'{emotion.capitalize()}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Frequency')
    axes[i].grid(False)

plt.tight_layout()
plt.show()

## 10. Save Model and Results

In [None]:
# Save the final model
model_save_path = 'best_cnn_spectrogram_model.keras'
cnn_model.save(model_save_path)
print(f"✓ Model saved to: {model_save_path}")

# Save predictions for further analysis
predictions_dict = {
    'true_labels': y_true,
    'predicted_labels': y_pred,
    'predicted_probabilities': y_pred_probs,
    'emotion_classes': le.classes_,
    'test_accuracy': cnn_test_accuracy
}

np.save('cnn_spectrogram_predictions.npy', predictions_dict)
print(f"✓ Predictions saved to: cnn_spectrogram_predictions.npy")

# Print final summary
print("\n" + "=" * 70)
print("TRAINING SUMMARY")
print("=" * 70)
print(f"Model: 2D CNN for Spectrogram-based Emotion Recognition")
print(f"Input Shape: {X_audio_train.shape[1:]}")
print(f"Number of Classes: {num_classes}")
print(f"Training Samples: {X_audio_train.shape[0]}")
print(f"Validation Samples: {X_audio_val.shape[0]}")
print(f"Test Samples: {X_audio_test.shape[0]}")
print(f"\nFinal Test Accuracy: {cnn_test_accuracy:.4f}")
print(f"Best Validation Accuracy: {max(cnn_history.history['val_accuracy']):.4f}")
print("=" * 70)