In [1]:
%%capture
# Установка используемых библиотек и настройка окружения
!pip install git+https://github.com/huggingface/datasets.git
!pip install git+https://github.com/huggingface/transformers.git
!pip install jiwer
!pip install torchaudio

%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8
%env TRANSFORMERS_CACHE=/content/cache
%env HF_DATASETS_CACHE=/content/cache
%env CUDA_LAUNCH_BLOCKING=1

In [2]:
AUGMENTED = True # Использовать ли аугментацию
ALPHA = 0.3
DATASET = 'ravdess' # Либо "emodb", "cremad" 
FOLD = 0 # 0, 1, 2 ...N, где N - количество фолдов в соответствии с работой 
EPOCHS = 10 # Можно изменять количество эпох, в работе указаны использованные значения 
RAVDESS = # Ваш путь до директории здесь
EMODB = # Ваш путь до директории здесь
CREMAD = # Ваш путь до директории здесь

In [3]:
import os
import sys
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import torchaudio
from sklearn.model_selection import train_test_split
import torchaudio
import IPython.display as ipd
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from torch import nn
from transformers.file_utils import ModelOutput
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import transformers
from transformers import Wav2Vec2Processor, EvalPrediction
from typing import Any, Dict, Union
from packaging import version
from torch.utils.data import DataLoader

import scipy.io.wavfile as wf
from torch.utils.data import Dataset
from transformers import AutoConfig, Wav2Vec2Processor, TrainingArguments, Trainer, is_apex_available


from datasets import load_dataset, load_metric
import glob
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)

In [4]:
# Перевод RAVDESS в датафрейм для удобства дальнейшей работы
encode_emotions = { 
    1 : 'neu',
    2 : 'calm', 
    3 : 'hap',
    4 : 'sad',
    5 : 'ang',
    6: 'fea',
    7: 'dis',
    8: 'sup'
}
dataset_data = []
for path in glob.glob(RAVDESS+'/Actor_*/*.wav'):
    filename = path.split('/')[-1][:-4]
    modality, vocal_channel, emotion, emo_intensity, statement, repetition, actor = [int(x) for x in filename.split('-')]
    dataset_data.append([actor, statement, encode_emotions[emotion], 'ravdess', path])
df_ravdess = pd.DataFrame(dataset_data, columns = ['actor', 'statement', 'emotion', 'dataset', 'path'])

In [5]:
# Перевод EMODB в датафрейм для удобства дальнейшей работы
encode_emotions = {
    'W' : 'ang',
    'L' : 'bor', # == 'calm' ?
    'E' : 'dis',
    'A' : 'fea',
    'F' : 'hap',
    'T' : 'sad',
    'N' : 'neu'
}
dataset_data = []
for file in glob.glob(EMODB+'/*.wav'):
    filename = file[-11:-4]
    speaker = filename[:2]
    text_id = filename[3:5]
    emotion_code = filename[5]
    version_ = filename[6]
    dataset_data.append([int(speaker), int(text_id), encode_emotions[emotion_code], 'emodb', file])
df_emodb = pd.DataFrame(dataset_data, columns = ['actor', 'statement', 'emotion', 'dataset', 'path'])

In [6]:
# Перевод CREMA-D в датафрейм для удобства дальнейшей работы
dataset_data = []
for path in glob.glob(CREMAD+'/*.wav'):
    filename = path.split('/')[-1][:-4]
    emotion = filename.split('_')[2]
    actor = filename[:4]
    dataset_data.append([int(actor)-1000,0, emotion.lower(), 'cremad', path])
    
df_cremad = pd.DataFrame(dataset_data, columns = ['actor', 'statement', 'emotion', 'dataset', 'path'])

In [7]:
if DATASET == 'emodb':
    df = df_emodb
elif DATASET == 'ravdess':
    df = df_ravdess 
elif DATASET == 'cremad':
    df = df_cremad

else:
    raise NotImplementedError("Undefined Dataset name")

In [8]:
# Краткое описание данных
print("Labels: ", df["emotion"].unique())
df.groupby("emotion").count()[["path"]]

In [9]:
idx = np.random.randint(0, len(df))
sample = df.iloc[idx]
path = sample["path"]
label = sample["emotion"]


print(f"Расположение файла: {idx}")
print(f"      Эмоция: {label}")
print()

speech, sr = torchaudio.load(path)
speech = speech[0].numpy().squeeze()
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=sr)

