import pandas as pd
import librosa
from sklearn.model_selection import train_test_split
from IPython.display import Audio
import random
import json
import os

In [4]:
emotion_data_path = "../data/Emotion_Speech_Data/"  # RASDEV
emotion_data_path_ESD = "../data/Emotion_Speech_Dataset/English"  # ESD Dataset

### Preprocessing the RAVDESS Emotion Data Path


In [5]:
def emotion_label_tagger(emotion):
    #   Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised)
    labels = {
        "01": "neutral",
        "02": "calm",
        "03": "happy",
        "04": "sad",
        "05": "angry",
        "06": "fearful",
        "07": "disgust",
        "08": "surprise",
    }
    emotion_label = labels.get(emotion, "Unknown")
    return emotion_label

In [6]:
emotion_voice_data = []

for root, dirs, files in os.walk(emotion_data_path):
    for file in files:
        if file.endswith(".wav"):

            emotion = file.split("-")[2]
            emotion = emotion_label_tagger(emotion)
            audio_file = os.path.join(root, file)
            audio_name = audio_file.split("/")[3]
            audio_name_split = audio_name.split("\\")
            actor_name = audio_name_split[0]
            file_name = audio_name_split[1]

            entry = {
                # "Actor": actor_name,
                "Name": file,
                "path": audio_file,
                "emotion": emotion,
            }
            emotion_voice_data.append(entry)

    with open("../data/emotion_voice_data.json", "w") as f:
        json.dump(emotion_voice_data, f)

In [7]:
data = []
with open("../data/emotion_voice_data.json", "r") as f:
    data = pd.read_json(f)

df_1 = pd.DataFrame(data)
print(f"Shape of the Dataset : {df_1.shape}")
df_1.head(-1)

Shape of the Dataset : (1440, 3)


Unnamed: 0,Name,path,emotion
0,03-01-01-01-01-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
1,03-01-01-01-01-02-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
2,03-01-01-01-02-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
3,03-01-01-01-02-02-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
4,03-01-02-01-01-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-02-...,calm
...,...,...,...
1434,03-01-08-01-02-01-24.wav,../data/Emotion_Speech_Data/Actor_24\03-01-08-...,surprise
1435,03-01-08-01-02-02-24.wav,../data/Emotion_Speech_Data/Actor_24\03-01-08-...,surprise
1436,03-01-08-02-01-01-24.wav,../data/Emotion_Speech_Data/Actor_24\03-01-08-...,surprise
1437,03-01-08-02-01-02-24.wav,../data/Emotion_Speech_Data/Actor_24\03-01-08-...,surprise


### Filtering Three Emotions from RASDEV


In [8]:
df_1 = df_1[~df_1["emotion"].isin(["fearful", "disgust", "calm"])]

In [9]:
emotion_voice_data_ESD = []
# Path to the main directory
emotion_data_path = "../data/Emotion_Speech_Dataset/English"

# Dictionary to store the emotion labels and associated .wav files
emotion_files = {}

# Traverse the directory to capture emotion labels and .wav files
for session in os.listdir(emotion_data_path):
    session_path = os.path.join(emotion_data_path, session)

    # Skip non-directory entries like .DS_Store or text files
    if not os.path.isdir(session_path):
        continue

    for emotion in os.listdir(session_path):
        emotion_path = os.path.join(session_path, emotion)

        # Ensure the path is an emotion folder
        if os.path.isdir(emotion_path):
            wav_files = [f for f in os.listdir(emotion_path) if f.endswith(".wav")]

            # Add files to the dictionary under the current emotion
            if emotion not in emotion_files:
                emotion_files[emotion] = []

            # Add the full path to each wav file to the list
            for wav_file in wav_files:
                emotion_files[emotion].append(os.path.join(emotion_path, wav_file))

# Print the collected


for emotion, files in emotion_files.items():
    for file in files:
        name = file.split("\\")[3]
        entry = {
            # "Actor": actor_name,
            "Name": name,
            "path": file,
            "emotion": emotion.lower(),
        }
        emotion_voice_data_ESD.append(entry)


