In [1]:
import os
os.environ["HF_HOME"] = (
    "/media/s44504/3b01c699-3670-469b-801f-13880b9cac56/huggingface/"
)

In [None]:
from datasets import load_dataset
import soundfile as sf
from pathlib import Path
import os
import numpy as np

# dataset_list = ["en-US-Madison", "en-AU-Zak", "id-ID-Althaf"] # range(0, 5)
# dataset_list = ["en-UK-Thalia"] (range(47, 57))
dataset_list = ["sw-TZ-Victoria"]


for dataset_uri in dataset_list:
    # dataset = load_dataset("bookbot/id-ID-Althaf", num_proc=os.cpu_count)
    dataset = load_dataset("bookbot/sw-TZ-Victoria", num_proc=os.cpu_count)
    # dataset = dataset.filter(lambda example: example["speaker"] == dataset_uri)
    dataset = dataset["train"].select(range(0, 5))
    dataset_name = Path(dataset_uri).stem

    # Specify the directory where you want to save the WAV files
    output_directory = f"{dataset_name}"

    # Ensure the output directory exists
    os.makedirs(output_directory, exist_ok=True)

    # Initialize an empty list to hold the audio data arrays
    concatenated_audio = []

    # Iterate over each item in the dataset
    for i, item in enumerate(dataset):
        # Extract the audio array and sampling rate
        audio_array = item["audio"]["array"]
        sampling_rate = item["audio"]["sampling_rate"]
        try:
            id = item["id"]
        except:
            id = i

        # Define the output file path
        output_file_path = os.path.join(output_directory, f"{id}.wav")

        # Write the audio data to a WAV file
        sf.write(output_file_path, audio_array, sampling_rate)

        # Append the audio data to the list
        concatenated_audio.append(audio_array)

        silence_length_samples = int((350 / 1000) * sampling_rate)
        silence_audio = np.zeros(silence_length_samples)
        concatenated_audio.append(silence_audio)

        print(f"Written file {output_file_path}")

    # Concatenate the audio data arrays
    concatenated_audio = np.concatenate(concatenated_audio)

    # Define the output file path for the concatenated audio
    concatenated_file_path = os.path.join(output_directory, "concatenated_audio.wav")

    # Write the concatenated audio data to a WAV file
    sf.write(concatenated_file_path, concatenated_audio, sampling_rate)

    print(f"Concatenated audio written to {concatenated_file_path}")

In [2]:
import torch
torch.manual_seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

import random
random.seed(0)

import numpy as np
np.random.seed(0)

%cd ..

# load packages
import time
import random
import yaml
from munch import Munch
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torchaudio
import librosa
from nltk.tokenize import word_tokenize

from models import *
from utils import *
from text_utils import TextCleaner
textclenaer = TextCleaner()

%matplotlib inline

/home/s44504/StyleTTS2
177


In [3]:
to_mel = torchaudio.transforms.MelSpectrogram(
    n_mels=80, n_fft=2048, win_length=1200, hop_length=300
)
mean, std = -4, 4


def length_to_mask(lengths):
    mask = (
        torch.arange(lengths.max())
        .unsqueeze(0)
        .expand(lengths.shape[0], -1)
        .type_as(lengths)
    )
    mask = torch.gt(mask + 1, lengths.unsqueeze(1))
    return mask


def preprocess(wave):
    wave_tensor = torch.from_numpy(wave).float()
    mel_tensor = to_mel(wave_tensor)
    mel_tensor = (torch.log(1e-5 + mel_tensor.unsqueeze(0)) - mean) / std
    return mel_tensor


def compute_style(path):
    wave, sr = librosa.load(path, sr=24000)
    audio, index = librosa.effects.trim(wave, top_db=30)
    if sr != 24000:
        audio = librosa.resample(audio, sr, 24000)
    mel_tensor = preprocess(audio).to(device)

    with torch.no_grad():
        ref_s = model.style_encoder(mel_tensor.unsqueeze(1))
        ref_p = model.predictor_encoder(mel_tensor.unsqueeze(1))

    return torch.cat([ref_s, ref_p], dim=1)

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# load phonemizer
import phonemizer

global_phonemizer = phonemizer.backend.EspeakBackend(
    language="sw", preserve_punctuation=True, with_stress=True
)

