<a href="https://colab.research.google.com/github/Aadil404/Music-Emotion-Recognition/blob/main/notebooks/03_Model_Definition_And_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1. Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 2. Navigate to your project directory (adjust the path if needed)
%cd /content/drive/MyDrive/MER(final-year-project)/

# 3. Install necessary libraries
!pip install librosa tensorflow pandas scikit-learn matplotlib seaborn

Mounted at /content/drive
/content/drive/MyDrive/MER(final-year-project)


In [2]:
# (Mount Drive, install libraries: tensorflow, etc.)
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import os

In [3]:
# --- 1. Load Preprocessed Segment Data ---
data = np.load('processed_data/emotify_spectrograms_5s_segments.npz')
X_train, y_train = data['X_train'], data['y_train']
X_test, y_test = data['X_test'], data['y_test']

# --- 2. Define the Multi-Label CNN Model ---
input_shape = X_train.shape[1:]  # This will be (128, 216, 1) for 5-second segments
num_classes = y_train.shape[1] # Number of emotion columns

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")
print(f"Training segments: {X_train.shape[0]}")
print(f"Testing segments: {X_test.shape[0]}")
print(f"Label range: [{y_train.min():.3f}, {y_train.max():.3f}]")  # Should be 0-1 for weighted labels

Input shape: (128, 216, 1)
Number of classes: 9
Training segments: 7346
Testing segments: 1806
Label range: [0.000, 1.000]


In [4]:
X_train[1]

array([[[1.        ],
        [1.        ],
        [1.        ],
        ...,
        [1.        ],
        [1.        ],
        [1.        ]],

       [[1.        ],
        [1.        ],
        [1.        ],
        ...,
        [1.        ],
        [1.        ],
        [1.        ]],

       [[1.        ],
        [1.        ],
        [1.        ],
        ...,
        [1.        ],
        [1.        ],
        [1.        ]],

       ...,

       [[0.39273763],
        [0.3174634 ],
        [0.28360796],
        ...,
        [0.28360796],
        [0.36913404],
        [0.50409716]],

       [[0.3930145 ],
        [0.31772175],
        [0.28360796],
        ...,
        [0.28360796],
        [0.36364183],
        [0.50283974]],

       [[0.39263278],
        [0.3172226 ],
        [0.28360796],
        ...,
        [0.28360796],
        [0.35711402],
        [0.4995784 ]]], dtype=float32)

In [5]:
y_train[1]

array([0.18181819, 0.        , 0.09090909, 0.        , 0.09090909,
       0.45454547, 0.09090909, 0.72727275, 0.27272728], dtype=float32)

In [7]:
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

def create_segment_model(input_shape, num_classes):
    model = models.Sequential([
        # --- Block 1: Low-level features (Rhythm/Beats) ---
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape, padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.2), # Light dropout

        # --- Block 2: Mid-level features (Chords/Simple Patterns) ---
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.3),

        # --- Block 3: High-level features (Instrumentation/Texture) ---
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.4),

        # --- Block 4: Abstract features ---
        layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        # Global Pooling reduces (Height x Width x Filters) to just (Filters)
        # This prevents the model from caring *where* a sound happens, just *that* it happened.
        layers.GlobalAveragePooling2D(),
        layers.Dropout(0.5), # Strong dropout before dense layers

        # --- Dense Layers ---
        layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),

        # --- Output Layer ---
        # Sigmoid is crucial for Multi-Label classification (independent probabilities)
        layers.Dense(num_classes, activation='sigmoid')
    ])

    return model

# Create model
model = create_segment_model(input_shape, num_classes)

# Custom metric for multi-label F1 score (Keep this from your previous code)
class MultiLabelF1(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', threshold=0.5, **kwargs):
        super().__init__(name=name, **kwargs)
        self.precision_m = tf.keras.metrics.Precision(thresholds=threshold)
        self.recall_m = tf.keras.metrics.Recall(thresholds=threshold)

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision_m.update_state(y_true, y_pred, sample_weight)
        self.recall_m.update_state(y_true, y_pred, sample_weight)

    def result(self):
        p = self.precision_m.result()
        r = self.recall_m.result()
        return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))

    def reset_states(self):
        self.precision_m.reset_states()
        self.recall_m.reset_states()

# Compile
model.compile(
    optimizer=Adam(learning_rate=0.001), # Standard starting rate
    loss='binary_crossentropy',
    metrics=[
        'accuracy',
        MultiLabelF1(threshold=0.3),
        tf.keras.metrics.AUC(name='auc', multi_label=True)
    ]
)

model.summary()

In [17]:
import glob
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

checkpoint_dir = 'checkpoints/segment_model/'
os.makedirs(checkpoint_dir, exist_ok=True)

# Improved checkpoint callback
checkpoint_callback = ModelCheckpoint(
    filepath=os.path.join(checkpoint_dir, 'best_model.weights.h5'),
    monitor='val_f1_score',  # Monitor F1 score instead of loss
    save_best_only=True,
    save_weights_only=True,
    mode='max',  # We want to maximize F1 score
    verbose=1
)

# Additional callbacks for better training
early_stopping = EarlyStopping(
    monitor='val_f1_score',
    patience=20,  # More patience for segment-based training
    restore_best_weights=True,
    mode='max',
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_f1_score',
    factor=0.5,
    patience=8,
    min_lr=1e-7,
    mode='max',
    verbose=1
)

