<a href="https://www.kaggle.com/code/aisuko/text-to-speech?scriptVersionId=164955787" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Overview

Text-to-speech (TTS) is the task of creating natural-sounding speech from text, where the speech can be generated in multiple languages and for multiple speakers. Several text-to-speech models are currently avaliable in Transformers, such as Bark, MMS, VITS and SpeechT5. We can easily generate audio using the `text-to-audio` pipeline (or its alias `text-to-speech`). Som models, like Bark, can also be conditioned to generate non-verbal communications such as laughing, and crying, or even add music.

In [None]:
%%capture
!pip install transformers==4.35.2
!pip install datasets==2.15.0
!pip install soundfile==0.12.1
!pip install speechbrain==0.5.16

In [None]:
from transformers import pipeline

# example of pipelien text-to-speech
pipe=pipeline("text-to-speech", model="suno/bark-small", device='cuda')
text="[clears throat] This is a test ...  and I just took a long pause."
output=pipe(text)

In [None]:
from IPython.display import Audio

Audio(output["audio"], rate=output["sampling_rate"])

We are goging to fine-tune a TTS model, we can currently fine-tune SpeechT5. It is pre-trained on a combination of `speech-to-text` and `text-to-speech` data, allowing it to learn a unified space of hidden representations shared by both text and speech. This means that the same pre-trained model can be fine-tuned for different tasks. Furthermore, SpeechT5 supports multiple speakers through x-vector speaker embeddings.

In [None]:
import os
from huggingface_hub import login
from kaggle_secrets import UserSecretsClient

user_secrets = UserSecretsClient()

login(token=user_secrets.get_secret("HUGGINGFACE_TOKEN"))

os.environ["WANDB_API_KEY"]=user_secrets.get_secret("WANDB_API_KEY")
os.environ["WANDB_PROJECT"] = "Fine-tuning speech-t5"
os.environ["WANDB_NOTES"] = "Fine tune model distilbert base uncased"
os.environ["WANDB_NAME"] = "ft-speech-t5-on-voxpopuli"

# Loading the dataset

Here we are going to use VoxPopuli dataset. It is a large-scale multilingual speech corpus consisting of data sourced from 2009-2020 European Parliament event recordings. It contains labelled audio-transcription data for 15 European languages.

**Note that VoxPopuli or any other automated speech recognition(ASR) dataset may not be the most suitable option for training TTS models. The features that make it beneficial for ASR, such as excessive background noise, are typically undesirable in TTS. However, finding top-quality, multilingual, and multi-speaker TTS datasets can be quite challenging.**

In [None]:
from datasets import load_dataset, Audio

dataset=load_dataset("facebook/voxpopuli","nl", split="train")
dataset

SpeechT5 expects audio data to have a sampling rate of 16 kHz, so make sure the examples in the dataset meet this requirement.

In [None]:
dataset=dataset.cast_column("audio", Audio(sampling_rate=16000))

# Preprocess the data
Let's begin by defining the model checkpoint to use and loading the appropriate processor:

In [None]:
from transformers import SpeechT5Processor

checkpoint="microsoft/speecht5_tts"
processor=SpeechT5Processor.from_pretrained(checkpoint)

# Text cleanup for SpeechT5 tokenization

The dataset examples contain `raw_text` and `normalized_text` features. When deciding which feature to use as the text input, consider that the SpeechT5 tokenizer doesn't have any tokens for numbers. In `normalized_text` the numbers are written out as text. Thus, it is a better fit, and we recommend using `normalized_text` as input text.

Because SpeechT5 was trained on the English language, it may not recognize certrain characters in the Dutch dataset. If left as is, these characters will be converted to `<unk>` tokens. However, in Dutch, certain characters like `à` are used to stress syllables. In order to preserve the meaning of the text, we can replace this character with a regular `a`.

To identify `unsupported` tokens, extract all unique characters in the dataset using the SpeechT5 Tokenizer which works with chatacters as tokens. To do this, write the `extract_all_chars` mapping function that concatenates the transcriptions from all examples into one string and converts it to a set of characters. Make sure to set batched=True and batch_size=-1 in dataset.map() so that all transcriptions are avaliable at once for the mapping function.