# config = yaml.safe_load(open("Models/EN-Multi-ID-Althaf/config_ft_en_multi_id_althaf.yml"))
config = yaml.safe_load(open("Models/EN-Multi-ID-Althaf-SW-Victoria/config_ft_en_multi_id_althaf_sw_victoria.yml"))

# load pretrained ASR model
ASR_config = config.get("ASR_config", False)
ASR_path = config.get("ASR_path", False)
text_aligner = load_ASR_models(ASR_path, ASR_config)

# load pretrained F0 model
F0_path = config.get("F0_path", False)
pitch_extractor = load_F0_models(F0_path)

# load BERT model
from Utils.PLBERT.util import load_plbert

BERT_path = config.get("PLBERT_dir", False)
plbert = load_plbert(BERT_path)

model_params = recursive_munch(config["model_params"])
model = build_model(model_params, text_aligner, pitch_extractor, plbert)
_ = [model[key].eval() for key in model]
_ = [model[key].to(device) for key in model]

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# params_whole = torch.load("Models/EN-Multi-ID-Althaf/epoch_2nd_00024.pth", map_location="cpu")
params_whole = torch.load("Models/EN-Multi-ID-Althaf-SW-Victoria/epoch_2nd_00019.pth", map_location="cpu")
# params_whole = torch.load("Models/LibriTTS/epochs_2nd_00020.pth", map_location="cpu")
params = params_whole["net"]

In [6]:
for key in model:
    if key in params:
        print("%s loaded" % key)
        try:
            model[key].load_state_dict(params[key])
        except:
            from collections import OrderedDict

            state_dict = params[key]
            new_state_dict = OrderedDict()
            for k, v in state_dict.items():
                name = k[7:]  # remove `module.`
                new_state_dict[name] = v
            # load params
            model[key].load_state_dict(new_state_dict, strict=False)
#             except:
#                 _load(params[key], model[key])
_ = [model[key].eval() for key in model]

bert loaded
bert_encoder loaded
predictor loaded
decoder loaded
text_encoder loaded
predictor_encoder loaded
style_encoder loaded
diffusion loaded
text_aligner loaded
pitch_extractor loaded
mpd loaded
msd loaded
wd loaded


In [7]:
from Modules.diffusion.sampler import DiffusionSampler, ADPM2Sampler, KarrasSchedule

sampler = DiffusionSampler(
    model.diffusion.diffusion,
    sampler=ADPM2Sampler(),
    sigma_schedule=KarrasSchedule(
        sigma_min=0.0001, sigma_max=3.0, rho=9.0
    ),  # empirical parameters
    clamp=False,
)

In [None]:
# text = "Now he cannot find his way back home."
# text = "I am so glad that I have a mom and dad to get through everything in my day."
# text = "kila mmoja alichukua kipande chake na kuhesabu sehemu zake kwa makini"
text = text.strip()
ps = global_phonemizer.phonemize([text])
ps = word_tokenize(ps[0])
ps = " ".join(ps)
tokens = textclenaer(ps)
tokens.insert(0, 0)
tokens = torch.LongTensor(tokens).to(device).unsqueeze(0)

In [None]:
alpha = 0.8
beta = 0.1
diffusion_steps = 10
embedding_scale = 1.0

ref_thalia = compute_style("/home/s44504/StyleTTS2/Demo/en-UK-Thalia/en-UK-Thalia_141.wav")
# ref_victoria = compute_style("/home/s44504/StyleTTS2/Demo/sw-TZ-Victoria/sw-TZ-Victoria_Gawa_chungwa_3.wav")
ref_althaf = compute_style("/home/s44504/StyleTTS2/Demo/id-ID-Althaf/concatenated_audio.wav")

