In [1]:
import os
import random
from typing import Tuple, Dict

import numpy as np
import torch
from torch import optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
from dotenv import load_dotenv
import torch.nn.functional as F

import MD.main as main
import pickle

In [2]:
load_dotenv()
data_path = r'{}'.format(os.environ['DATASET_PATH'])

# Получение данных
id_list = os.listdir(data_path)
batch_size = 256
mfcc_count = 20
target_sr = 16000

In [3]:
with open('voice_params_20pers_20mfccs_with_normalize.pkl', 'rb') as f:
    voice_params = pickle.load(f)

In [3]:
# Получение голосовых признаков
voice_params = {}
pickle_file = r'voice_params_20pers_20mfccs_with_normalize.pkl'
for person_id in id_list:
    files = main.get_audio_for_id(data_path, person_id)
    person_params = []
    for file in files:
        normilize_audio = main.preprocess_audio(file, target_sr=target_sr, segment_length=3)
        person_params.extend([main.get_mfccs(audio, sample_rate=target_sr, n_mfcc=mfcc_count) for audio in normilize_audio])
    voice_params[person_id] = person_params
    print(f'Person {person_id} saved.')
with open(pickle_file, 'wb') as f:
    pickle.dump(voice_params, f)

Person id10001 saved.
Person id10002 saved.
Person id10003 saved.
Person id10004 saved.
Person id10005 saved.
Person id10006 saved.
Person id10007 saved.
Person id10008 saved.
Person id10009 saved.
Person id10010 saved.
Person id10011 saved.
Person id10012 saved.
Person id10013 saved.
Person id10014 saved.
Person id10015 saved.
Person id10016 saved.
Person id10017 saved.
Person id10018 saved.
Person id10019 saved.
Person id10020 saved.


In [9]:
voice_params['id10011'][10].shape

(20, 32)

In [11]:
class VoiceEmbeddingModel(nn.Module):
    def __init__(self, input_size: int = 40, channels_size: int = 128, lstm_out_size: int = 128):
        super(VoiceEmbeddingModel, self).__init__()
        self.conv1 = nn.Conv2d(input_size, channels_size, 5, padding=2)
        self.bn1 = nn.BatchNorm1d(channels_size)
        self.pool = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(channels_size, channels_size, 5, padding=2)
        self.flatten = nn.Flatten()
        # num_channels = channels_size * (128//4)
        self.lstm1 = nn.LSTM(channels_size, lstm_out_size, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(lstm_out_size * 2, 256)
        self.fc2 = nn.Linear(256, 128)  # Предположим, размерность эмбеддинга 128

    def forward(self, x):
        x = self.pool(nn.functional.relu(self.bn1(self.conv1(x))))
        x = self.pool(nn.functional.relu(self.bn1(self.conv2(x))))
        x = x.transpose(1, 2)
        x, _ = self.lstm1(x)
        x = x[:, -1, :]
        # x = self.flatten(x)
        x = self.dropout(nn.functional.relu(self.fc1(x)))
        x = self.fc2(x)
        return x
    
class VoiceEmbeddingCNN(nn.Module):
    def __init__(self):
        super(VoiceEmbeddingCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3), padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(32 * 5 * 8, 128)

    def forward(self, x):
        x = x.unsqueeze(1)  # Добавляем размерность канала
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = x.view(-1, 32 * 5 * 8)  # Выравниваем в одномерный вектор
        x = F.relu(self.fc1(x))
        return x

In [10]:
class VoicePairsDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        mfcc1, mfcc2, label = self.pairs[idx]
        return torch.tensor(mfcc1).t(), torch.tensor(mfcc2).t(), torch.tensor([label])

    @staticmethod
    def collate_fn(batch) -> Tuple:
        mfcc1s, mfcc2s, labels = zip(*batch)
        mfcc1s_padded = pad_sequence(mfcc1s, batch_first=True, padding_value=0).transpose(1, 2)
        mfcc2s_padded = pad_sequence(mfcc2s, batch_first=True, padding_value=0).transpose(1, 2)

        labels = torch.stack(labels)

        return mfcc1s_padded, mfcc2s_padded, labels


class VoiceTripletsDataset(Dataset):
    def __init__(self, pairs):
        self.triplets = pairs

    def __len__(self):
        return len(self.triplets)

    def __getitem__(self, idx):
        mfcc1, mfcc2, mfcc3 = self.triplets[idx]
        return torch.tensor(mfcc1).t(), torch.tensor(mfcc2).t(), torch.tensor(mfcc3).t()

    @staticmethod
    def collate_fn(batch) -> Tuple:
        mfcc1s, mfcc2s, mfcc3 = zip(*batch)
        mfcc1s_padded = pad_sequence(mfcc1s, batch_first=True, padding_value=0).transpose(1, 2)
        mfcc2s_padded = pad_sequence(mfcc2s, batch_first=True, padding_value=0).transpose(1, 2)
        mfcc3_padded = pad_sequence(mfcc3, batch_first=True, padding_value=0).transpose(1, 2)

        return mfcc1s_padded, mfcc2s_padded, mfcc3_padded

In [12]:
class ConstrastiveLoss(nn.Module):
    def __init__(self, margin: float = 1.0):
        super(ConstrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1 - label) * torch.pow(euclidean_distance, 0.5) +
                                      (label) * torch.pow(
            torch.clamp(self.margin - euclidean_distance, min=0.0), 0.5))
        loss_contrastive *= 1000
        return loss_contrastive


