In [None]:
# !pip install ipywidgets
# !pip install wheel
# !pip install setuptools
# !pip3 install torch
# !pip install tensorboard
# !pip install transformers
# !pip install accelerate
# !pip install datasets[audio]
# !pip install speechbrain
# !pip install resemble-enhance
# !pip install Cython
# !pip install pesq
# !pip install peft

# installation of nvidia apex (also requires cuda toolkit to be installed system wide)
# !git clone https://github.com/NVIDIA/apex
# !cd apex
# !pip install -v --disable-pip-version-check --no-cache-dir --no-build-isolation --config-settings "--build-option=--cpp_ext" --config-settings "--build-option=--cuda_ext" ./

In [None]:
# import os
# hf_home_dir = "./hf_cache"
# os.environ["HF_HOME"] = hf_home_dir  # TODO outcomment this?

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
from datasets import load_dataset, Audio

dataset = load_dataset("alexandrainst/nst-da")
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
# num_train_samples = int(10000 * 0.9)
# num_test_samples = int(10000 * 0.1)

In [None]:
# import random

# random.seed(10)
# train_rands = random.sample(range(len(dataset['train'])), num_train_samples)
# dataset['train'] = dataset['train'].select(train_rands)

# test_rands = random.sample(range(len(dataset['test'])), num_test_samples)
# dataset['test'] = dataset['test'].select(test_rands)

In [None]:
from transformers import SpeechT5Processor

checkpoint = "microsoft/speecht5_tts"
processor = SpeechT5Processor.from_pretrained(checkpoint)
tokenizer = processor.tokenizer
processor_sr = processor.feature_extractor.sampling_rate
assert processor_sr == 16000

In [None]:
dataset = dataset.filter(
    lambda x: not (set("0123456789") & set(x)), input_columns=["text"]
)

In [None]:
replacements = [
    ("&", "og"),
    ("\r", " "),
    ("´", ""),
    ("\\", ""),
    ("¨", " "),
    ("Å", "AA"),
    ("Æ", "AE"),
    ("É", "E"),
    ("Ö", "OE"),
    ("Ø", "OE"),
    ("á", "a"),
    ("ä", "ae"),
    ("å", "aa"),
    ("è", "e"),
    ("î", "i"),
    ("ô", "oe"),
    ("ö", "oe"),
    ("ø", "oe"),
    ("ü", "y"),
]

In [None]:
def cleanup_text(inputs):
    for src, dst in replacements:
        inputs["text"] = inputs["text"].replace(src, dst)
    return inputs


dataset = dataset.map(cleanup_text)

In [None]:
from collections import defaultdict

speaker_counts = defaultdict(int)

for speaker_id in dataset["train"]["speaker_id"]:
    speaker_counts[speaker_id] += 1

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.hist(speaker_counts.values(), bins=20)
plt.ylabel("Speakers")
plt.xlabel("Examples")
plt.show()

In [None]:
def select_speaker(speaker_id):
    return 280 <= speaker_counts[speaker_id] <= 327

In [None]:
dataset["train"] = dataset["train"].filter(
    select_speaker,
    input_columns=["speaker_id"],
)

In [None]:
len(set(dataset["train"]["speaker_id"]))

In [None]:
len(dataset["train"])

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

device = "cuda" if torch.cuda.is_available() else "cpu"

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [None]:
from speechbrain.pretrained import SpectralMaskEnhancement

metricgan_model_name = "speechbrain/metricgan-plus-voicebank"


enhance_model = SpectralMaskEnhancement.from_hparams(
    source=metricgan_model_name,
    savedir=os.path.join("/tmp", metricgan_model_name),
    run_opts={"device": device},
)


def enhance_audio(waveform):
    tensor = torch.tensor(waveform).reshape(1, -1).float()
    enhanced = enhance_model.enhance_batch(tensor, lengths=torch.tensor([1.0]))
    enhanced = enhanced.squeeze().cpu().numpy()
    return enhanced

In [None]:
def prepare_dataset(example, enhance=False):  # TODO change enhance default to False?
    audio = example["audio"]["array"]
    sampling_rate = example["audio"]["sampling_rate"]
    if enhance:
        audio = enhance_audio(audio)

    example = processor(
        text=example["text"],
        audio_target=audio,
        sampling_rate=sampling_rate,
        return_attention_mask=False,
    )

    # strip off the batch dimension
    example["labels"] = example["labels"][0]

    # use SpeechBrain to obtain x-vector

    example["speaker_embeddings"] = create_speaker_embedding(audio)

    return example

In [None]:
processed_example = prepare_dataset(dataset["train"][0])
list(processed_example.keys())

In [None]:
processed_example["speaker_embeddings"].shape

In [None]:
plt.figure()
plt.imshow(processed_example["labels"].T)
plt.show()

In [None]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset["train"].column_names)

In [None]:
input_length_max = 600


def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < input_length_max


dataset = dataset.filter(
    is_not_too_long,
    input_columns=["input_ids"],
)

In [None]:
len(dataset["train"])

In [None]:
len(dataset["test"])

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In [None]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

In [None]:
from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)
delattr(model.config, "max_length")  # TODO might need to outcomment this later?

In [None]:
from functools import partial

# from transformers import SpeechT5HifiGan
# from pesq import pesq_batch
# import numpy as np

# vocoder = SpeechT5HifiGan.from_pretrained(
#     "microsoft/speecht5_hifigan", device_map="cuda"
# )

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(
    model.generate,
    use_cache=True,
    # vocoder=vocoder,  # TODO outcomment?
)  # TODO might not be necessary if we are not using generate during testing


# def compute_pesq(pred):
#     pred_ids = pred.predictions
#     label_ids = pred.label_ids
#     print(pred_ids)
#     print(label_ids.shape)

