In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
from datasets import load_dataset

ds = load_dataset("UmarRamzan/common-voice-urdu-processed")
train_data = ds['train']
test_data = ds['test']

In [None]:
train_data = train_data.remove_columns(["path", "variant"])
train_data

In [None]:
from datasets import Audio
train_data
train_data = train_data.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
train_data

In [None]:
from transformers import SpeechT5Processor

checkpoint = "microsoft/speecht5_tts"
processor = SpeechT5Processor.from_pretrained(checkpoint)


In [None]:
tokenizer = processor.tokenizer

In [None]:
train_data[2: 5]

In [None]:
train_data[0: 1]

In [None]:
def get_urdu_tokens():
    tokens = [char for char in "ابپتٹثجچحخدڈذرڑزژسشصضطظعغفقکلمنوہھیےأؤئگںۂۃۓ"]
    return tokens


tokenizer.add_tokens(get_urdu_tokens())

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["sentence"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = train_data.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=train_data.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
print(tokenizer_vocab)

In [None]:
import re 

def normalize(text):

    text = re.sub(r'[^\w\s\']', '', text)

    # Remove extra whitespace
    text = ' '.join(text.split())

    urdu = {
        'ا':['إ','أ','ئ','ؤ'],
        'ق':['غ'], 'ز':['ظ','ض','ذ'],
        'س':['ص','ث'], 'ت':['ط','ة'],
        'ک':['ك'], '':['ّ', 'ً'], ',':'،'}

    for key in urdu:
        for char in urdu[key]:
            text = text.replace(char, key)
    return text

In [None]:
def add_normalized_text(example):
    example['normalized_text'] = normalize(example['sentence'])
    return example


train_data = train_data.map(add_normalized_text)

In [None]:
for i in train_data[0: 1]:
    print(len(train_data[0: 1][i][0]))

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["normalized_text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = train_data.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=train_data.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
dataset_vocab - tokenizer_vocab

In [None]:
for feature in train_data.features:
    print(feature)
    print(train_data[0][feature])

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
# device='cpu'
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [None]:
def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )

    # strip off the batch dimension
    example["labels"] = example["labels"][0]

    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"] = create_speaker_embedding(audio["array"])

    return example

In [None]:
processed_example = prepare_dataset(train_data[8])
list(processed_example.keys())

In [None]:
processed_example["speaker_embeddings"].shape
len(processed_example["input_ids"])

In [None]:
train_data = train_data.map(prepare_dataset, remove_columns=train_data.column_names)

In [None]:
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < 200

train_data = train_data.filter(is_not_too_long, input_columns=["input_ids"])
len(train_data)

In [None]:
train_data = train_data.train_test_split(test_size=0.1)

In [None]:
train_data

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In [None]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

In [None]:
from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)
model = model.to("cuda")

In [None]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False
model.resize_token_embeddings(len(tokenizer))

# set language and task for generation and re-enable cache
model.generate = partial(model.generate, use_cache=True)

In [None]:
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments


training_args = Seq2SeqTrainingArguments(
    output_dir="final_urdu_t5_finetuned", 
    per_device_train_batch_size=64,
    gradient_accumulation_steps=4,
    learning_rate=2e-4,
    warmup_steps=100,
    max_steps=10000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    save_steps=10000,
    eval_steps=500,
    logging_steps=25,
    optim="adamw_bnb_8bit",
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=True,
)

In [None]:
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_data['train'],
    eval_dataset=train_data['test'],
    data_collator=data_collator,
    tokenizer=processor,
)

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()