# Setting up the environment

In [None]:
from google.colab import drive
import os
from google.colab import userdata

# Step 1: Mount Google Drive
# Mount Google Drive to access your GitHub repository
drive.mount('/content/drive', force_remount=True)

# Step 2: Navigate to Your GitHub Repository
# Change to your repository's location in Google Drive
repo_path = "/content/drive/MyDrive/colab_repos/Wav2Vec2-vs-HUbert"  # Adjust to your repository path
os.chdir(repo_path)

# Step 3: Set Git User Identity
# Configure Git with your username and email for committing
!git config --global user.name "FilipLarsson12"
!git config --global user.email "hockeyfilip12@gmail.com"

# Step 4: Configure Git Remote
# Use the GitHub Personal Access Token from secrets for authentication
github_token = userdata.get("github_access_token")  # Retrieve the secret
repo_url = f"https://{github_token}@github.com/FilipLarsson12/Wav2Vec2-vs-HUbert.git"

# Set or update the Git remote
!git remote set-url origin {repo_url}

# Step 5: Stage, Commit, and Push Changes
# Add the file(s) to the Git staging area
!git add "Wav2Vec2forER KEX.ipynb"  # Adjust to your notebook's name

# Commit with a meaningful message
!git commit -m "Updated Google Colab notebook"

# Push to GitHub
!git push origin main  # Push to 'main'


In [None]:
%%capture

!pip install git+https://github.com/huggingface/datasets.git
!pip install git+https://github.com/huggingface/transformers.git
!pip install jiwer
!pip install torchaudio
!pip install librosa


In [None]:
%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8
%env TRANSFORMERS_CACHE=/content/cache
%env HF_DATASETS_CACHE=/content/cache
%env CUDA_LAUNCH_BLOCKING=1

In [None]:
# Monitor the training process
!pip install wandb

In [None]:
# # Uncomment this part if you want to setup your wandb project
from google.colab import userdata
import os
wandb_token = userdata.get("WANDB_TOKEN")

%env WANDB_WATCH=all
%env WANDB_LOG_MODEL=1
%env WANDB_PROJECT=Wav2Vec2forER
!wandb login {wandb_token} --relogin  # Use the secret for authentication


# Loading in and preparing the RAVDESS dataset

In [None]:
from datasets import load_dataset

# Set the custom cache directory to your new destination
import os

# Re-load the dataset with the new cache
dataset = load_dataset("narad/ravdess")

In [None]:
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split

import os
import sys

In [None]:
import torchaudio
import librosa
import IPython.display as ipd
import numpy as np

In [None]:
print(dataset['train'])

Creating label2id and id2label dictionaries to get easier overview of classes and labels.

In [None]:
# Get information about the dataset
print(dataset['train'].features)
label_names = dataset['train'].features['labels'].names
print(label_names)

# Create a dictionary mapping label names to their corresponding IDs
label2id = {name: idx for idx, name in enumerate(label_names)}

# Create a dictionary mapping label IDs to their corresponding label names
id2label = {idx: name for idx, name in enumerate(label_names)}

# Print the dictionaries
print("Label to ID:", label2id)
print("ID to Label:", id2label)

In [None]:
df = dataset['train'].to_pandas()

Adding an emotion column to the Dataframe to make things more clear.

In [None]:
df["emotion"] = df["labels"].map(id2label)

In [None]:
df.head()

Listening to a random sample:

In [None]:
idx = np.random.randint(0, len(df))
sample = df.iloc[idx]

path = sample['audio']["path"]
label = sample["emotion"]
labelid = sample['labels']


print(f"ID Location: {idx}")
print(f"      Label: {label}")
print(f"      Label: {labelid}")

print()

speech, sr = torchaudio.load(path)
print(path)
print(speech[0])
speech = speech[0].numpy().squeeze()
print(speech)
speech = librosa.resample(y=speech, orig_sr=sr, target_sr=16000)  # Corrected usage
ipd.Audio(data=np.asarray(speech), autoplay=False, rate=16000)

