In [30]:
import argparse
import json
import librosa
import os
import random
import torch
import torchaudio
from tqdm import tqdm

from unitspeech.unitspeech import UnitSpeech
from unitspeech.encoder import Encoder
from unitspeech.speaker_encoder.ecapa_tdnn import ECAPA_TDNN_SMALL
from unitspeech.textlesslib.textless.data.speech_encoder import SpeechEncoder
from unitspeech.util import HParams, fix_len_compatibility, process_unit, generate_path, sequence_mask
from unitspeech.vocoder.env import AttrDict
from unitspeech.vocoder.meldataset import mel_spectrogram
from unitspeech.vocoder.models import BigVGAN

In [31]:
args = {
    "reference_path": "path_to_reference_audio",
    "encoder_path": "unitspeech/checkpoints/unit_encoder.pt",
    "decoder_path": "unitspeech/checkpoints/pretrained_decoder.pt",
    "speaker_encoder_path": "unitspeech/speaker_encoder/checkpts/speaker_encoder.pt",
    "config_path": "unitspeech/checkpoints/finetune.json",
    "output_decoder_path": "unitspeech/outputs/finetuned_decoder.pt",
    "n_iters": 500,
    "learning_rate": 2e-5,
    "fp16_run": False
}

In [32]:
args.get("config_path")

'unitspeech/checkpoints/finetune.json'

In [33]:
with open(args.get("config_path"), "r") as f:
    data = f.read()
config = json.loads(data)

In [34]:
hps = HParams(**config)
hps

{'data': {'n_units': 1000, 'n_feats': 80, 'n_fft': 1024, 'hop_length': 256, 'win_length': 1024, 'sampling_rate': 22050, 'mel_fmin': 0.0, 'mel_fmax': 8000.0}, 'encoder': {'n_channels': 192, 'filter_channels': 768, 'n_layers': 6, 'kernel_size': 3, 'p_dropout': 0.1, 'n_heads': 2, 'window_size': 4}, 'decoder': {'dim': 128, 'dim_mults': [1, 2, 4, 8], 'pe_scale': 1000, 'beta_min': 0.05, 'beta_max': 20.0, 'spk_emb_dim': 256}, 'train': {'out_size_second': 2, 'vocoder_config_path': 'unitspeech/vocoder/checkpts/bigvgan-config.json', 'vocoder_ckpt_path': 'unitspeech/vocoder/checkpts/bigvgan.pt'}}

## Main

In [35]:
segment_size = fix_len_compatibility(
    hps.train.out_size_second * hps.data.sampling_rate // hps.data.hop_length,
    len(hps.decoder.dim_mults) - 1
)
segment_size

176

In [36]:
num_units = hps.data.n_units
num_units

1000

### Speaker Encoder

- Load the pre-trained speaker encoder model
- Set it to evaluation mode: no training is needed -> KEEP FROZEN

In [37]:
spk_embedder = ECAPA_TDNN_SMALL(feat_dim=1024,
                                feat_type="wavlm_large", # NOTE: see other feature types
                                config_path=None)
state_dict = torch.load(args.speaker_encoder_path,
                        map_location=lambda storage, loc: storage)
spk_embedder.load_state_dict(state_dict['model'],
                             strict=False)
_ = spk_embedder.cuda().eval()

Using cache found in /home/astanea/.cache/torch/hub/s3prl_s3prl_main
  from .autonotebook import tqdm as notebook_tqdm
