In [None]:
%%capture

!pip install git+https://github.com/huggingface/datasets.git
!pip install git+https://github.com/huggingface/transformers.git
!pip install jiwer
!pip install torchaudio
!pip install librosa

!pip install accelerate -U

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%%capture

!unzip /content/drive/MyDrive/auto_reco/CaFE_48k.zip
# !unzip all_audios.zip

!mkdir -p /content/data

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import librosa
import os
import sys

import torch
import torchaudio
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
import accelerate

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

from datasets import load_dataset, load_metric
from dataclasses import dataclass
from typing import Optional, Tuple

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)

from dataclasses import dataclass
from typing import Dict, List, Optional, Union

import transformers
from transformers import EvalPrediction
from transformers import AutoConfig, Wav2Vec2Processor
from transformers.file_utils import ModelOutput
from transformers import Trainer, TrainingArguments

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cuda


In [None]:
emotions_folders = [ "/content/Colere", "/content/Degout", "/content/Joie", "/content/Neutre", "/content/Peur", "/content/Surprise", "/content/Tristesse"]

data = []

for emotion_folder in emotions_folders:
    for path in tqdm(Path(emotion_folder).glob("**/*.wav")):
        name = str(path).split('/')[-1].split('.')[0]
        label = emotion_folder.split('/')[-1]  # Utilisez le dernier élément du chemin comme étiquette émotionnelle

        try:
            # Il y a quelques fichiers corrompus
            s, _ = torchaudio.load(path)
            data.append({
                "name": name,
                "path": path,
                "emotion": label
            })
        except Exception as e:
            # print(str(path), e)
            pass

df = pd.DataFrame(data)
df

144it [00:04, 30.17it/s]
144it [00:05, 25.50it/s]
144it [00:02, 71.07it/s]
72it [00:01, 71.14it/s]
144it [00:02, 71.27it/s]
144it [00:01, 73.01it/s]
144it [00:02, 68.88it/s]


Unnamed: 0,name,path,emotion
0,07-C-2-5,/content/Colere/Fort/07-C-2-5.wav,Colere
1,01-C-2-4,/content/Colere/Fort/01-C-2-4.wav,Colere
2,03-C-2-1,/content/Colere/Fort/03-C-2-1.wav,Colere
3,12-C-2-1,/content/Colere/Fort/12-C-2-1.wav,Colere
4,12-C-2-2,/content/Colere/Fort/12-C-2-2.wav,Colere
...,...,...,...
931,01-T-1-2,/content/Tristesse/Faible/01-T-1-2.wav,Tristesse
932,12-T-1-6,/content/Tristesse/Faible/12-T-1-6.wav,Tristesse
933,11-T-1-4,/content/Tristesse/Faible/11-T-1-4.wav,Tristesse
934,04-T-1-4,/content/Tristesse/Faible/04-T-1-4.wav,Tristesse


In [None]:
save_path = "/content/data"

train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["emotion"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)


print(train_df.shape)
print(test_df.shape)

(748, 3)
(188, 3)


## Prepare Data for Training

In [None]:
data_files = {
    "train": "/content/data/train.csv",
    "validation": "/content/data/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['name', 'path', 'emotion'],
    num_rows: 748
})
Dataset({
    features: ['name', 'path', 'emotion'],
    num_rows: 188
})


In [None]:
# We need to specify the input and output column
input_column = "path"
output_column = "emotion"

# we need to distinguish the unique labels in our SER dataset
label_list = train_dataset.unique(output_column)
label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

A classification problem with 7 classes: ['Colere', 'Degout', 'Joie', 'Neutre', 'Peur', 'Surprise', 'Tristesse']


In [None]:
model_name_or_path = "jonatasgrosman/wav2vec2-large-xlsr-53-french"
pooling_mode = "mean"

config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

config.json:   0%|          | 0.00/1.53k [00:00<?, ?B/s]

In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

preprocessor_config.json:   0%|          | 0.00/262 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/560 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

The target sampling rate: 16000


# Preprocess Data

In [None]:
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)

    speech_array = speech_array[0].numpy().squeeze()
    speech = librosa.resample(np.asarray(speech_array), orig_sr=sampling_rate, target_sr=16_000)
    return speech

def label_to_id(label, label_list):

    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]

    result = processor(speech_list, sampling_rate=target_sampling_rate)
    result["labels"] = list(target_list)

    return result

In [None]:
train_dataset_prepoc = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

eval_dataset_prepoc = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

Map (num_proc=4):   0%|          | 0/748 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/188 [00:00<?, ? examples/s]

## Model


In [None]:
@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


In [None]:
class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

is_regression = False

In [None]:
def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)

model.freeze_feature_extractor()

pytorch_model.bin:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-french and are newly initialized: ['classifier.dense.bias', 'classifier.out_proj.weight', 'classifier.dense.weight', 'classifier.out_proj.bias', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
EPOCHS = 3.0

# Define your training arguments
training_args = TrainingArguments(
    output_dir="/content/wav2vec2-large-xlsr-53-french",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=EPOCHS,
    fp16=True,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)

# Create a Trainer instance
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset_prepoc,
    eval_dataset=eval_dataset_prepoc,
    tokenizer=processor.feature_extractor,
    compute_metrics=compute_metrics,
)

