In [1]:
# Install necessary libraries
!pip install transformers datasets torchaudio librosa scikit-learn pandas



In [2]:
import os
import pandas as pd
import numpy as np
import torch
import librosa
from transformers import Wav2Vec2FeatureExtractor, AutoModelForAudioClassification, Trainer, TrainingArguments
from datasets import Dataset, DatasetDict, Features, ClassLabel, Audio
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, confusion_matrix
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from zipfile import ZipFile
import shutil



In [3]:
class RAVDESSDatasetLoader:
    """
    Handles downloading, extracting, loading, and preprocessing the RAVDESS dataset (emotion labels only).
    """
    def __init__(self, dataset_zip_name='Audio_Speech_Actors_01-24.zip', dataset_folder='RAVDESS'):
        self.dataset_zip_name = dataset_zip_name
        self.dataset_folder = dataset_folder
        self.extract_path = f'./{self.dataset_folder}'
        self.emotion_mapping = {
            '01': 'neutral',
            '02': 'calm',
            '03': 'happy',
            '04': 'sad',
            '05': 'angry',
            '06': 'fearful',
            '07': 'disgust',
            '08': 'surprised'
        }

    def authenticate_and_create_drive(self):
        """
        Authenticates the user and creates a PyDrive GoogleDrive instance.
        """
        auth.authenticate_user()
        gauth = GoogleAuth()
        gauth.credentials = GoogleCredentials.get_application_default()
        drive = GoogleDrive(gauth)
        return drive

    def download_dataset(self, drive, file_id):
        """
        Downloads the RAVDESS dataset zip file from Google Drive using its file ID.
        """
        print(f"Downloading dataset from Google Drive with file ID: {file_id}")
        downloaded = drive.CreateFile({'id': file_id})
        downloaded.GetContentFile(self.dataset_zip_name)
        print(f"Downloaded dataset as {self.dataset_zip_name}")

    def extract_dataset(self):
        """
        Extracts the downloaded zip file to the specified directory.
        """
        if not os.path.exists(self.extract_path):
            print("Extracting RAVDESS dataset...")
            with ZipFile(self.dataset_zip_name, 'r') as zip_ref:
                zip_ref.extractall(self.extract_path)
            print("Extraction completed.")
        else:
            print("RAVDESS dataset already extracted.")

    def load_data(self):
        """
        Loads the RAVDESS dataset, parses filenames to extract emotion labels.
        """
        data = []
        for root, dirs, files in os.walk(self.extract_path):
            for file in files:
                if file.endswith('.wav'):
                    filepath = os.path.join(root, file)
                    filename = os.path.basename(file)
                    parts = filename.split('.')[0].split('-')
                    if len(parts) != 7:
                        continue  # Skip files that don't match the naming convention
                    emotion_code = parts[2]

                    emotion = self.emotion_mapping.get(emotion_code, 'unknown')

                    if emotion == 'unknown':
                        continue

                    label = f"{emotion}"  # Only emotion as the label
                    data.append({'file_path': filepath, 'label': label})

        df = pd.DataFrame(data)
        print(f"Total samples after filtering: {len(df)}")
        return df

    def prepare_datasets(self, df):
        """
        Splits the DataFrame into training and testing sets and converts them into Hugging Face Datasets.
        """
        train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)
        print(f"Training samples: {len(train_df)}")
        print(f"Testing samples: {len(test_df)}")

        labels = sorted(df['label'].unique())
        num_labels = len(labels)
        label2id = {label: idx for idx, label in enumerate(labels)}
        id2label = {idx: label for label, idx in label2id.items()}

        features = Features({
            'file_path': Audio(sampling_rate=16000),
            'label': ClassLabel(names=labels)
        })

        train_dataset = Dataset.from_pandas(train_df).remove_columns('__index_level_0__').cast(features)
        test_dataset = Dataset.from_pandas(test_df).remove_columns('__index_level_0__').cast(features)

        dataset = DatasetDict({
            'train': train_dataset,
            'test': test_dataset
        })

        return dataset, label2id, id2label

    def run(self, dataset_file_id):
        """
        Executes the dataset loading and preparation steps.
        """
        drive = self.authenticate_and_create_drive()
        self.download_dataset(drive, dataset_file_id)
        self.extract_dataset()
        df = self.load_data()
        dataset, label2id, id2label = self.prepare_datasets(df)
        return dataset, label2id, id2label


