# Fine-tuning SpeechT5 for multilingual TTS
## Settings

In [None]:
dir_to_save_model = r".\results"
batch_size_train = 16
gradient_accumulation_steps = 2
batch_size_eval = 32
eval_steps = 300
save_steps = 900
max_steps =  18000
previous_max_steps = 11700 #for resuming
learning_rate = 6e-5
number_of_torch_threads = 10

## Load the model

We'll start from speecht5_finetuned_multilingual_librispeech_pl that's already been fine-tuned for Polish TTS, and fine-tune it again but for one specific speaker.

In [None]:
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech
import torch
import os
import numpy as np
torch.set_num_threads(number_of_torch_threads)

device = "cuda" if torch.cuda.is_available() else "cpu"

torch.serialization.add_safe_globals([
    np.core.multiarray._reconstruct,
    np.ndarray,
    np.dtype,
    np.dtypes.UInt32DType,
])

processor_path = os.path.join(dir_to_save_model, "outputSave")
checkpoint_path = os.path.join(dir_to_save_model, f"checkpoint-{previous_max_steps}")
default_model = "Sagicc/speecht5_finetuned_multilingual_librispeech_pl"


def processor_files_exist(path):
    expected_files = [
        "preprocessor_config.json",
        "tokenizer_config.json"
    ]
    if not os.path.isdir(path):
        return False
    files = os.listdir(path)
    return all(f in files for f in expected_files)

def model_files_exist(path):
    expected_files = [
        "config.json"
    ]
    if not os.path.isdir(path):
        return False
    files = os.listdir(path)
    return all(f in files for f in expected_files)

# Select processor source
if processor_files_exist(processor_path) and model_files_exist(checkpoint_path):
    print("✅ Załadowano processor z checkpointa.")
    processor = SpeechT5Processor.from_pretrained(processor_path)
    model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint_path).to(device)
    resume_from_checkpoint = True
else:
    print("⚠️ Brak pełnych danych procesora w checkpoint — używam defaultowego.")
    processor = SpeechT5Processor.from_pretrained(default_model)
    model = SpeechT5ForTextToSpeech.from_pretrained(default_model).to(device)
    resume_from_checkpoint = False



## Load the dataset
Loading dataset from CSV file.

In [None]:
from datasets import load_dataset, Audio

from datasets import load_dataset
from datasets import Dataset

data_files= r".\dane_trimmed_noise_reduced_cutted\output.csv"

# WLoading dataset from CSV file
dataset = load_dataset("csv", data_files=data_files, split="train", delimiter="|")
# Change column names to adjust for Datset structure
dataset = dataset.rename_columns({"id": "audio_id", "text": "raw_text", "audio_path": "audio"})
dataset = dataset.map(lambda example: {
"language" : "pl",
"gender" :  "male",
"speaker_id": 1,
"is_gold_transcript": True,
"accent": None,
"normalized_text" : example["raw_text"]
})


It's important to set the sampling rate of the audio data to 16 kHz, which is what SpeechT5 expects.

In [None]:
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

Let's quickly check how many examples are in this dataset.

In [None]:
len(dataset)

## Clean up the text

The dataset contains Polish characters, but the model may not contain them. We need to make sure they are included for the tokenizer.


First we'll need to find out what the unsupported tokens are. The `SpeechT5Tokenizer` works with characters as tokens, so let's extract all distinct characters of the dataset. We write a mapping function `extract_all_chars` that concatenates the transcriptions from all examples into one long transcription and then transforms the string into a set of characters. It is important to pass the arguments `batched=True` and `batch_size=-1` to `dataset.map(...)` so that the mapping function has access to all transcriptions at once.

In [None]:
tokenizer = processor.tokenizer
# Add Polish characters if they are not there yet.
polish_chars = list("ąćęłńóśźżĄĆĘŁŃÓŚŹŻ")
added = tokenizer.add_tokens(polish_chars)
print(f"Dodano {added} tokenów.")
model.resize_token_embeddings(len(tokenizer))

