In [72]:
import numpy as np
import torchaudio
from datasets import load_dataset
from transformers import AutoConfig, Wav2Vec2Processor, EvalPrediction, TrainingArguments
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List, Union
import torch
from transformers.file_utils import ModelOutput
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)
from typing import Any, Dict, Union
from packaging import version
from transformers import (
    Trainer,
    is_apex_available,
) 
if is_apex_available():
    from apex import amp
if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.amp import autocast

## Load data

In [47]:
data_files = {
    "train": "../../dataset_csv/train.csv", 
    "validation": "../../dataset_csv/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t")

train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

Dataset({
    features: ['name', 'path', 'emotion'],
    num_rows: 1152
})
Dataset({
    features: ['name', 'path', 'emotion'],
    num_rows: 288
})


In [48]:
input_column = "path"
output_column = "emotion"

### Control step to detect anomalies in the training assembly

In [49]:
label_list = train_dataset.unique(output_column)
label_list.sort()
num_labels = len(label_list)

print(f"A classification problem with {num_labels} classes: {label_list}")

A classification problem with 8 classes: ['angry', 'calm', 'disgust', 'fearful', 'happy', 'neutral', 'sad', 'surprised']


# Selection of pre-trained model

In [50]:
model_name_or_path = "lighteternal/wav2vec2-large-xlsr-53-greek"
pooling_mode = "mean"

## Prepare the model for fine-tuning

In [51]:
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

In [52]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate
# Sampling rate expected by the model
print(f"The target sampling rate: {target_sampling_rate}")

The target sampling rate: 16000


# Preprocess Data

In [53]:
# Load an audio file and transform it into an array of numerical values.
def speech_file_to_array_fn(path):
    # speech_array -> tensor with audio data
    # sampling_rate -> audio file sample rate
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    if speech.ndim == 0:
        speech = np.array([speech])
    return speech

def label_to_id(label, label_list):

    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]

    speech_list = [np.asarray(speech).flatten() for speech in speech_list]
    
    result = processor(speech_list, sampling_rate=target_sampling_rate)
    result["labels"] = list(target_list)

    return result

In [54]:
train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=1,
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=1,
)

In [55]:
idx = 2
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training attention_mask: {train_dataset[idx]['attention_mask']}")
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")

Training input_values: [-0.00031803842284716666, 0.000801649468485266, -0.0028854531701654196, -0.00830519013106823, -0.0023451929446309805, 0.0023333292920142412, 0.00892800185829401, 0.002452349290251732, -0.002885744208469987, -0.00014755230222363025, 8.066212467383593e-05, -0.0016002018237486482, 0.0019468602258712053, 0.01612282730638981, 0.004661410115659237, 0.003931188024580479, -0.001383547903969884, -0.007989996112883091, 0.008182018995285034, 0.006187531631439924, 0.002376001328229904, 0.017410507425665855, 0.004444633144885302, -0.004921436309814453, -0.000351182883605361, -0.006834488827735186, -0.0006588680553250015, -0.0023243811447173357, -0.0060186623595654964, 0.01003329735249281, -0.0032831565476953983, -0.010845686309039593, 0.002633193973451853, -0.0005065755685791373, 0.0012483354657888412, 0.0026718750596046448, 0.006938535254448652, 0.007097181864082813, -0.008401991799473763, 0.002954374300315976, 0.0037717102095484734, -0.0012534687994048, 0.010807416401803493

# Model

## Structuring the model output

In [56]:
@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [57]:
# Classification header which takes features extracted by Wav2Vec2 and transforms them into logits for classification
class Wav2Vec2ClassificationHead(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


In [58]:
# Definition of a classification model based on Wav2Vec2 for speech
class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    # Method for freezing the feature extractor parameters of the Wav2Vec2 model (for fine-tuning)
    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    # Method which aggregates hidden states -> allows transforming the outputs of the Wav2Vec2 model into a compact vector for classification
    def merged_strategy(self, hidden_states, mode="mean"):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")
        return outputs
    
    def forward(self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, labels=None, ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )



# Training

In [59]:
@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [60]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [61]:
is_regression = False

In [62]:
def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

In [63]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at lighteternal/wav2vec2-large-xlsr-53-greek and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [64]:
model.freeze_feature_extractor()

In [65]:
training_args = TrainingArguments(
    output_dir="/content/wav2vec2-xlsr-greek-speech-emotion-recognition",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=1.0,
    fp16=True,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)

In [75]:
class CTCTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        # Remove use_amp from kwargs before passing to parent constructor
        self.use_amp = kwargs.pop('use_amp', False)
        self.use_apex = kwargs.pop('use_apex', False)
        self.deepspeed = kwargs.pop('deepspeed', False)
        
        # Initialize the parent constructor with remaining kwargs
        super().__init__(*args, **kwargs)
        
        # Initialize scaler if using AMP
        if self.use_amp:
            self.scaler = torch.amp.GradScaler()

    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], num_items_in_batch: int) -> torch.Tensor:
        model.train()
        inputs = self._prepare_inputs(inputs)

        if self.use_amp:
            with autocast(device_type='cuda', dtype=torch.float16):
                loss = self.compute_loss(model, inputs)
        else:
            loss = self.compute_loss(model, inputs)

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        if self.use_amp:
            self.scaler.scale(loss).backward()
        elif self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        elif self.deepspeed:
            self.deepspeed.backward(loss)
        else:
            loss.backward()

        return loss.detach()

trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    processing_class=processor.feature_extractor,
    use_amp=True 
)


In [76]:
trainer.train()

  1%|▏         | 2/144 [03:04<3:37:48, 92.03s/it]
  7%|▋         | 10/144 [05:16<1:12:22, 32.41s/it]
  7%|▋         | 10/144 [05:16<1:12:22, 32.41s/it]

{'loss': 1.8885, 'grad_norm': 2.350334882736206, 'learning_rate': 9.305555555555556e-05, 'epoch': 0.07}



[A
[A
[A

KeyboardInterrupt: 