In [1]:
import torch
import intel_extension_for_pytorch as ipex
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
import logging
import json
from pathlib import Path


from wavlm.WavLM import WavLM, WavLMConfig
from hifigan.models import Generator as HiFiGAN
from hifigan.utils import AttrDict
from matcher import ExNOTVC
import torchaudio

import random
import numpy as np

from IPython.display import clear_output
from tqdm.notebook import tqdm as tqdm

DEVICE = 'xpu'

In [2]:
def xnot_vc(pretrained=True, progress=True, prematched=False, device='xpu') -> ExNOTVC:
    """ Load kNN-VC (WavLM encoder and HiFiGAN decoder). Optionally use vocoder trained on `prematched` data. """
    hifigan, hifigan_cfg = hifigan_wavlm(pretrained, progress, prematched, device)
    wavlm = wavlm_large(pretrained, progress, device)
    xnotvc = ExNOTVC(wavlm, hifigan, hifigan_cfg, device)
    return xnotvc


def hifigan_wavlm(pretrained=True, progress=True, prematched=False, device='xpu') -> HiFiGAN:
    """ Load pretrained hifigan trained to vocode wavlm features. Optionally use weights trained on `prematched` data. """
    #cp = Path(__file__).parent.absolute()

    with open('hifigan/config_v1_wavlm.json') as f:
        data = f.read()
    json_config = json.loads(data)
    h = AttrDict(json_config)
    device = torch.device(device)

    generator = HiFiGAN(h).to(device)
    
    if pretrained:
        if prematched:
            url = "https://github.com/bshall/knn-vc/releases/download/v0.1/prematch_g_02500000.pt"
        else:
            print("Загружаем непреметченный")
            url = "https://github.com/bshall/knn-vc/releases/download/v0.1/g_02500000.pt"
        state_dict_g = torch.hub.load_state_dict_from_url(
            url,
            map_location=device,
            progress=progress
        )
        generator.load_state_dict(state_dict_g['generator'])
    generator.eval()
    generator.remove_weight_norm()
    print(f"[HiFiGAN] Generator loaded with {sum([p.numel() for p in generator.parameters()]):,d} parameters.")
    return generator, h


def wavlm_large(pretrained=True, progress=True, device='xpu') -> WavLM:
    """Load the WavLM large checkpoint from the original paper. See https://github.com/microsoft/unilm/tree/master/wavlm for details. """
    if torch.xpu.is_available() is False:
        if str(device) != 'cpu':
            logging.warning(f"Overriding device {device} to cpu since no GPU is available.")
            device = 'cpu'
    checkpoint = torch.hub.load_state_dict_from_url(
        "https://github.com/bshall/knn-vc/releases/download/v0.1/WavLM-Large.pt", 
        map_location=device, 
        progress=progress
    )
    
    cfg = WavLMConfig(checkpoint['cfg'])
    device = torch.device(device)
    model = WavLM(cfg)
    if pretrained:
        model.load_state_dict(checkpoint['model'])
    model = model.to(device)
    model.eval()
    print(f"WavLM-Large loaded with {sum([p.numel() for p in model.parameters()]):,d} parameters.")
    return model


In [3]:

xnotvc = xnot_vc() #загружаем веса для WavLM, HiFi Gan, убираем флажок преметчинга 




Загружаем непреметченный
Removing weight norm...
[HiFiGAN] Generator loaded with 16,523,393 parameters.


2024-08-10 13:32:50,627 - wavlm.WavLM - INFO - WavLM Config: {'extractor_mode': 'layer_norm', 'encoder_layers': 24, 'encoder_embed_dim': 1024, 'encoder_ffn_embed_dim': 4096, 'encoder_attention_heads': 16, 'activation_fn': 'gelu', 'layer_norm_first': True, 'conv_feature_layers': '[(512,10,5)] + [(512,3,2)] * 4 + [(512,2,2)] * 2', 'conv_bias': False, 'feature_grad_mult': 1.0, 'normalize': True, 'dropout': 0.0, 'attention_dropout': 0.0, 'activation_dropout': 0.0, 'encoder_layerdrop': 0.0, 'dropout_input': 0.0, 'dropout_features': 0.0, 'mask_length': 10, 'mask_prob': 0.8, 'mask_selection': 'static', 'mask_other': 0.0, 'no_mask_overlap': False, 'mask_min_space': 1, 'mask_channel_length': 10, 'mask_channel_prob': 0.0, 'mask_channel_selection': 'static', 'mask_channel_other': 0.0, 'no_mask_channel_overlap': False, 'mask_channel_min_space': 1, 'conv_pos': 128, 'conv_pos_groups': 16, 'relative_position_embedding': True, 'num_buckets': 320, 'max_distance': 800, 'gru_rel_pos': True}