with torch.no_grad():
    input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
    text_mask = length_to_mask(input_lengths).to(device)

    t_en = model.text_encoder(tokens, input_lengths, text_mask)
    bert_dur = model.bert(tokens, attention_mask=(~text_mask).int())
    d_en = model.bert_encoder(bert_dur).transpose(-1, -2)

    s_pred = sampler(
        noise=torch.randn((1, 256)).unsqueeze(1).to(device),
        embedding=bert_dur,
        embedding_scale=embedding_scale,
        features=ref_althaf,  # reference from the same speaker as the embedding
        num_steps=diffusion_steps,
    ).squeeze(1)

    ref = s_pred[:, :128]
    s = s_pred[:, 128:]

    ref = alpha * ref + (1 - alpha) * ref_thalia[:, :128]
    s = beta * s + (1 - beta) * ref_thalia[:, 128:]

    d = model.predictor.text_encoder(d_en, s, input_lengths, text_mask)

    x, _ = model.predictor.lstm(d)
    duration = model.predictor.duration_proj(x)

    duration = torch.sigmoid(duration).sum(axis=-1)
    pred_dur = torch.round(duration.squeeze()).clamp(min=1)

    pred_aln_trg = torch.zeros(input_lengths, int(pred_dur.sum().data))
    c_frame = 0
    for i in range(pred_aln_trg.size(0)):
        pred_aln_trg[i, c_frame : c_frame + int(pred_dur[i].data)] = 1
        c_frame += int(pred_dur[i].data)

    # encode prosody
    en = d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device)
    if model_params.decoder.type == "hifigan":
        asr_new = torch.zeros_like(en)
        asr_new[:, :, 0] = en[:, :, 0]
        asr_new[:, :, 1:] = en[:, :, 0:-1]
        en = asr_new

    # F0_pred, N_pred = model.predictor.F0Ntrain(en, ref_thalia[:, 128:])
    # F0_pred, N_pred = model.predictor.F0Ntrain(en, ref_althaf[:, 128:])
    F0_pred, N_pred = model.predictor.F0Ntrain(en, s)

    # F0_real, _, F0 = model.pitch_extractor(mel_tensor.unsqueeze(1))
    # F0_real = F0_real.unsqueeze(0)
    # N_real = log_norm(mel_tensor.unsqueeze(1)).squeeze(1)

    asr = t_en @ pred_aln_trg.unsqueeze(0).to(device)
    if model_params.decoder.type == "hifigan":
        asr_new = torch.zeros_like(asr)
        asr_new[:, :, 0] = asr[:, :, 0]
        asr_new[:, :, 1:] = asr[:, :, 0:-1]
        asr = asr_new

    out = model.decoder(asr, F0_pred, N_pred, ref.squeeze().unsqueeze(0))
    # out = model.decoder(asr, F0_real, N_real, ref.squeeze().unsqueeze(0))

wav = out.squeeze().cpu().numpy()[..., :-50]

In [None]:
import soundfile as sf

sf.write(f'Demo/thalia-althaf-ts2s-{str(alpha).replace(".", "_")}-{str(beta).replace(".", "_")}.wav', wav, 24000)

In [8]:
def inference(text, ref_s, alpha=0.3, beta=0.7, diffusion_steps=5, embedding_scale=1):
    text = text.strip()
    ps = global_phonemizer.phonemize([text])
    ps = word_tokenize(ps[0])
    ps = " ".join(ps)
    tokens = textclenaer(ps)
    tokens.insert(0, 0)
    tokens = torch.LongTensor(tokens).to(device).unsqueeze(0)

    with torch.no_grad():
        input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
        text_mask = length_to_mask(input_lengths).to(device)

        t_en = model.text_encoder(tokens, input_lengths, text_mask)
        bert_dur = model.bert(tokens, attention_mask=(~text_mask).int())
        d_en = model.bert_encoder(bert_dur).transpose(-1, -2)

        s_pred = sampler(
            noise=torch.randn((1, 256)).unsqueeze(1).to(device),
            embedding=bert_dur,
            embedding_scale=embedding_scale,
            features=ref_s,  # reference from the same speaker as the embedding
            num_steps=diffusion_steps,
        ).squeeze(1)

        s = s_pred[:, 128:]
        ref = s_pred[:, :128]

        ref = alpha * ref + (1 - alpha) * ref_s[:, :128]
        s = beta * s + (1 - beta) * ref_s[:, 128:]

        d = model.predictor.text_encoder(d_en, s, input_lengths, text_mask)

        x, _ = model.predictor.lstm(d)
        duration = model.predictor.duration_proj(x)

        duration = torch.sigmoid(duration).sum(axis=-1)
        pred_dur = torch.round(duration.squeeze()).clamp(min=1)

        pred_aln_trg = torch.zeros(input_lengths, int(pred_dur.sum().data))
        c_frame = 0
        for i in range(pred_aln_trg.size(0)):
            pred_aln_trg[i, c_frame : c_frame + int(pred_dur[i].data)] = 1
            c_frame += int(pred_dur[i].data)

        # encode prosody
        en = d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device)
        if model_params.decoder.type == "hifigan":
            asr_new = torch.zeros_like(en)
            asr_new[:, :, 0] = en[:, :, 0]
            asr_new[:, :, 1:] = en[:, :, 0:-1]
            en = asr_new

        F0_pred, N_pred = model.predictor.F0Ntrain(en, s)

        asr = t_en @ pred_aln_trg.unsqueeze(0).to(device)
        if model_params.decoder.type == "hifigan":
            asr_new = torch.zeros_like(asr)
            asr_new[:, :, 0] = asr[:, :, 0]
            asr_new[:, :, 1:] = asr[:, :, 0:-1]
            asr = asr_new

        out = model.decoder(asr, F0_pred, N_pred, ref.squeeze().unsqueeze(0))

    return (
        out.squeeze().cpu().numpy()[..., :-50]
    )  # weird pulse at the end of the model, need to be fixed later