class EmotionIntensityModel:
    """
    Manages the loading, fine-tuning, and evaluation of the DistilHuBERT model for emotion classification.
    """
    def __init__(self, model_name, num_labels, label2id, id2label):
        self.model_name = model_name
        self.num_labels = num_labels
        self.label2id = label2id
        self.id2label = id2label
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)
        self.model = self.initialize_model()

    def initialize_model(self):
        """
        Loads the pre-trained DistilHuBERT model and configures it for classification.
        """
        model = AutoModelForAudioClassification.from_pretrained(
            self.model_name,
            num_labels=self.num_labels,
            label2id=self.label2id,
            id2label=self.id2label
        )
        return model

    def preprocess_function(self, examples):
        """
        Preprocesses the dataset by using the audio features provided in the dataset.
        """
        audio = examples['file_path']
        inputs = self.feature_extractor(
            audio['array'],
            sampling_rate=audio['sampling_rate'],
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=61395
        )
        examples["input_values"] = inputs["input_values"][0].numpy()
        return examples

    def prepare_data(self, dataset):
        """
        Applies preprocessing to the dataset.
        """
        dataset = dataset.map(self.preprocess_function, remove_columns=['file_path'])
        return dataset

    def compute_metrics(self, pred):
        """
        Computes evaluation metrics.
        """
        labels = pred.label_ids
        preds = pred.predictions.argmax(-1)
        accuracy = accuracy_score(labels, preds)
        precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=0)
        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
        }

    def fine_tune(self, dataset, training_args):
        """
        Fine-tunes the model using the Trainer API.
        """
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=dataset["train"],
            eval_dataset=dataset["test"],
            tokenizer=None,
            compute_metrics=self.compute_metrics,
        )

        trainer.train()
        return trainer

    def evaluate(self, trainer, dataset):
        """
        Evaluates the fine-tuned model on the test set.
        """
        eval_results = trainer.evaluate()
        print(f"Evaluation Results: {eval_results}")
        return eval_results

    def generate_reports(self, trainer, dataset, id2label):
        """
        Generates classification reports and confusion matrices.
        """
        predictions = trainer.predict(dataset["test"])
        pred_ids = np.argmax(predictions.predictions, axis=1)
        true_ids = predictions.label_ids

        print("\nClassification Report:")
        print(classification_report(true_ids, pred_ids, target_names=list(id2label.values()), zero_division=0))

        cm = confusion_matrix(true_ids, pred_ids)
        print("Confusion Matrix:")
        print(cm)

    def save_model(self, path="./fine_tuned_model"):
        """
        Saves the fine-tuned model and feature extractor.
        """
        self.model.save_pretrained(path)
        self.feature_extractor.save_pretrained(path)
        print(f"Model saved to {path}.")

    def load_model(self, path="./fine_tuned_model"):
        """
        Loads a previously fine-tuned model and feature extractor.
        """
        self.model = AutoModelForAudioClassification.from_pretrained(path)
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(path)
        print(f"Model loaded from {path}.")

    def predict_all_labels(self, audio_file_path):
        """
        Predict all possible emotion labels with their respective confidence scores.
        """
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        speech_array, sampling_rate = librosa.load(audio_file_path, sr=16000)
        inputs = self.feature_extractor(
            speech_array, return_tensors="pt", sampling_rate=sampling_rate, padding=True
        )

        inputs = {key: value.to(device) for key, value in inputs.items()}
        self.model.to(device)

        with torch.no_grad():
            logits = self.model(**inputs).logits
            probabilities = torch.softmax(logits, dim=-1).squeeze().cpu().numpy()

        id2label = self.model.config.id2label

        results = []
        for label_id, confidence in enumerate(probabilities):
            emotion = id2label[label_id]
            results.append({
                'audio_file': os.path.basename(audio_file_path),
                'emotion': emotion.capitalize(),
                'confidence': confidence
            })

        df = pd.DataFrame(results)
        df = df.sort_values(by='confidence', ascending=False).reset_index(drop=True)

        return df

class EmotionPipeline:
    """
    Orchestrates the workflow of downloading data, fine-tuning the model, and evaluating its performance.
    """
    def __init__(self, dataset_zip_file_id, model_path="./fine_tuned_model", audio_train_switch=False):
        self.loader = RAVDESSDatasetLoader()
        self.model = None
        self.dataset_zip_file_id = dataset_zip_file_id
        self.audio_model_path = model_path  # Path where the model will be saved/loaded
        self.audio_train_switch = audio_train_switch  # Add a training switch to control model retraining

    def run(self):
        """
        Executes the entire pipeline: checks if the model exists, loads or fine-tunes it, and evaluates performance.
        """
        dataset, label2id, id2label = self.loader.run(self.dataset_zip_file_id)

        model_name = "ntu-spml/distilhubert"

        # Check if the model is already saved locally or if training is forced by the switch
        if os.path.exists(self.audio_model_path) and not self.audio_train_switch:
            print(f"Model found at {self.audio_model_path}. Loading the model...")
            self.model = EmotionIntensityModel(model_name=model_name, num_labels=len(label2id), label2id=label2id, id2label=id2label)
            self.model.load_model(self.audio_model_path)
        else:
            if self.audio_train_switch:
                print("Audio training switch is ON. Re-training the model...")
            else:
                print("No saved model found. Fine-tuning the model...")

            self.model = EmotionIntensityModel(model_name=model_name, num_labels=len(label2id), label2id=label2id, id2label=id2label)

            dataset = self.model.prepare_data(dataset)

            training_args = TrainingArguments(
                output_dir="./results",
                evaluation_strategy="epoch",
                save_strategy="epoch",
                learning_rate=5e-5,
                per_device_train_batch_size=8,
                per_device_eval_batch_size=8,
                num_train_epochs=10,
                weight_decay=0.01,
                optim="adamw_torch",
                logging_dir='./logs',
                logging_steps=10,
                lr_scheduler_type="linear",
                warmup_ratio=0.1,
                seed=42,
                load_best_model_at_end=True,
                metric_for_best_model="accuracy",
            )

            trainer = self.model.fine_tune(dataset, training_args)
            self.model.evaluate(trainer, dataset)
            self.model.generate_reports(trainer, dataset, id2label)

            # Save the fine-tuned model to the local directory
            self.model.save_model(self.audio_model_path)

    def authenticate_and_create_drive(self):
        """
        Authenticates the user and creates a PyDrive GoogleDrive instance.
        """
        auth.authenticate_user()
        gauth = GoogleAuth()
        gauth.credentials = GoogleCredentials.get_application_default()
        drive = GoogleDrive(gauth)
        return drive

    def download_audio_from_drive(self, drive, audio_file_id, destination_path):
        """
        Downloads an audio file from Google Drive using its file ID.
        """
        print(f"Downloading audio file from Google Drive with file ID: {audio_file_id}")
        downloaded = drive.CreateFile({'id': audio_file_id})
        downloaded.GetContentFile(destination_path)
        print(f"Downloaded audio file and saved as {destination_path}")

    def load_and_predict(self, audio_file_ids):
        """
        Downloads the audio files using their file IDs and predicts all possible labels.
        """
        drive = self.authenticate_and_create_drive()

        for audio_file_name, audio_file_id in audio_file_ids.items():
            destination_path = f"./{audio_file_name}"
            self.download_audio_from_drive(drive, audio_file_id, destination_path)

            result_df = self.model.predict_all_labels(destination_path)
            print(result_df)