with open("../data/emotion_voice_data_ESD.json", "w") as f:
    json.dump(emotion_voice_data_ESD, f)

In [10]:
data = []
with open("../data/emotion_voice_data_ESD.json", "r") as f:
    data = pd.read_json(f)

df_2 = pd.DataFrame(data)
print(f"Shape of the Dataset : {df_2.shape}")
df_2.head(-1)

Shape of the Dataset : (17500, 3)


Unnamed: 0,Name,path,emotion
0,0011_000351.wav,../data/Emotion_Speech_Dataset/English\0011\An...,angry
1,0011_000352.wav,../data/Emotion_Speech_Dataset/English\0011\An...,angry
2,0011_000353.wav,../data/Emotion_Speech_Dataset/English\0011\An...,angry
3,0011_000354.wav,../data/Emotion_Speech_Dataset/English\0011\An...,angry
4,0011_000355.wav,../data/Emotion_Speech_Dataset/English\0011\An...,angry
...,...,...,...
17494,0020_001745.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
17495,0020_001746.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
17496,0020_001747.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
17497,0020_001748.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise


### Combining the Datafram from two seperate Datasets


In [11]:
DF = pd.concat([df_1, df_2], ignore_index=True)
DF.head(-1)

Unnamed: 0,Name,path,emotion
0,03-01-01-01-01-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
1,03-01-01-01-01-02-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
2,03-01-01-01-02-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
3,03-01-01-01-02-02-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-01-...,neutral
4,03-01-03-01-01-01-01.wav,../data/Emotion_Speech_Data/Actor_01\03-01-03-...,happy
...,...,...,...
18358,0020_001745.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
18359,0020_001746.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
18360,0020_001747.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
18361,0020_001748.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise


In [12]:
# Saving the csv for the combined Dataset
DF.to_csv("../data/complete_emotion_data.csv", sep="\t", encoding="utf-8", index=False)

#### Counting Individual Labels


In [13]:
print("Emotion Types: ", DF["emotion"].unique())
print()
DF.groupby("emotion").count()[["path"]]

Emotion Types:  ['neutral' 'happy' 'sad' 'angry' 'surprise']



Unnamed: 0_level_0,path
emotion,Unnamed: 1_level_1
angry,3692
happy,3692
neutral,3596
sad,3692
surprise,3692


### Preview of Audios


In [14]:
id = random.randint(0, DF.shape[0])
print(f"ID Location: {id}\nAudio Name: {DF['Name'][id]}\nEmotion = {DF['emotion'][id]}")
Audio(DF["path"][id])

ID Location: 5413
Audio Name: 0013_001050.wav
Emotion = happy


### Sample only a certain amount from the complete Dataset


In [15]:
Audio_data = DF.sample(n=5000, random_state=101)
Audio_data = Audio_data.reset_index()

In [16]:
Data = Audio_data.drop(columns="index")

In [17]:
Data

Unnamed: 0,Name,path,emotion
0,0012_001510.wav,../data/Emotion_Speech_Dataset/English\0012\Su...,surprise
1,0016_000878.wav,../data/Emotion_Speech_Dataset/English\0016\Ha...,happy
2,0014_000425.wav,../data/Emotion_Speech_Dataset/English\0014\An...,angry
3,0014_001418.wav,../data/Emotion_Speech_Dataset/English\0014\Su...,surprise
4,0013_000913.wav,../data/Emotion_Speech_Dataset/English\0013\Ha...,happy
...,...,...,...
4995,0013_000721.wav,../data/Emotion_Speech_Dataset/English\0013\Ha...,happy
4996,0015_000494.wav,../data/Emotion_Speech_Dataset/English\0015\An...,angry
4997,0016_001227.wav,../data/Emotion_Speech_Dataset/English\0016\Sa...,sad
4998,0013_000331.wav,../data/Emotion_Speech_Dataset/English\0013\Ne...,neutral


