In [1]:
import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
import logging
import json
from pathlib import Path


from wavlm.WavLM import WavLM, WavLMConfig
from hifigan.models import Generator as HiFiGAN
from hifigan.utils import AttrDict
from matcher import ExNOTVC
import torchaudio

import random
import numpy as np

from IPython.display import clear_output
from tqdm import tqdm

DEVICE = 'cuda'

In [2]:
def xnot_vc(pretrained=True, progress=True, prematched=True, device='cuda') -> ExNOTVC:
    """ Load kNN-VC (WavLM encoder and HiFiGAN decoder). Optionally use vocoder trained on `prematched` data. """
    hifigan, hifigan_cfg = hifigan_wavlm(pretrained, progress, prematched, device)
    wavlm = wavlm_large(pretrained, progress, device)
    xnotvc = ExNOTVC(wavlm, hifigan, hifigan_cfg, device)
    return xnotvc


def hifigan_wavlm(pretrained=True, progress=True, prematched=True, device='cuda') -> HiFiGAN:
    """ Load pretrained hifigan trained to vocode wavlm features. Optionally use weights trained on `prematched` data. """
    #cp = Path(__file__).parent.absolute()

    with open('hifigan/config_v1_wavlm.json') as f:
        data = f.read()
    json_config = json.loads(data)
    h = AttrDict(json_config)
    device = torch.device(device)

    generator = HiFiGAN(h).to(device)
    
    if pretrained:
        if prematched:
            url = "https://github.com/bshall/knn-vc/releases/download/v0.1/prematch_g_02500000.pt"
        else:
            print("Загружаем непреметченный")
            url = "https://github.com/bshall/knn-vc/releases/download/v0.1/g_02500000.pt"
        state_dict_g = torch.hub.load_state_dict_from_url(
            url,
            map_location=device,
            progress=progress
        )
        generator.load_state_dict(state_dict_g['generator'])
    generator.eval()
    generator.remove_weight_norm()
    print(f"[HiFiGAN] Generator loaded with {sum([p.numel() for p in generator.parameters()]):,d} parameters.")
    return generator, h


def wavlm_large(pretrained=True, progress=True, device='cuda') -> WavLM:
    """Load the WavLM large checkpoint from the original paper. See https://github.com/microsoft/unilm/tree/master/wavlm for details. """
    if torch.cuda.is_available() == False:
        if str(device) != 'cpu':
            logging.warning(f"Overriding device {device} to cpu since no GPU is available.")
            device = 'cpu'
    checkpoint = torch.hub.load_state_dict_from_url(
        "https://github.com/bshall/knn-vc/releases/download/v0.1/WavLM-Large.pt", 
        map_location=device, 
        progress=progress
    )
    
    cfg = WavLMConfig(checkpoint['cfg'])
    device = torch.device(device)
    model = WavLM(cfg)
    if pretrained:
        model.load_state_dict(checkpoint['model'])
    model = model.to(device)
    model.eval()
    print(f"WavLM-Large loaded with {sum([p.numel() for p in model.parameters()]):,d} parameters.")
    return model


In [3]:

xnotvc = xnot_vc() #загружаем веса для WavLM, HiFi Gan, убираем флажок преметчинга 


  WeightNorm.apply(module, name, dim)


Removing weight norm...
[HiFiGAN] Generator loaded with 16,523,393 parameters.
WavLM-Large loaded with 315,453,120 parameters.


In [4]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.metrics import mean_squared_error

### Пути к аудиодорожкам с женским голосом и мужским

Domain source - домен с исходным голосом, Domain target - домен с целевым голосом

In [29]:
domain_target_path = "experiments/2300/to_male/male_8455.flac"
domain_source_path = "experiments/2300/male_2300.flac"


### Кодируем аудиодорожки с помощью WavLM 

In [30]:
features_source = xnotvc.get_features(domain_source_path).to("cpu")
features_target = xnotvc.get_features(domain_target_path)

In [31]:
from sklearn.model_selection import train_test_split

target = xnotvc.match(features_source, features_target).to('cpu')
# Split the data into training (80%) and test (20%) sets
X_train, X_test, y_train, y_test = train_test_split(features_source, target, test_size=0.2, random_state=42)


In [32]:
model = Ridge(alpha=0.1)

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse:.4f}")

Mean Squared Error: 1.0462


In [33]:
model = LinearRegression()

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse:.4f}")

Mean Squared Error: 1.0462


In [None]:
y_pred.shape

In [34]:
def regression_transform(source_tensor):
    result = np.empty((source_tensor.shape[0],1024)) #Initialize empty tensor with the same dimensions as an input feature vector
    for i in range(source_tensor.shape[0]):
        result[i] = model.predict(source_tensor[i])
    return result


In [34]:
validation_path = "experiments/2300/valid_long.flac" #Путь до файла с женским голосом для валидации
features_valid = xnotvc.get_features(validation_path).to("cpu") #Формируем массив из векторов фичей
feautures_transformed = torch.Tensor(model.predict(features_valid)).to("cuda")#Преобразованный массив
#feautures_transformed = regression_transform(features_valid) #Преобразованный массив
out_wav = xnotvc.vocode(feautures_transformed[None].to(DEVICE)).cpu().squeeze()
torchaudio.save('experiments/2300/to_male/results/linreg/linreg_2300_8455.flac', out_wav[None], 16000)

## **KNNVC TEST** (skip)

In [6]:
validation_path = "experiments/2300/valid_long.flac"

In [7]:
ref_wav_paths = [domain_target_path] #target
src_wav_path = validation_path  #validation 

In [8]:
query_seq = xnotvc.get_features(src_wav_path)