2024-03-31 13:50:40 | INFO | s3prl.util.download | Requesting URL: https://huggingface.co/s3prl/converted_ckpts/resolve/main/wavlm_large.pt
2024-03-31 13:50:40 | INFO | s3prl.util.download | Using URL's local file: /home/astanea/.cache/s3prl/download/f2d5200177fd6a33b278b7b76b454f25cd8ee866d55c122e69fccf6c7467d37d.wavlm_large.pt
2024-03-31 13:50:44 | INFO | s3prl.upstream.wavlm.WavLM | WavLM Config: {'extractor_mode': 'layer_norm', 'encoder_layers': 24, 'encoder_embed_dim': 1024, 'encoder_ffn_embed_dim': 4096, 'encoder_attention_heads': 16, 'activation_fn': 'gelu', 'layer_norm_first': True, 'conv_feature_layers': '[(512,10,5)] + [(512,3,2)] * 4 + [(512,2,2)] * 2', 'conv_bias': False, 'feature_grad_mult': 1.0, 'normalize': True, 'dropout': 0.0, 'attention_dropout': 0.0, 'activation_dropout': 0.0, 'encoder_layerdrop': 0.0, 'dropout_input': 0.0, 'dropout_features': 0.0, '

: 

### Unit extracter -> from textlesslib

- The speech encoder should just generated units, load and use it as it it: FREEZE

In [None]:
dense_model_name = "mhubert-base-vp_en_es_fr"
quantizer_name, vocab_size = "kmeans", 1000 # TODO: 1000 might be n_units hyperparameter from data.n_units

unit_extractor = SpeechEncoder.by_name(
    dense_model_name=dense_model_name,
    quantizer_model_name=quantizer_name,
    vocab_size=vocab_size,
    deduplicate=True,
    need_f0=False
)
_ = unit_extractor.cuda().eval()

2024-03-31 13:24:21 | INFO | fairseq.tasks.hubert_pretraining | current directory is /home/astanea/dev/UnitSpeech
2024-03-31 13:24:21 | INFO | fairseq.tasks.hubert_pretraining | HubertPretrainingTask Config {'_name': 'hubert_pretraining', 'data': '/checkpoint/annl/s2st/data/voxpopuli/mHuBERT/en_es_fr', 'fine_tuning': False, 'labels': ['km'], 'label_dir': '/checkpoint/wnhsu/experiments/hubert/kmeans/mhubert_vp_en_es_fr_it2_400k/en_es_fr.layer9.km500', 'label_rate': 50.0, 'sample_rate': 16000, 'normalize': False, 'enable_padding': False, 'max_keep_size': None, 'max_sample_size': 250000, 'min_sample_size': 32000, 'single_target': False, 'random_crop': True, 'pad_audio': False}
2024-03-31 13:24:21 | INFO | fairseq.models.hubert.hubert | HubertModel Config: {'_name': 'hubert', 'label_rate': 50.0, 'extractor_mode': default, 'encoder_layers': 12, 'encoder_embed_dim': 768, 'encoder_ffn_embed_dim': 3072, 'encoder_attention_heads': 12, 'activation_fn': gelu, 'layer_type': transformer, 'dropout':

In [None]:
# TODO: See loading wav, and preprocessing: normalization, extract speaker embedding

In [None]:
def normalize_mel_spec(mel):
    mel_min = mel.min(-1, keepdim=True)[0]
    mel_max = mel.max(-1, keepdim=True)[0]
    
    mel = (mel - mel_min) / (mel_max - mel_min) * 2 - 1 # Interval: [-1, 1]
    
    return (mel - mel_min) / (mel_max - mel_min) * 2 - 1

In [None]:
def get_speaker_embedding(spk_embedder, wav):
    with torch.no_grad():
        spk_embedding = spk_embedder(wav)
        spk_embedding = spk_embedding / spk_embedding.norm()
    
    return spk_embedding

In [None]:
def get_features():
    wav, sr = librosa.load(args.get("reference_path"))
    wav = torch.FloatTensor(wav).unsqueeze(0) # Add batch dimension: (1, num_samples)
    
    # (batch_dim, n_mels, n_frames)
    mel = mel_spectrogram(wav,
                        hps.data.n_fft,
                        hps.data.n_feats,
                        hps.data.sampling_rate,
                        hps.data.hop_length,
                        hps.data.win_length,
                        hps.data.mel_fmin,
                        hps.data.mel_fmax,
                        center=False)
    mel = normalize_mel_spec(mel).cuda()

    # Sr was at 22050, resample to 16000
    resample_fn = torchaudio.transforms.Resample(sr, 16000).cuda()
    wav = resample_fn(wav.cuda())
    
    # Extract speaker embedding with Ecapa-TDNN
    spk_embedding = get_speaker_embedding(spk_embedder, wav)

    # Extract units with dense model
    encoded = unit_extractor(wav.to("cuda"))
    unit, duration = process_unit(encoded, hps.data.sampling_rate, hps.data.hop_length)

### Unit Encoder

- Training UnitEncoder model => freeze decoder (GradTTS) which should be trained apriori, then adapt the unit encoder to represent the units in the same latent space the text encoder does.

In [None]:
unitspeech = UnitSpeech(
    n_feats=hps.data.n_feats,
    **hps.decoder
)

decoder_dict = torch.load(args.get("decoder_path"), map_location=lambda loc, storage: loc)
unitspeech.load_state_dict(decoder_dict['model'])
_ = unitspeech.cuda().eval()

In [None]:
optimizer = torch.optim.Adam(params=unitspeech.parameters(),
                             lr=args.get("learning_rate"))

if args.fp16_run:
    scaler = torch.cuda.amp.GradScaler()

In [None]:
unit_encoder = Encoder(
    n_vocab=num_units,
    n_feats=hps.data.n_feats,
    **hps.encoder
)
_ = unit_encoder.cuda().train()

In [None]:
device = next(unit_encoder.parameters()).device
device

device(type='cuda', index=0)

In [None]:
for epoch in range(1, hps.train.n_epochs + 1):
    unitspeech.eval()
    unit_encoder.train()