In [None]:
def extract_all_chars(batch):
    all_text=" ".join(batch["normalized_text"])
    vocab=list(set(all_text))
    return {"vocab": [vocab], "all_text":[all_text]}

vocabs=dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)


tokenizer=processor.tokenizer

dataset_vocab=set(vocabs["vocab"][0])
tokenizer_vocab={k for k, _ in tokenizer.get_vocab().items()}

# Deal with Special Characters

Now, we have two sets of characters: one with the vocabulary from the dataset and one with the vocabulary from the tokenizer. To identify any unsupported characters in the dataset, we can take the difference between these two sets. The resulting set will contain the characters that are in the dataset but not in the tokenizer.

To handle the unsupported characters identified in the previous step, define a function that maps these characters to valid tokens. Note that spaces are already replaced by _ in the tokenizer and do not need to be handled separately.

In [None]:
print(dataset_vocab-tokenizer_vocab)


replacements=[
    ("à", "a"),
    ("ç", "c"),
    ("è", "e"),
    ("ë", "e"),
    ("í", "i"),
    ("ï", "i"),
    ("ö", "o"),
    ("ü", "u"),
]


def cleanup_text(inputs):
    for src, dst in replacements:
        inputs["normalized_text"]=inputs["normalized_text"].replace(src, dst)
    return inputs

dataset=dataset.map(cleanup_text)

Now that we have deal with special characters in the text, it's time to shift focus to the audio data.

# Checking Speakers

The VoxPopuli dataset includes speech from multiple speakers, but how many speakers are represented in the datasets? To determine this, we can count the number of unique speakers and the number of examples each speaker contributes to the dataset. With a total of 20968 examples in the dataset, this information will give us a better understanding of the distribution of speakers and examples in the data.

In [None]:
from collections import defaultdict
import matplotlib.pyplot as plt

speaker_counts=defaultdict(int)

for speaker_id in dataset["speaker_id"]:
    speaker_counts[speaker_id]+=1

plt.figure()
plt.hist(speaker_counts.values(), bins=20)
plt.ylabel("Speakers")
plt.xlabel("Examples")
plt.show()

# Cleaning Data

The histogram reveals that approximately one-third of the speakers in the dataset have fewer than 100 examples, while arounf ten speakers have more than 500 examples. To improve training effficiency and balance the dataset, we can limit the data to speakers with between 100 and 400 examples.

Note that some speakers with few examples may actually have more audio available if the examples are long. However, determining the total amount of audio for each speaker requires scanning through the entire dataset, which is a time-consuming process that involves loading and decoding each audio file. As such, we have chosen to skip this step here.

In [None]:
def select_speaker(speaker_id):
    return 100<=speaker_counts[speaker_id]<=400

dataset=dataset.filter(select_speaker, input_columns=["speaker_id"])

# Let's check how many speakers remain
print(len(set(dataset["speaker_id"])))

# Let's see how many examples are left
print(len(dataset))

# Speaker embeddings

To enable the TTS model to differentiable between multiple speakers, we will need to create a speaker emebeddings for each example. The speaker embedding it an additional input into the model that captures a particular speaker's voice characteristics. To generate these speakers embeddings, use the pre-trained spkrec-xvect-voxceleb model from SpeechBrain. Create a function **create_speaker_emebdding()** that takes an input audio waveform and outputs a 512-element vector containing the corresponding speaker embedding. It's important to note that the speechbrain/spkrec-xvect-voxceleb model was trained on English speech from the VoxCeleb dataset, whereas the training examples in here are in Dutch. While we believe that this model will still generate reasonable speaker embeddings for our Dutch dataset, this assumption may not hold true in all cases.

For optimal results, we recommend training an X-vector model on the target speech first. This will ensure that the model is better able to capture the unique voice characteristics present in the Dutch language.

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name="speechbrain/spkrec-xvect-voxceleb"

device="cuda"
speaker_model=EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings=speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings=torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings=speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings


def prepare_dataset(example):
    audio=example["audio"]
    
    example=processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )
    
    # strip off thje batch dimension
    example["labels"]=example["labels"][0]
    
    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"]=create_speaker_embedding(audio["array"])
    
    return example

Verify the processing is correct by looking at a single example, the speaker embeddings should be a 512-element vector, and the labels should be a log-mel spectrogram with 80 mel bins.