In [18]:
data_save_path = "../data"

train_df, test_df = train_test_split(
    Data, test_size=0.2, random_state=100, stratify=Audio_data["emotion"]
)

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(
    f"{data_save_path}/emotion_train.csv", sep="\t", encoding="utf-8", index=False
)
test_df.to_csv(
    f"{data_save_path}/emotion_test.csv", sep="\t", encoding="utf-8", index=False
)

print(f"Train Data: {train_df.shape} | Test Data: {test_df.shape}")

Train Data: (4000, 3) | Test Data: (1000, 3)


In [19]:
from datasets import load_dataset

data_files_csv = {
    "train": "../data/emotion_train.csv",
    "validation": "../data/emotion_test.csv",
}

dataset = load_dataset("csv", data_files=data_files_csv, delimiter="\t")
train_dataset_csv = dataset["train"]
validation_dataset_csv = dataset["validation"]

print(train_dataset_csv)
print(validation_dataset_csv)

ModuleNotFoundError: No module named 'datasets'

In [18]:
input_column = "path"
output_column = "emotion"

In [None]:
label_list = train_dataset_csv.unique(output_column)
label_list.sort()
num_labels = len(label_list)

print(f"{num_labels} Labels : {label_list}")

5 Labels : ['angry', 'happy', 'neutral', 'sad', 'surprise']


### Converting audio feators to vectors, typically through Short-Time Fourier Transform using Wav2Vec


In [20]:
from transformers import Wav2Vec2Processor, AutoConfig

In [21]:
MODEL_NAME = "facebook/wav2vec2-large-960h"
POOLING_MODE = "mean"

In [22]:
config = AutoConfig.from_pretrained(
    MODEL_NAME,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, "pooling_mode", POOLING_MODE)

In [23]:
processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
target_sr = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sr}")

The target sampling rate: 16000


In [24]:
print(f"train: {train_dataset_csv}")
print(f"validation: {validation_dataset_csv}")

train: Dataset({
    features: ['Name', 'path', 'emotion'],
    num_rows: 4000
})
validation: Dataset({
    features: ['Name', 'path', 'emotion'],
    num_rows: 1000
})


In [25]:
import librosa


emotion_map = {
    "neutral": 0,
    "happy": 1,
    "sad": 2,
    "angry": 3,
    "surprise": 4,
}


def preprocess_audio(file_path, label):

    speech_array, sampling_rate = librosa.load(file_path, sr=16000)
    speech_array = speech_array.squeeze()

    inputs = processor(
        speech_array,
        sampling_rate=sampling_rate,
        padding=True,
        return_attention_mask=True,
    )

    label_id = emotion_map[label]

    return {
        "input_values": inputs["input_values"][0],
        "attention_mask": inputs["attention_mask"][0],
        "labels": label_id,
    }

In [26]:
def processing_data(_t_dataset_):
    dataset = []
    temp_df = pd.DataFrame(_t_dataset_)
    for _, row in temp_df.iterrows():
        processed_data = preprocess_audio(row["path"], row["emotion"])
        dataset.append(processed_data)

    return dataset

In [27]:
train_dataset = processing_data(train_dataset_csv)
validation_dataset = processing_data(validation_dataset_csv)
print(train_dataset)
print(validation_dataset)

