In [1]:
import os
import torchaudio
import torch
from torchaudio.transforms import MelSpectrogram
from sklearn.model_selection import train_test_split

# Пути к данным и директории
DATA_DIR = "/home/davidlimcher/projects/nowadays/synthetic_media/practice_4/VCTK-Corpus"
AUDIO_DIR = os.path.join(DATA_DIR, "wav48")
MEL_DIR = os.path.join(DATA_DIR, "mel")

# Создание директории для мел-спектрограмм
os.makedirs(MEL_DIR, exist_ok=True)

def extract_mel_spectrogram(file_path, sample_rate=16000, n_mels=64):
    try:
        # Попытка загрузить аудиофайл
        waveform, sr = torchaudio.load(file_path)
        # Ресемплирование при необходимости
        if sr != sample_rate:
            resample = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate)
            waveform = resample(waveform)
        # Создание мел-спектрограммы
        mel_spectrogram = MelSpectrogram(sample_rate=sample_rate, n_mels=n_mels)(waveform)
        return mel_spectrogram.squeeze(0)
    except Exception as e:
        print(f"Ошибка при обработке файла {file_path}: {e}")
        return None

def process_dataset(audio_dir, mel_dir):
    speakers = os.listdir(audio_dir)
    data = []
    for speaker in speakers:
        speaker_path = os.path.join(audio_dir, speaker)
        for file in os.listdir(speaker_path):
            if file.endswith(".wav"):
                audio_file = os.path.join(speaker_path, file)
                mel = extract_mel_spectrogram(audio_file)
                if mel is not None:  # Проверка на успешное создание мел-спектрограммы
                    mel_file = os.path.join(mel_dir, f"{speaker}_{file.split('.')[0]}.pt")
                    torch.save(mel, mel_file)
                    data.append((mel_file, speaker))
    return data

# Выполнение предобработки
data = process_dataset(AUDIO_DIR, MEL_DIR)

# Разделение на обучающую и тестовую выборку
if data:
    labels = [label for _, label in data]
    train_data, test_data = train_test_split(data, test_size=0.2, stratify=labels)
    print(f"Обучающая выборка: {len(train_data)}, Тестовая выборка: {len(test_data)}")
else:
    print("Не удалось создать данные для обучения и тестирования.")


Обучающая выборка: 35393, Тестовая выборка: 8849


In [2]:
import torch.nn as nn

class CNNEncoder(nn.Module):
    def __init__(self, feature_dim=128):
        super(CNNEncoder, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Flatten()
        )
        self.fc = nn.Linear(64 * 16 * 16, feature_dim)

    def forward(self, x):
        x = self.conv_layers(x)
        return self.fc(x)


In [3]:
class RNNEncoder(nn.Module):
    def __init__(self, input_dim=64, hidden_dim=128, num_layers=2):
        super(RNNEncoder, self).__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 128)

    def forward(self, x):
        x, _ = self.rnn(x)
        return self.fc(x[:, -1, :])


In [4]:
import random

def load_mel_spectrogram(file_path):
    """
    Функция для загрузки мел-спектрограммы из сохраненного файла.
    """
    return torch.load(file_path)

def get_triplet_samples(train_data):
    """
    Генератор триплетов из обучающего набора данных.
    train_data - список кортежей (путь_к_файлу, метка_класса)
    """
    class_dict = {}
    
    # Группируем данные по меткам классов
    for file_path, label in train_data:
        if label not in class_dict:
            class_dict[label] = []
        class_dict[label].append(file_path)
    
    while True:
        # Случайный выбор anchor
        anchor_file, anchor_label = random.choice(train_data)
        
        # Выбор positive из того же класса, что и anchor
        positive_file = random.choice(class_dict[anchor_label])
        while positive_file == anchor_file:
            positive_file = random.choice(class_dict[anchor_label])
        
        # Выбор negative из другого класса
        negative_label = random.choice(list(set(class_dict.keys()) - {anchor_label}))
        negative_file = random.choice(class_dict[negative_label])
        
        # Загрузка и возврат аудиофрагментов
        anchor = load_mel_spectrogram(anchor_file)
        positive = load_mel_spectrogram(positive_file)
        negative = load_mel_spectrogram(negative_file)
        
        yield anchor, positive, negative

In [5]:
import torch.optim as optim
import torch.nn.functional as F

def triplet_loss(anchor, positive, negative, margin=1.0):
    pos_dist = F.cosine_similarity(anchor, positive)
    neg_dist = F.cosine_similarity(anchor, negative)
    return torch.clamp(margin - pos_dist + neg_dist, min=0).mean()

from torch.utils.data import DataLoader

class TripletDataset(torch.utils.data.Dataset):
    def __init__(self, train_data):
        self.train_data = train_data

    def __getitem__(self, index):
        anchor, positive, negative = next(get_triplet_samples(self.train_data))
        return anchor, positive, negative

    def __len__(self):
        return len(self.train_data)

train_dataset = TripletDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

def train_model(model, train_loader, epochs=20):
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for anchor, positive, negative in train_loader:
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            loss = triplet_loss(anchor_out, positive_out, negative_out)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader)}")



In [None]:
from clearml import Task
import wandb

# Определение модели
model = CNNEncoder()

# ClearML
task = Task.init(project_name="Speaker Verification", task_name="Train Encoder")

# Weights & Biases
wandb.init(project="speaker-verification", entity="username")
wandb.watch(model)

# TensorBoard
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir="./logs")

# Запуск тренировки
train_model(model, train_loader, epochs=20)


ClearML Task: created new task id=6dd6d50e8dbf4e88b4c042e309950b61
2024-10-20 00:26:10,661 - clearml.Task - INFO - Storing jupyter notebook directly as code
ClearML results page: https://app.clear.ml/projects/d1ee835952e9416b8a4c75b3bf98727c/experiments/6dd6d50e8dbf4e88b4c042e309950b61/output/log


wandb: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
wandb: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
wandb: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

- src/
  - data/          # Скрипты для загрузки и предобработки данных
  - models/        # CNN и RNN энкодеры
  - training/      # Скрипты для обучения и мониторинга
  - utils/         # Вспомогательные функции
- notebooks/       # Jupyter ноутбуки для экспериментов
- README.md        # Описание проекта, выводы и графики
- requirements.txt # Список зависимостей
