In [None]:
from huggingface_hub import notebook_login
import json
from transformers import Wav2Vec2CTCTokenizer

notebook_login()

In [None]:
from datasets import load_dataset, load_metric, Audio

voice_train = load_dataset("mozilla-foundation/common_voice_9_0", "pt", use_auth_token=True,split="train+validation+test")

voice_train=voice_train.train_test_split(train_size=0.9)
voice_test=voice_train["test"]
voice_train=voice_train["train"]

### Listen to audio

In [None]:
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements_Dataset(dataset, num_examples=10):
    '''
        Show 10 random sentences from the dataset.
    '''
    dataset = dataset.remove_columns(["path","audio"])
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)

    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
import IPython.display as ipd
import numpy as np
import random


samp=random.randint(0,voice_train.shape[0])
print(samp)
ipd.Audio(data=voice_train[samp]["audio"]["array"], autoplay=True, rate=voice_train[samp]["audio"]["sampling_rate"])

### Normalize Data

In [None]:
from IPython.display import display, HTML
show_random_elements_Dataset(voice_train,5)

In [None]:
##generate preprocessing
import re

def normalizing_text(batch):
    import unicodedata
    method = 'NFKC' 
    batch["sentence"] = "".join(str(unicodedata.normalize(method,i)) for i in batch["sentence"])
    return batch
    
chars_to_remove = '[\"\“\”\�\!\#\$\%\&\(\)\*\+\,\-\.\/\:\;\<\=\>\u003F\@\^\_\{\|\}]'
def remove_special_characters(batch):
    batch["sentence"] = re.sub(chars_to_remove, '', batch["sentence"]).lower()
    batch["sentence"] = re.sub("”", '', batch["sentence"])
    batch["sentence"] = re.sub("“", '', batch["sentence"])
    batch["sentence"] = re.sub("’","ʼ",batch["sentence"])
    return batch

def extract_all_chars(batch):
  all_text = " ".join(batch["sentence"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
train_data_pr=voice_train.map(remove_special_characters)
test_data_pr=voice_test.map(remove_special_characters)
train_data_pr=train_data_pr.map(normalizing_text)
test_data_pr=test_data_pr.map(normalizing_text)
print("AFTER TEXT PROCESSING")
show_random_elements_Dataset(train_data_pr,10)


In [None]:
vocab_train = train_data_pr.map(extract_all_chars, batched=True, batch_size=-1, remove_columns=train_data_pr.column_names)
vocab_test = test_data_pr.map(extract_all_chars, batched=True, batch_size=-1, remove_columns=test_data_pr.column_names)

vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(sorted(vocab_list))}
print(vocab_dict)

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]
##add unknown and padding tokens
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)


tokenizer = Wav2Vec2CTCTokenizer.from_pretrained("./", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
repo_name = "vkt/first-model"
tokenizer.push_to_hub(repo_name)

### Model Definition & Audio Processing

In [None]:
from transformers import Wav2Vec2FeatureExtractor,Wav2Vec2Processor
 
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
train_data_pr = train_data_pr.cast_column("audio", Audio(sampling_rate=16000))
test_data_pr = test_data_pr.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched"
    batch["input_values"] = processor(audio["array"], sampling_rate=16000).input_values[0]
    batch["input_length"] = len(batch["input_values"])
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["sentence"]).input_ids
    return batch

In [None]:
train_data_pr = train_data_pr.map(prepare_dataset, remove_columns=train_data_pr.column_names)
test_data_pr = test_data_pr.map(prepare_dataset, remove_columns=test_data_pr.column_names)

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

wer_metric = load_metric("wer")
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-xls-r-300m", 
    attention_dropout=0.0,
     hidden_dropout=0.0,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.0,
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
)

model.freeze_feature_extractor()

### Set up trainer

In [None]:
from transformers import TrainingArguments
from transformers import Trainer

training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True,
  per_device_train_batch_size=8,
  gradient_accumulation_steps=2,
  evaluation_strategy="steps",
  num_train_epochs=10,
  gradient_checkpointing=True,
  fp16=True,
  save_steps=500,
  eval_steps=200,
  logging_steps=200,
  learning_rate=2e-4,
  warmup_steps=100,
  save_total_limit=20,
  push_to_hub=True,
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_data_pr,
    eval_dataset=test_data_pr,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()
trainer.push_to_hub()