WavLM-Large loaded with 315,453,120 parameters.


### Пути к аудиодорожкам с женским голосом и мужским

Domain source - домен с исходным голосом, Domain target - домен с целевым голосом

In [4]:
domain_target_path = "experiments/2300/to_female/female_3570_30.flac"
domain_source_path = "experiments/2300/male_2300_30.flac"


### Кодируем аудиодорожки с помощью WavLM 

In [5]:
features_source = xnotvc.get_features(domain_source_path, )
features_target = xnotvc.get_features(domain_target_path)

## **KNNVC TEST** (skip)

In [6]:
validation_path = "experiments/2300/valid_long.flac"

In [7]:
ref_wav_paths = [domain_target_path] #target
src_wav_path = validation_path  #validation 

In [8]:
query_seq = xnotvc.get_features(src_wav_path)

In [9]:
matching_set = xnotvc.get_matching_set(ref_wav_paths)

In [10]:
out_wav = xnotvc.match(query_seq, matching_set, topk=4)

torch.Size([880, 1024])


In [12]:
torchaudio.save('trash/2300/res.flac', out_wav[None], 16000)

## **END OF TEST**

WavLM каждые ~20-30 мс звуковой дорожки представляет в виде вектора фичей размерностью 1024

In [13]:
print(features_source.shape) #[длина дорожки делить на ~20-30мс , 1024] = массив из 7971 векторов размерностью 1024

torch.Size([1493, 1024])


In [14]:
print(features_target.shape) #массив из 5473 векторов размерностью 1024

torch.Size([1418, 1024])


### Sampler from voice distributions

Случайным образом выбираем батч векторов фичей

In [15]:
print(features_target.shape)

torch.Size([1418, 1024])


In [16]:
def sample_from_tensor(t, batch_size = 32):
    tensor = t.detach().cpu().numpy()
    batch = np.empty((batch_size, 1024))
    for i in range(batch_size):
        indice = random.randrange(0, tensor.shape[0])
        sample_tensor = tensor[indice]
        batch[i]=sample_tensor
    return batch.astype(np.float32)

In [17]:
sample_from_tensor(features_source).shape

(32, 1024)

In [18]:
sample_from_tensor(features_target).shape

(32, 1024)

### Функция стоимости

In [19]:
def sq_cost(X,Y):
    return (X-Y).square().flatten(start_dim=1).mean(dim=1)

COST = sq_cost

### Define NNs for transport map ***T*** : R^1024 -> R^1024 (generator) and potential ***f*** : R^1024 -> R (discriminator).
Medium size multilayer perceptrons with ReLU.

In [41]:
import itertools

class NegAbs(nn.Module):
    def __init__(self):
        super(NegAbs, self).__init__()

    def forward(self, input):
        return -torch.abs(input)

T = nn.Sequential(
    *itertools.chain.from_iterable([[nn.Linear(1024, 1024),
    nn.ReLU(True),] for _ in range(7)]),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    nn.Linear(1024, 1024),
).to(DEVICE)

f = nn.Sequential(
    *itertools.chain.from_iterable([[nn.Linear(1024, 1024),
    nn.ReLU(True),] for _ in range(7)]),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    # nn.Linear(1024, 1024),
    # nn.ReLU(True),
    nn.Linear(1024, 1),
    NegAbs(),
).to(DEVICE)

