In [None]:
# # Download local version of NeMo scripts. If you are running locally and want to use your own local NeMo code,
# # comment out the below lines and set `code_dir` to your local path.
code_dir = 'NeMoTTS'
!git clone https://github.com/NVIDIA/NeMo.git {code_dir}

In [None]:
import os
import json
import librosa
import soundfile as sf
from textgrid import TextGrid
from collections import Counter
from tqdm import tqdm

### Data Preprocessing

In [None]:
sample_rate = 22050
# Store all manifest and audios
data_dir = 'dataset/EmoV-DB_sr22'
# Store all supplementary files
supp_dir = "NeMoTTS_sup_data"
# Store all training logs
logs_dir = "NeMoTTS_logs"
# Store all mel-spectrograms for vocoder training
mels_dir = "NeMoTTS_mels"

In [None]:
os.makedirs(data_dir, exist_ok=True)
data_dir = os.path.abspath(data_dir)
os.makedirs(supp_dir, exist_ok=True)
supp_dir = os.path.abspath(supp_dir)
os.makedirs(logs_dir, exist_ok=True)
logs_dir = os.path.abspath(logs_dir)
os.makedirs(mels_dir, exist_ok=True)
mels_dir = os.path.abspath(mels_dir)

In [None]:
def read_jsonl(path):

    with open(path, 'r', encoding='utf-8') as f:
        data = [json.loads(line) for line in f if line.strip()]

    for entry in tqdm(data):
        entry["text"], entry["emotion"] = entry["text"].split("[EMOTION]")[0].strip(),  entry["text"].split("[EMOTION]")[-1].strip()
    
    return data

train_path = "/home/alina/datasets/EMOV-DB/EMOV/train_data.jsonl"
test_path = "/home/alina/datasets/EMOV-DB/EMOV/test_data.jsonl"

train_data, valid_data = read_jsonl(train_path), read_jsonl(test_path)

In [None]:
data_dir = os.path.abspath(data_dir)

def add_full_path(example):
    if example["emotion"] == "nuetral": # Chage to Targe emotion
        if example["emotion"] == "neutral" or example["emotion"] == "anger" :
            example["audio_path"] = os.path.join(data_dir,"2",example["emotion"],"audio", example["audio_path"])
        elif example["emotion"] == "amused" or example["emotion"] == "sleepiness":
            example["audio_path"] = os.path.join(data_dir,"1",example["emotion"],"audio", example["audio_path"])
        return example
    else:
        return None

train_data = [add_full_path(entry) for entry in train_data]
valid_data = [add_full_path(entry) for entry in valid_data]
train_data = [entry for entry in train_data if entry is not None]
valid_data = [entry for entry in valid_data if entry is not None]

In [None]:
from joblib import Parallel, delayed
import librosa
from tqdm import tqdm

def process_entry(entry):
    # base_path='/home/alina/datasets/EMOV-DB/EMOV/'
    del entry['target']
    entry['audio_filepath'] = entry['audio_path']
    y, sr = librosa.load(entry['audio_path'], sr=sample_rate)
    entry['duration'] = librosa.get_duration(y=y, sr=sr)
    return entry

train_data = Parallel(n_jobs=-1)(
    delayed(process_entry)(entry) for entry in tqdm(train_data, desc="Processing train split..")
)

valid_data = Parallel(n_jobs=-1)(
    delayed(process_entry)(entry) for entry in tqdm(valid_data, desc="Processing val split..")
)

In [1]:
def save_manifest(data, path):
    with open(path, 'w', encoding='utf-8') as f:
        for item in data:
            json.dump(item, f, ensure_ascii=False)
            f.write('\n')  

In [None]:
train_manifest=os.path.join(data_dir, "train_manifest.json")
val_manifest=os.path.join(data_dir, "val_manifest.json")
save_manifest(train_data, train_manifest)
save_manifest(valid_data, val_manifest)

In [None]:
!cd {code_dir} && python scripts/dataset_processing/tts/extract_sup_data.py \
      manifest_filepath={train_manifest} \
      sup_data_path={supp_dir} \
      +sup_data_types='["pitch", "align_prior_matrix"]' \
      +overwrite_sup_data=True \
      dataset.sample_rate={sample_rate} \
      dataset.n_fft=1024 \
      dataset.win_length=1024 \
      dataset.hop_length=256 \
      +dataloader_params.num_workers=8

In [None]:
# Angry
PITCH_MEAN=274.6726989746094
PITCH_STD=73.51393127441406
PITCH_MIN=107.0
PITCH_MAX=566.0
# Amused
PITCH_MEAN=257.8951416015625
PITCH_STD=74.55020904541016
PITCH_MIN=86.0
PITCH_MAX=569.0
# Sleepiness 
PITCH_MEAN=330.51446533203125
PITCH_STD=98.4043960571289
PITCH_MIN=94.0
PITCH_MAX=714.0
# Neutral 
PITCH_MEAN=221.4080047607422
PITCH_STD=65.20470428466797
PITCH_MIN=59.0
PITCH_MAX=442.0

In [None]:
phoneme_dict_path = os.path.abspath(os.path.join(code_dir, "scripts", "tts_dataset_files", "cmudict-0.7b_nv22.10"))
heteronyms_path = os.path.abspath(os.path.join(code_dir, "scripts", "tts_dataset_files", "heteronyms-052722"))