class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        positive_distance = F.pairwise_distance(anchor, positive)
        negative_distance = F.pairwise_distance(anchor, negative)
        losses = F.relu(positive_distance - negative_distance + self.margin)
        return losses.mean()

In [13]:
def cosine_similarity(vec1, vec2):
    return torch.nn.functional.cosine_similarity(vec1.unsqueeze(0), vec2.unsqueeze(0))

In [14]:
def train_model(model: nn.Module, dataloaders: Dict, criterion, lr=0.001,
                epoches: int = 25, device: str = 'cuda') -> nn.Module:
    """
    Обучает модель и выводит информацию о процессе обучения.
   :param model: torch.nn.Module -  Модель для обучения.
   :param dataloaders: dict - Словарь содержащий 'train' и 'val' DataLoader.
   :param criterion: torch.nn.modules.loss - Функция потерь.
   :param lr: float
   :param epoches: int - Количество эпох обучения.
   :param device: str - Устройство для обучения ('cuda' или 'cpu').
    :return: nn.Module - Обученная модель.
    """
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_model_wts = model.state_dict()
    best_acc = 0.0
    for epoch in range(epoches):
        print(f'Epoch {epoch + 1}/{epoches}')
        print('-' * 10)

        # Каждая эпоха имеет фазу обучения и валидации
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Установка модели в режим обучения
            else:
                model.eval()  # Установка модели в режим оценки

            running_loss = 0.0
            running_corrects = 0

            # Итерация по данным.
            for inputs1, inputs2, inputs3 in dataloaders[phase]:
                inputs1 = inputs1.to(device)
                inputs2 = inputs2.to(device)
                inputs3 = inputs3.to(device)

                # Обнуление градиентов параметров
                optimizer.zero_grad()

                # Прямой проход
                with torch.set_grad_enabled(phase == 'train'):
                    outputs1 = model(inputs1)
                    outputs2 = model(inputs2)
                    outputs3 = model(inputs3)
                    loss = criterion(outputs1, outputs2, outputs3)

                    # Обратное распространение и оптимизация только в фазе обучения
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Статистика
                running_loss += loss.item() * inputs1.size(0)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects / len(dataloaders[phase].dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Копирование модели, если она показала лучшую точность
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()
        print()
    print(f'Лучшая точность валидации: {best_acc:.4f}')

    # Загрузка лучших весов модели
    model.load_state_dict(best_model_wts)
    return model

In [15]:
data_pairs = main.create_triplets(voice_params)
random.shuffle(data_pairs)
# Разделение на обучающую и валидационную выборки
val_size = int(0.2 * len(data_pairs))
data_train = data_pairs[val_size:]
data_val = data_pairs[:val_size]

dataset_train = VoiceTripletsDataset(data_train)
dataset_val = VoiceTripletsDataset(data_val)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True,
                              collate_fn=VoiceTripletsDataset.collate_fn)
dataloader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=True,
                            collate_fn=VoiceTripletsDataset.collate_fn)
dataloaders = {'train': dataloader_train,
               'val': dataloader_val}

In [16]:
model = VoiceEmbeddingCNN()
model = train_model(model, dataloaders, TripletLoss(), epoches=10)
torch.save(model.state_dict(), f'speak_rec_20_256_128_15epo_triplets_002.pth')

Epoch 1/10
----------


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), f'speak_rec_20_256_128_15epo_triplets_001.pth')

In [14]:
model = VoiceEmbeddingModel(15).to('cuda')
model.load_state_dict(torch.load(f'speak_rec_15_256_128_10epo.pth'))

<All keys matched successfully>

In [30]:
for a, b, c in dataloader_train:
    batch = (a, b, c)
    break

RuntimeError: stack expects each tensor to be equal size, but got [283, 20] at entry 0 and [197, 20] at entry 1

In [11]:
v1 = main.get_voice_mfccs(main.get_audio_for_id(data_path, id_list[10])[0], n_mfcc=15)


In [12]:
model.eval()