# Find latest checkpoint
checkpoints = glob.glob(os.path.join(checkpoint_dir, '*.weights.h5'))
if checkpoints:
    latest_checkpoint = max(checkpoints, key=os.path.getctime)
    print(f"✅ Resuming from: {latest_checkpoint}")
    model.load_weights(latest_checkpoint)
    # Extract epoch from filename if possible, else start from 0
    try:
        initial_epoch = int(latest_checkpoint.split('epoch_')[1].split('.')[0])
    except:
        initial_epoch = 0
else:
    print("❌ No checkpoint found. Training from scratch.")
    initial_epoch = 0

print(f"Starting from epoch: {initial_epoch}")

❌ No checkpoint found. Training from scratch.
Starting from epoch: 0


In [18]:
import numpy as np
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import os

# --- 1. Calculate Class Weights (Crucial for Imbalanced Data) ---
def calculate_class_weights(y_train):
    """
    Calculate weights for each emotion class.
    Rare emotions get higher weights so the model doesn't ignore them.
    """
    # Sum the weighted probabilities for each class
    class_counts = np.sum(y_train, axis=0)
    total_samples = len(y_train)

    # Formula: total / (num_classes * class_count)
    # This balances the influence of each emotion.
    class_weights_array = total_samples / (len(class_counts) * class_counts)

    # Convert to dictionary {0: weight, 1: weight...} for Keras
    class_weights_dict = {i: weight for i, weight in enumerate(class_weights_array)}
    return class_weights_dict

# Calculate the weights
class_weights = calculate_class_weights(y_train)
print("Class weights:", class_weights)

# --- 2. Define Callbacks (The Safety Net) ---
checkpoint_dir = 'checkpoints/segment_model/'
os.makedirs(checkpoint_dir, exist_ok=True)

callbacks = [
    # Save the model ONLY when Validation Loss decreases (The "Best" version)
    ModelCheckpoint(
        filepath=os.path.join(checkpoint_dir, 'best_model.weights.h5'),
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=True,
        mode='min', # We want minimum loss
        verbose=1
    ),
    # Stop training if the model stops improving for 10 epochs
    EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        mode='min',
        verbose=1
    ),
    # Slow down learning rate if we get stuck
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2, # Divide LR by 5
        patience=5,
        min_lr=1e-6,
        mode='min',
        verbose=1
    )
]

# --- 3. Start Training ---
print("Starting training...")
history = model.fit(
    X_train, y_train,
    epochs=50, # 50 is usually enough with EarlyStopping
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=callbacks,
    class_weight=class_weights, # Use the weights we calculated
    verbose=1
)

Class weights: {0: np.float32(0.82283634), 1: np.float32(0.5710493), 2: np.float32(0.61373276), 3: np.float32(0.44373468), 4: np.float32(0.370971), 5: np.float32(0.5961752), 6: np.float32(0.42075852), 7: np.float32(0.4820439), 8: np.float32(0.5933279)}
Starting training...
Epoch 1/50
[1m230/230[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step - accuracy: 0.2830 - auc: 0.6381 - f1_score: 0.3830 - loss: 0.2867
Epoch 1: val_loss improved from inf to 0.56460, saving model to checkpoints/segment_model/best_model.weights.h5
[1m230/230[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 43ms/step - accuracy: 0.2830 - auc: 0.6381 - f1_score: 0.3830 - loss: 0.2867 - val_accuracy: 0.3306 - val_auc: 0.6308 - val_f1_score: 0.4326 - val_loss: 0.5646 - learning_rate: 2.5000e-04
Epoch 2/50
[1m230/230[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step - accuracy: 0.2893 - auc: 0.6445 - f1_score: 0.3826 - loss: 0.2701
Epoch 2: val_loss improved from 0.56460 to 0.52414,

In [19]:
# Predict on a single sample
# Add a batch dimension to the input data for prediction
input_sample = np.expand_dims(X_test[110], axis=0)
predictions = model.predict(input_sample)
print(predictions)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[[0.15463279 0.29196802 0.17626534 0.23155828 0.34344316 0.1404848
  0.19088584 0.21510112 0.19474053]]


In [20]:
y_test[110]

array([0.05882353, 0.29411766, 0.23529412, 0.4117647 , 0.47058824,
       0.        , 0.        , 0.1764706 , 0.29411766], dtype=float32)

In [21]:
# Evaluate the model on the test set
print("Evaluating the model on the test set...")
results = model.evaluate(X_test, y_test, batch_size=32, verbose=1)

# Print the evaluation results
print("Test Loss:", results[0])
# Assuming the order of metrics in model.compile matches the results list
metric_names = model.metrics_names
for name, value in zip(metric_names, results):
    print(f"Test {name}: {value}")

Evaluating the model on the test set...
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 12ms/step - accuracy: 0.3115 - auc: 0.7103 - f1_score: 0.3518 - loss: 0.5105
Test Loss: 0.5188334584236145
Test loss: 0.5188334584236145
Test compile_metrics: 0.2607973515987396


In [22]:
results

[0.5188334584236145,
 0.2607973515987396,
 0.33676308393478394,
 0.6915789842605591]