In [None]:
from matplotlib import pyplot as plt
import seaborn as sns
df.groupby('emotion').size().plot(kind='barh', color=sns.palettes.mpl_palette('Dark2'))
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
print("Labels: ", df["emotion"].unique())
print()
df.groupby("emotion").count()['audio']

Restructuring the dataframe a bit for clarity:

In [None]:
df["path"] = df["audio"].apply(lambda audio: audio.get("path", None))



In [None]:
df.head()

Now we are gonna split the dataset into a train and test split and also save them into content/data as csv files:

In [None]:
import os

save_path = "/content/data"

os.makedirs(save_path, exist_ok=True)

train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["labels"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)


print(train_df.shape)
print(test_df.shape)

In [None]:
# Loading the created dataset using datasets
from datasets import load_dataset, load_metric


data_files = {
    "train": "/content/data/train.csv",
    "validation": "/content/data/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
print(dataset)
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

In [None]:
input_column = "path"
output_column = "emotion"

In [None]:
# we need to distinguish the unique labels in our SER dataset
label_list = train_dataset.unique(output_column)
label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

# Loading in the Wav2Vec2 model

In [None]:
from transformers import AutoConfig, Wav2Vec2Processor

In [None]:
model_name_or_path = "facebook/wav2vec2-large-960h"
pooling_mode = "mean"

In [None]:
print(label2id)
print(id2label)

# config
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=len(label_list),
    label2id=label2id,
    id2label=id2label,
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)
print(config)

In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)
print(processor)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

In [None]:
def speech_file_to_array_fn(path):
    try:
        # Load and resample the audio
        speech_array, sampling_rate = torchaudio.load(path)
        resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
        speech = resampler(speech_array)
        speech = speech.flatten()

        # Convert to numpy array
        speech = speech.numpy()

        if len(speech.shape) != 1:
            print("Hej")
            raise ValueError("Expected a 1D numpy array of float values.")

    except Exception as e:
        print(f"Error processing file {path}: {e}")
        # Return a consistent placeholder (empty array)
        speech = np.array([])

    return speech



def label_to_id(label, label_list):

    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label2id[label] for label in examples[output_column]]

    result = processor(speech_list, sampling_rate=target_sampling_rate)
    result["labels"] = list(target_list)

    return result

In [None]:
train_dataset_copy = train_dataset
sample1 = speech_file_to_array_fn(train_dataset_copy[0]['path'])
print(sample1)
print(processor.feature_extractor)

In [None]:
processed_sample_1 = processor(sample1, sampling_rate=target_sampling_rate)
print(processed_sample_1)

In [None]:
print(len(train_dataset))
print(len(eval_dataset))


train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

print(len(train_dataset))
print(len(eval_dataset))
print(train_dataset)

In [None]:
idx = 0
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")

# Defining the model

Now we're going to create custom classes that define our model which will consist of the base wav2vec2 model + a classification head that succeeds the wav2vec2 model.

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [None]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model,
    Wav2Vec2ForCTC
)


class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [None]:
print("Columns in train_dataset:", train_dataset.column_names)
# Display the first few examples
print("Sample data from train_dataset:")
print(train_dataset[0:1])  # Adjust the slice to see more or fewer examples


Cleaning up the datasets:

In [None]:
train_dataset = train_dataset.remove_columns("audio")
eval_dataset = eval_dataset.remove_columns("audio")
train_dataset = train_dataset.remove_columns("text")
eval_dataset = eval_dataset.remove_columns("text")
train_dataset = train_dataset.remove_columns("speaker_id")
eval_dataset = eval_dataset.remove_columns("speaker_id")
train_dataset = train_dataset.remove_columns("speaker_gender")
eval_dataset = eval_dataset.remove_columns("speaker_gender")

In [None]:
print("Columns in train_dataset:", train_dataset.column_names)
# Display the first few examples
print("Sample data from train_dataset:")
print(train_dataset[0])  # Adjust the slice to see more or fewer examples


# Training!!

Now we will perform the final steps necessary and then start the training process.

