#  <center> Speech Emotion Recognition <center>

## Importing Libraries

In [None]:
from transformers import AutoFeatureExtractor, ASTForAudioClassification, TrainingArguments, Trainer
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from datasets import Dataset
import pandas as pd
import torch
import librosa
import os
import numpy as np
import random
from tqdm import tqdm
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Audio
from tqdm import tqdm

### Load and Pre Process the Dataset

In [None]:
# Paths for datasets
Ravdess_path = "/kaggle/input/ravdess-emotional-speech-audio/audio_speech_actors_01-24/"
Crema_path = "/kaggle/input/cremad/AudioWAV/"
Tess_path = "/kaggle/input/toronto-emotional-speech-set-tess/tess toronto emotional speech set data/TESS Toronto emotional speech set data/"
Savee_path = "/kaggle/input/surrey-audiovisual-expressed-emotion-savee/ALL/"

In [None]:
# Function to process RAVDESS dataset
def process_ravdess(Ravdess_path):
    ravdess_directory_list = os.listdir(Ravdess_path)
    file_emotion = []
    file_path = []

    for dir in ravdess_directory_list:
        actor_dir = os.path.join(Ravdess_path, dir)
        if os.path.isdir(actor_dir):
            actor = os.listdir(actor_dir)
            for file in actor:
                part = file.split('.')[0]
                part = part.split('-')
                file_emotion.append(int(part[2]))
                file_path.append(os.path.join(actor_dir, file))
                
    emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
    path_df = pd.DataFrame(file_path, columns=['Path'])
    Ravdess_df = pd.concat([emotion_df, path_df], axis=1)
    Ravdess_df.Emotions.replace({1:'neutral', 2:'calm', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 8:'surprise'}, inplace=True)
    return Ravdess_df

In [None]:
# Function to process CREMA dataset
def process_crema(Crema_path):
    crema_directory_list = os.listdir(Crema_path)
    file_emotion = []
    file_path = []

    for file in crema_directory_list:
        file_path.append(os.path.join(Crema_path, file))
        part = file.split('_')
        if part[2] == 'SAD':
            file_emotion.append('sad')
        elif part[2] == 'ANG':
            file_emotion.append('angry')
        elif part[2] == 'DIS':
            file_emotion.append('disgust')
        elif part[2] == 'FEA':
            file_emotion.append('fear')
        elif part[2] == 'HAP':
            file_emotion.append('happy')
        elif part[2] == 'NEU':
            file_emotion.append('neutral')
        else:
            file_emotion.append('Unknown')
            
    emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
    path_df = pd.DataFrame(file_path, columns=['Path'])
    Crema_df = pd.concat([emotion_df, path_df], axis=1)
    return Crema_df

In [None]:
# Function to process TESS dataset
def process_tess(Tess_path):
    tess_directory_list = os.listdir(Tess_path)
    file_emotion = []
    file_path = []

    for dir in tess_directory_list:
        dir_path = os.path.join(Tess_path, dir)
        if os.path.isdir(dir_path):
            directories = os.listdir(dir_path)
            for file in directories:
                file_path.append(os.path.join(dir_path, file))
                part = file.split('.')[0]
                part = part.split('_')[2]
                if part == 'ps':
                    file_emotion.append('surprise')
                else:
                    file_emotion.append(part)
                
    emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
    path_df = pd.DataFrame(file_path, columns=['Path'])
    Tess_df = pd.concat([emotion_df, path_df], axis=1)
    return Tess_df


In [None]:
# Function to process SAVEE dataset
def process_savee(Savee_path):
    savee_directory_list = os.listdir(Savee_path)
    file_emotion = []
    file_path = []

    for file in savee_directory_list:
        file_path.append(os.path.join(Savee_path, file))
        part = file.split('_')[1]
        ele = part[:-6]
        if ele == 'a':
            file_emotion.append('angry')
        elif ele == 'd':
            file_emotion.append('disgust')
        elif ele == 'f':
            file_emotion.append('fear')
        elif ele == 'h':
            file_emotion.append('happy')
        elif ele == 'n':
            file_emotion.append('neutral')
        elif ele == 'sa':
            file_emotion.append('sad')
        else:
            file_emotion.append('surprise')
            
    emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
    path_df = pd.DataFrame(file_path, columns=['Path'])
    Savee_df = pd.concat([emotion_df, path_df], axis=1)
    return Savee_df

In [None]:
# Process each dataset
Ravdess_df = process_ravdess(Ravdess_path)
Crema_df = process_crema(Crema_path)
Tess_df = process_tess(Tess_path)
Savee_df = process_savee(Savee_path)

# Combine all dataframes
df = pd.concat([Ravdess_df, Crema_df, Tess_df, Savee_df], axis=0)

# Encode emotion labels
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(df['Emotions'])

# Split dataset into train and test sets with stratification
train_paths, test_paths, train_labels, test_labels = train_test_split(
    df['Path'].tolist(), encoded_labels, test_size=0.2, random_state=42, stratify=encoded_labels
)