In [9]:
matching_set = xnotvc.get_matching_set(ref_wav_paths)

In [None]:
out_wav = xnotvc.match(query_seq, matching_set, topk=4)

In [11]:
torchaudio.save('experiments/2300/to_female/results/knnvc_2300_3570_30_long.flac', out_wav[None], 16000)

## **END OF TEST**

WavLM каждые ~20-30 мс звуковой дорожки представляет в виде вектора фичей размерностью 1024

In [None]:
print(features_source.shape) #[длина дорожки делить на ~20-30мс , 1024] = массив из 7971 векторов размерностью 1024

In [None]:
print(features_target.shape) #массив из 5473 векторов размерностью 1024

### Sampler from voice distributions

Случайным образом выбираем батч векторов фичей

In [None]:
print(features_target.shape)

In [29]:
def sample_from_tensor(t, batch_size = 32):
    tensor = t.detach().cpu().numpy()
    batch = np.empty((batch_size, 1024))
    for i in range(batch_size):
        indice = random.randrange(0, tensor.shape[0])
        sample_tensor = tensor[indice]
        batch[i]=sample_tensor
    return batch.astype(np.float32)

In [None]:
sample_from_tensor(features_source).shape

In [None]:
sample_from_tensor(features_target).shape

### Функция стоимости

In [18]:
def sq_cost(X,Y):
    return (X-Y).square().flatten(start_dim=1).mean(dim=1)

COST = sq_cost

### Define NNs for transport map ***T*** : R^1024 -> R^1024 (generator) and potential ***f*** : R^1024 -> R (discriminator).
Medium size multilayer perceptrons with ReLU.

In [None]:
class NegAbs(nn.Module):
    def __init__(self):
        super(NegAbs, self).__init__()

    def forward(self, input):
        return -torch.abs(input)

T = nn.Sequential(
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
).to(DEVICE)

f = nn.Sequential(
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1024),
    nn.ReLU(True),
    nn.Linear(1024, 1),
    NegAbs(),
).to(DEVICE)

def weight_reset(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        m.reset_parameters()


print('T params:', np.sum([np.prod(p.shape) for p in T.parameters()]))
print('f params:', np.sum([np.prod(p.shape) for p in f.parameters()]))

Оптимизаторы

In [20]:

T_opt = torch.optim.Adam(T.parameters(), lr=1e-4, weight_decay=1e-10)
f_opt = torch.optim.Adam(f.parameters(), lr=1e-4, weight_decay=1e-10)

In [48]:
#Parameters
T_ITERS = 10 # T updates per 1 f update
MAX_ITERS = 30001

to_torch = lambda x: torch.Tensor(x).to(DEVICE)

In [30]:
X = to_torch(sample_from_tensor(features_source))
Y = to_torch(sample_from_tensor(features_target))

In [None]:
sq_cost(X,Y)

## Обучение

Сбрасываем веса

In [49]:
weight_reset(T); weight_reset(f)

Параметр W из алгоритма неполного транспорта

In [50]:
W=1

Алгоритм неполного оптимального транспорта

In [None]:
#XNOT algo

for step in tqdm (range(MAX_ITERS)):

    T.train(True); f.eval()
    for t_iter in range(T_ITERS):
        X = to_torch(sample_from_tensor(features_source)) 
        T_loss = COST(X, T(X)).mean() - f(T(X)).mean()
        T_opt.zero_grad(); T_loss.backward(); T_opt.step()

    #f optimization
    T.eval(); f.train(True)
    X = to_torch(sample_from_tensor(features_source))
    #Y = to_torch(sample_from_tensor(features_target))
    Y_G = xnotvc.match(X, features_target)
    Y = Y_G.clone()
    f_loss = f(T(X)).mean() - (W * f(Y)).mean()
    f_opt.zero_grad(); f_loss.backward(); f_opt.step()

    if step % 100 == 0:
        clear_output(wait=True)
        print("Step", step)




In [22]:
torch.save(T.state_dict(), "checkpoints/xnot_vc_2300_3570_30s_20k_W_20.pth")

## Транспорт в новый домен

### Функция, которая заменяет каждый исходный вектор фичей на преобразованный

In [52]:
def xnot_transform(source_tensor):
    with torch.no_grad():
        result = torch.empty(source_tensor.shape[0],1024) #Initialize empty tensor with the same dimensions as an input feature vector
        for i in range(source_tensor.shape[0]):
            result[i] = T(source_tensor[i])

    return result


### Загружаем звуковую дорожку с женским голосом, которой не было в тренировочной выборке и формируем массив из векторов фичей с помощью WavLM

In [53]:
validation_path = "experiments/2300/valid_long.flac" #Путь до файла с женским голосом для валидации
features_valid = xnotvc.get_features(validation_path) #Формируем массив из векторов фичей

In [None]:
print(features_valid.shape) #Исходный массив: 646 векторов размерностью 1024


Формируем преобразованный массив

In [55]:
feautures_transformed = xnot_transform(features_valid) #Преобразованный массив

In [None]:
print(feautures_transformed.shape) #Размерность преобразованного массива должна совпасть с размерностью исходного = 646х1024

In [None]:
print(feautures_transformed[5])

## Вокодинг

Применяем метод для вокодинга фичей с помощью HiFi GAN

In [56]:
out_wav = xnotvc.vocode(feautures_transformed[None].to(DEVICE)).cpu().squeeze() #Вокодинг преобразованного с помощью XNOT вектора фичей
        

In [None]:
print(out_wav.shape)

Сохраняем файл с аудиодорожкой

In [57]:
torchaudio.save('otknn female_30.flac', out_wav[None], 16000)