In [None]:
%%capture

%pip install --upgrade pip 
%pip install datasets[audio]
%pip install evaluate
%pip install transformers
%pip install jiwer
%pip install accelerate
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [None]:
from huggingface_hub import login
login()

In [None]:
from datasets import load_dataset

shn_dataset_train = load_dataset("norhsangpha/shn-asr-datasets", split="train", token=True)
shn_dataset_test = load_dataset("norhsangpha/shn-asr-datasets", split="test", token=True)


In [None]:
shn_dataset_train

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)

    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

show_random_elements(shn_dataset_train.remove_columns(["audio"]), num_examples=10)

In [None]:
import re
chars_to_remove_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\'\။\၊\…]'

def remove_special_characters(batch):
    batch["transcription"] = re.sub(chars_to_remove_regex, '', batch["transcription"]).lower()
    return batch

shn_dataset_train = shn_dataset_train.map(remove_special_characters)
shn_dataset_test = shn_dataset_test.map(remove_special_characters)

In [None]:
show_random_elements(shn_dataset_train.remove_columns(["audio"]))

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["transcription"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

vocab_train = shn_dataset_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=shn_dataset_train.column_names)
vocab_test = shn_dataset_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=shn_dataset_test.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(sorted(vocab_list))}

vocab_dict

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)


In [None]:
target_lang = "shn"
# new_vocab_dict = {target_lang: vocab_dict}

In [None]:
from transformers import Wav2Vec2CTCTokenizer

mms_adapter_repo = "facebook/mms-1b-all"

tokenizer = Wav2Vec2CTCTokenizer.from_pretrained(mms_adapter_repo)
new_vocab = tokenizer.vocab

new_vocab[target_lang] = vocab_dict

In [None]:
import json
with open('vocab.json', 'w', encoding='utf-8') as vocab_file:
    json.dump(vocab_dict, vocab_file, ensure_ascii=False)


In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer.from_pretrained("./", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
repo_name = "wav2vec2-large-mms-1b-shan"
tokenizer.push_to_hub(repo_name)


In [None]:
tokenizer.save_pretrained("./models/wav2vec2-large-mms-1b-shan")

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)


In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)


In [None]:
shn_dataset_train[0]["audio"]

In [None]:
from datasets import Audio

shn_dataset_train = shn_dataset_train.cast_column("audio", Audio(sampling_rate=16_000))
shn_dataset_test = shn_dataset_test.cast_column("audio", Audio(sampling_rate=16_000))


In [None]:
shn_dataset_train[0]["audio"]

In [None]:
rand_int = random.randint(0, len(shn_dataset_train)-1)

print("Target text:", shn_dataset_train[rand_int]["transcription"])
print("Input array shape:", shn_dataset_train[rand_int]["audio"]["array"].shape)
print("Sampling rate:", shn_dataset_train[rand_int]["audio"]["sampling_rate"])


## Dataset Prepair

In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched"
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    batch["input_length"] = len(batch["input_values"])

    batch["labels"] = processor(text=batch["transcription"]).input_ids
    return batch


In [None]:
shn_dataset_train = shn_dataset_train.map(prepare_dataset, remove_columns=shn_dataset_train.column_names)
shn_dataset_test = shn_dataset_test.map(prepare_dataset, remove_columns=shn_dataset_test.column_names)

## Training

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )

        labels_batch = self.processor.pad(
            labels=label_features,
            padding=self.padding,
            return_tensors="pt",
        )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

#### Evaluate

In [None]:
from evaluate import load

wer_metric = load("wer")

In [None]:
import numpy as np

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


In [None]:
from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/mms-1b-all",
    attention_dropout=0.0,
    hidden_dropout=0.0,
    feat_proj_dropout=0.0,
    layerdrop=0.0,
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
    ignore_mismatched_sizes=True,
)


In [None]:
model.init_adapter_layers()
model.freeze_base_model()

adapter_weights = model._get_adapters()
for param in adapter_weights.values():
    param.requires_grad = True


In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True,
  per_device_train_batch_size=16,
  evaluation_strategy="steps",
  num_train_epochs=10,
  gradient_checkpointing=True,
  fp16=True,
  save_steps=200,
  eval_steps=100,
  logging_steps=5,
  learning_rate=1e-3,
  warmup_steps=100,
  save_total_limit=2,
  push_to_hub=False,
)

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=shn_dataset_train,
    eval_dataset=shn_dataset_test,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
from safetensors.torch import save_file as safe_save_file
from transformers.models.wav2vec2.modeling_wav2vec2 import WAV2VEC2_ADAPTER_SAFE_FILE
import os

adapter_file = WAV2VEC2_ADAPTER_SAFE_FILE.format(target_lang)
adapter_file = os.path.join(training_args.output_dir, adapter_file)

# safe_save_file(model._get_adapters(), adapter_file, metadata={"format": "pt"})


In [None]:
trainer.push_to_hub()

# Inference

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor

model_id = "./wav2vec2-large-mms-1b-shan"

model = Wav2Vec2ForCTC.from_pretrained(model_id, target_lang="shn", ignore_mismatched_sizes=True).to("cuda")
processor = Wav2Vec2Processor.from_pretrained(model_id)

# processor.tokenizer.set_target_lang("shn")


In [None]:
model_id_2 = "facebook/mms-1b-all"

model_2 = Wav2Vec2ForCTC.from_pretrained(model_id_2, target_lang="shn", ignore_mismatched_sizes=True).to("cuda")
processor_2 = Wav2Vec2Processor.from_pretrained(model_id_2)

processor_2.tokenizer.set_target_lang("shn")

In [None]:
from datasets import Audio, load_dataset

test_data = load_dataset("NorHsangPha/shn-asr-datasets", split="test", token=True)
test_data = test_data.cast_column("audio", Audio(sampling_rate=16_000))


In [None]:
test_data[1]["audio"]

In [None]:
import torch

input_dict = processor(test_data[1]["audio"]["array"], sampling_rate=16_000, return_tensors="pt", padding=True)
logits = model(input_dict.input_values.to("cuda")).logits

pred_ids = torch.argmax(logits, dim=-1)[0]

print("Prediction:")
print(processor.decode(pred_ids))

print("\nReference:")
print(test_data[1]["transcription"].lower())

In [None]:
model.push_to_hub("haohaa/wav2vec2-large-mms-1b-shan")

In [None]:
model.load_adapter("shn")

In [None]:
input_dict = processor_2(test_data[1]["audio"]["array"], sampling_rate=16_000, return_tensors="pt", padding=True)
logits = model_2(input_dict.input_values.to("cuda")).logits

pred_ids = torch.argmax(logits, dim=-1)[0]

print("Prediction:")
print(processor.decode(pred_ids))

print("\nReference:")
print(test_data[1]["transcription"].lower())

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer.from_pretrained(model_id)

token = tokenizer("ၾႃႉၾူၼ်ၵမ်ႇလမ်သႃး")

tokenizer.decode(token["input_ids"])