In [None]:
#!sudo apt install sox libsndfile1 ffmpeg
#!pip3 install wget unidecode pynini==2.1.4
#!pip3 install git+https://github.com/NVIDIA/NeMo.git@v1.12.0#egg=nemo_toolkit[all]
#!wget https://raw.githubusercontent.com/NVIDIA/NeMo/main/nemo_text_processing/install_pynini.sh
#!bash install_pynini.sh

In [None]:
import requests
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
import shutil
import os
from bs4 import BeautifulSoup
import soundfile as sf
import string
import json
import re
import num2words

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

blocklist = ["potato", "_ding_", "00_part1_entry-6"]
audio_dir = 'audio'
download_threads = 64

def prep(args, overwrite=False):
    already_exists = os.path.exists(audio_dir)
    
    if already_exists and not overwrite:
        print("Data already downloaded")
        return
    
    if already_exists:
        print("Deleting previously downloaded audio")
        shutil.rmtree(audio_dir)

    os.mkdir(audio_dir)
    download_parallel(args)

def remove_punctuation(str):
    return str.translate(str.maketrans('', '', string.punctuation))
    
def audio_duration(fn):
    f = sf.SoundFile(fn)
    return f.frames / f.samplerate

def download_file(args):
    url, filename = args[0], args[1]

    try:
        response = requests.get(url)
        open(os.path.join(audio_dir, filename), "wb").write(response.content)
        return filename, True
    except:
        return filename, False

def download_parallel(args):
    results = ThreadPool(download_threads).imap_unordered(download_file, args)
    for result in results:
        if result[1]:
            print(bcolors.OKGREEN + "[" + u'\u2713' + "] " + bcolors.ENDC + result[0])
        else:
            print(bcolors.FAIL + "[" + u'\u2715' + "] " + bcolors.ENDC + result[0])

def main():
    r = requests.get("https://theportalwiki.com/wiki/GLaDOS_voice_lines")

    urls = []
    filenames = []
    texts = []

    soup = BeautifulSoup(r.text.encode('utf-8').decode('ascii', 'ignore'), 'html.parser')
    for link_item in soup.find_all('a'):
        url = link_item.get("href", None)
        if url:
            if "https:" in url and ".wav" in url:
                list_item = link_item.find_parent("li")
                ital_item = list_item.find_all('i')
                if ital_item:
                    text = ital_item[0].text
                    text = text.replace('"', '')
                    filename = url[url.rindex("/")+1:]

                    if "[" not in text and "]" not in text and "$" not in text:
                        if url not in urls:
                            for s in blocklist:
                                if s in url:
                                    break
                            else:
                                urls.append(url)
                                filenames.append(filename)
                                text = text.replace('*', '')
                                text = re.sub(r"(\d+)", lambda x: num2words.num2words(int(x.group(0))), text)
                                texts.append(text)

    print("Found " + str(len(urls)) + " urls")

    args = zip(urls, filenames)

    prep(args)
    
    total_audio_time = 0
    outFile=open(os.path.join(audio_dir, "manifest.json"), 'w')
    for i in range(len(urls)):
        item = {}
        text = texts[i]
        filename = filenames[i]
        item["audio_filepath"] = os.path.join(audio_dir, filename)
        #item["text_normalized"] = text
        #item["text_no_preprocessing"] = text
        item["text"] = text.lower()
        item["duration"] = audio_duration(os.path.join(audio_dir, filename))
        total_audio_time = total_audio_time + item["duration"]
        outFile.write(json.dumps(item, ensure_ascii=True, sort_keys=True) + "\n")
 
    outFile.close()
    print(str(total_audio_time/60.0) + " min")

main()

In [None]:
!head -n 1 ./audio/manifest.json

In [None]:
!cat ./audio/manifest.json | tail -n 2 > ./manifest_validation.json
!cat ./audio/manifest.json | head -n -2 > ./manifest_train.json

In [None]:
home_path = !(echo $HOME)
home_path = home_path[0]
print(home_path)

In [None]:
import os
import json

import torch
import IPython.display as ipd
from matplotlib.pyplot import imshow
from matplotlib import pyplot as plt

from nemo.collections.tts.models import FastPitchModel
FastPitchModel.from_pretrained("tts_en_fastpitch")

from pathlib import Path
nemo_files = [p for p in Path(f"{home_path}/.cache/torch/NeMo/").glob("**/tts_en_fastpitch_align.nemo")]
print(f"Copying {nemo_files[0]} to ./")
Path("./tts_en_fastpitch_align.nemo").write_bytes(nemo_files[0].read_bytes())

In [None]:
!wget https://raw.githubusercontent.com/nvidia/NeMo/v1.12.0/examples/tts/fastpitch_finetune.py
!wget https://raw.githubusercontent.com/NVIDIA/NeMo/v1.12.0/examples/tts/hifigan_finetune.py
    
!mkdir -p conf
!cd conf \
&& wget https://raw.githubusercontent.com/nvidia/NeMo/v1.12.0/examples/tts/conf/fastpitch_align_v1.05.yaml \
&& wget https://raw.githubusercontent.com/NVIDIA/NeMo/v1.12.0/examples/tts/conf/hifigan/hifigan.yaml \
&& cd ..