#     pesq_list = pesq_batch(processor_sr, label_ids, pred_ids, mode="wb")

#     pesq_metric = np.array(pesq_list).mean()

#     return {"pesq": pesq_metric}

In [None]:
model_name = checkpoint.split("/")[-1]
finetuned_model_name = f"{model_name}-finetuned-nst-da"

In [None]:
train_epochs = 20

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir=finetuned_model_name,
    per_device_train_batch_size=16,
    dataloader_num_workers=4,
    learning_rate=5e-5,
    warmup_ratio=0.1,
    num_train_epochs=train_epochs,
    gradient_checkpointing=True,
    # gradient_checkpointing_kwargs={"use_reentrant": True},
    fp16=True,
    optim="adamw_apex_fused",
    logging_steps=25,
    # torch_compile=True,
    evaluation_strategy="epoch",
    per_device_eval_batch_size=16,
    save_strategy="epoch",
    save_total_limit=1,
    report_to=["tensorboard"],
    # predict_with_generate=True,  # TODO might be wrong
    load_best_model_at_end=True,
    # metric_for_best_model="pesq",  # TODO change to "loss"?
    greater_is_better=False,
    label_names=["labels"],  # TODO dont think this is necessary to add
    push_to_hub=False,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    # compute_metrics=compute_pesq,
    tokenizer=processor,
)

In [None]:
train_output = trainer.train(resume_from_checkpoint=False)

In [None]:
model.config.use_cache = True

In [None]:
# NOTE evaluate explicitly so that best model performance is updated on model card
eval_output = trainer.evaluate(dataset["test"])

In [None]:
kwargs = {
    "dataset_tags": "alexandrainst/nst-da",
    "dataset": "NST Danish ASR Database",
    "model_name": finetuned_model_name,
    "finetuned_from": checkpoint,
    "tasks": "text-to-speech",
    "language": "da",
}

In [None]:
trainer.push_to_hub(commit_message="train without enhancement", **kwargs)

In [10]:
import torch
from transformers import pipeline

pipe = pipeline(
    "text-to-speech",
    model="JackismyShephard/speecht5_tts-finetuned-nst-da",
    use_fast=True,
    device=0 if torch.cuda.is_available() else "cpu",
    revision="9373f1b1e7b4b1fd939fe59d26c3fe9e8bd6d4f4",
)

pipe2 = pipeline(
    "text-to-speech",
    model="JackismyShephard/speecht5_tts-finetuned-nst-da",
    use_fast=True,
    device=0 if torch.cuda.is_available() else "cpu",
    revision="080e9d6dec5ac0e7f24744515955e2ab95b3a683",
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [11]:
replacements = [
    ("&", "og"),
    ("\r", " "),
    ("´", ""),
    ("\\", ""),
    ("¨", " "),
    ("Å", "AA"),
    ("Æ", "AE"),
    ("É", "E"),
    ("Ö", "OE"),
    ("Ø", "OE"),
    ("á", "a"),
    ("ä", "ae"),
    ("å", "aa"),
    ("è", "e"),
    ("î", "i"),
    ("ô", "oe"),
    ("ö", "oe"),
    ("ø", "oe"),
    ("ü", "y"),
]


def replace_danish_letters(text):
    for src, dst in replacements:
        text = text.replace(src, dst)
    return text

In [12]:
text = "I sin oprindelige før-kristne form blev alferne sandsynligvis opfattet som en personificering af det land og den natur, der omgav menneskene, dvs. den opdyrkede jord, gården og de naturressourcer, som hørte dertil. De var guddommelige eller delvis guddommelige væsener, der besad magiske kræfter, som de brugte både til fordel og ulempe for menneskene."
text = replace_danish_letters(text)
text2 = "Selvom mosser er almindelige, er en lang række mosser dog knyttet snævert til habitater der er i stærk tilbagegang og disse mosser er truede. Kendtest af disse er måske tørvemosserne hvor flere arter kun forekommer i såkaldte højmoser der nu er meget sjældne i Danmark."
text2 = replace_danish_letters(text2)

In [15]:
root = "./embeddings/nst-da-resemble-enhance/"
speaker_embedding_path = root + "female_22_oestjylland.npy"

In [16]:
import numpy as np

speaker_embedding = np.load(speaker_embedding_path)
speaker_embedding = torch.tensor(speaker_embedding).unsqueeze(0)

In [17]:
forward_params = {"speaker_embeddings": speaker_embedding}
speech_pipe = pipe(text, forward_params=forward_params)
speech_pipe2 = pipe2(text, forward_params=forward_params)

In [None]:
from resemble_enhance.enhancer.inference import enhance
from IPython.utils import io


def mega_enhance_audio(
    waveform, sr, device="cuda", nfe=64, solver="midpoint", lambd=0.9, tau=0.95
):
    tensor = torch.tensor(waveform).float()
    with io.capture_output() as _:
        enhanced, new_sr = enhance(
            tensor, sr, device, nfe=nfe, solver=solver, lambd=lambd, tau=tau
        )
    enhanced_cpu = enhanced.cpu().numpy()
    return enhanced_cpu, new_sr


speech_enhanced, new_sr = mega_enhance_audio(
    speech_pipe["audio"], speech_pipe["sampling_rate"]
)
speech_enhanced2, new_sr2 = mega_enhance_audio(
    speech_pipe2["audio"], speech_pipe2["sampling_rate"]
)

In [18]:
from IPython.display import Audio

Audio(speech_pipe["audio"], rate=speech_pipe["sampling_rate"])

In [19]:
Audio(speech_pipe2["audio"], rate=speech_pipe2["sampling_rate"])

In [None]:
Audio(speech_enhanced, rate=new_sr)

In [None]:
Audio(speech_enhanced2, rate=new_sr2)