In [1]:
!pip install numpy json_tricks pydub librosa noisereduce tensorflow scikit-learn matplotlib seaborn pandas

Collecting json_tricks
  Downloading json_tricks-3.17.3-py2.py3-none-any.whl.metadata (16 kB)
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting noisereduce
  Downloading noisereduce-3.0.3-py3-none-any.whl.metadata (14 kB)
Downloading json_tricks-3.17.3-py2.py3-none-any.whl (27 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Downloading noisereduce-3.0.3-py3-none-any.whl (22 kB)
Installing collected packages: pydub, json_tricks, noisereduce
Successfully installed json_tricks-3.17.3 noisereduce-3.0.3 pydub-0.25.1


In [2]:
import numpy as np
import os
from pydub import AudioSegment, effects
import librosa
import noisereduce as nr
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, callbacks
import sklearn
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import time
import sys

In [3]:
DATASET_PATH = '/content/drive/MyDrive/special project/TESS' # ADJUST THIS PATH
OUTPUT_DIR = '/content/drive/My Drive/Colab Notebooks/' # ADJUST THIS PATH
PROCESSED_DATA_FILE = os.path.join(OUTPUT_DIR, 'processed_data.npz')
MODEL_WEIGHTS_FILE = os.path.join(OUTPUT_DIR, 'best_weights_lstm_mod.keras') # Use .keras extension
MODEL_JSON_FILE = os.path.join(OUTPUT_DIR, 'model_lstm_mod.json')
MODEL_H5_FILE = os.path.join(OUTPUT_DIR, 'model_lstm_mod.weights.h5') # Keep h5 for compatibility if needed
X_TEST_FILE = os.path.join(OUTPUT_DIR, 'x_test_data_mod.npy') # Save test data separately if needed
Y_TEST_FILE = os.path.join(OUTPUT_DIR, 'y_test_data_mod.npy')

In [4]:
FRAME_LENGTH = 2048
HOP_LENGTH = 512
N_MFCC = 13

In [5]:
# Training Hyperparameters
BATCH_SIZE = 32 # More standard batch size
EPOCHS = 10 # Increase epochs, use EarlyStopping
LEARNING_RATE = 0.001 # Explicitly set learning rate if needed (Adam default is often good)
LSTM_UNITS = 128 # Increased units
DROPOUT_RATE = 0.3 # Added dropout

In [6]:
# Emotion Mapping
TESS_EMOTION_MAP = {
    'neutral': 0, 'happy': 1, 'sad': 2, 'angry': 3,
    'fear': 4, 'disgust': 5, 'ps': 6 # 'ps' represents surprised
}
EMOTION_LABELS = ['neutral', 'happy', 'sad', 'angry', 'fearful', 'disgust', 'surprised']
NUM_CLASSES = len(EMOTION_LABELS)

In [7]:
def get_tess_emotion(filename):
    """Extract emotion label from TESS filename."""
    filename = filename.lower()
    for emotion, label in TESS_EMOTION_MAP.items():
        if emotion in filename:
            return label
    return -1

In [8]:
def process_audio_file(file_path, target_length, sr=None):
    """
    Load, preprocess (normalize, trim, pad, noise reduce), and extract features
    (RMS, ZCR, MFCC) from a single audio file.

    Args:
        file_path (str): Path to the audio file.
        target_length (int): The desired length to pad or truncate the audio signal to.
        sr (int, optional): Target sample rate. If None, uses the file's native SR.

    Returns:
        np.array: Extracted features of shape (timesteps, num_features) or None if error.
        int: The extracted emotion label or -1 if not found/error.
    """
    try:
        # Load audio using librosa first to get consistent sample rate handling
        y, sr_native = librosa.load(file_path, sr=sr) # Load with target SR if specified

        # Use pydub for normalization (requires converting back to AudioSegment)
        # Ensure sample width is appropriate (e.g., 16-bit for PCM WAV)
        # Convert float32 numpy array to int16
        y_int16 = (y * 32767).astype(np.int16)
        rawsound = AudioSegment(
            y_int16.tobytes(),
            frame_rate=sr_native if sr is None else sr,
            sample_width=y_int16.dtype.itemsize, # Should be 2 for int16
            channels=1 # Assuming mono
        )

        # Normalize
        normalizedsound = effects.normalize(rawsound, headroom=0)
        normal_x = np.array(normalizedsound.get_array_of_samples(), dtype='float32')
        current_sr = normalizedsound.frame_rate # Use SR from normalized sound

        # Trim silence
        xt, _ = librosa.effects.trim(normal_x, top_db=30)

        # Pad or truncate
        if len(xt) > target_length:
            xt = xt[:target_length]
        else:
            xt = np.pad(xt, (0, target_length - len(xt)), 'constant')

        # Noise reduction
        # Ensure noise reduction is applied correctly, may need tuning
        final_x = nr.reduce_noise(xt, sr=current_sr, stationary=False) # Try non-stationary

        # Extract features
        rms = librosa.feature.rms(y=final_x, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
        zcr = librosa.feature.zero_crossing_rate(y=final_x, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
        # --- Correction is here ---
        mfccs = librosa.feature.mfcc(y=final_x, sr=current_sr, n_mfcc=N_MFCC,
                                      n_fft=FRAME_LENGTH, hop_length=HOP_LENGTH) # Changed frame_length to n_fft
        # --- End Correction ---

        # Combine features (timesteps, features)
        # Note: RMS and ZCR have shape (1, T) while MFCC has (n_mfcc, T). We need T steps.
        # Transpose MFCCs and ensure shapes align. Librosa output needs careful handling.
        # rms/zcr are often shape (1, N_FRAMES), mfcc is (N_MFCC, N_FRAMES)
        features = np.vstack((zcr, rms, mfccs)).T # Stack vertically and transpose

        # Get emotion label
        filename = os.path.basename(file_path)
        emotion_label = get_tess_emotion(filename)

        return features, emotion_label

    except Exception as e:
        print(f"Error processing {file_path}: {e}", file=sys.stderr)
        return None, -1

In [9]:
def preprocess_data(dataset_path, target_length=None):
    """
    Walk through the dataset directory, process each audio file,
    and collect features and labels. Determines max length if not provided.

    Args:
        dataset_path (str): Path to the root directory of the dataset.
        target_length (int, optional): Fixed length for audio samples. If None,
                                       calculates max length from the dataset.

    Returns:
        np.array: Feature data (X) of shape (num_samples, timesteps, num_features).
        np.array: Label data (Y) of shape (num_samples,).
        int: The target length used for padding/truncating.
    """
    if not os.path.isdir(dataset_path):
        print(f"Error: Dataset directory not found at {dataset_path}", file=sys.stderr)
        return None, None, 0

    # --- Determine Target Length (if not provided) ---
    if target_length is None:
        print("Calculating maximum sample length...")
        max_len_calc = 0
        num_files_calc = 0
        for subdir, _, files in os.walk(dataset_path):
            for file in files:
                if file.lower().endswith(('.wav', '.mp3', '.ogg')): # Add other formats if needed
                    file_path = os.path.join(subdir, file)
                    try:
                        y, sr = librosa.load(file_path, sr=None)
                        xt, _ = librosa.effects.trim(y, top_db=30)
                        max_len_calc = max(max_len_calc, len(xt))
                        num_files_calc += 1
                    except Exception as e:
                        print(f"Warning: Could not load/trim {file_path} during length calculation: {e}", file=sys.stderr)
        if max_len_calc == 0:
             print("Error: Could not determine maximum length. No valid audio files found?", file=sys.stderr)
             return None, None, 0
        target_length = max_len_calc
        print(f"Determined target length: {target_length} samples from {num_files_calc} files.")


    print(f"Processing audio files using target length: {target_length}...")
    tic = time.perf_counter()
    all_features = []
    all_labels = []
    processed_count = 0

    for subdir, _, files in os.walk(dataset_path):
        for file in files:
             if file.lower().endswith(('.wav', '.mp3', '.ogg')):
                file_path = os.path.join(subdir, file)
                features, label = process_audio_file(file_path, target_length)

                if features is not None and label != -1:
                    all_features.append(features)
                    all_labels.append(label)
                    processed_count += 1
                # Simple progress indicator
                if processed_count % 100 == 0:
                    print(f"Processed {processed_count} files...")


    toc = time.perf_counter()
    print(f"Finished processing {processed_count} files in {(toc - tic)/60:0.4f} minutes.")

    if not all_features:
        print("Error: No features were extracted.", file=sys.stderr)
        return None, None, target_length

    # Convert lists to NumPy arrays
    # Ensure all feature arrays have the same number of timesteps (should be handled by process_audio_file)
    try:
        X = np.asarray(all_features).astype('float32')
    except ValueError as e:
        print(f"Error: Could not stack features. Check for inconsistent shapes: {e}", file=sys.stderr)
        # Add more debugging here if needed, e.g., print shapes of individual feature arrays
        # for i, f in enumerate(all_features): print(f"Shape {i}: {f.shape}")
        return None, None, target_length

    Y = np.asarray(all_labels).astype('int8')

    print("Feature shape (X):", X.shape) # Should be (num_samples, timesteps, num_features)
    print("Label shape (Y):", Y.shape)   # Should be (num_samples,)

    return X, Y, target_length

In [10]:
target_len = None # Let preprocess_data calculate it initially
if os.path.exists(PROCESSED_DATA_FILE):
    print(f"Loading preprocessed data from {PROCESSED_DATA_FILE}...")
    try:
        data = np.load(PROCESSED_DATA_FILE, allow_pickle=True) # allow_pickle needed if contains non-numeric like target_len
        X = data['X']
        Y = data['Y']
        # Load target_len if saved, otherwise it needs to be consistent
        target_len = data['target_length'].item() if 'target_length' in data else None
        print("Data loaded successfully.")
        print("Feature shape (X):", X.shape)
        print("Label shape (Y):", Y.shape)
        if target_len: print("Target Length:", target_len)
    except Exception as e:
        print(f"Error loading processed data: {e}. Reprocessing...", file=sys.stderr)
        X, Y, target_len = preprocess_data(DATASET_PATH, target_length=target_len) # Use previously determined length if available
else:
    print("No preprocessed data found. Starting processing...")
    X, Y, target_len = preprocess_data(DATASET_PATH)
    if X is not None and Y is not None:
        print(f"Saving processed data to {PROCESSED_DATA_FILE}...")
        os.makedirs(os.path.dirname(PROCESSED_DATA_FILE), exist_ok=True)
        np.savez(PROCESSED_DATA_FILE, X=X, Y=Y, target_length=target_len) # Save target_len too
        print("Data saved.")

if X is None or Y is None:
    print("Exiting due to data processing errors.", file=sys.stderr)
    sys.exit(1) # Exit if data processing failed

if target_len is None:
    print("Error: Target length could not be determined or loaded.", file=sys.stderr)
    # Attempt to infer from X shape if possible
    if X is not None:
         target_len_inferred = X.shape[1] # Assuming X is (samples, timesteps, features)
         print(f"Warning: Inferring target length from X shape: {target_len_inferred}", file=sys.stderr)
         # This might not be the original padded length, use with caution
         # It's better to ensure target_len is saved/passed correctly.
    else:
        sys.exit(1)

Loading preprocessed data from /content/drive/My Drive/Colab Notebooks/processed_data.npz...
Data loaded successfully.
Feature shape (X): (2800, 257, 15)
Label shape (Y): (2800,)
Target Length: 131072


In [11]:
# --- Data Splitting ---
print("Splitting data into training, validation, and test sets...")
# Using a more standard 70% train, 15% validation, 15% test split
x_train, x_temp, y_train, y_temp = train_test_split(X, Y, test_size=0.3, random_state=42, stratify=Y)
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp) # 0.5 * 0.3 = 0.15

print("Train set shapes:", x_train.shape, y_train.shape)
print("Validation set shapes:", x_val.shape, y_val.shape)
print("Test set shapes:", x_test.shape, y_test.shape)

# One-hot encode labels
y_train_class = tf.keras.utils.to_categorical(y_train, NUM_CLASSES)
y_val_class = tf.keras.utils.to_categorical(y_val, NUM_CLASSES)
y_test_class = tf.keras.utils.to_categorical(y_test, NUM_CLASSES)


# --- Save Test Data (Optional but good practice) ---
print("Saving test data...")
os.makedirs(os.path.dirname(X_TEST_FILE), exist_ok=True)
np.save(X_TEST_FILE, x_test)
np.save(Y_TEST_FILE, y_test) # Save original labels, not one-hot encoded
print("Test data saved.")

Splitting data into training, validation, and test sets...
Train set shapes: (1960, 257, 15) (1960,)
Validation set shapes: (420, 257, 15) (420,)
Test set shapes: (420, 257, 15) (420,)
Saving test data...
Test data saved.


In [12]:
print("Building the LSTM model...")
model = keras.Sequential([
    # Input shape: (timesteps, features) - determined by X.shape[1:]
    layers.Input(shape=x_train.shape[1:]), # Explicit Input layer
    # Consider BatchNormalization before LSTM or between LSTM layers
    # layers.BatchNormalization(),
    layers.Bidirectional(layers.LSTM(LSTM_UNITS, return_sequences=True)),
    layers.Dropout(DROPOUT_RATE),
    # layers.BatchNormalization(),
    layers.Bidirectional(layers.LSTM(LSTM_UNITS, return_sequences=False)), # Only last output needed here
    layers.Dropout(DROPOUT_RATE),
    # layers.BatchNormalization(),
    # Add an intermediate Dense layer
    layers.Dense(64, activation='relu'),
    layers.Dropout(DROPOUT_RATE),
    layers.Dense(NUM_CLASSES, activation='softmax')
])

model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), # Using Adam
              metrics=['categorical_accuracy'])