[{'input_values': array([0.02848713, 0.02638132, 0.02638132, ..., 0.0232226 , 0.0221697 ,
       0.02638132], dtype=float32), 'attention_mask': array([1, 1, 1, ..., 1, 1, 1], dtype=int32), 'labels': 1}, {'input_values': array([ 0.06792314,  0.06439266,  0.05997958, ..., -0.02916474,
       -0.03004735, -0.03004735], dtype=float32), 'attention_mask': array([1, 1, 1, ..., 1, 1, 1], dtype=int32), 'labels': 2}, {'input_values': array([ 0.00463632,  0.00463632,  0.00378897, ..., -0.07586167,
       -0.07840371, -0.08264045], dtype=float32), 'attention_mask': array([1, 1, 1, ..., 1, 1, 1], dtype=int32), 'labels': 1}, {'input_values': array([0.02681051, 0.02821871, 0.03103511, ..., 0.01061622, 0.01061622,
       0.01061622], dtype=float32), 'attention_mask': array([1, 1, 1, ..., 1, 1, 1], dtype=int32), 'labels': 1}, {'input_values': array([ 0.03729221,  0.03655714,  0.03802728, ..., -0.03400961,
       -0.03621482, -0.0369499 ], dtype=float32), 'attention_mask': array([1, 1, 1, ..., 1, 1, 1],

In [28]:
idx = 0
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training Attention masks: {train_dataset[idx]['attention_mask']}")
print(f"Training label: {train_dataset[idx]['labels']}")

Training input_values: [0.02848713 0.02638132 0.02638132 ... 0.0232226  0.0221697  0.02638132]
Training Attention masks: [1 1 1 ... 1 1 1]
Training label: 1


### MODEL


In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassificationOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [30]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model,
)


class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(self, hidden_states, mode="mean"):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']"
            )

        return outputs

    def forward(
        self,
        input_values,
        attention_mask=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
        labels=None,
    ):
        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None

        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and labels.dtype in (torch.long, torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.float())
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels.float())
        else:
            loss = None

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassificationOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

### Data Augmentation


In [None]:
import random
import torchaudio
import torch
import librosa


def apply_spec_augment(waveform, sample_rate):
    spectrogram = torchaudio.transforms.MelSpectrogram(sample_rate)(waveform)
    spectrogram = torchaudio.transforms.TimeMasking(time_mask_param=20)(spectrogram)
    spectrogram = torchaudio.transforms.FrequencyMasking(freq_mask_param=20)(
        spectrogram
    )
    return spectrogram


def add_noise(waveform, noise_factor=0.005):
    noise = torch.randn(waveform.size()) * noise_factor
    return waveform + noise


def pitch_shift(waveform, sample_rate, n_steps=2):
    waveform_np = waveform.numpy().squeeze()
    shifted = librosa.effects.pitch_shift(
        y=waveform_np, sr=sample_rate, n_steps=n_steps
    )
    return torch.from_numpy(shifted).unsqueeze(0)


def volume_perturb(waveform, gain_db=5):
    gain_factor = 10 ** (gain_db / 20)
    return waveform * gain_factor

In [32]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch
from transformers import Wav2Vec2Processor


@dataclass
class DataCollatorCTCWithPadding:

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:

        # Augmentation of Data

        # for feature in features:
        #     waveform = torch.tensor(feature["input_values"])

        #     augmentation = random.choice(["S", "N", "P", "V"])
        #     if augmentation == "S":
        #         waveform = apply_spec_augment(
        #             waveform, self.processor.feature_extractor.sampling_rate
        #         )
        #     elif augmentation == "N":
        #         waveform = add_noise(waveform)
        #     elif augmentation == "P":
        #         waveform = pitch_shift(
        #             waveform, self.processor.feature_extractor.sampling_rate
        #         )
        #     else:
        #         waveform = volume_perturb(waveform)

        #     feature["input_values"] = waveform.squeeze().tolist()

        input_features = [
            {"input_values": feature["input_values"]} for feature in features
        ]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [33]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [34]:
is_regression = False

In [35]:
import numpy as np
from transformers import EvalPrediction


def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

In [36]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    MODEL_NAME,
    config=config,
)

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at facebook/wav2vec2-large-960h and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [37]:
model.freeze_feature_extractor()

In [46]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="../models",
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    gradient_accumulation_steps=1,
    eval_strategy="epoch",
    num_train_epochs=1,
    fp16=True,
    save_steps=100,
    eval_steps=100,
    logging_steps=100,
    learning_rate=1e-4,
    save_total_limit=2,
)

