In [None]:
!pip install git+https://github.com/huggingface/transformers.git
!pip install jiwer
!pip install torchaudio
!pip install librosa

# Загрузка датасета с kaggle и создание датафрейма

In [None]:
!pip install -q kaggle

In [None]:
from google.colab import files
files.upload()

In [None]:
! mkdir ~/.kaggle

In [None]:
! cp kaggle.json ~/.kaggle/

In [None]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
! kaggle datasets list

In [None]:
! kaggle datasets download -d uwrfkaggler/ravdess-emotional-speech-audio

Downloading ravdess-emotional-speech-audio.zip to /content
 99% 425M/429M [00:07<00:00, 76.4MB/s]
100% 429M/429M [00:07<00:00, 60.4MB/s]


In [None]:
! unzip ravdess-emotional-speech-audio.zip

In [None]:
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split

import os
import sys

In [None]:
data = []
# for
for x in range(1, 25):
  if (x < 10):
    number_act = "0" + str(x)
  else:
    number_act = str(x)
  for path in tqdm(Path(f"audio_speech_actors_01-24/Actor_{number_act}").glob("**/*.wav")):
      name = str(path).split('/')[2]
      label = name.split('-')[2]
      actor = str(path).split('/')[1]

      try:
          # There are some broken files
          s = torchaudio.load(path)
          data.append({
              "name": name,
              "actor": actor,
              "path": path,
              "emotion": label
          })
      except Exception as e:
          # print(str(path), e)
          pass

In [None]:
df = pd.DataFrame(data)
df

Unnamed: 0,name,actor,path,emotion
0,03-01-05-01-01-02-01.wav,Actor_01,audio_speech_actors_01-24/Actor_01/03-01-05-01...,05
1,03-01-06-01-02-01-01.wav,Actor_01,audio_speech_actors_01-24/Actor_01/03-01-06-01...,06
2,03-01-04-01-01-02-01.wav,Actor_01,audio_speech_actors_01-24/Actor_01/03-01-04-01...,04
3,03-01-02-01-01-01-01.wav,Actor_01,audio_speech_actors_01-24/Actor_01/03-01-02-01...,02
4,03-01-07-01-01-01-01.wav,Actor_01,audio_speech_actors_01-24/Actor_01/03-01-07-01...,07
...,...,...,...,...
1435,03-01-02-02-01-01-24.wav,Actor_24,audio_speech_actors_01-24/Actor_24/03-01-02-02...,02
1436,03-01-08-02-02-01-24.wav,Actor_24,audio_speech_actors_01-24/Actor_24/03-01-08-02...,08
1437,03-01-04-01-01-01-24.wav,Actor_24,audio_speech_actors_01-24/Actor_24/03-01-04-01...,04
1438,03-01-06-02-02-01-24.wav,Actor_24,audio_speech_actors_01-24/Actor_24/03-01-06-02...,06


In [None]:
num = ["01", "02", "03", "04", "05", "06", "07", "08"]
emotion = ["neutral", "calm", "happy", "sad", "angry", "fearful", "disgust", "surprised"]
df = df.replace(num, emotion)
df["emotion"].value_counts(dropna=False)

angry        192
fearful      192
sad          192
calm         192
disgust      192
surprised    192
happy        192
neutral       96
Name: emotion, dtype: int64

In [None]:
! pip install samplerate
! pip install resampy

Раздерение на обучающую и валидационную выборку

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["emotion"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv("train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv("test.csv", sep="\t", encoding="utf-8", index=False)

train_df.head()

# print(train_df.shape)
# print(test_df.shape)

Unnamed: 0,name,actor,path,emotion
0,03-01-02-02-02-01-04.wav,Actor_04,audio_speech_actors_01-24/Actor_04/03-01-02-02...,calm
1,03-01-07-02-02-01-21.wav,Actor_21,audio_speech_actors_01-24/Actor_21/03-01-07-02...,disgust
2,03-01-05-01-02-02-20.wav,Actor_20,audio_speech_actors_01-24/Actor_20/03-01-05-01...,angry
3,03-01-04-01-01-01-08.wav,Actor_08,audio_speech_actors_01-24/Actor_08/03-01-04-01...,sad
4,03-01-05-01-01-02-16.wav,Actor_16,audio_speech_actors_01-24/Actor_16/03-01-05-01...,angry


# Подготовка к обучению

In [None]:
!pip install git+https://github.com/huggingface/datasets.git

In [None]:
from datasets import load_dataset, load_metric


data_files = {
    "train": "train.csv",
    "validation": "test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

Downloading data files:   0%|          | 0/2 [00:00<?, ?it/s]

Extracting data files:   0%|          | 0/2 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['name', 'actor', 'path', 'emotion'],
    num_rows: 1152
})
Dataset({
    features: ['name', 'actor', 'path', 'emotion'],
    num_rows: 288
})


In [None]:
from transformers import AutoConfig, Wav2Vec2Processor

In [None]:
label_list = train_dataset.unique("emotion")
label_list.sort()
num_labels = len(label_list)
model_name_or_path = "lighteternal/wav2vec2-large-xlsr-53-greek"
pooling_mode = "mean"
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
# x = {label: i for i, label in enumerate(label_list)}
# y = {i: label for i, label in enumerate(label_list)}
# print(x)
# print(y)
setattr(config, 'pooling_mode', pooling_mode)

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.56k [00:00<?, ?B/s]



In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)