In [None]:
print(dataset)

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["normalized_text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k,_ in tokenizer.get_vocab().items()}

Now we have two sets of characters, one with the vocabulary from the dataset and one with the vocabulary from the tokenizer. By taking the difference between these sets, we find the characters that are in the dataset but not in the tokenizer.

In [None]:
dataset_vocab - tokenizer_vocab

You may want to try to replace Polish characters with neutral ones. I left them to make sure they are distinguished.

In [None]:
polish_replacements = [
    ('ą', 'a'),
    ('ć', 'c'),
    ('ę', 'e'),
    ('ł', 'l'),
    ('ń', 'n'),
    ('ó', 'o'),
    ('ś', 's'),
    ('ź', 'z'),
    ('ż', 'z'),
]

def cleanup_polish_text(inputs):
    for src, dst in polish_replacements:
        inputs["normalized_text"] = inputs["normalized_text"].replace(src, dst)
    return inputs

#dataset = dataset.map(cleanup_polish_text)

## Speakers

VoxPopuli is a multi-speaker dataset. We know there are 20968 examples in the dataset, but how many speakers are there and how many examples do they have?

In [None]:
from collections import defaultdict
speaker_counts = defaultdict(int)

for speaker_id in dataset["speaker_id"]:
    speaker_counts[speaker_id] += 1

By plotting a histogram we can get a sense of how much data there is for each speaker.

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.hist(speaker_counts.values(), bins=20)
plt.ylabel("Speakers")
plt.xlabel("Examples")
#plt.show()

Around one-third of speakers has fewer than 100 examples. Approximately ten or so speakers have more than 500 examples. To speed up training, let's limit ourselves to speakers that have between 100 and 400 examples. This also makes the dataset more balanced.

We have only one speaker

In [None]:
def select_speaker(speaker_id):
    return 100 <= speaker_counts[speaker_id] <= 400

#we have only one speaker
#dataset = dataset.filter(select_speaker, input_columns=["speaker_id"])

How many speakers remain?

In [None]:
len(set(dataset["speaker_id"]))

## Speaker embeddings

To allow the TTS model to distinguish between multiple speakers, we'll need to create a speaker embedding for each example. The speaker embedding is simply an additional input into the model that captures a particular speaker's voice characteristics.

To create the speaker embeddings, we use the [spkrec-xvect-voxceleb](https://huggingface.co/speechbrain/spkrec-xvect-voxceleb) model from SpeechBrain. The following defines a function `create_speaker_embedding()` that takes an input audio waveform and outputs a 512-element vector containing the corresponding speaker embedding.

In [None]:
import os
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name)
)

def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform, device=device))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

Note: the `speechbrain/spkrec-xvect-voxceleb` model was trained on English speech from the VoxCeleb dataset, while our training examples are in Polish. We're assuming here that this model will still make reasonable speaker embeddings, but this may not be true. First training an X-vector model on the target speech, such as Polish, might work better.

## Preparing the dataset

The `prepare_dataset` function takes in a single example and uses the `SpeechT5Processor` object to tokenize the input text and load the target audio into a log-mel spectrogram. It also adds the speaker embeddings as an additional input.

In [None]:
def prepare_dataset(example):
    audio = example["audio"]
    waveform = audio["array"]
    sr = audio["sampling_rate"]

    # Resample do 16 kHz jeśli potrzeba
    if sr != 16000:
        import torchaudio
        waveform = torchaudio.functional.resample(torch.tensor(waveform), sr, 16000).numpy()
        sr = 16000

    # Stereo do mono (średnia kanałów)
    if len(waveform.shape) > 1 and waveform.shape[0] > 1:
        waveform = waveform.mean(axis=0)

    example = processor(
        text=example["normalized_text"],
        audio_target=waveform,
        sampling_rate=sr,
        return_attention_mask=False,
    )

    example["labels"] = example["labels"][0]
    example["speaker_embeddings"] = create_speaker_embedding(waveform)

    return example

In [None]:
print(dataset)

Let's verify the processing is correct by looking at a single example:

In [None]:
processed_example = prepare_dataset(dataset[0])

This should give us:

- the tokens for the input text in `input_ids`
- the speaker embedding in `speaker_embeddings`
- the target spectrogram in `labels`

In [None]:
list(processed_example.keys())

The tokens should decode into the original text, with `</s>` to mark the end of the sentence.

In [None]:
tokenizer.decode(processed_example["input_ids"])

Speaker embeddings should be a 512-element vector:

In [None]:
processed_example["speaker_embeddings"].shape

If we run a vocoder on the log-mel spectrogram, it should produce the original audio again. We'll load the HiFi-GAN vocoder from the original [SpeechT5 checkpoint](https://hf.co/microsoft/speecht5_hifigan).

In [None]:
from transformers import SpeechT5HifiGan
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan").to(device)

In [None]:
spectrogram = torch.tensor(processed_example["labels"], device=device)
with torch.no_grad():
    speech = vocoder(spectrogram)

In [None]:
from IPython.display import Audio
Audio(speech.cpu().numpy(), rate=16000)

That all looks and sounds good! We can now process the entire dataset. This will take between 5 and 10 minutes.

In [None]:
dataset = dataset.map(prepare_dataset)

Some of the examples in the dataset are apparently longer than the maximum input length the model can handle (600 tokens), so we should remove those from the dataset. In fact, to allow for larger batch sizes we'll remove anything over 200 tokens.

There are also removed audios longer than 12s and shorter than 1s.

In [None]:

def is_short_audio(example):
    array = example["audio"]["array"]
    sr = example["audio"]["sampling_rate"]
    duration = example["audio"].get("duration")

    # Jeśli duration nie istnieje — oblicz go ręcznie
    if duration is None:
        duration = len(array) / sr

    return 1.0 < duration < 12.0  # np. min 1 max 12s
    
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < 200

dataset = dataset.filter(is_short_audio)
dataset = dataset.filter(is_not_too_long, input_columns=["input_ids"])


How many examples are left?

In [None]:
len(dataset)

## Train/test split