In [10]:
# За основу кода этой ячейки взят код https://github.com/m3hrdadfi/soxan с некоторыми изменениями и упрощениями
idx = 0
# Класс для хранения результатов
@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

# Классификатор для всей модели
class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        global num_labels
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, num_labels)#config.num_labels
        self.num_labels = num_labels
    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

# Интерфейс - модель, использующая Wav2Vec2
class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        global num_labels
        super().__init__(config)
        self.num_labels = num_labels
        self.pooling_mode = "mean" # mean работает лучше
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config) # Сама модель
        self.classifier = Wav2Vec2ClassificationHead(config) # Классификатор

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(self, hidden_states, mode="mean"): # mean работает лучше
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        return outputs

    def forward(self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, 
                return_dict=None, labels=None ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0] # берем последнее
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode) # Усредняем по времени
        logits = self.classifier(hidden_states)
        # Далее выдаем результат в нужном формате
        loss = None
        if labels is not None:
            loss_fct = BCEWithLogitsLoss()
            loss = loss_fct(logits, labels)
        
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )
    

# Словарь с точностью (можно добавлять другие метрики)
def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.argmax(preds, axis=1)
    return {"accuracy": (preds == np.argmax(p.label_ids, 1)).astype(np.float32).mean().item()}





if is_apex_available(): # апекс можно использовать, если есть поддержка.
    # Вычисления будут эффективнее, меньше расход памяьти
    from apex import amp

if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast

    
    
class CTCTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        """
        Класс-обертка для обучения модели, совместим с huggingface trainer, что делает удобным его применение
        """

        model.train()
        inputs = self._prepare_inputs(inputs)

        if self.use_amp:
            with autocast():
                loss = self.compute_loss(model, inputs)
        else:
            loss = self.compute_loss(model, inputs)

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        if self.use_amp:
            self.scaler.scale(loss).backward()
        elif self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        elif self.deepspeed:
            self.deepspeed.backward(loss)
        else:
            loss.backward()

        return loss.detach()

# Загрузка и ресэмплинг при необходимости - используется сэмплинг рейт 16 000
# (во всех аудиозаписях датасета используется тоже 16 000)
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    if len(speech_array.shape) > 1:
        speech_array = speech_array[0] # for two-channels audio
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech

# По метке получаем  число
def label_to_id(label, label_list):

    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

# В этой функции используются предыдущие две - загружается запись и метки преобразуются в число
def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]
    try:
        result = {}
        result = processor(speech_list, sampling_rate=target_sampling_rate)
        result["labels"] = list(target_list)
    except:
        print("error")
    return result

In [11]:
@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": torch.tensor(feature["input_values"].astype('float'))} for feature in features]
        label_features = [torch.nn.functional.one_hot(torch.tensor(feature["labels"]), 8) if type(feature["labels"]) == int else feature["labels"] for feature in features]
        
        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        batch["labels"] = torch.stack(label_features)
        return batch

In [12]:
pooling_mode = "mean"
model_name_or_path = "jonatasgrosman/wav2vec2-large-xlsr-53-english"
input_column = "path"
output_column = "emotion"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
OUT_DIR = f"wav2vec2_fold{FOLD}"

In [13]:
if DATASET == 'ravdess':
    # RAVDESS
    actors_per_fold = {
            0: [1, 2, 3, 4, 5],
            1: [6, 7, 8, 9, 10],
            2: [11, 12, 13, 14, 15],
            3: [16, 17, 18, 19, 20],
            4: [21, 22, 23, 24]
    }
elif DATASET == 'cremad':
    # CREMAD
    actors_per_fold = {
            0: [10, 13, 14, 17, 18, 21, 26, 30, 32, 40, 41, 51, 58, 86],
            1: [23, 24, 25, 27, 28, 29, 31, 33, 34, 35, 36, 37, 38, 39]
    }
elif DATASET == 'EMODB':
    # EMODB
    actors_per_fold = {
        0: [13, 14, 15, 16],
        1: [3, 8, 9] # Суммарное количество записей такое же, как в случае предыдущего фолда
    }
else:
    raise NotImplementedError("Undefined dataset")

In [14]:

training_args = TrainingArguments(
output_dir=OUT_DIR,

per_device_train_batch_size=3,
per_device_eval_batch_size=3,
gradient_accumulation_steps=2,
evaluation_strategy="steps",
num_train_epochs=EPOCHS,
fp16=(True if device =='cuda' else False),
save_steps=500,
eval_steps=20,
logging_steps=10,
learning_rate=1e-4,
save_total_limit=2,
)
test_part = df.query(f'actor in {actors_per_fold[FOLD]}')
train_part = df.query(f'not (actor in {actors_per_fold[FOLD]})')
save_path = "./"
train_part.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_part.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)
data_files = {
    "train": "./train.csv", 
    "validation": "./test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

label_list = train_dataset.unique(output_column)
label_list.sort() 
num_labels = len(label_list)



config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate


train_dataset = train_dataset.map(
preprocess_function,
batch_size=100,
batched=True,
num_proc=4
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=4
)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)

print(f"Целевой сэмплинг рейт: {target_sampling_rate}")
print(f"Количество лейблов {num_labels}, классы : {label_list}")
print(f"Используется: {device}")

In [15]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True) 

In [16]:
eval_pd = eval_dataset.to_pandas()
train_pd = train_dataset.to_pandas()

In [17]:
EMOTIONS = num_labels # Количество классов


distribution = torch.distributions.beta.Beta(torch.tensor([ALPHA]), torch.tensor([ALPHA])) #torch.rand(1).item()

# Это один из примеров возможного смешивания, который не используется, но также был исследован при экспериментах 
def permute_audio(audio, permutation):
    return np.hstack(np.array(np.array_split(audio, len(permutation)),dtype=object)[permutation])

# Функция смешивания двух образцов записи в соответствии с описанным в работе подходом
def mixup(audio1, audio2, label1, label2):
    proportion = distribution.sample().item() # Семплируем из бета-распределения
    take1 = round(len(audio1)*proportion) #  Берем соответствующие пропорции
    take2 = round(len(audio2)*(1-proportion))
    audio = np.hstack([audio1[:take1], audio2[-take2:]])
    label = label1*proportion + label2*(1-proportion) # Изменяем метки соответствующим образом
    return audio, label

# Реализация датасета для получения записи из набора данных

class AudioDataset(Dataset):
    def __init__(self, is_train, df, n_splits=4, p_augment = 0.5, use_augment=True): # possible use random n_splits every time
        self.df = df
        self.is_train = is_train
        self.n_splits = n_splits
        self.p_augment = p_augment
        self.use_augment = use_augment
        
        
    def __len__(self):
        return self.df.shape[0]
    
    
    def __getitem__(self, index):
        item = self.df.iloc[index].to_dict()
        if self.is_train and (np.random.uniform() < self.p_augment) and self.use_augment: 
            # С некоторой вероятностью аугментируем запись из тренировочного датасета
            actor = item['actor']
            same_actor = self.df.query(f'actor=={actor}') # Рассматриваем множетво записей того же актера без ограничения эмоций
            pick = np.random.randint(0, same_actor.shape[0]) # Выбираем случайную запись этого актера
            another_sample = self.df.iloc[pick].to_dict()
            # Далее смешиваем
            audio, sample = mixup(item['input_values'], another_sample['input_values'], 
                  torch.nn.functional.one_hot(torch.tensor(item['labels']), EMOTIONS),
                  torch.nn.functional.one_hot(torch.tensor(another_sample['labels']), EMOTIONS)
                 )
            item['input_values'] = audio
            item['labels'] = sample
        else:
            item['labels'] = torch.nn.functional.one_hot(torch.tensor(item['labels']), EMOTIONS).float()
        return item

In [18]:
# Создание экземпляров датасета 
trainds = AudioDataset(True, train_pd, use_augment=AUGMENTED)
evalds = AudioDataset(False, eval_pd)

In [19]:
#  Содание загрузчика с использванием коллатора
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True) # 
train_dl = DataLoader(trainds, collate_fn=data_collator, batch_size=32)

In [20]:
# Определение класса модели
model = Wav2Vec2ForSpeechClassification.from_pretrained(model_name_or_path)
model.to(device)
model.freeze_feature_extractor()

In [None]:
# Аргументы для huggingface trainer и обучение модели
training_args = TrainingArguments(
    output_dir=OUT_DIR,
    per_device_train_batch_size=3, # Большее количество записей не помещалось
    per_device_eval_batch_size=3,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=EPOCHS,
    fp16=(True if device =='cuda' else False), # Оптимизация по памяти
    save_steps=500, # Можно сохранять модель
    eval_steps=20, # Как часто проводить валидацию
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
    lr_scheduler_type='polynomial' # Использую полиномиальное сокращение шага, как указано в работе
)

trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=trainds,
    eval_dataset=evalds,
    tokenizer=processor.feature_extractor
)

trainer.train()