Downloading (…)rocessor_config.json:   0%|          | 0.00/158 [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/138 [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/535 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

# Preprocess Data

In [None]:
def speech_file_to_array_fn(path):
    speech, sr = librosa.load(path,sr=48000)
    speech = librosa.resample(speech, orig_sr=sr, target_sr=16000)
    return speech


def label_to_id(label, label_list):
    if len(label_list) > 0:
        if label in label_list:
            return label_list.index(label)
        else:
            return -1;
    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples["path"]]
    target_list = [label_to_id(label, label_list) for label in examples["emotion"]]
    result = processor(speech_list, sampling_rate=16000)
    result["labels"] = list(target_list)
    return result

In [None]:
preprocess_function(df)

In [None]:
train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)

Map (num_proc=4):   0%|          | 0/1152 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/288 [00:00<?, ? examples/s]

In [None]:
idx = 20
print(f"Training input_values: {(train_dataset[idx]['input_values'][:10])}")
# print(f"Training attention_mask: {(train_dataset[idx]['attention_mask'])}")
# print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")

Training input_values: [0.0005402070819400251, 0.0005402069073170424, 0.0005402070237323642, 0.0005402071983553469, 0.0005402070237323642, 0.0005402069073170424, 0.0005402070819400251, 0.000540207140147686, 0.0005402070237323642, 0.0005402070819400251]


# Модель

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput

@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [None]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)

In [None]:
class Wav2Vec2ClassificationHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(self, hidden_states, mode="mean"):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]

        return outputs

    def forward(self, input_values, attention_mask=None, output_attentions=None,
                output_hidden_states=None, return_dict=None, labels=None,):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(input_values, attention_mask=attention_mask, output_attentions=output_attentions,
                                output_hidden_states=output_hidden_states, return_dict=return_dict,)
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )



# Обучение

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch

import transformers
from transformers import Wav2Vec2Processor

In [None]:
@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(input_features, padding=self.padding, max_length=self.max_length,
                                   pad_to_multiple_of=self.pad_to_multiple_of, return_tensors="pt",)

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
data_collator

In [None]:
import numpy as np
from transformers import EvalPrediction

In [None]:
def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)
    return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(model_name_or_path, config=config,)

Downloading pytorch_model.bin:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at lighteternal/wav2vec2-large-xlsr-53-greek and are newly initialized: ['classifier.out_proj.weight', 'classifier.out_proj.bias', 'classifier.dense.bias', 'classifier.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
model.freeze_feature_extractor()

In [None]:
! pip install -U accelerate
! pip install -U transformers

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="/content/wav2vec2-xlsr-greek-speech-emotion-recognition",
    # output_dir="/content/gdrive/MyDrive/wav2vec2-xlsr-greek-speech-emotion-recognition"
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=1.0,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)

In [None]:
from typing import Any, Dict, Union

import torch
from packaging import version
from torch import nn

from transformers import (
    Trainer,
    is_apex_available,
)

if is_apex_available():
    from apex import amp

if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast


class CTCTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        model.train()
        inputs = self._prepare_inputs(inputs)
        loss = self.compute_loss(model, inputs)
        loss = loss / self.args.gradient_accumulation_steps
        if self.deepspeed:
            self.deepspeed.backward(loss)
        else:
            loss.backward()

        return loss.detach()

In [None]:
trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

Step,Training Loss,Validation Loss,Accuracy
10,2.0325,2.077425,0.267361
20,2.0884,1.919384,0.354167
30,1.9275,1.790171,0.385417
40,1.8637,1.739768,0.354167
50,1.6711,1.577426,0.409722


Step,Training Loss,Validation Loss,Accuracy
10,2.0325,2.077425,0.267361
20,2.0884,1.919384,0.354167
30,1.9275,1.790171,0.385417
40,1.8637,1.739768,0.354167
50,1.6711,1.577426,0.409722
60,1.6089,1.52409,0.413194
70,1.5205,1.388671,0.447917