Create a basic train/test split. For our purposes, it's OK if the same speaker is part of both sets.

Augmentation may help to obtain better results, but in my case, training results seem to be just noisier.

In [None]:
import numpy as np

def augment_audio(element):
    audio = element["audio"]
    waveform = audio["array"]
    noise_amp = 0.005 * np.random.uniform() * np.amax(waveform)
    waveform = waveform + noise_amp * np.random.normal(size=waveform.shape)
    waveform = waveform / (np.max(np.abs(waveform)) + 1e-9)
    waveform = np.clip(waveform, -1.0, 1.0)
    audio["array"] = waveform
    element["audio"] = audio
    return element

In [None]:
dataset = dataset.train_test_split(test_size=0.1)
#dataset["train"] = dataset["train"].map(augment_audio)

What does the dataset look like now?

In [None]:
dataset

## Collator to make batches

We need to define a custom collator to combine multiple examples into a batch. This will pad shorter sequences with padding tokens. For the spectrogram labels, the padded portions are replaced with the special value -100. This special value tells the model to ignore that part of the spectrogram when calculating the spectrogram loss.


In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:

        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids,
            labels=label_features,
            padding=True,#
            return_tensors="pt",
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor([
                len(feature["input_values"]) for feature in label_features
            ], device=device)
            target_lengths = target_lengths.new([
                length - length % model.config.reduction_factor for length in target_lengths
            ])
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In SpeechT5, the input to the decoder part of the model is reduced by a factor 2. In other words, it throws away every other timestep from the target sequence. The decoder then predicts a sequence that is twice as long. Since the original target sequence length may be odd, the data collator makes sure to round the maximum length of the batch down to be a multiple of 2.

In [None]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

Let's test the data collator.

In [None]:
features = [
    dataset["train"][0],
    dataset["train"][1],
    dataset["train"][20],
]

batch = data_collator(features)

In [None]:
{k:v.shape for k,v in batch.items()}

Looks good!

## Training

The `use_cache=True` option is incompatible with gradient checkpointing. Disable it for training, otherwise it keeps complaining about it.

In [None]:
model.config.use_cache = False

Define the training arguments.

We won't be computing any evaluation metrics during the training process. Instead, we'll only look at the loss. The lower the loss, the better the model.

Note: If you do not want to upload the model checkpoints to the Hub, set `push_to_hub=False`.

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir=dir_to_save_model,
    per_device_train_batch_size=batch_size_train,
    gradient_accumulation_steps=gradient_accumulation_steps,
    learning_rate=learning_rate,
    warmup_steps=2000,
    weight_decay = 0.01,
    max_steps=max_steps,
    gradient_checkpointing=False,#True,
    fp16=True,#False,
    evaluation_strategy="steps",
    lr_scheduler_type="cosine_with_restarts",
    per_device_eval_batch_size=batch_size_eval,
    save_steps=save_steps,
    eval_steps=eval_steps,
    logging_steps=100,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=False
)

Create the trainer object using the model, dataset, and data collator.

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor.tokenizer,
)

And with that, we're ready to start training!

Training will take several hours depending on your GPU or the one allocated to the Google Colab. Depending on your GPU, it is possible that you will encounter a CUDA "out-of-memory" error when you start training. In this case, you can reduce the `per_device_train_batch_size` incrementally by factors of 2 and increase `gradient_accumulation_steps` by 2x to compensate.

In [None]:
trainer.train(resume_from_checkpoint=resume_from_checkpoint)

## Evaluate

After training finishes, let's use the model to synthesize some speech!

I'm loading the model from the Hugging Face Hub, as the Colab notebook was terminated before training finished (which is why it's a good idea to use `push_to_hub=True` when training).

In [None]:
try:
    model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint_path).to(device)
except Exception as e:
    print(f"Error: {e}")

First we need to obtain a speaker embedding. We can simply grab one from the test set.

In [None]:
example = dataset["test"][304]
speaker_embeddings = torch.tensor(example["speaker_embeddings"], device=device).unsqueeze(0)
speaker_embeddings.shape

Define some input text and tokenize it.

In [None]:
text = "witajcie"

In [None]:
tokenizer.decode(tokenizer(text)["input_ids"])

In [None]:
inputs = processor(text=text, return_tensors="pt").to(device)

Tell the model to generate a spectrogram from the input text.

In [None]:
spectrogram = model.generate_speech(inputs["input_ids"], speaker_embeddings)

Finally, use the vocoder to turn the spectrogram into sound.

In [None]:
with torch.no_grad():
    speech = vocoder(spectrogram)

In [None]:
import transformers
print(transformers.__version__)

In [None]:
from IPython.display import Audio
Audio(speech.cpu().numpy(), rate=16000)

In [None]:
import soundfile as sf
sf.write("output.wav", speech.cpu().numpy(), samplerate=16000)

## Save

In [None]:
dir_to_save_model = os.path.join(dir_to_save_model, "outputSave")
trainer.save_model(dir_to_save_model) 
processor.save_pretrained(dir_to_save_model)