In [None]:
# additional files
!mkdir -p tts_dataset_files && cd tts_dataset_files \
&& wget https://raw.githubusercontent.com/NVIDIA/NeMo/v1.12.0/scripts/tts_dataset_files/cmudict-0.7b_nv22.08 \
&& wget https://raw.githubusercontent.com/NVIDIA/NeMo/v1.12.0/scripts/tts_dataset_files/heteronyms-052722 \
&& wget https://raw.githubusercontent.com/NVIDIA/NeMo/v1.12.0/nemo_text_processing/text_normalization/en/data/whitelist/lj_speech.tsv \
&& cd ..

In [None]:
!(python3 fastpitch_finetune.py --config-name=fastpitch_align_v1.05.yaml \
    train_dataset=./manifest_train.json \
    validation_datasets=./manifest_validation.json \
    sup_data_path=./fastpitch_sup_data \
    phoneme_dict_path=tts_dataset_files/cmudict-0.7b_nv22.08 \
    heteronyms_path=tts_dataset_files/heteronyms-052722 \
    whitelist_path=tts_dataset_files/lj_speech.tsv \
    exp_manager.exp_dir=./glados_out \
    +init_from_nemo_model=./tts_en_fastpitch_align.nemo \
    trainer.max_epochs=100 \
    trainer.check_val_every_n_epoch=25 \
    model.train_ds.dataloader_params.batch_size=12 model.validation_ds.dataloader_params.batch_size=12 \
    model.n_speakers=1 model.pitch_mean=121.9 model.pitch_std=23.1 \
    model.pitch_fmin=30 model.pitch_fmax=512 model.optim.lr=2e-4 \
    ~model.optim.sched model.optim.name=adam trainer.devices=1 trainer.strategy=null \
    +model.text_tokenizer.add_blank_at=true \
)

In [None]:
from nemo.collections.tts.models import HifiGanModel
from nemo.collections.tts.models import FastPitchModel

vocoder = HifiGanModel.from_pretrained("tts_hifigan")
vocoder = vocoder.eval().cuda()

In [None]:
def infer(spec_gen_model, vocoder_model, str_input, speaker=None):
    """
    Synthesizes spectrogram and audio from a text string given a spectrogram synthesis and vocoder model.
    
    Args:
        spec_gen_model: Spectrogram generator model (FastPitch in our case)
        vocoder_model: Vocoder model (HiFiGAN in our case)
        str_input: Text input for the synthesis
        speaker: Speaker ID
    
    Returns:
        spectrogram and waveform of the synthesized audio.
    """
    with torch.no_grad():
        parsed = spec_gen_model.parse(str_input)
        if speaker is not None:
            speaker = torch.tensor([speaker]).long().to(device=spec_gen_model.device)
        spectrogram = spec_gen_model.generate_spectrogram(tokens=parsed, speaker=speaker)
        audio = vocoder_model.convert_spectrogram_to_audio(spec=spectrogram)
        
    if spectrogram is not None:
        if isinstance(spectrogram, torch.Tensor):
            spectrogram = spectrogram.to('cpu').numpy()
        if len(spectrogram.shape) == 3:
            spectrogram = spectrogram[0]
    if isinstance(audio, torch.Tensor):
        audio = audio.to('cpu').numpy()
    return spectrogram, audio

def get_best_ckpt_from_last_run(
        base_dir, 
        new_speaker_id, 
        duration_mins, 
        mixing_enabled, 
        original_speaker_id, 
        model_name="FastPitch"
    ):    
    mixing = "no_mixing" if not mixing_enabled else "mixing"
    
    d = "glados_out"
    
    exp_dirs = list([i for i in (Path(base_dir) / d / model_name).iterdir() if i.is_dir()])
    last_exp_dir = sorted(exp_dirs)[-1]
    
    last_checkpoint_dir = last_exp_dir / "checkpoints"
    
    last_ckpt = list(last_checkpoint_dir.glob('*-last.ckpt'))

    if len(last_ckpt) == 0:
        raise ValueError(f"There is no last checkpoint in {last_checkpoint_dir}.")
    
    return str(last_ckpt[0])

In [None]:
new_speaker_id = 6097
duration_mins = 5
mixing = False
original_speaker_id = "ljspeech"

last_ckpt = get_best_ckpt_from_last_run("./", new_speaker_id, duration_mins, mixing, original_speaker_id)
print(last_ckpt)

spec_model = FastPitchModel.load_from_checkpoint(last_ckpt)
spec_model.eval().cuda()

# Only need to set speaker_id if there is more than one speaker
speaker_id = None
if mixing:
    speaker_id = 1

num_val = 2  # Number of validation samples
val_records = []
with open("manifest_validation.json", "r") as f:
    for i, line in enumerate(f):
        val_records.append(json.loads(line))
        if len(val_records) >= num_val:
            break
            
for val_record in val_records:
    print("Real validation audio")
    ipd.display(ipd.Audio(val_record['audio_filepath'], rate=22050))
    print(f"SYNTHESIZED FOR -- Speaker: {new_speaker_id} | Dataset size: {duration_mins} mins | Mixing:{mixing} | Text: {val_record['text']}")
    spec, audio = infer(spec_model, vocoder, val_record['text'], speaker=speaker_id)
    ipd.display(ipd.Audio(audio, rate=22050))
    %matplotlib inline
    imshow(spec, origin="lower", aspect="auto")
    plt.show()