# Modelo Wav2vec 2.0

Fine-Tuning utilizado para a iniciação científica com o objetivo de "Estudo dos Impactos das Características dos Locutores no Desempenho de Modelos de Reconhecimento Automático de Fala" com foco na língua portuguesa do Brasil.

Autora: Lara Ramos Linhares

Orientador: Luiz Henrique Merschmann

As bibliotecas utilizadas neste código estão em um arquivo requirements.txt.

Para fazer o download das bibliotecas utilize o comando pip install -r requirements.txt

## Preparando os dados para o fine-tuning

In [None]:
from datasets import load_dataset, load_metric

import pandas as pd
import numpy as np

from num2fawords import words, ordinal_words
from tqdm import tqdm

from sklearn.model_selection import train_test_split

import os

A biblioteca os é utilizada para facilitar o acesso aos arquivos de transcrição em formato csv necessários.

In [None]:
cwd = os.getcwd()
csv_path = os.path.join(cwd, "BraccentPorSotaque", "Baiano")
csv_path_treino = os.path.join(csv_path, "BaianoTreino.csv")
csv_path_teste = os.path.join(csv_path, "BaianoTeste.csv")

print("Caminho do arquivo de treino:", csv_path_treino)
print("Caminho do arquivo de teste:", csv_path_teste)

É realizado o carregamento do conjunto de dados de treino e teste a partir do arquivo CSV usando load_dataset

In [None]:
common_voice_train = load_dataset("csv", data_files={"train": csv_path_treino}, delimiter=",", encoding='utf8')["train"]
common_voice_test = load_dataset("csv", data_files={"test": csv_path_teste}, delimiter=",", encoding='utf8')["test"]
print(common_voice_train)
print(common_voice_test)

A função extract_all_chars é responsável por extrair todos os caracteres únicos do conjunto de dados para a criação de um vocabulário.

Algumas considerações:

- 'all_text' é uma string que contém todas as sentenças concatenadas do lote. Isso é feito usando join para unir as sentenças com um espaço entre elas.
- 'vocab' é uma lista contendo todos os caracteres únicos presentes na string 'all_text'. Isso é alcançado convertendo a string para um conjunto (set) para garantir a unicidade dos caracteres e, em seguida, convertendo de volta para uma lista.

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["sentence"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_train.column_names)
vocab_test = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_test.column_names)

In [None]:
vocab_list = list(sorted(set(vocab_train["vocab"][0]) ))
vocab_list = [vocab for vocab in vocab_list if vocab not in [" ", "\u0307","<",">"]]
print(len(vocab_list))
print(vocab_list)

In [None]:
special_vocab = ["<pad>", "<s>", "</s>", "<unk>", "|"]

In [None]:
vocab_dict = {v: k for k, v in enumerate(special_vocab + vocab_list)}
print(len(vocab_dict))
print(vocab_dict)

Escreve o vocabulário resultante e armazenado em 'vocab_dict' em um arquivo JSON. 

In [None]:
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

Nesta parte, é determinado o modelo pré-treinado utilizado e o local em que será salvo os resultados finais. 

Nesse experimento, será utilizado o modelo https://huggingface.co/facebook/wav2vec2-large-xlsr-53 que é um dos muitos modelos multilíngues disponíveis no HuggingFace. 

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer, AutoProcessor, AutoModelForPreTraining

save_dir = "./novo-treinamento/"
model_name_or_path = "facebook/wav2vec2-large-xlsr-53"


Este trecho de código verifica se existem checkpoints salvos no diretório 'save_dir' e, se existirem, recupera o caminho do último checkpoint usando a função 'get_last_checkpoint' do módulo transformers.trainer_utils. 

Isso pode ser útil, por exemplo, ao retomar o treinamento de um modelo a partir do ponto em que parou.

In [None]:
from transformers.trainer_utils import get_last_checkpoint

last_checkpoint = None
if os.path.exists(save_dir):
    last_checkpoint = get_last_checkpoint(save_dir)

print(last_checkpoint if last_checkpoint else str(None))

Criação de um novo tokenizer Wav2Vec2CTCTokenizer usando um arquivo JSON específico como vocabulário.

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer(
        "./vocab.json", 
        bos_token="<s>",
        eos_token="</s>",
        unk_token="<unk>",
        pad_token="<pad>",
        word_delimiter_token="|",
        do_lower_case=False
)