In [4]:
def main():
    """
    Main function to execute the emotion classification pipeline.
    """
    dataset_zip_file_id = '1dSXSY-f6ZigkcJWV07v6kc3r_i3pMPvl'
    audio_model_path = './audio_emotion_model'

    # Set audio_train_switch to True if you want to force re-training the model, otherwise False
    audio_train_switch = True

    audio_pipeline = EmotionPipeline(dataset_zip_file_id=dataset_zip_file_id, model_path=audio_model_path, audio_train_switch=audio_train_switch)
    audio_pipeline.run()

    audio_file_ids = {
        'audio1.mp3': '108kPpEQeA_6RkQXmmLWDJXQzdiISlm0r'
    }

    audio_pipeline.load_and_predict(audio_file_ids)

if __name__ == "__main__":
    main()

Downloading dataset from Google Drive with file ID: 1dSXSY-f6ZigkcJWV07v6kc3r_i3pMPvl
Downloaded dataset as Audio_Speech_Actors_01-24.zip
RAVDESS dataset already extracted.
Total samples after filtering: 1440
Training samples: 1152
Testing samples: 288


Casting the dataset:   0%|          | 0/1152 [00:00<?, ? examples/s]

Casting the dataset:   0%|          | 0/288 [00:00<?, ? examples/s]

Audio training switch is ON. Re-training the model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of HubertForSequenceClassification were not initialized from the model checkpoint at ntu-spml/distilhubert and are newly initialized: ['classifier.bias', 'classifier.weight', 'encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'projector.bias', 'projector.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Map:   0%|          | 0/1152 [00:00<?, ? examples/s]

Map:   0%|          | 0/288 [00:00<?, ? examples/s]



Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,1.8816,1.772234,0.315972,0.188558,0.315972,0.218185
2,1.447,1.295342,0.524306,0.602552,0.524306,0.464297
3,1.0171,0.951202,0.6875,0.717516,0.6875,0.662009
4,0.7154,0.722277,0.75,0.76375,0.75,0.745656
5,0.3539,0.610278,0.770833,0.77484,0.770833,0.76042
6,0.2147,0.531886,0.833333,0.843677,0.833333,0.834087
7,0.0851,0.517787,0.840278,0.845117,0.840278,0.838867
8,0.0486,0.585231,0.822917,0.830888,0.822917,0.823221
9,0.0219,0.558815,0.836806,0.846105,0.836806,0.836994
10,0.0175,0.562754,0.836806,0.846465,0.836806,0.837378


Evaluation Results: {'eval_loss': 0.5177865624427795, 'eval_accuracy': 0.8402777777777778, 'eval_precision': 0.8451172578849229, 'eval_recall': 0.8402777777777778, 'eval_f1': 0.8388666594750462, 'eval_runtime': 13.4751, 'eval_samples_per_second': 21.373, 'eval_steps_per_second': 2.672, 'epoch': 10.0}

Classification Report:
              precision    recall  f1-score   support

       angry       0.94      0.82      0.87        38
        calm       0.72      0.95      0.82        38
     disgust       0.88      0.92      0.90        38
     fearful       0.94      0.87      0.91        39
       happy       0.85      0.85      0.85        39
     neutral       0.67      0.53      0.59        19
         sad       0.82      0.74      0.78        38
   surprised       0.85      0.90      0.88        39

    accuracy                           0.84       288
   macro avg       0.83      0.82      0.82       288
weighted avg       0.85      0.84      0.84       288

Confusion Matrix:
[[31 