print(f"Number of training samples: {len(train_paths)}")
print(f"Number of testing samples: {len(test_paths)}")


In [None]:
df

In [None]:
import pandas as pd

# Find unique values in Emotions
unique_values_column1 = df['Emotions'].unique()
print(unique_values_column1) 

In [None]:
# Function to load, resample, and preprocess audio
def load_and_preprocess_audio(audio_path, target_sampling_rate):
    try:
        audio, sr = librosa.load(audio_path, sr=None)  # Load audio with original sampling rate
        if sr != target_sampling_rate:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sampling_rate)  # Resample to target rate
    except Exception as e:
        print(f"Error loading audio file {audio_path}: {e}")
        return None
    return audio

## Data Visualisation and Exploration

First let's plot the count of each emotions in our dataset.

In [None]:
# Assuming you have a DataFrame named Savee_df
plt.figure(figsize=(16, 6))
plt.title('Count of Emotions', size=20)
sns.countplot(data=df, x='Emotions', order=df['Emotions'].value_counts().index)
plt.ylabel('Count', size=14)
plt.xlabel('Emotions', size=14)
sns.despine(top=True, right=True, left=False, bottom=False)
plt.show()

We can also plot waveplots and spectograms for audio signals

* Waveplots - Waveplots let us know the loudness of the audio at a given time.
* Spectograms - A spectrogram is a visual representation of the spectrum of frequencies of sound or other signals as they vary with time. It’s a representation of frequencies changing with respect to time for given audio/music signals.

In [None]:
# Now, for visualization, you can use the following code:

def create_waveplot(data, sr, e):
    plt.figure(figsize=(10, 3))
    plt.title('Waveplot for audio with {} emotion'.format(e), size=15)
    plt.plot(np.linspace(0, len(data) / sr, num=len(data)), data)
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.show()


def create_spectrogram(data, sr, e):
    # stft function converts the data into short term fourier transform
    X = librosa.stft(data)
    Xdb = librosa.amplitude_to_db(abs(X))
    plt.figure(figsize=(12, 3))
    plt.title('Spectrogram for audio with {} emotion'.format(e), size=15)
    librosa.display.specshow(Xdb, sr=sr, x_axis='time', y_axis='hz')   
    #librosa.display.specshow(Xdb, sr=sr, x_axis='time', y_axis='log')
    plt.colorbar()

In [None]:
emotion='fear'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='angry'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='sad'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='happy'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='disgust'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='neutral'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='surprise'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

In [None]:
emotion='calm'
path = np.array(df.Path[df.Emotions == emotion])[1]
data, sampling_rate = librosa.load(path)
create_waveplot(data, sampling_rate, emotion)
create_spectrogram(data, sampling_rate, emotion)
Audio(path)

## Load Model

In [None]:
# Load the model and feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
model = ASTForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

In [None]:
# Check if a GPU is available and move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


### Process

In [None]:
from tqdm import tqdm
import torch
from datasets import Dataset

def preprocess_data(file_paths, labels, feature_extractor, sampling_rate):
    inputs = []
    for i, file_path in enumerate(tqdm(file_paths, desc="Processing audio files")):
        audio = load_and_preprocess_audio(file_path, sampling_rate)
        if audio is not None:
            input_features = feature_extractor(audio, sampling_rate=sampling_rate, return_tensors="pt")
            input_features["labels"] = torch.tensor([labels[i]])
            inputs.append(input_features)
    return inputs

# Get the sampling rate from the feature extractor
sampling_rate = feature_extractor.sampling_rate
# Preprocess the train and test datasets
train_dataset = preprocess_data(train_paths, train_labels.tolist(), feature_extractor, sampling_rate)
test_dataset = preprocess_data(test_paths, test_labels.tolist(), feature_extractor, sampling_rate)

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch

class CustomDataset(Dataset):
    def __init__(self, features):
        self.features = features

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        sample = self.features[idx]
        # Ensure the sample has correct keys and values
        input_values = sample["input_values"].squeeze(0)  # Adjust if needed
        labels = sample["labels"].squeeze(0)  # Adjust if needed
        return {
            "input_values": input_values,
            "labels": labels
        }

# Convert the lists of dictionaries to the CustomDataset format
train_features = [{"input_values": x["input_values"], "labels": x["labels"]} for x in train_dataset]
test_features = [{"input_values": x["input_values"], "labels": x["labels"]} for x in test_dataset]

# Wrap features into Dataset and use DataLoader
train_dataset = CustomDataset(train_features)
test_dataset = CustomDataset(test_features)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)


In [None]:
# Function to predict emotion from audio file
def predict_emotion(audio_path):
    audio = load_and_preprocess_audio(audio_path, sampling_rate)
    if audio is None:
        return None

    inputs = feature_extractor(audio, sampling_rate=sampling_rate, return_tensors="pt")
    inputs = {key: value.to(device) for key, value in inputs.items()}  # Move inputs to the same device as the model

    with torch.no_grad():
        logits = model(**inputs).logits

    predicted_class_id = torch.argmax(logits, dim=-1).item()
    predicted_label = model.config.id2label[predicted_class_id]
    return predicted_label

