# 1. Install Dependencies and login Huggingface if you want to push your model

In [None]:

# !pip install torchaudio==0.10.0+cu111 -f https://download.pytorch.org/whl/cu111/torch_stable.html
# !pip install datasets
# !pip install transformers==4.11.3
# !pip install librosa
# !pip install jiwer
# !pip install ipywidgets

In [None]:
# from huggingface_hub import notebook_login

# notebook_login()

In [None]:
from datasets import load_dataset, load_metric, Audio, Dataset
import pandas as pd
import numpy
import torchaudio
import datasets


# 2.1. Load Dataset

In [None]:
data = pd.read_csv("/path/to/file/train.tsv", 
                   delimiter='\t')

In [None]:
from datasets import load_dataset, load_metric, Audio, Dataset

# 2.2. Split dataset into train and test

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(data, test_size = 0.3)

In [None]:
train = pd.DataFrame({
    "path" : train['path'],
    "sentence" : train['sentence'],
})

test = pd.DataFrame({
    "path" : test['path'],
    "sentence" : test['sentence']
})

In [None]:
data_train = Dataset.from_dict(train)
data_test = Dataset.from_dict(test)
DatasetDict = datasets.DatasetDict({"train" : data_train, "test" : data_test})

In [None]:
DatasetDict

In [None]:
data_train

# 2.3. Just to see your data instead of using .head() or .tail()

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(data_train, num_examples=10)
show_random_elements(data_test, num_examples=10)


# We need to remove special characters for token writing
# 2.4. Extract Chars from Labels

In [None]:
# import re
# chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�]'

# def remove_special_characters(batch):
#     batch["sentence"] = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower() + " "
#     return batch

### Change this accordingly ###

import re
chars_to_remove_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\'\，\(\)\[\]\’\–\—\\\]'

def remove_special_characters(batch):
    batch["sentence"] = re.sub(chars_to_remove_regex, '', batch["sentence"]).lower()
    return batch

In [None]:
data_train = data_train.map(remove_special_characters)
data_test = data_test.map(remove_special_characters)

In [None]:
### Function to extract all chars in dataset label

def extract_all_chars(batch):
  all_text = " ".join(batch["sentence"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = data_train.map(extract_all_chars, batched=True, batch_size=-1, 
                             keep_in_memory=True, 
                             remove_columns=data_train.column_names)

vocab_test = data_test.map(extract_all_chars, batched=True, batch_size=-1, 
                           keep_in_memory=True, 
                           remove_columns=data_test.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict

In [None]:
### Just we can see the separator, we change blank space into |

vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
### Unkown and Padding token definition

vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
vocab_dict

In [None]:
### Write the dict into a .json file

import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

# 2.5. Generating Processor

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, 
                                             padding_value=0.0, do_normalize=True, 
                                             return_attention_mask=True)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

# 2.6. Change Audio into Array

In [None]:
import soundfile as sf
import librosa
import numpy
import torchaudio
# import array as arr
def speech_file_to_array_fn(batch):
#     y, sr = librosa.load(batch["path"], sr=16000, dtype='float32')
#     array, sr = sf.read(batch['path'])
    y, sr = torchaudio.load(batch["path"])
    resampler = torchaudio.transforms.Resample(sr,16000)
    array = resampler(y).squeeze().numpy()
    batch["speech"] = array
    batch["sampling_rate"] = int(sr)
    batch["target_text"] = batch["sentence"]
    
    return batch

# Implement the function

In [None]:
data_train = data_train.map(speech_file_to_array_fn, remove_columns=data_train.column_names, num_proc=4)

In [None]:
data_test = data_test.map(speech_file_to_array_fn, remove_columns=data_test.column_names, num_proc=4)

In [None]:
show_random_elements(data_train, num_examples=10)

In [None]:
### Last checking
rand_int = random.randint(0, len(data_train))

print("Target text:", data_train[rand_int]["target_text"])
print("Input array shape:", np.asarray(data_train[rand_int]["speech"]).shape)
print("Sampling rate:", data_train[rand_int]["sampling_rate"])

# 3.1. Preparing Dataset for Training

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

def prepare_dataset(batch):
    # check that all files have the correct sampling rate
    assert (
        len(set(batch["sampling_rate"])) == 1
    ), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

In [None]:
# def prepare_dataset(batch):
#     audio = batch["speech"]

#     # batched output is "un-batched"
#     batch["input_values"] = processor(audio["speech"], sampling_rate=audio["sampling_rate"]).input_values[0]
#     batch["input_length"] = len(batch["input_values"])
    
#     with processor.as_target_processor():
#         batch["labels"] = processor(batch["sentence"]).input_ids
#     return batch

In [None]:
### Implement the Function

data_train_prep = data_train.map(prepare_dataset, remove_columns=data_train.column_names,num_proc=4, batched=True)
data_test_prep = data_test.map(prepare_dataset, remove_columns=data_test.column_names, num_proc=4, batched=True)

# 3.2. Creating Data Collator for Wav2Vec2, so every matrix is padded and have the same length

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch


In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)


# 3.3. Error metrics using Character Error Rate (CER) from Jiwer

In [None]:
from jiwer import cer
import numpy as np
def compute_metrics(pred):
    global cer
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    # wer = wer_metric.compute(predictions=pred_str, references=label_str)
    # cer = fastwer.score(pred_str, label_str, char_level=True)
    # cer = word_error_rate(hypotheses=pred_str, references=label_str, use_cer=True)

    # ref: https://huggingface.co/ctl/wav2vec2-large-xlsr-cantonese/blob/main/cer.py
    preds = [char for seq in pred_str for char in list(seq)]
    refs = [char for seq in label_str for char in list(seq)]
    cer_error = cer(label_str, pred_str)

    return {"cer": cer_error}
    # return {"wer": wer}

In [None]:
tes = 'aaab'
ref = 'aac'
error = cer(ref, tes)
print(error)

# 3.4. Load model from Huggingface, make sure you have unmetered internet connection since this will download the model

In [None]:
### we can customize the model hyperparameters here

from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-large-xlsr-53", 
    attention_dropout=0.0,
    hidden_dropout=0.0,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.1, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer)
)

In [None]:
### To prevent the model uses default pre-trained feature extractor

model.freeze_feature_extractor()

# 4. Training the Model

In [None]:
### Model Repo name, you can custom this to your path

repo_name = "mbrola-noise2"

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True,
  per_device_train_batch_size=32, # Size per batch, reduce this if your pc bottlenecks
  evaluation_strategy="steps",
  num_train_epochs=20, # The number of training iteration, the more epochs the longer it will take to train.
  gradient_checkpointing=True,
  fp16=False, ### If you have Nvidia GPU, you can change this to True for faster processing
  save_steps=500,
  eval_steps=500,
  logging_steps=40,
  learning_rate=1e-4,
  weight_decay=0.001,
  warmup_steps=1000,
  save_total_limit=2
)

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=data_train_prep,
    eval_dataset=data_test_prep,
    tokenizer=processor.feature_extractor,
)

In [None]:
import os
os.environ["WANDB_DISABLED"]="true"

In [None]:
trainer.train()

In [None]:
# tokenizer.save_pretrained