# Fine-tune the model
trainer.train()

Step,Training Loss,Validation Loss,Accuracy
10,1.5206,2.021797,0.292553
20,1.2661,1.633785,0.340426
30,1.2961,1.414153,0.468085
40,1.1222,1.227504,0.515957
50,0.9449,1.235334,0.553191
60,0.8884,1.255782,0.553191
70,1.1132,1.172572,0.601064
80,0.9073,1.2865,0.574468
90,0.8923,0.878828,0.680851
100,0.8702,0.920953,0.680851


TrainOutput(global_step=930, training_loss=0.27006122886493644, metrics={'train_runtime': 8178.52, 'train_samples_per_second': 0.915, 'train_steps_per_second': 0.114, 'total_flos': 1.1929088038243113e+18, 'train_loss': 0.27006122886493644, 'epoch': 9.95})

# Testing on another samples

In [None]:
df = pd.DataFrame(data={"path": ["/content/11a03Ca.wav", "/content/24a01Ca.wav", "/content/07a09Pa.wav", "/content/11a03Da.wav"]})
df

Unnamed: 0,path
0,/content/11a03Ca.wav
1,/content/24a01Ca.wav
2,/content/07a09Pa.wav
3,/content/11a03Da.wav


In [None]:
import torch
import torchaudio
import librosa
import numpy as np
from torch.nn.functional import softmax

# ... [Assuming processor, model, device, and config.id2label are already defined] ...

def predict_single_sample(sample):
    try:
        # Load and preprocess the audio file
        speech_array, sampling_rate = torchaudio.load(sample["path"])
        speech_array = speech_array.squeeze().numpy()
        speech_array = librosa.resample(np.asarray(speech_array), orig_sr=sampling_rate, target_sr=processor.feature_extractor.sampling_rate)

        # Process the audio features through the processor
        features = processor(speech_array, sampling_rate=processor.feature_extractor.sampling_rate, return_tensors="pt", padding=True)

        input_values = features.input_values.to(device)
        attention_mask = features.attention_mask.to(device)

        # Model inference
        with torch.no_grad():
            logits = model(input_values, attention_mask=attention_mask).logits

        # Convert logits to probabilities (confidence scores)
        probabilities = softmax(logits, dim=-1)
        pred_confidences, pred_ids = torch.max(probabilities, dim=-1)

        # Prepare the predictions with their confidence scores
        predictions = [(int(pred_id), float(confidence)) for pred_id, confidence in zip(pred_ids.detach().cpu().numpy(), pred_confidences.detach().cpu().numpy())]
        sample["predictions"] = predictions
    except Exception as e:
        print(f"Error processing sample {sample['path']}: {e}")
        sample["predictions"] = None

    return sample

# Iterate over the DataFrame
for index, sample in df.iterrows():
    sample_dict = sample.to_dict()
    sample_result = predict_single_sample(sample_dict)
    print("Sample Path:", sample_result["path"])

    if sample_result["predictions"] is not None:
        for pred_id, confidence in sample_result["predictions"]:
            label = config.id2label.get(pred_id, "Unknown")
            print(f'Predicted: id: {pred_id} ---> {label} (Confidence: {confidence:.2f})')
    else:
        print("Prediction failed for this sample.")
    print("="*50)


Sample Path: /content/11a03Ca.wav
Predicted: id: 2 ---> Joie (Confidence: 0.73)
Predicted: id: 2 ---> Joie (Confidence: 0.80)
Sample Path: /content/24a01Ca.wav
Predicted: id: 5 ---> Surprise (Confidence: 0.98)
Predicted: id: 5 ---> Surprise (Confidence: 0.99)
Sample Path: /content/07a09Pa.wav
Predicted: id: 1 ---> Degout (Confidence: 1.00)
Predicted: id: 1 ---> Degout (Confidence: 1.00)
Sample Path: /content/11a03Da.wav
Predicted: id: 1 ---> Degout (Confidence: 1.00)
Predicted: id: 1 ---> Degout (Confidence: 0.99)


If you want to save the model:

In [None]:
# !zip -r /content/drive/MyDrive/auto_reco/voice_reco_models.zip /content/wav2vec2-large-xlsr-53-french

  adding: content/wav2vec2-large-xlsr-53-french/ (stored 0%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/ (stored 0%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/training_args.bin (deflated 51%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/config.json (deflated 65%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/trainer_state.json (deflated 85%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/model.safetensors (deflated 8%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/preprocessor_config.json (deflated 38%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/optimizer.pt (deflated 8%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/scheduler.pt (deflated 56%)
  adding: content/wav2vec2-large-xlsr-53-french/checkpoint-930/rng_state.pth (deflated 25%)
  adding: content/wav2vec2-large-xlsr-53-french/runs/ (stored 0%)
  adding: content/wav2vec2-large-xlsr-53-fr