In [None]:
text = "Eu rasguei todo meu dinheiro em vão, minha família depende da aposentadoria do meu avô. Mas o dia amanheceu ensolarado e eu fiquei feliz porque minha tia viajou para Porto Alegre às sete horas."
print(" ".join(tokenizer.tokenize(text)))
print(tokenizer.decode(tokenizer.encode(text)))

Configuração de um Wav2Vec2FeatureExtractor para extrair características de áudio.

Este código é útil para configurar o Feature Extractor para extrair características de áudio, seja criando um novo a partir do zero ou carregando um existente de um modelo pré-treinado.

In [None]:
from transformers import Wav2Vec2FeatureExtractor

if not os.path.exists(save_dir) and not model_name_or_path:
    print("Load from scratch")
    feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
else:
    print(f"Load from {model_name_or_path}")
    feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name_or_path)

In [None]:
from transformers import Wav2Vec2Processor
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

Este trecho de código verifica se o tamanho do vocabulário retornado pelo método get_vocab() do tokenizer é igual ao tamanho total do tokenizer. Se ambos forem iguais, o código imprime o tamanho do tokenizer.

In [None]:
if len(processor.tokenizer.get_vocab()) == len(processor.tokenizer):
    print(len(processor.tokenizer))

## Redimencionamento dos dados (Resampling data)

A taxa de amostragem na qual os arquivos de áudio devem ser digitalizados é de 16.000Hz. Como o modelo Wav2Vec2 pré-treinado é treinado com arquivos de áudio com taxa de amostragem de 16.000 Hz, devemos reamostrar nossos dados de acordo.

Se você deseja redimencionar o áudio para a taxa alvo (target_sampling_rate), você pode descomentar a linha #speech_array = librosa.resample(np.asarray(speech_array), sampling_rate, target_sampling_rate) e fornecer a implementação correta para resamplear usando librosa. 

Isso é útil se a taxa de amostragem do áudio original for diferente da target_sampling_rate.

In [None]:
import torchaudio
import librosa
import os

target_sampling_rate = 16000

cwd = os.getcwd()
audio_path = os.path.join(cwd, "BraccentPorSotaque", "Baiano")
        
def speech_file_to_array_fn(batch):
    audio_file_path = os.path.join(audio_path, batch["path"])
    speech_array, sampling_rate = torchaudio.load(audio_file_path, normalize=True)  
    speech_array = speech_array.squeeze().numpy()
    #speech_array = librosa.resample(np.asarray(speech_array), sampling_rate, target_sampling_rate)
    
    batch["speech"] = speech_array
    batch["sampling_rate"] = target_sampling_rate
    batch["duration_in_seconds"] = len(batch["speech"]) / target_sampling_rate
    batch["target_text"] = batch["sentence"]
    
    return batch

Este trecho está mapeando a função 'speech_file_to_array_fn' nos conjuntos de dados de treinamento e teste, 'remove_columns' é usado para remover as colunas existentes, mantendo apenas as novas colunas adicionadas pela função.

- 'num_proc' especifica o número de processos paralelos a serem utilizados para mapeamento (pode acelerar o processamento)

In [None]:
common_voice_train = common_voice_train.map(speech_file_to_array_fn, remove_columns=common_voice_train.column_names, num_proc=19)
common_voice_test = common_voice_test.map(speech_file_to_array_fn, remove_columns=common_voice_test.column_names, num_proc=19)

In [None]:
print(common_voice_train[0]["sampling_rate"])
print(common_voice_test[0]["sampling_rate"])

A função 'filter_by_max_duration' filtra os exemplos de dados com base na duração do áudio.

- 'min_duration_in_seconds' e 'max_duration_in_seconds': definem os limites inferior e superior para a duração desejada do áudio.

In [None]:
min_duration_in_seconds = 5.0
max_duration_in_seconds = 10.0

def filter_by_max_duration(batch):
   return min_duration_in_seconds <= batch["duration_in_seconds"] <= max_duration_in_seconds

In [None]:
print(f"Split sizes [BEFORE]: {len(common_voice_train)} train and {len(common_voice_test)} validation.")

_common_voice_train = common_voice_train.filter(filter_by_max_duration, num_proc=24)

_common_voice_test = common_voice_test.filter(filter_by_max_duration, num_proc=4)

print(f"Split sizes [AFTER]: {len(_common_voice_train)} train and {len(_common_voice_test)} validation.")

Este trecho de código parece ser usado para visualizar aleatoriamente um exemplo do conjunto de treinamento 'common_voice_train'.

In [None]:
import IPython.display as ipd
import numpy as np
import random