## Synthesize Speech

In [9]:
# text = """ StyleTTS 2 is a text to speech model that leverages style diffusion and adversarial training with large speech language models to achieve human level text to speech synthesis. """
# text = """ StyleTTS 2 adalah model text to speech yang memanfaatkan style diffusion dan pelatihan adversarial dengan model bahasa ucapan besar untuk mencapai sintesis text to speech level manusia. """
# text = """ Halo nama saya Budi dari Jakarta dan saya suka sekali membaca buku tentang sejarah Indonesia. """
text = """ Habari jina langu ni Victoria kutoka Tanzania, na napenda kusoma vitabu vya historia. """

In [16]:
from pathlib import Path
import os
import soundfile as sf


def synthesize_speech(reference_dicts):
    start = time.time()
    noise = torch.randn(1, 1, 256).to(device)
    for k, path in reference_dicts.items():
        try:
            # Convert string path to Path object for easier manipulation
            ref_s = compute_style(path)
            path = Path(path)
            # Create the output directory based on the reference path
            output_directory = path.parent / "synthesized_multilingual_en_id_sw"
            os.makedirs(output_directory, exist_ok=True)

            wav = inference(
                text, ref_s, alpha=0.3, beta=0.1, diffusion_steps=10, embedding_scale=1
            )

            rtf = (time.time() - start) / (len(wav) / 24000)
            print(f"RTF = {rtf:5f}")
            import IPython.display as ipd

            # print(k + " Synthesized:")
            # display(ipd.Audio(wav, rate=24000, normalize=False))

            sf.write(f"{output_directory}/{path.name}", wav, 24000)
        except Exception as e:
            print(e)

        # print("Reference:")
        # display(ipd.Audio(path, rate=24000, normalize=False))

### US MADISON

In [None]:
reference_dicts = {}

dir_path = Path("Demo/en-US-Madison")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [None]:
synthesize_speech(reference_dicts)

### UK THALIA

In [None]:
reference_dicts = {}

dir_path = Path("Demo/en-UK-Thalia")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [None]:
synthesize_speech(reference_dicts)

### AU Zak

In [None]:
reference_dicts = {}

dir_path = Path("Demo/en-AU-Zak")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [None]:
synthesize_speech(reference_dicts)

### ID ALTHAF

In [None]:
reference_dicts = {}

dir_path = Path("Demo/id-ID-Althaf")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [None]:
synthesize_speech(reference_dicts)

### EN Althaf S2S

In [None]:
reference_dicts = {}

dir_path = Path("Demo/en-Althaf-S2S")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [None]:
synthesize_speech(reference_dicts)

### SW Victoria

In [17]:
reference_dicts = {}

dir_path = Path("Demo/sw-TZ-Victoria")

# Iterate through each file in the directory
for k, file_path in enumerate(sorted(dir_path.iterdir())):
    if file_path.is_file():
        reference_dicts[k] = file_path

In [18]:
synthesize_speech(reference_dicts)

RTF = 0.018854
RTF = 0.040019
RTF = 0.056657
RTF = 0.071426
RTF = 0.081993
RTF = 0.107526