In [None]:
!(cd {code_dir} && python examples/tts/fastpitch_finetune.py \
  --config-name="fastpitch_align_v1.05.yaml" \
  train_dataset={train_manifest} \
  validation_datasets={val_manifest} \
  sup_data_path={supp_dir} \
  phoneme_dict_path={phoneme_dict_path}\
  heteronyms_path={heteronyms_path} \
  name="exp0032_Neutral_st_4000_lr_2e_4_batch_32_align" \
  exp_manager.exp_dir={logs_dir} \
  +init_from_pretrained_model="tts_en_fastpitch" \
  +trainer.max_steps=4000 ~trainer.max_epochs \
  trainer.check_val_every_n_epoch=25 \
  ~model.optim.sched \
  model.train_ds.dataloader_params.batch_size=8 \
  ++trainer.accumulate_grad_batches=4 \
  model.validation_ds.dataloader_params.batch_size=24 \
  model.n_speakers=1 \
  model.pitch_mean={PITCH_MEAN} \
  model.pitch_std={PITCH_STD} \
  model.pitch_fmin={PITCH_MIN} \
  model.pitch_fmax={PITCH_MAX} \
  model.optim.lr=2e-4 \
  model.optim.name=adam \
  trainer.devices=1 \
  trainer.strategy=auto \
  +model.text_tokenizer.add_blank_at=True \
  ++exp_manager.create_tensorboard_logger=True \
  ++exp_manager.create_wandb_logger=False \
  +exp_manager.checkpoint_callback_params.save_top_k=1 \
  ++exp_manager.checkpoint_callback_params.monitor=val_loss \
  +exp_manager.checkpoint_callback_params.mode=min \
  +exp_manager.checkpoint_callback_params.save_last=True \
  trainer.check_val_every_n_epoch=5 \
  trainer.log_every_n_steps=5 \
)

### Inference with HiFi-GAN

In [None]:
from nemo.collections.asr.parts.preprocessing.features import WaveformFeaturizer
from nemo.collections.tts.models import FastPitchModel
from nemo.collections.tts.models import HifiGanModel
from collections import defaultdict
import IPython.display as ipd
import matplotlib.pyplot as plt
from pathlib import Path

In [None]:
experiment_name = "exp0032_Neutral_st_4000_lr_2e_4_batch_32_align"
last_checkpoint_dir = sorted([i for i in (Path(logs_dir) / experiment_name).iterdir() if i.is_dir()])[-1] / "checkpoints"

best_fastpitch_checkpoint = sorted(last_checkpoint_dir.glob("*val_loss=*.ckpt"), key=lambda p: float(p.stem.split("val_loss=")[1].split("-")[0]) )[0]

model = FastPitchModel.load_from_checkpoint(str(best_fastpitch_checkpoint))

nemo_output_path = Path(logs_dir) / experiment_name / "Neutral_FastPitch_best.nemo"
model.save_to(str(nemo_output_path))

print(f"Best checkpoint used: {best_fastpitch_checkpoint}")
print(f"Model saved to: {nemo_output_path}")

In [None]:
wave_model = WaveformFeaturizer(sample_rate=sample_rate)
spec_model = FastPitchModel.restore_from(nemo_output_path).eval().cuda()
vocoder_model = HifiGanModel.from_pretrained("tts_en_hifigan").eval().cuda()

In [None]:
def generate_and_save_sample(
    text: str,
    speaker_id: int,
    spec_gen_model,
    vocoder_model,
    sample_rate: int,
    output_dir: str,
    base_name: str,
):
    os.makedirs(output_dir, exist_ok=True)

    tokens = spec_gen_model.parse(text)

    with torch.no_grad():
        spectrogram = spec_gen_model.generate_spectrogram(
            tokens=tokens,
            speaker=None,
            reference_spec=None,
            reference_spec_lens=None
        )

    with torch.no_grad():
        audio = vocoder_model.convert_spectrogram_to_audio(spec=spectrogram)

    audio = audio.squeeze().cpu().numpy().astype(np.float32)
    if np.max(np.abs(audio)) > 1.0:
        audio = audio / np.max(np.abs(audio))

    wav_path = os.path.join(output_dir, f"{base_name}_gen.wav")
    sf.write(wav_path, audio, samplerate=sample_rate, format='WAV', subtype='PCM_16')

    with open(os.path.join(output_dir, f"{base_name}_text.txt"), "w") as f:
        f.write(text)

    plt.figure(figsize=(10, 4))
    plt.imshow(spectrogram[0].cpu().numpy(), origin="lower", aspect="auto")
    plt.colorbar()
    plt.title(f"Spectrogram for {base_name}")
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"{base_name}_spec.png"))
    plt.close()


In [None]:
for i, val_record in tqdm(enumerate(valid_data)):
    if val_record["emotion"] == "neutral":
        generate_and_save_sample(
            text= val_record['text'],
            speaker_id=0,
            spec_gen_model=spec_model,
            vocoder_model=vocoder_model,
            sample_rate=22050,
            output_dir=f"{logs_dir}/{experiment_name}/eval",
            base_name=f"{val_record['audio_path'].split('/')[-1].replace('.wav', '')}"
        )