<a href="https://colab.research.google.com/github/CharlesR-W/HFCourses/blob/main/SpeechT5_TTS_finetune_HFAHO4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers datasets soundfile speechbrain accelerate
from huggingface_hub import notebook_login

notebook_login()

In [None]:
from datasets import load_dataset, Audio

dataset = load_dataset("thiagolira/LatinYoutube")
len(dataset)
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
dataset = dataset["train"]

In [None]:
from transformers import SpeechT5Processor

checkpoint = "microsoft/speecht5_tts"
processor = SpeechT5Processor.from_pretrained(checkpoint)
tokenizer = processor.tokenizer

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
invalid_chars = dataset_vocab - tokenizer_vocab
#print(f"Invalid characters: {invalid_chars}")
for c in invalid_chars:
  print(f"(\"{c}\",\"\" ),")

In [None]:
replacements = [
    ("‘","" ),
    ("Ō","OO" ),
    ("\\","" ),
    ("ú","u" ),
    ("ȳ","yy" ),
    ("3","tres" ),
    ("ó","o" ),
    ("*","" ),
    ("8","octo" ),
    ("Ī","II" ),
    ("7","septem" ),
    ("ō","oo" ),
    ("0","nullus" ),
    ("ἑ","he" ),
    ("ι","i" ),
    ("5","quinque" ),
    ("”","" ),
    ("­","" ),
    ("’","" ),
    ("–","" ),
    ("ū","uu" ),
    ("4","quattor" ),
    ("σ","s" ),
    ("","" ),
    ("Ū","UU" ),
    ("ὸ","" ),
    ("Ē","EE" ),
    ("1","unus" ),
    ("°","" ),
    ("ὁ","oo" ),
    ("ī","ii" ),
    ("ρ","r" ),
    ("ß","ss" ),
    ("Ā","AA" ),
    (" ","" ),
    ("|","" ),
    ("ς","s" ),
    ("ό","o" ),
    ("í","i" ),
    ("τ","t" ),
    ("ā","aa" ),
    ("9","novem" ),
    ("„","" ),
    ("ē","ee" ),
    ("ά","a" ),
    ("á","a" ),
    ("κ","k" ),
    ("α","a" ),
    ("6","sex" ),
    ("2","duus" ),
    ("ο","o" ),
    ("“","" ),
    ("😄","" ),
    ("ä","a" ),
    ("ν","n" ),
    ("…","" ),
    ]


def cleanup_text(inputs):
    for src, dst in replacements:
        inputs["text"] = inputs["text"].replace(src, dst)
    return inputs


dataset = dataset.map(cleanup_text)

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [None]:
def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        text=example["text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )

    # strip off the batch dimension
    example["labels"] = example["labels"][0]

    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"] = create_speaker_embedding(audio["array"])

    return example



In [None]:
processed_example = prepare_dataset(dataset[0])
list(processed_example.keys())
processed_example["speaker_embeddings"].shape
import matplotlib.pyplot as plt

plt.figure()
plt.imshow(processed_example["labels"].T)
plt.show()

In [None]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names)
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < 200


dataset = dataset.filter(is_not_too_long, input_columns=["input_ids"])


In [None]:
len(dataset)

In [None]:
dataset = dataset.train_test_split(test_size=0.1)

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch
data_collator = TTSDataCollatorWithPadding(processor=processor)

from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(model.generate, use_cache=True)

from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="speecht5_finetuned_latinvoice",  # change to a repo name of your choice
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=1e-5,
    warmup_steps=500,
    max_steps=4000,
    gradient_checkpointing=True,
    fp16=True,
    eval_strategy="steps",
    per_device_eval_batch_size=2,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=True,
)

from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor,
)

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()

In [None]:
model = SpeechT5ForTextToSpeech.from_pretrained(
    "CarolusRenniusVitellius/speecht5_finetuned_latinvoice"
)

In [None]:
example = dataset["test"][0]
speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0)
text = "Omnis Galliae in partes tres divisa sunt."
inputs = processor(text=text, return_tensors="pt")
from transformers import SpeechT5HifiGan

vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)
from IPython.display import Audio

Audio(speech.numpy(), rate=16000)