VoiceEmbeddingModel(
  (conv1): Conv1d(15, 128, kernel_size=(5,), stride=(1,), padding=(2,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(128, 128, kernel_size=(5,), stride=(1,), padding=(2,))
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (lstm1): LSTM(128, 128, batch_first=True)
  (fc1): Linear(in_features=128, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=128, bias=True)
)

In [27]:
roma1 = main.get_voice_mfccs(r'D:\University\Диссерт\val_data\Roma1.ogg', clear=True, clear_output=r'output/Roma1.wav',
                             n_mfcc=15)
roma1 = torch.tensor(roma1, dtype=torch.float32).unsqueeze(0).to('cuda')
roma1_emb = model(roma1)

In [28]:
roma2 = main.get_voice_mfccs(r'D:\University\Диссерт\val_data\Roma2.ogg', clear=True, clear_output=r'output/Roma2.wav',
                             n_mfcc=15)
roma2 = torch.tensor(roma2, dtype=torch.float32).unsqueeze(0).to('cuda')
roma2_emb = model(roma2)

In [29]:
rad1 = main.get_voice_mfccs(r'D:\University\Диссерт\val_data\Rad1.ogg', clear=True, clear_output=r'output/Rad1.wav',
                            n_mfcc=15)
rad1 = torch.tensor(rad1, dtype=torch.float32).unsqueeze(0).to('cuda')
rad1_emb = model(rad1)

In [38]:
rad2 = main.get_voice_mfccs(r'D:\University\Диссерт\val_data\Rad2.ogg', clear=True, clear_output=r'output/Rad2.wav',
                            n_mfcc=15)
rad2 = torch.tensor(rad2, dtype=torch.float32).unsqueeze(0).to('cuda')
rad2_emb = model(rad2)

In [18]:
rad1_emb

tensor([[ -2.0123,  -2.2304,  -8.5127, -27.8939,  18.5335, -27.9814,  24.6854,
         -29.6719,  14.2017, -18.7953,  27.9606, -33.8146, -13.9903,   0.9342,
         -13.6428,  30.0798, -33.7939,   1.9956,  20.6993,  -8.6234, -29.2436,
           1.1553,   2.4182,  -2.7796, -29.0598,  36.0511,  -1.2846,  -8.5150,
         -35.1723, -18.6078,  25.3065,  36.7703,  29.6898,  -6.5807, -27.0767,
          29.6256,  22.5598, -12.4937,  -3.4577,   6.7036, -21.6700, -30.9447,
         -29.9405,  -6.7488, -30.6491,   4.3517,  -2.4798,  23.6850,  21.4136,
           9.1717,   6.6804, -23.5795,  23.3602,  23.4931,  -5.4534, -23.1248,
          25.0040,  10.1666, -12.6213,  15.0363,   1.2163,  15.8564, -22.6861,
           2.8412,  33.3507,  10.9805,  25.6619,  12.7693, -38.1582,  -0.4479,
         -24.5357,   6.2112, -14.3070,  27.1434,   0.1968,  16.3499,  21.7442,
         -21.3088, -28.4449,  25.0744,  25.1865, -36.4896,   0.8688,   6.8741,
           1.0673,  -4.8335,  18.6386, -21.3199, -20

In [32]:
cosine_similarity(roma2_emb, roma1_emb)

tensor([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1.]], device='cuda:0', grad_fn=<SumBackward1>)

In [20]:
embedding1 = torch.tensor([1.0, 2.0, 3.0])
embedding2 = torch.tensor([4.0, 5.0, 6.0])
cosine_similarity(embedding1, embedding2)

tensor([0.9746])

In [42]:
torch.norm(rad1_emb - roma2_emb)

tensor(7.5912, device='cuda:0', grad_fn=<LinalgVectorNormBackward0>)

In [43]:
rad2_emb

tensor([[ -1.3423,  -2.7562,  -8.6419, -27.2519,  18.2805, -27.3712,  23.5874,
         -28.8533,  14.0961, -18.4135,  27.2655, -32.8835, -13.7695,   1.5922,
         -13.4152,  29.1782, -32.8263,   1.3149,  20.3020,  -8.8157, -28.6114,
           0.4546,   2.9151,  -3.2998, -28.4159,  34.9340,  -0.6875,  -8.6741,
         -33.7749, -18.3857,  24.1713,  35.5761,  28.9477,  -5.7407, -26.5015,
          28.8841,  21.9906, -12.5852,  -2.6302,   6.4142, -20.8082, -30.0826,
         -29.2466,  -7.0453, -29.8049,   3.6251,  -1.7733,  23.2055,  21.0578,
           9.0684,   6.8325, -23.0020,  22.9182,  23.0082,  -4.6133, -22.5401,
          24.2017,  10.3489, -12.7686,  14.3108,   1.7776,  15.4300, -22.2818,
           2.1289,  32.1716,  11.0348,  25.2061,  12.7681, -36.9366,  -1.0287,
         -24.0626,   6.5162, -14.1568,  26.4261,  -0.5287,  16.0960,  20.7529,
         -20.9946, -27.2554,  23.9601,  24.5917, -35.4519,   0.2399,   7.1683,
           0.3806,  -4.1184,  18.4323, -20.9762, -20

In [48]:
F.pairwise_distance(rad2_emb, roma1_emb)

tensor([3.9091], device='cuda:0', grad_fn=<NormBackward1>)

0