In [47]:
from typing import Any, Dict, Union

import torch

# from packaging import version
from torch import nn

from transformers import Trainer


class CTCTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if self.args.fp16:
            self.scaler = torch.amp.GradScaler("cuda")

    def training_step(
        self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], hiddens
    ) -> torch.Tensor:

        model.train()
        inputs = self._prepare_inputs(inputs)

        # Compute loss with mixed-precision (AMP) if enabled
        if self.args.fp16:
            with torch.amp.autocast("cuda"):
                loss = self.compute_loss(model, inputs)
        else:
            loss = self.compute_loss(model, inputs)

        # Scale loss for gradient accumulation
        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        # Backpropagation with AMP if enabled
        if self.args.fp16:
            self.scaler.scale(loss).backward()
        else:
            loss.backward()

        return loss.detach()

In [48]:
trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    processing_class=processor.feature_extractor,
)

In [49]:
trainer.train()

  0%|          | 0/2000 [00:00<?, ?it/s]

{'loss': 1.593, 'grad_norm': 8.38288688659668, 'learning_rate': 9.5e-05, 'epoch': 0.05}




{'loss': 1.6608, 'grad_norm': 4.133175849914551, 'learning_rate': 9e-05, 'epoch': 0.1}




{'loss': 1.6588, 'grad_norm': 7.959367275238037, 'learning_rate': 8.5e-05, 'epoch': 0.15}




{'loss': 1.6274, 'grad_norm': 5.258180618286133, 'learning_rate': 8e-05, 'epoch': 0.2}




{'loss': 1.6909, 'grad_norm': 4.131208896636963, 'learning_rate': 7.500000000000001e-05, 'epoch': 0.25}




{'loss': 1.6624, 'grad_norm': 4.767113208770752, 'learning_rate': 7e-05, 'epoch': 0.3}




{'loss': 1.6745, 'grad_norm': 4.166616439819336, 'learning_rate': 6.500000000000001e-05, 'epoch': 0.35}




{'loss': 1.6493, 'grad_norm': 4.271084785461426, 'learning_rate': 6e-05, 'epoch': 0.4}




{'loss': 1.6457, 'grad_norm': 3.775951385498047, 'learning_rate': 5.500000000000001e-05, 'epoch': 0.45}




{'loss': 1.6243, 'grad_norm': 4.286005973815918, 'learning_rate': 5e-05, 'epoch': 0.5}




{'loss': 1.6343, 'grad_norm': 3.3446810245513916, 'learning_rate': 4.5e-05, 'epoch': 0.55}




{'loss': 1.6323, 'grad_norm': 7.0823163986206055, 'learning_rate': 4e-05, 'epoch': 0.6}




{'loss': 1.6541, 'grad_norm': 3.5845134258270264, 'learning_rate': 3.5e-05, 'epoch': 0.65}




{'loss': 1.6211, 'grad_norm': 3.2604148387908936, 'learning_rate': 3e-05, 'epoch': 0.7}




{'loss': 1.6386, 'grad_norm': 3.27030086517334, 'learning_rate': 2.5e-05, 'epoch': 0.75}




{'loss': 1.6404, 'grad_norm': 3.524282932281494, 'learning_rate': 2e-05, 'epoch': 0.8}




{'loss': 1.6263, 'grad_norm': 3.7606749534606934, 'learning_rate': 1.5e-05, 'epoch': 0.85}




{'loss': 1.6099, 'grad_norm': 3.3493874073028564, 'learning_rate': 1e-05, 'epoch': 0.9}




{'loss': 1.6038, 'grad_norm': 3.6557860374450684, 'learning_rate': 5e-06, 'epoch': 0.95}




{'loss': 1.6018, 'grad_norm': 5.040419101715088, 'learning_rate': 0.0, 'epoch': 1.0}


  0%|          | 0/500 [00:00<?, ?it/s]