def weight_reset(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        m.reset_parameters()


print('T params:', np.sum([np.prod(p.shape) for p in T.parameters()]))
print('f params:', np.sum([np.prod(p.shape) for p in f.parameters()]))

T params: 8396800
f params: 7348225


Оптимизаторы

In [42]:

T_opt = torch.optim.Adam(T.parameters(), lr=1e-4, weight_decay=1e-10)
f_opt = torch.optim.Adam(f.parameters(), lr=1e-4, weight_decay=1e-10)

In [15]:
#Parameters
T_ITERS = 10 # T updates per 1 f update
MAX_ITERS = 20001

to_torch = lambda x: torch.Tensor(x).to(DEVICE)

In [16]:
X = to_torch(sample_from_tensor(features_source))
Y = to_torch(sample_from_tensor(features_target))

In [17]:
sq_cost(X,Y)

tensor([16.7152, 21.0695, 10.0319, 16.4119, 19.3818, 18.5900, 17.9818, 12.2870,
        15.0837,  9.7668, 17.7889, 21.6511, 19.4071, 13.0706, 21.2476, 12.2096,
        12.4429, 14.6963, 20.2154, 17.1835, 17.8451,  9.5174, 14.4012, 17.2628,
        16.7337, 17.0727,  9.0887, 17.7961, 15.4787, 17.3981, 16.1379, 14.5723],
       device='cuda:0')

## Обучение

Сбрасываем веса

In [18]:
weight_reset(T); weight_reset(f)

Параметр W из алгоритма неполного транспорта

In [19]:
W=20

Алгоритм неполного оптимального транспорта

In [21]:
#XNOT algo

for step in tqdm (range(MAX_ITERS)):
    T_loss_ar = []
    f_loss_ar = []
    #T optimization
    T.train(True); f.eval()
    for t_iter in range(T_ITERS):
        X = to_torch(sample_from_tensor(features_source)) 
        T_loss = COST(X, T(X)).mean() - f(T(X)).mean()
        with torch.no_grad():
            T_loss_ar.append(T_loss.cpu().detach().numpy().mean())
        T_opt.zero_grad(); T_loss.backward(); T_opt.step()

    #f optimization
    T.eval(); f.train(True)
    X, Y = to_torch(sample_from_tensor(features_source)), to_torch(sample_from_tensor(features_target))
    f_loss = f(T(X)).mean() - (W * f(Y)).mean()
    with torch.no_grad():
            f_loss_ar.append(f_loss.cpu().detach().numpy())
    f_opt.zero_grad(); f_loss.backward(); f_opt.step()

    if step % 100 == 0:
        clear_output(wait=True)
        print("Step", step)
        print("T_loss", T_loss_ar)
        print("f_loss", f_loss_ar)




Step 20000
T_loss [np.float32(3.4607031), np.float32(3.1610453), np.float32(3.1301904), np.float32(2.980997), np.float32(2.8818643), np.float32(2.8462367), np.float32(3.0384846), np.float32(3.2547486), np.float32(3.2168407), np.float32(3.273753)]
f_loss [array(-0.04195407, dtype=float32)]


In [22]:
torch.save(T.state_dict(), "checkpoints/xnot_vc_2300_3570_30s_20k_W_20.pth")

## Транспорт в новый домен

### Функция, которая заменяет каждый исходный вектор фичей на преобразованный

In [23]:
def xnot_transform(source_tensor):
    with torch.no_grad():
        result = torch.empty(source_tensor.shape[0],1024) #Initialize empty tensor with the same dimensions as an input feature vector
        for i in range(source_tensor.shape[0]):
            result[i] = T(source_tensor[i])

    return result


### Загружаем звуковую дорожку с женским голосом, которой не было в тренировочной выборке и формируем массив из векторов фичей с помощью WavLM

In [24]:
validation_path = "experiments/2300/valid_long.flac" #Путь до файла с женским голосом для валидации
features_valid = xnotvc.get_features(validation_path) #Формируем массив из векторов фичей

In [25]:
print(features_valid.shape) #Исходный массив: 646 векторов размерностью 1024


torch.Size([880, 1024])


Формируем преобразованный массив

In [26]:
feautures_transformed = xnot_transform(features_valid) #Преобразованный массив

In [27]:
print(feautures_transformed.shape) #Размерность преобразованного массива должна совпасть с размерностью исходного = 646х1024

torch.Size([880, 1024])


In [28]:
print(feautures_transformed[5])

tensor([ 1.5977,  1.7348, -2.3709,  ...,  0.5059, -3.1327, -1.8742])


## Вокодинг

Применяем метод для вокодинга фичей с помощью HiFi GAN

In [29]:
out_wav = xnotvc.vocode(feautures_transformed[None].to(DEVICE)).cpu().squeeze() #Вокодинг преобразованного с помощью XNOT вектора фичей
        

In [30]:
print(out_wav.shape)

torch.Size([281600])


Сохраняем файл с аудиодорожкой

In [31]:
torchaudio.save('xnotvc_2300_3570_30_long_W20.flac', out_wav[None], 16000)