sample = common_voice_train
rand_int = random.randint(0, len(sample))

print("Target text:", sample[rand_int]["target_text"])
print("Input array shape:", np.asarray(sample[rand_int]["speech"]).shape)
print("Sampling rate:", sample[rand_int]["sampling_rate"])

ipd.Audio(data=np.asarray(sample[rand_int]["speech"]), autoplay=True, rate=16000)

A função 'prepare_dataset' é destinada a preparar o conjunto de dados para treinamento, processando os sinais de áudio e os textos-alvo.

- 'assert len(set(batch["sampling_rate"])) == 1': Verifica se todos os sinais de áudio no lote têm a mesma taxa de amostragem. Isso é importante para garantir consistência.

- 'batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values': Usa o 'processor' (que inclui o 'feature_extractor' e o 'tokenizer') para processar os sinais de áudio ('batch["speech"]') e obter os 'input_values' que serão usados como entrada para o modelo.

- 'with processor.as_target_processor():': Configura o processador Wav2Vec2 para atuar como um processador de destino.

- 'batch["labels"] = processor(batch["target_text"]).input_ids': Usa o processador para processar os textos-alvo '(batch["target_text"])' e obter os 'input_ids' que representam as sequências de tokens.

- 'return batch': Retorna o batch modificado após o processamento.

Essa função é uma etapa importante na preparação dos dados para treinamento do modelo. Certifica-se de que os sinais de áudio e os textos-alvo estejam em um formato adequado para serem usados como entrada e rótulos durante o treinamento do modelo Wav2Vec2.

In [None]:
def prepare_dataset(batch):
    # check that all files have the correct sampling rate
    assert (
        len(set(batch["sampling_rate"])) == 1
    ), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

In [None]:
_common_voice_train = _common_voice_train.map(prepare_dataset, remove_columns=_common_voice_train.column_names, batch_size=8, num_proc=19, batched=True)
_common_voice_test = _common_voice_test.map(prepare_dataset, remove_columns=_common_voice_test.column_names, batch_size=8, num_proc=19, batched=True)

Essa classe 'DataCollatorCTCWithPadding' é projetada para ser usada como um 'data_collator' ao treinar um modelo Wav2Vec2 para tarefas de CTC.

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

A função load_metric("wer") está sendo usada para carregar a métrica de taxa de erro de palavra (Word Error Rate, WER). Usado para a avaliação do modelo.

In [None]:
wer_metric = load_metric("wer")

Este trecho de código define uma função 'compute_metrics' que é usada para calcular a métrica Word Error Rate (WER) com base nas predições do modelo e nos rótulos de referência.

In [None]:
import random


def compute_metrics(pred):
    
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)
    
    if isinstance(label_str, list):
        if isinstance(pred_str, list) and len(pred_str) == len(label_str):
            for index in random.sample(range(len(label_str)), 3):
                print(f'reference: "{label_str[index]}"')
                print(f'predicted: "{pred_str[index]}"')

        else:
            for index in random.sample(range(len(label_str)), 3):
                print(f'reference: "{label_str[index]}"')
                print(f'predicted: "{pred_str}"')

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


## Apontando as configurações do modelo

In [None]:
model = Wav2Vec2ForCTC.from_pretrained(
    model_name_or_path,
    attention_dropout=0.1,
    activation_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.1,
    final_dropout=0.1,
    layerdrop=0.1,
    gradient_checkpointing=False,
    ctc_loss_reduction="mean",
    ctc_zero_infinity=True,
    bos_token_id=processor.tokenizer.bos_token_id,
    eos_token_id=processor.tokenizer.eos_token_id,
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer.get_vocab())
)

In [None]:
model.freeze_feature_encoder()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir=save_dir,
    group_by_length=True,
    per_device_train_batch_size=4,  
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=10,
    fp16=True,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=10,
    learning_rate=4e-4,
    warmup_steps=500,
    save_total_limit=2,
    dataloader_num_workers=16
)


## Treinamento

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=_common_voice_train,
    eval_dataset=_common_voice_test,
    tokenizer=processor.feature_extractor,
)


In [None]:
print("step1")
train_result = trainer.train()

In [None]:
print("step2")
metrics = train_result.metrics

In [None]:
print("step3")
max_train_samples = len(_common_voice_train)
metrics["train_samples"] = min(max_train_samples, len(_common_voice_train))

print("step4")
trainer.save_model()

print("model created!")

trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()

In [None]:
final_path = "./novo-treinamento/"