In [None]:
test_data = [
    {"input_values": list(range(10)), "labels": 0},
    {"input_values": list(range(20)), "labels": 1},
    {"input_values": list(range(15)), "labels": 2},
]

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch
from transformers import Wav2Vec2Processor


@dataclass
class DataCollatorWithPadding:
    """
    Data collator that pads only the input sequences, leaving the output labels unchanged.
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Extract and pad only the input values
        input_features = [{"input_values": feature["input_values"]} for feature in features]

        # Pad the input values with the given strategy and other specified options
        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",  # Return padded input as PyTorch tensors
        )

        # Add labels to the batch without padding or modifications
        # Ensure the correct data type for labels
        label_features = [feature["labels"] for feature in features]
        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch["labels"] = torch.tensor(label_features, dtype=d_type)


        return batch


In [None]:
data_collator = DataCollatorWithPadding(processor=processor, padding=True)

In [None]:
batch = data_collator(test_data)

In [None]:
is_regression = False

In [None]:
import numpy as np
from transformers import EvalPrediction


def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

Instantiating the model:

In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)
print(config)

In [None]:
import numpy as np

def generate_sine_wave(freq, sample_rate, duration):
    t = np.linspace(0, duration, int(sample_rate * duration), False)  # Time axis
    waveform = np.sin(2 * np.pi * freq * t)  # Sine wave
    return waveform

# Parameters
sample_rate = 16000  # 16 kHz sample rate, typical for audio processing
durations = [1.0, 0.5, 2.0]  # Durations in seconds
frequencies = [440, 1000, 250]  # Frequencies in Hz

# Generate synthetic audio data
audio_samples = [generate_sine_wave(freq, sample_rate, duration)
                 for freq, duration in zip(frequencies, durations)]
print(audio_samples[0])


In [None]:
model.freeze_feature_extractor()

In [None]:
%%capture

!pip install accelerate -U
!pip install transformers[torch]

In [None]:
!pip show accelerate

In [None]:
from transformers import TrainingArguments

"""
training_args = TrainingArguments(
    output_dir="/content/wav2vec2-base-960h-RAVDESS",
    per_device_train_batch_size=16,  # Increased from 4
    per_device_eval_batch_size=16,  # Increased from 4
    gradient_accumulation_steps=3,  # Adjusted to accumulate gradients more frequently
    evaluation_strategy="steps",
    num_train_epochs=10.0,  # Increased to explore longer training
    fp16=True,
    save_steps=50,  # Increased to reduce I/O overhead
    eval_steps=50,  # Increased for consistent evaluation
    logging_steps=50,  # Adjusted for consistent logging
    learning_rate=3.5e-05,  # Intermediate learning rate
    save_total_limit=3,  # Increased limit for saved checkpoints
)
"""

training_args = TrainingArguments(
    output_dir="/content/wav2vec2-xlsr-greek-speech-emotion-recognition",
    # output_dir="/content/MyDrive/wav2vec2-xlsr-greek-speech-emotion-recognition"
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=3.0,
    fp16=True,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)


Osäker om denna ska vara kvar eller inte

In [None]:
from typing import Any, Dict, Union
import torch
from torch import nn
from transformers import Trainer

class EmotionRecognitionTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        """
        Perform a training step on a batch of inputs.

        Subclass and override to inject custom behavior.

        Args:
            model (:obj:`nn.Module`):
                The model to train.
            inputs (:obj:`Dict[str, Union[torch.Tensor, Any]]`):
                The inputs and targets of the model.

        Return:
            :obj:`torch.Tensor`: The tensor with training loss on this batch.
        """

        model.train()
        inputs = self._prepare_inputs(inputs)

        loss = self.compute_loss(model, inputs)  # Compute loss using the standard method

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        loss.backward()  # Perform backpropagation

        return loss.detach()  # Return the loss for tracking

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
import librosa
from sklearn.metrics import classification_report

In [None]:
test_dataset = load_dataset("csv", data_files={"test": "/content/data/test.csv"}, delimiter="\t")["test"]
test_dataset

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")