{'eval_loss': 1.6124515533447266, 'eval_accuracy': 0.2029999941587448, 'eval_runtime': 588.1733, 'eval_samples_per_second': 1.7, 'eval_steps_per_second': 0.85, 'epoch': 1.0}
{'train_runtime': 9296.8643, 'train_samples_per_second': 0.43, 'train_steps_per_second': 0.215, 'train_loss': 1.6374838943481445, 'epoch': 1.0}


TrainOutput(global_step=2000, training_loss=1.6374838943481445, metrics={'train_runtime': 9296.8643, 'train_samples_per_second': 0.43, 'train_steps_per_second': 0.215, 'total_flos': 3.846948651312312e+17, 'train_loss': 1.6374838943481445, 'epoch': 1.0})

In [50]:
import librosa
from sklearn.metrics import classification_report

In [60]:
test_dataset = load_dataset(
    "csv", data_files={"test": "../data/emotion_test.csv"}, delimiter="\t"
)["test"]
test_dataset

Dataset({
    features: ['Name', 'path', 'emotion'],
    num_rows: 1000
})

In [61]:
test_dataset = test_dataset.select(range(500))

# Display the subset
print(test_dataset)

Dataset({
    features: ['Name', 'path', 'emotion'],
    num_rows: 500
})


In [62]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cpu


In [68]:
def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    speech_array = speech_array.squeeze().numpy()
    speech_array = librosa.resample(
        y=np.asarray(speech_array),
        orig_sr=sampling_rate,
        target_sr=processor.feature_extractor.sampling_rate,
    )
    print(speech_array)

    batch["speech"] = speech_array
    return batch


def predict(batch):
    features = processor(
        batch["speech"],
        sampling_rate=processor.feature_extractor.sampling_rate,
        return_tensors="pt",
        padding=True,
        return_attention_mask=True,
    )

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model(input_values, attention_mask=attention_mask).logits

    pred_ids = torch.argmax(logits, dim=-1).detach().cpu().numpy()
    batch["predicted"] = pred_ids
    return batch

In [69]:
test_dataset = test_dataset.map(speech_file_to_array_fn)

Map:   0%|          | 0/500 [00:00<?, ? examples/s]

[-0.00036621 -0.00042725 -0.00036621 ... -0.00021362 -0.00018311
 -0.00018311]
[1.0375977e-03 9.7656250e-04 9.4604492e-04 ... 1.5258789e-04 9.1552734e-05
 9.1552734e-05]
[ 0.00082397  0.00115967  0.00128174 ... -0.00054932 -0.0005188
 -0.00036621]
[-3.3569336e-04 -3.6621094e-04 -3.0517578e-04 ... -6.1035156e-05
 -6.1035156e-05 -6.1035156e-05]
[ 0.0027771   0.00283813  0.0027771  ... -0.00018311 -0.00021362
 -0.00021362]
[ 0.0000000e+00  6.1035156e-05 -3.0517578e-05 ...  1.0375977e-03
  1.0375977e-03  1.0070801e-03]
[9.1552734e-05 6.1035156e-05 3.0517578e-05 ... 8.8500977e-04 9.1552734e-04
 8.8500977e-04]
[ 0.00024414  0.00027466  0.00024414 ... -0.00018311 -0.00024414
 -0.00024414]
[-1.8310547e-04 -1.8310547e-04 -1.5258789e-04 ...  1.5258789e-04
  1.5258789e-04  9.1552734e-05]
[2.1362305e-04 1.8310547e-04 2.1362305e-04 ... 6.1035156e-05 9.1552734e-05
 6.1035156e-05]
[ 0.0000000e+00  0.0000000e+00 -3.0517578e-05 ...  5.6152344e-03
  5.5236816e-03  5.4626465e-03]