In [None]:
# Update model config with custom labels
model.config.id2label = {i: label for i, label in enumerate(label_encoder.classes_)}
model.config.label2id = {label: i for i, label in enumerate(label_encoder.classes_)}

# Get the sampling rate from the feature extractor or define a default value
try:
    sampling_rate = feature_extractor.sampling_rate
except AttributeError:
    sampling_rate = 16000

## Fine Tuning and Evaluation

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    save_strategy="epoch",
    logging_dir="./logs",
)

# Define the trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

In [None]:
# Wandb login:
from kaggle_secrets import UserSecretsClient
import wandb
user_secrets = UserSecretsClient()
secret_value = user_secrets.get_secret("wandb_api_key")
wandb.login(key=secret_value)

In [None]:
# Clear CUDA cache
torch.cuda.empty_cache()

# Monitor GPU memory usage
def print_gpu_utilization():
    print(f"Allocated: {round(torch.cuda.memory_allocated(0)/1024**3,1)} GB")
    print(f"Cached: {round(torch.cuda.memory_reserved(0)/1024**3,1)} GB")

# Print GPU utilization before training
print_gpu_utilization()

In [None]:
# Train the model
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.save_model()

## Inference

In [None]:
# Set the model to evaluation mode
model.eval()

# Function to perform inference
def infer(model, dataloader, device):
    model.to(device)  # Move model to the appropriate device
    all_predictions = []
    all_labels = []

    with torch.no_grad():  # No need to compute gradients
        for batch in tqdm(dataloader, desc="Inference"):
            input_values = batch["input_values"].to(device)
            labels = batch["labels"].to(device)
            
            # Forward pass through the model
            outputs = model(input_values)
            
            # Assuming model outputs logits
            logits = outputs.logits  # Adjust if your model's output is different
            predictions = torch.argmax(logits, dim=-1)  # Get predictions

            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return all_predictions, all_labels

# Set the device for inference
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Perform inference on the test dataset
predictions, true_labels = infer(model, test_loader, device)

# Print example predictions and logits
print("Predicted Labels (raw):", predictions)
print("True Labels:", true_labels)

# Check for out-of-range predictions
unique_predictions = np.unique(predictions)
print("Unique Predictions:", unique_predictions)

# Filter out invalid predictions
valid_predictions = [pred for pred in predictions if pred in label_encoder.classes_]
invalid_predictions = [pred for pred in predictions if pred not in label_encoder.classes_]

print("Valid Predictions:", valid_predictions)
print("Invalid Predictions:", invalid_predictions)

# Map predicted labels back to emotion names using label_encoder for valid predictions
if valid_predictions:
    predicted_emotions = label_encoder.inverse_transform(valid_predictions)
    print("Predicted Emotions (valid):", predicted_emotions)
else:
    print("No valid predictions to map.")

# Optionally handle invalid predictions
# This can be done by mapping them to a default class or logging them for analysis
if invalid_predictions:
    print("Handling invalid predictions...")

# Print example valid predictions
print("Predicted Labels (valid):", valid_predictions)


## Prediction

In [None]:
# Example usage
random_file = random.choice(test_paths)
print(f"Selected file: {random_file}")

predicted_label = predict_emotion(random_file)
print(f"Predicted Label: {predicted_label}")

In [None]:
import IPython.display as ipd
# Load the audio file for visualization and playback
audio, sr = librosa.load(random_file, sr=None)  # Use None to preserve original sampling rate

# Plot the waveform
plt.figure(figsize=(12, 4))
librosa.display.waveshow(audio, sr=sr)
plt.title('Waveform')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.show()

# Plot the spectrogram
plt.figure(figsize=(12, 4))
spec = librosa.feature.melspectrogram(y=audio, sr=sr)
spec_db = librosa.power_to_db(spec, ref=np.max)
librosa.display.specshow(spec_db, sr=sr, x_axis='time', y_axis='mel')
plt.colorbar(format='%+2.0f dB')
plt.title('Mel Spectrogram')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (Hz)')
plt.show()

# Play the audio
print("Audio Playback:")
ipd.display(ipd.Audio(audio, rate=sr))

## Benchmarks

In [None]:
import numpy as np
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder

# Assuming 'true_labels' and 'predictions' are already defined

# Create a LabelEncoder and fit it with all possible labels
all_labels = np.unique(np.concatenate([true_labels, predictions]))
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

# Convert true labels and predictions to encoded form
true_labels_encoded = label_encoder.transform(true_labels)
predictions_encoded = label_encoder.transform(predictions)

# Convert the numeric class labels to string labels
class_names = [str(label) for label in label_encoder.classes_]

# Compute the classification report
report = classification_report(true_labels_encoded, predictions_encoded, target_names=class_names)
print("Classification Report:")
print(report)


# Compute and plot confusion matrix
conf_matrix = confusion_matrix(true_labels, predictions)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()