print(model.summary())

Building the LSTM model...


None


In [None]:
# --- Callbacks ---
# Save only the best model based on validation accuracy
mcp_save = callbacks.ModelCheckpoint(
    MODEL_WEIGHTS_FILE, save_best_only=True,
    monitor='val_categorical_accuracy', mode='max', save_weights_only=False # Save entire model
)
# Reduce learning rate if validation accuracy plateaus
rlrop = callbacks.ReduceLROnPlateau(
    monitor='val_categorical_accuracy', factor=0.2, patience=10, min_lr=0.00001, verbose=1
)
# Stop training early if validation loss doesn't improve
early_stopping = callbacks.EarlyStopping(
    monitor='val_loss', patience=15, verbose=1, restore_best_weights=True # Restore best weights found
)

# --- Training ---
print("Starting model training...")
history = model.fit(x_train, y_train_class,
                    epochs=EPOCHS,
                    batch_size=BATCH_SIZE,
                    validation_data=(x_val, y_val_class),
                    callbacks=[mcp_save, rlrop, early_stopping]) # Added early stopping

# Note: If EarlyStopping restored best weights, loading from checkpoint might be redundant,
# but it's safer if saving the whole model fails for some reason.
# Check if the best model was saved and load it explicitly if needed.
if os.path.exists(MODEL_WEIGHTS_FILE) and early_stopping.stopped_epoch > 0: # Check if early stopping happened
     print(f"Loading best model weights from epoch {early_stopping.best_epoch + 1} saved at {MODEL_WEIGHTS_FILE}")
     # If mcp_save saved the whole model:
     model = keras.models.load_model(MODEL_WEIGHTS_FILE)
     # If save_weights_only=True was used with ModelCheckpoint:
     # model.load_weights(MODEL_WEIGHTS_FILE)