[-3.6379788e-12 -5.45696

In [70]:
result = test_dataset.map(predict, batched=True, batch_size=8)

Map:   0%|          | 0/500 [00:00<?, ? examples/s]

In [71]:
label_names = [config.id2label[i] for i in range(config.num_labels)]
label_names

['angry', 'happy', 'neutral', 'sad', 'surprise']

In [72]:
y_true = [config.label2id[name] for name in result["emotion"]]
y_pred = result["predicted"]

print(y_true[:5])
print(y_pred[:5])

[3, 1, 4, 0, 3]
[1, 1, 1, 1, 1]


In [73]:
print(classification_report(y_true, y_pred, target_names=label_names, zero_division=1))

              precision    recall  f1-score   support

       angry       1.00      0.00      0.00       101
       happy       0.20      1.00      0.34       101
     neutral       1.00      0.00      0.00       102
         sad       1.00      0.00      0.00        90
    surprise       1.00      0.00      0.00       106

    accuracy                           0.20       500
   macro avg       0.84      0.20      0.07       500
weighted avg       0.84      0.20      0.07       500



### Prediction


In [74]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from transformers import AutoConfig, Wav2Vec2Processor

import librosa
import IPython.display as ipd
import numpy as np
import pandas as pd

In [75]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
sampling_rate = processor.feature_extractor.sampling_rate

In [76]:
def speech_file_to_array_fn(path, sampling_rate):
    speech_array, _sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech


def predict(path, sampling_rate):
    speech = speech_file_to_array_fn(path, sampling_rate)
    features = processor(
        speech,
        sampling_rate=sampling_rate,
        return_tensors="pt",
        padding=True,
        return_attention_mask=True,
    )

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model(input_values, attention_mask=attention_mask).logits

    print(logits.shape)

    scores = F.softmax(logits, dim=1).detach().cpu().numpy()[0]
    outputs = [
        {"Emotion": config.id2label[i], "Score": f"{round(score * 100, 3):.1f}%"}
        for i, score in enumerate(scores)
    ]
    return outputs


STYLES = """
<style>
div.display_data {
    margin: 0 auto;
    max-width: 500px;
}
table.xxx {
    margin: 50px !important;
    float: right !important;
    clear: both !important;
}
table.xxx td {
    min-width: 300px !important;
    text-align: center !important;
}
</style>
""".strip()


def prediction(df_row):
    path, emotion = df_row["path"], df_row["emotion"]
    df = pd.DataFrame([{"Emotion": emotion}])
    setup = {
        "border": 2,
        "show_dimensions": True,
        "justify": "center",
        "classes": "xxx",
        "escape": False,
    }
    ipd.display(ipd.HTML(STYLES + df.to_html(**setup) + "<br />"))
    speech, sr = torchaudio.load(path)
    speech = speech[0].numpy().squeeze()
    speech = librosa.resample(y=np.asarray(speech), orig_sr=sr, target_sr=sampling_rate)
    ipd.display(ipd.Audio(data=np.asarray(speech), autoplay=True, rate=sampling_rate))

    outputs = predict(path, sampling_rate)
    r = pd.DataFrame(outputs)
    ipd.display(ipd.HTML(STYLES + r.to_html(**setup) + "<br />"))

In [77]:
test = pd.read_csv("../data/emotion_test.csv", sep="\t")
test.head()

Unnamed: 0,Name,path,emotion
0,0018_001341.wav,../data/Emotion_Speech_Dataset/English\0018\Sa...,sad
1,0012_000931.wav,../data/Emotion_Speech_Dataset/English\0012\Ha...,happy
2,0020_001526.wav,../data/Emotion_Speech_Dataset/English\0020\Su...,surprise
3,0017_000538.wav,../data/Emotion_Speech_Dataset/English\0017\An...,angry
4,0018_001330.wav,../data/Emotion_Speech_Dataset/English\0018\Sa...,sad


In [82]:
prediction(test.iloc[5])

Unnamed: 0,Emotion
0,angry


torch.Size([1, 5])


Unnamed: 0,Emotion,Score
0,angry,19.0%
1,happy,23.1%
2,neutral,17.9%
3,sad,21.0%
4,surprise,19.1%
