In [46]:
!pip install transformers datasets soundfile accelerate speechbrain==0.5.16



In [47]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [48]:
# from huggingface_hub import notebook_login

# notebook_login()

In [49]:
from datasets import load_dataset, Audio
dataset = load_dataset("csv", data_files = "/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/metadata_COLAB.csv", split="train")
dataset

Dataset({
    features: ['text', 'audio'],
    num_rows: 11247
})

In [50]:
len(dataset)

11247

In [51]:
print(dataset)

Dataset({
    features: ['text', 'audio'],
    num_rows: 11247
})


In [52]:
# # Calculate the number of rows for half the dataset
# half_size = len(dataset) // 16

# # Select the first half of the dataset
# dataset = dataset.select(range(half_size))

# print(dataset)

We are using just the 1/16th of the data

In [53]:
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

In [54]:
from transformers import SpeechT5Processor

checkpoint = "microsoft/speecht5_tts"
processor = SpeechT5Processor.from_pretrained(checkpoint)




In [55]:
tokenizer = processor.tokenizer

In [56]:
dataset[2:5]

{'text': ['bfs stands for breadth-first search',
  'vpn stands for virtual private network',
  'rest stands for representational state transfer'],
 'audio': [{'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-03.wav',
   'array': array([0.00057983, 0.00091553, 0.0007019 , ..., 0.00588989, 0.00592041,
          0.00592041]),
   'sampling_rate': 16000},
  {'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-04.wav',
   'array': array([-0.00210571, -0.00378418, -0.00300598, ...,  0.00080872,
           0.00057983,  0.00032043]),
   'sampling_rate': 16000},
  {'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-05.wav',
   'array': array([0.00163269, 0.00137329, 0.00137329, ..., 0.00036621, 0.        ,
          0.        ]),
   'sampling_rate': 16000}]}

Let's normalize the dataset, create a column called "normalized_text"

In [57]:
def extract_all_chars(batch):
    all_text = " ".join(batch["text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

Map:   0%|          | 0/11247 [00:00<?, ? examples/s]

In [58]:
dataset_vocab - tokenizer_vocab

{' ',
 '$',
 '%',
 '&',
 '*',
 '+',
 '0',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 '@',
 '_',
 '\xa0',
 '’',
 '“',
 '”'}

In [59]:
import re

def normalize_text(text):
    # Convert to lowercase
    text = text.lower()

    # Remove punctuation (except apostrophes)
    text = re.sub(r'[^\w\s\']', '', text)

    # Remove extra whitespace
    text = ' '.join(text.split())

    return text

# Define a function to add the normalized_text column
def add_normalized_text(example):
    example['normalized_text'] = normalize_text(example['text'])
    return example

# Apply the function to the dataset
dataset = dataset.map(add_normalized_text)

# Print the first few examples to verify
print(dataset[2:5])

{'text': ['bfs stands for breadth-first search', 'vpn stands for virtual private network', 'rest stands for representational state transfer'], 'audio': [{'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-03.wav', 'array': array([0.00057983, 0.00091553, 0.0007019 , ..., 0.00588989, 0.00592041,
       0.00592041]), 'sampling_rate': 16000}, {'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-04.wav', 'array': array([-0.00210571, -0.00378418, -0.00300598, ...,  0.00080872,
        0.00057983,  0.00032043]), 'sampling_rate': 16000}, {'path': '/content/drive/MyDrive/Colab Notebooks/TTS/ALL_DATASET/data/Processed_DATASET/data/processed_full_forms/full_forms-05.wav', 'array': array([0.00163269, 0.00137329, 0.00137329, ..., 0.00036621, 0.        ,
       0.        ]), 'sampling_rate': 16000}], 'normalized_text': ['bfs stands for breadthfirst search',

In [60]:
def extract_all_chars(batch):
    all_text = " ".join(batch["normalized_text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

Map:   0%|          | 0/11247 [00:00<?, ? examples/s]

In [61]:
dataset_vocab - tokenizer_vocab

{' ', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '_'}

In [62]:
replacements = [
    ("â", "a"),  # Long a
    ("ç", "ch"),  # Ch as in "chair"
    ("ğ", "gh"),  # Silent g or slight elongation of the preceding vowel
    ("ı", "i"),   # Dotless i
    ("î", "i"),   # Long i
    ("ö", "oe"),  # Similar to German ö
    ("ş", "sh"),  # Sh as in "shoe"
    ("ü", "ue"),  # Similar to German ü
    ("û", "u"),   # Long u
]

def cleanup_text(inputs):
    for src, dst in replacements:
        inputs["normalized_text"] = inputs["normalized_text"].replace(src, dst)
    return inputs

dataset = dataset.map(cleanup_text)

In [63]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [64]:
def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )

    # strip off the batch dimension
    example["labels"] = example["labels"][0]

    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"] = create_speaker_embedding(audio["array"])

    return example

In [65]:
processed_example = prepare_dataset(dataset[0])
list(processed_example.keys())

['input_ids', 'labels', 'speaker_embeddings']

In [66]:
processed_example["speaker_embeddings"].shape

(512,)

In [67]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names)

Map:   0%|          | 0/11247 [00:00<?, ? examples/s]

In [68]:
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < 200

dataset = dataset.filter(is_not_too_long, input_columns=["input_ids"])
len(dataset)

Filter:   0%|          | 0/11247 [00:00<?, ? examples/s]

11222

In [69]:
dataset = dataset.train_test_split(test_size=0.1)

In [70]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In [71]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

In [74]:
from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)

config.json:   0%|          | 0.00/2.06k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/585M [00:00<?, ?B/s]

In [75]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(model.generate, use_cache=True)

In [76]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="speecht5_finetuned",  # change to a repo name of your choice
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=1e-4,
    warmup_steps=100,
    max_steps=1500,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    save_steps=100,
    eval_steps=100,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=False,
)



In [77]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor,
)

  self.scaler = torch.cuda.amp.GradScaler(**kwargs)
max_steps is given, it will override any value given in num_train_epochs


In [None]:
trainer.train()

  return fn(*args, **kwargs)


Step,Training Loss,Validation Loss
100,0.5891,0.512154
200,0.5482,0.50959
300,0.523,0.475684
400,0.5121,0.474158
500,0.5005,0.464333
600,0.5037,0.467483
700,0.4972,0.458246
800,0.4843,0.454266
900,0.4863,0.452102
1000,0.4773,0.451052


Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return fn(*args, **kwargs)
Non-default generation parameters: {'max_length': 1876}
  return 

# Inference

In [None]:
model = SpeechT5ForTextToSpeech.from_pretrained(
    "speecht5_ttsv5"
)

In [None]:
example = dataset["test"][304]
speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0)

In [None]:
text = "API stands for Application programming interface"

In [None]:
inputs = processor(text, return_tensors="pt")

In [None]:
from transformers import SpeechT5HifiGan

vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)

In [None]:
from IPython.display import Audio
import soundfile as sf

Audio(speech.numpy(), rate=16000)
# Save the audio to a file (e.g., 'output.wav')
sf.write('output.wav', speech.numpy(), 16000)