In [None]:
import matplotlib.pyplot as plt


processed_example=prepare_dataset(dataset[0])
print(list(processed_example.keys()))
print(processed_example["speaker_embeddings"].shape)

plt.figure()
plt.imshow(processed_example["labels"].T)
plt.show()

In [None]:
# applying the processing function to the entire dataset
dataset=dataset.map(prepare_dataset, remove_columns=dataset.column_names)

We will see a warning saying that some examples in the dataset are longer than the maximum input length the model can handle(600 tokens). Remove those examples from the dataset. Here we go even further and to follow for larger batch sizes we remove anything over 200 tokens.

In [None]:
def is_not_too_long(input_ids):
    input_length=len(input_ids)
    return input_length<200

dataset=dataset.filter(is_not_too_long, input_columns=["input_ids"])

In [None]:
dataset=dataset.train_test_split(test_size=0.1)

## Data collator

We want to combine multiple examples into a batch, we need to define a custom data collator. This collator will pad shorter sequences with padding tokens, ensuring that all examples have the same length. For the spectrogram labels, the padded portions are replaced with the special value `-100`. This special value instructs the model to ignore that part of the spectrogram when calculating the spectrogram loss.

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class TTSDataCollatorWithPadding:
    
    processor: Any
    
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_ids=[{"input_ids":feature["input_ids"]} for feature in features]
        label_features=[{"input_values":feature["labels"]} for feature in features]
        speaker_features=[feature["speaker_embeddings"] for feature in features]
        
        # collate the inputs and targets into a batch
        batch=processor.pad(input_ids=input_ids, labels=label_features, return_tensors="pt")
        
        # replace padding with -100 to ignore loss correctly
        batch["labels"]=batch["labels"].masked_fill(batch.decoder_attention_mask.unsqueeze(-1).ne(1),-100)
        
        #not used during fine-tuning
        del batch["decoder_attention_mask"]
        
        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor>1:
            target_lengths=torch.tensor([len(feature["input_values"]) for feature in label_features])
            target_lengths=target_lengths.new(
                [length-length%model.config.reduction_factor for length in target_lengths]
            )
            max_length=max(target_lengths)
            batch["labels"]=batch["labels"][:, :max_length]
        
        # also add in the speaker embeddings
        batch["speaker_embeddings"]=torch.tensor(speaker_features)
        
        return batch

In SpeechT5, the input to the decoder part of the model is reduced by a factor 2. In other words, it throws away every other timestep from the target sequence. The decoder then predicts a sequence that is twice as long. Since the original target sequence length may be odd, the data collator makes sure to round the maximum length of the batch down to be a multiple of 2.

In [None]:
data_collator=TTSDataCollatorWithPadding(processor=processor)

# Train the model

The `use_cache=True` option is incompatible with gradient checkpointing. Disable it for training.

In [None]:
from transformers import SpeechT5ForTextToSpeech

model=SpeechT5ForTextToSpeech.from_pretrained(checkpoint)
model.config.use_cache=False
print(model.config)

In [None]:
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

training_args=Seq2SeqTrainingArguments(
    output_dir=os.getenv("WANDB_NAME"),
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=1e-5,
    warmup_steps=50,
    max_steps=100,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    save_steps=50,
    eval_steps=50,
    logging_steps=25,
    report_to="wandb", # or report_to="tensorboard"
    run_name=os.getenv("WANDB_NAME"),
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=False,
)

trainer=Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor,
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
processor.push_to_hub(os.getenv("WANDB_NAME"))
trainer.push_to_hub(os.getenv("WANDB_NAME"))

# Inference

In [None]:
from transformers import pipeline()

pipe=pipeline("text-to-speech", model=os.getenv("WANDB_NAME"), device='cuda')

In [None]:
text="hallo allemaal, ik praat nederlands. groetjes aan iedereen!"

example=dataset["test"][304]
speaker_emebeddings=torch.tensor(example["speaker_emebddings"]).unsqueeze(0)

forward_params={"speaker_embeddings": speaker_embeddings}
output=pipe(text, forward_params=forward_params)

In [None]:
from IPython.display import Audio
Audio(output['audio'], rate=output['sampling_rate'])