Starting model training...
Epoch 1/10
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m125s[0m 2s/step - categorical_accuracy: 0.2490 - loss: 1.8748 - val_categorical_accuracy: 0.4881 - val_loss: 1.3560 - learning_rate: 0.0010
Epoch 2/10
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m140s[0m 2s/step - categorical_accuracy: 0.4271 - loss: 1.4748 - val_categorical_accuracy: 0.4881 - val_loss: 1.2867 - learning_rate: 0.0010
Epoch 3/10
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m112s[0m 2s/step - categorical_accuracy: 0.4966 - loss: 1.2763 - val_categorical_accuracy: 0.6190 - val_loss: 1.0487 - learning_rate: 0.0010
Epoch 4/10
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m149s[0m 2s/step - categorical_accuracy: 0.6220 - loss: 1.0392 - val_categorical_accuracy: 0.6905 - val_loss: 0.8972 - learning_rate: 0.0010
Epoch 5/10
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m135s[0m 2s/step - categorical_accuracy: 0.6802 - loss: 0.8512 - val_

In [None]:
def plot_history(history):
    """Plots training & validation loss and accuracy."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    # Plot Loss
    ax1.plot(history.history['loss'], label='Loss (training data)')
    ax1.plot(history.history['val_loss'], label='Loss (validation data)')
    ax1.set_title('Loss for Train and Validation Sets')
    ax1.set_ylabel('Loss Value')
    ax1.set_xlabel('Epoch')
    ax1.legend(loc="upper right")

    # Plot Accuracy
    ax2.plot(history.history['categorical_accuracy'], label='Accuracy (training data)')
    ax2.plot(history.history['val_categorical_accuracy'], label='Accuracy (validation data)')
    ax2.set_title('Accuracy for Train and Validation Sets')
    ax2.set_ylabel('Accuracy (%)') # Accuracy is usually 0-1, unless multiplied by 100
    ax2.set_xlabel('Epoch')
    ax2.legend(loc="lower right")

    plt.tight_layout()
    plt.show()

print("Plotting training history...")
plot_history(history)

In [None]:
def evaluate_model(model, x_data, y_data_class, set_name="Validation"):
    """Evaluates the model and prints/plots confusion matrix."""
    print(f"\n--- {set_name} Set Evaluation ---")
    loss, acc = model.evaluate(x_data, y_data_class, verbose=0)
    print(f"{set_name} Loss: {loss:.4f}")
    print(f"{set_name} Accuracy: {acc:.4f}")

    # Get predictions
    predictions = model.predict(x_data)
    y_pred_class = np.argmax(predictions, axis=1)
    y_true_class = np.argmax(y_data_class, axis=1) # Convert one-hot back to class indices

    # Confusion Matrix
    cm = sklearn.metrics.confusion_matrix(y_true_class, y_pred_class)
    cm_df = pd.DataFrame(cm, index=EMOTION_LABELS, columns=EMOTION_LABELS)

    plt.figure(figsize=(10, 7))
    sns.heatmap(cm_df, annot=True, fmt="d", cmap="Blues")
    plt.title(f'{set_name} Set Confusion Matrix')
    plt.ylabel('True Emotion')
    plt.xlabel('Predicted Emotion')
    plt.show()

    # Per-class accuracy
    values = cm.diagonal()
    row_sum = np.sum(cm, axis=1)
    # Handle division by zero for classes with no samples in the set
    with np.errstate(divide='ignore', invalid='ignore'):
        class_acc = values / row_sum
        class_acc[np.isnan(class_acc)] = 0.0 # Set NaN to 0

    print(f'\n{set_name} Set Predicted Emotions Accuracy per Class:')
    for i, emotion in enumerate(EMOTION_LABELS):
        if row_sum[i] > 0: # Only print if class exists in this set
             print(f"{emotion}: {class_acc[i]:.4f} ({values[i]}/{row_sum[i]})")
        else:
             print(f"{emotion}: N/A (0 samples)")

# Evaluate on Validation Set
evaluate_model(model, x_val, y_val_class, set_name="Validation")

# Evaluate on Test Set
evaluate_model(model, x_test, y_test_class, set_name="Test")

In [None]:
# --- Save Final Model ---
# Save in Keras native format (recommended) and potentially JSON/H5 for compatibility
print("\nSaving the final model...")
# Keras format (saves architecture, weights, optimizer state)
# model.save(MODEL_WEIGHTS_FILE.replace('.keras', '_final.keras')) # Save final explicitly if needed

# JSON (architecture) + H5 (weights)
model_json = model.to_json()
os.makedirs(os.path.dirname(MODEL_JSON_FILE), exist_ok=True)
with open(MODEL_JSON_FILE, "w") as json_file:
    json_file.write(model_json)
model.save_weights(MODEL_H5_FILE)
print(f"Saved model architecture to {MODEL_JSON_FILE}")
print(f"Saved model weights to {MODEL_H5_FILE}")
print(f"Best model also saved/loaded from {MODEL_WEIGHTS_FILE} (Keras format)")


# --- Prediction Function ---
def predict_emotion_from_file(audio_file_path, model_to_use, target_len_predict):
    """Predicts emotion from a single audio file using the trained model."""
    print(f"\nPredicting emotion for: {audio_file_path}")
    if not os.path.exists(audio_file_path):
        print("Error: Audio file not found.", file=sys.stderr)
        return None, None

    # Process the audio file using the same parameters as training
    features, _ = process_audio_file(audio_file_path, target_length=target_len_predict)

    if features is None:
        print("Error: Could not extract features from the audio file.", file=sys.stderr)
        return None, None

    # Add batch dimension: (timesteps, features) -> (1, timesteps, features)
    features = np.expand_dims(features, axis=0)

    # Predict
    predictions = model_to_use.predict(features)
    predicted_index = np.argmax(predictions, axis=1)[0]
    predicted_emotion = EMOTION_LABELS[predicted_index]
    confidence_scores = predictions[0]

    print(f"Predicted Emotion: {predicted_emotion}")
    print("Confidence Scores:")
    for emotion, score in zip(EMOTION_LABELS, confidence_scores):
        print(f"  {emotion}: {score:.4f}")

    return predicted_emotion, confidence_scores


Saving the final model...
Saved model architecture to /content/drive/My Drive/Colab Notebooks/model_lstm_mod.json
Saved model weights to /content/drive/My Drive/Colab Notebooks/model_lstm_mod.weights.h5
Best model also saved/loaded from /content/drive/My Drive/Colab Notebooks/best_weights_lstm_mod.keras (Keras format)


In [None]:
# --- Example Usage for Prediction ---
# Make sure the example file path is correct
EXAMPLE_AUDIO_FILE = '/content/drive/MyDrive/special project/archive/Audio_Speech_Actors_01-24/Actor_01/03-01-01-01-01-01-01.wav' # ADJUST THIS PATH

# Ensure target_len is available for prediction
if target_len is None:
    print("Warning: Target length for prediction is unknown. Results may be inaccurate.", file=sys.stderr)
    # You might need to load it from the saved model or data file if running prediction separately
else:
    predict_emotion_from_file(EXAMPLE_AUDIO_FILE, model, target_len)

print("\nScript finished.")


Predicting emotion for: /content/drive/MyDrive/special project/archive/Audio_Speech_Actors_01-24/Actor_01/03-01-01-01-01-01-01.wav
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 171ms/step
Predicted Emotion: neutral
Confidence Scores:
  neutral: 0.8087
  happy: 0.0002
  sad: 0.1831
  angry: 0.0004
  fearful: 0.0021
  disgust: 0.0006
  surprised: 0.0048

Script finished.
