# Fine Tune Wav2Vec2
- Compare a fine tuned Wav2Vec2 with TIMIT Phoneme transcriptions to XLRS
- Following guide from Patrick von Platen: https://huggingface.co/blog/fine-tune-wav2vec2-english

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
# %%capture
!pip install numpy==1.23.4
!pip install datasets==2.15
!pip install transformers==4.17.0
!pip install jiwer
!pip install evaluate

In [None]:
%%capture
!apt install git-lfs

In [None]:
from datasets import load_dataset, load_metric

timit = load_dataset("kylelovesllms/timit_asr_ipa")

In [None]:
# Should print out high level structure of TIMIT
timit

In [None]:
print(timit["train"]["phonetic_detail"])

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer("./vocab_manual.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
TOKENIZER_NAME = "Wav2Vec2IpaTokenizer"
REPO_NAME = "Wav2Vec2IpaFullTIMIT_L2L"
HF_ID = "kylelovesllms"

In [None]:
EPOCHS = 100
SAVE_LOCAL_NAME = f"final_wav2vec2ipa_epochs_{10}"

In [None]:
tokenizer.save_pretrained(TOKENIZER_NAME)
from huggingface_hub import HfApi

api = HfApi()
repo_name = REPO_NAME
api.create_repo(repo_id=repo_name, exist_ok=True)
api.upload_folder(
    folder_path=TOKENIZER_NAME,  # Path to your saved tokenizer folder
    repo_id=f"{HF_ID}/{REPO_NAME}",  # Replace with your username and repo name
    commit_message="Upload tokenizer"
)

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(f"{HF_ID}/{REPO_NAME}")

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
import editdistance
# Custom normalized CER
def get_edit_distance(trans1: str, trans2: str) -> int:
    return editdistance.eval(trans1, trans2)


def get_phoneme_error_rate(*, ground_truth: str, prediction: str) -> float:
    """
    Return Normalized CER
        Normalized CER = edit distance / max(len(ground_truth, len(prediction)))

    Motivating example: https://towardsdatascience.com/evaluating-ocr-output-quality-with-character-error-rate-cer-and-word-error-rate-wer-853175297510#5aec
        - Standard CER: Truth = ABC, Prediction = ABC12345, Edit = 5 / 3
        -  Normalized CER: Truth = ABC, Prediction = ABC12345, Edit = 5 / 8
        -  Normalized CER: Truth = "", Prediction = ABC12345, Edit = 8 / 8 = 100%
        -  Normalized CER: Truth = ABC12345, Prediction = "", Edit = 8 / 8 = 100%
            Nice property is that it's symmetric
    """
    return get_edit_distance(ground_truth, prediction) / max(len(ground_truth), len(prediction))

In [None]:
import numpy as np
from evaluate import load
cer_metric = load("cer")
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)
    # TODO ensure we have the correct pred.label_ids as a preprocessing step

    # cer = get_phoneme_error_rate(prediction=pred_str, ground_truth=label_str)
    cer = cer_metric.compute(predictions=pred_str, references=label_str)

    return {"cer": cer}

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Config

config = Wav2Vec2Config.from_pretrained("facebook/wav2vec2-base")
config.vocab_size = processor.tokenizer.vocab_size
config.ctc_loss_reduction="mean"
config.pad_token_id = processor.tokenizer.pad_token_id

config_save_path = "ipa_config"
config.save_pretrained(config_save_path)
print("Config Vocab Size", config.vocab_size)

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base",
    # ctc_loss_reduction="mean",
    config=config_save_path,
    # pad_token_id=processor.tokenizer.pad_token_id,
)

In [None]:
# CNN/feature encoder is already trained well enough
model.freeze_feature_encoder()

## Prep Dataset
- We have a version of TIMIT which uses IPA rather than Word level transcriptions

In [None]:
def prepare_dataset(batch):
    # Each batch corresponds with a specific entry (either in train or test)
    audio = batch["audio"]

    # batched output is "un-batched" to ensure mapping is correct

    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]

    batch["input_length"] = len(batch["input_values"])

    with processor.as_target_processor():
        # timit_phonetic_detail = batch["phonetic_detail"]["utterance"]
        # timit_ipa_detail = "".join(getTimitToIPA(timit_phonetic_detail))
        # batch["labels"] = processor(batch["text"]).input_ids
        timit_ipa_labels_arr = batch["ipa_transcription"]
        ipa_transcription = "".join(timit_ipa_labels_arr)
        batch["labels"] = processor(ipa_transcription).input_ids
    return batch


In [None]:
timit = timit.map(prepare_dataset, num_proc=4)

In [None]:
phonetic_timit_train = timit["train"]

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir=REPO_NAME,
    group_by_length=True,
    per_device_train_batch_size=8,

    load_best_model_at_end=True,
    evaluation_strategy="epoch",
    num_train_epochs=EPOCHS,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    save_strategy="epoch",

    fp16=True,
    gradient_checkpointing=True,
    logging_steps=500,
    learning_rate=1e-4,
    weight_decay=0.005,
    warmup_steps=1000,
    save_total_limit=3,
    report_to="wandb",
)

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=timit["train"],
    eval_dataset=timit["validation"],
    tokenizer=processor.feature_extractor,
)

In [None]:
print(model.config.vocab_size)
print(processor.tokenizer.vocab_size)
print(timit)
print(timit["train"])

In [None]:
trainer.train()

In [None]:
# Upload finished model
model.save_pretrained(SAVE_LOCAL_NAME)

api.upload_folder(
    folder_path=SAVE_LOCAL_NAME,
    repo_id=f"{HF_ID}/{REPO_NAME}",  # Replace with your username and repo name
    commit_message=f"Final Training with {EPOCHS} epochs"
)