In [None]:
import torch, torchvision
import numpy as np
import math
import os
import torch.nn.functional as F
import matplotlib.pyplot as plt
import pandas as pd
import torchaudio
import torchvision.utils as vutils
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

from torch import nn
from torchvision import transforms
from torch.nn.utils.rnn import pad_sequence
from torch.cuda.amp import autocast
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from pathlib import Path
from tqdm.auto import tqdm
from torch import amp
from torch.cuda.amp import GradScaler
from torch.utils.data import DataLoader, SubsetRandomSampler, RandomSampler, Dataset
from torch.optim.swa_utils import AveragedModel    
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.nn.functional import pad
from tqdm.notebook import tqdm

device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'

In [None]:
device

In [None]:
config = {
    "dataset": {
        "train": {
            "table": "E:/data/train.csv",
            "data": "E:/data/bare_data/"
        },
        "val": {
            "table": "E:/data/val.csv",
            "data": "E:/data/bare_data/"
        }
    },
    "train": {
        "batch_size": 32,
        "grad_acum": 1,
        "dtype": "float32",
        'shuffle': True,
        'pin_memory': True,
    },
    "val": {
        "batch_size": 32,
        "grad_acum": 1,
        "dtype": "float32",
        'shuffle': False,
        'pin_memory': True,
    },
    "vae": {
        "freq": 16000,
        "lenght": 5,
    },
    "model": {
        "latent_size": 128,
        "epochs": 15,
        "learning_rate": 0.001,
        "freq_scale": 4,
        "time_scale": 4,
    },
    "utils": {
        "n_fft": 800,
    }
}

# VAE

In [None]:
# Класс для перевода объекта в латентное пространство для упрощённой работы с ним, подаём объект -> получаем тензор меток на него, т.е. какими признаками он обладает и с помощью этого можем его сравнивать с другими и обучать модель.

class VAE_Audio(nn.Module):
    def __init__(self,):
        super().__init__()
        # Слой получает признаки из исходного объекта
        self.encoder_input = nn.Sequential(
            nn.Conv2d(1, 64, 3, 1, 1), nn.BatchNorm2d(64), nn.GELU(),
            nn.Conv2d(64, 32, 3, 1, 1), nn.BatchNorm2d(32), nn.GELU(),
        )
        # Слой сжимает объект в латентное подпространство
        self.encoder_squeeze = nn.Sequential(
            nn.Conv2d(32, 32, 3, 2, 1), nn.BatchNorm2d(32), nn.GELU(),
            nn.Conv2d(32, 32, 3, 2, 1), nn.BatchNorm2d(32), nn.GELU(),
        )
        # Нужны, чтобы задать параметры Гауссовского распределения (mu - среднее, logvar - логарифм дисперсии)
        self.encoder_mu = nn.Conv2d(32, 32, 1) # Набор меток, отвечающих за среднее значение признаков
        self.encoder_logvar = nn.Conv2d(32, 32, 1) # Набор меток, отвечающих за то, как широко разбросаны признаки по латентному подпространству 
        
        # Слои восстанавливают размерность до исходной
        self.decoder_unsqueeze = nn.Sequential(
                nn.ConvTranspose2d(32, 32, 3, 2, 1, output_padding=1), nn.BatchNorm2d(32), nn.GELU(),
                nn.ConvTranspose2d(32, 32, 3, 2, 1, output_padding=1), nn.BatchNorm2d(32), nn.GELU(),
        )
        # Слой, отвечающий за выход: получаем объект, максимально похожий на исходный
        self.decoder_output = nn.Sequential(
                 nn.ConvTranspose2d(32, 16, 3, 1, 1), nn.GELU(),
                 nn.BatchNorm2d(16),
                 nn.ConvTranspose2d(16, 1, 3, 1, 1), 
        )
    def encode(self, x): # Функция кодирования объекта в латентное подпространство (пространство меньшей размерности), получает параметры кодирования объекта (mu, logvar)
        x = self.encoder_input(x)
        x = self.encoder_squeeze(x)
        mu = self.encoder_mu(x)
        logvar = self.encoder_logvar(x)
        return mu, logvar
    def decode(self, x): # Декодирует объект в исходное пространство по набору признаков
        x = self.decoder_unsqueeze(x)
        x = self.decoder_output(x)
        return x
    def KLD_loss(self, mu, logvar, q=0.005): # Вычисляет Kullback-Leibler divergence (мера различия между двумя вероятностными распределениями) между предсказанным распределением и стандартным нормальным распределением
        kld = -0.5 * (1 + logvar - mu.pow(2) - logvar.exp())
        kld = torch.clamp(kld, min=q)
        return kld.mean()
    def forward(self, x): # Полное прохождение через VAE
        mu, logvar = self.encode(x)
        std = torch.exp(0.5 * logvar)
        z = mu + std * torch.randn_like(std)
        recon = self.decode(z)
        return recon, z, mu, logvar


# Моделька

In [None]:
# Класс для постепенного зашумления картинки, чтобы диффузионная модель училась как можно лучше благодаря постепенному обучению (от меньшего шума к большему)

class NoiseScheduler:
    def __init__(self, timestamps=100, epochs=100):
        self.steps = timestamps
        self.epochs = epochs
        self.betas = self.cosine_beta_schedule(timestamps)
        self.alpha = 1 - self.betas
        self.alpha_hat = torch.cumprod(self.alpha, dim=0)
    
    @staticmethod 
    def cosine_beta_schedule(timesteps, s=0.008): # Функция, которая с помощью косинуса определяет то, с какой скоростью будет происходить зашумление, т.е. более вогнутая вниз кривая, в отличие от других функций по типу линейных
        steps = timesteps + 1
        x = torch.linspace(0, timesteps, steps)
        a_hat = torch.cos(((x / timesteps) + s) / (1 + s) * math.pi / 2) ** 2
        a_hat = a_hat / a_hat[0]
        betas = 1 - (a_hat[1:] / a_hat[:-1])
        return betas.clamp(1e-5, 0.999)
        
    def __call__(self, step): # Возвращает коэффициент зашумления (beta) на конкретном шаге
        return self.betas[step]
        
    def add_noise(self, image, index): # Функция зашумления картинки, возвращает зашумлённый объект и сам шум
        noise = torch.randn_like(image)
        b = image.shape[0]
        k = self.alpha_hat[index].view(b, 1, 1, 1)
        return torch.sqrt(k) * image + torch.sqrt(1 - k) * noise, noise

    def sample_timestamps(self, iters=10): # Выбирает случайные шаги зашумления для батча объектов
        dev = self.alpha_hat.device
        index = torch.randint(0, self.steps - 1, (iters - 1,), device=dev)
        return torch.cat((index, torch.tensor([100], device=dev)))

    def denoise_step(self, x_t, predicted_noise, t, sigma=0.0): # Функция обратной диффузии, т.е. расшумления картинки
        random_noise = torch.randn_like(x_t)
        if torch.is_tensor(t):
            t_prev = torch.clamp(t - 1, min = 0)
        else:
            t_prev = max(t - 1, 0)
        alpha_cumprod_t = self.alpha_hat[t]
        alpha_cumprod_prev = self.alpha_hat[t_prev]
        x0_pred = (x_t - torch.sqrt(1 - alpha_cumprod_t) * predicted_noise) / torch.sqrt(alpha_cumprod_t)
        direction = torch.sqrt(1 - alpha_cumprod_prev) * predicted_noise
        return torch.sqrt(alpha_cumprod_prev) * x0_pred + direction + sigma * random_noise

    def get_index(self, epoch, batch_size): # Функция получения элемента из батча
        return self.steps-torch.randint(0, self.steps, [batch_size]) - 1

In [None]:
# Класс, в котором происходит объединение признаков, полученных от части изображения и условия на него, в итоге на изображении остаётся совокупность признаков с учтённым условием

# conditions - условия, convolutions - признаки

class ConditionMixingLayer(nn.Module):
    def __init__(self, input_channels, conditioning_length, hidden_size = 8):
        super().__init__()
        self.hidden_size = hidden_size                                          # Сколько признаков храним внутри для перемешивания признаков объекта и заданного условия
        self.cond_proj = nn.Linear(conditioning_length, self.hidden_size)       # Переводит условие в скрытое пространство признаков нужной размерности
        self.conv_proj = nn.Conv2d(input_channels, self.hidden_size, 3, 1, 1)   # Делает признаки и условие совместимыми для дальнейших операций
        self.lin_proj = nn.Linear(self.hidden_size, self.hidden_size)           # Увеличивает гибкость/выразительность признаков
        self.lin1_unproj = nn.Linear(self.hidden_size, self.hidden_size)        # Дополнительный проход №1 для объединения условий и признаков
        self.lin2_unproj = nn.Linear(self.hidden_size, self.hidden_size)        # Дополнительный проход №2 для объединения условий и признаков
        self.conv_unproj = nn.Conv2d(self.hidden_size, input_channels, 3, 1, 1) # Переводит смешанные признаки обратно в исходное число каналов
        self.conv_act = nn.SiLU()                                               # Нужно для нелинейных преобразований признаков
        self.bn1 = nn.BatchNorm2d(input_channels)                               # Стабилизируют распределение признаков
        self.bn2 = nn.BatchNorm2d(input_channels)                               # Стабилизируют распределение признаков
        self.add_a = nn.Linear(self.hidden_size, self.hidden_size)              # Независимые преобразования для признаков
        self.add_b = nn.Linear(self.hidden_size, self.hidden_size)              # Независимые преобразования для условий

    def forward(self, x, c=None, skip=False):
        if len(x.shape)==3:
            x = x.unsqueeze(0)
        b, ch, h, w = x.shape # batch, channels, height, width
        x = self.bn1(x)
        xn = self.conv_proj(x) # [B, N, H, W]
        xn = self.conv_act(xn)
        xn = xn.view(b, h*w, self.hidden_size) # [B, H*W, N]
        xn = self.lin_proj(xn) # [B, H*W, N]
        xn = self.conv_act(xn) # [B, H*W, N]
        if not skip:
            cn = self.cond_proj(c) # [B, N]
            cn = self.conv_act(cn) # [B, N]
            cn = cn.view(b, 1, self.hidden_size)
            xn = self.add_a(xn) # [B, 1, N]
            cn = self.add_b(cn) # [B, H*W, N]
            xn = xn + cn # [B, H*W, N]
        xn = self.lin1_unproj(xn) # [B, H*W, N]
        xn = self.conv_act(xn) # [B, H*W, N]
        xn = self.lin2_unproj(xn) # [B, H*W, N]
        xn = self.conv_act(xn) # [B, H*W, N]
        xn = xn.view(b, self.hidden_size, h, w) # [B, N, H, W]
        xn = self.conv_unproj(xn)
        xn = self.conv_act(xn) # [B, I, H, W]
        xn = self.bn2(xn)
        x = xn + x
        return x

In [None]:
# Класс, который учится улучшать/дополнять исходные признаки. ВАЖНО: Отличие от MixingLayer в том, что в данном классе происходит улучшение и преобразование, а в Mixing происходит перемешиванние с конкретными признаками
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
        self.norm1 = nn.GroupNorm(1, channels, affine=True)
        self.act = nn.SiLU()
        self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
        self.norm2 = nn.GroupNorm(1, channels, affine=True)
        nn.init.kaiming_normal_(self.conv1.weight)
        nn.init.zeros_(self.conv1.bias)
        nn.init.kaiming_normal_(self.conv2.weight)
        nn.init.zeros_(self.conv2.bias)
        nn.init.ones_(self.norm1.weight)
        nn.init.zeros_(self.norm1.bias)
        nn.init.ones_(self.norm2.weight)
        nn.init.zeros_(self.norm2.bias)

    def forward(self, x):
        h = self.conv1(x)
        h = self.norm1(h)
        h = self.act(h)
        h = self.conv2(h)
        h = self.norm2(h)
        return x + h


# Класс, который позволяет каждой позиции в данных учиться искать важную информацию. Позволяет одному патчу видеть признаки других, а не только соседних. Например, в нашем контексте одно слово ищет схожие для правильного понимания:
# король + женщина = королева, и т.п.
# Q - Query (Запрос), K - Key (Ключ), V - Value (Значение)
# Query - что мне нужно узнать, чтобы получить как можно больше информации о себе?
# Key - насколько я подхожу запросу?
# Value - какая у меня есть информация для других запросов?
# Cross-Attention - с добавлением условия (например текст об объекте), Self-Attention - на основе объекта
class OurAttentionLayer(nn.Module):
    def __init__(self, patch_size, channels_in, hidden_dim, emb_size=1, cross=False):
        super().__init__()
        self.cross = cross                                               # Флаг, есть ли внешнее условие
        self.hidden_dim = hidden_dim                                     # Размерность Q, K, V
        self.channels_in = channels_in                                   # Кол-во входных каналов
        self.patch_size = patch_size                                     # Кол-во элементов в патче
        self.emb_size = emb_size                                         # Размерность условия
        self.key_proj = nn.Linear(patch_size, hidden_dim)                # Переводит каждый патч в скрытое пространство
        self.value_proj = nn.Linear(patch_size, hidden_dim)              # Переводит каждый патч в скрытое пространство
        self.norm = nn.LayerNorm([channels_in, patch_size])              # Нормализация для стабилизации выходных признаков
        if cross:
            self.cond_to_channel_proj = nn.Linear(emb_size, channels_in) # Переводит условие в пространство каналов
            self.cond_to_attn_proj = nn.Linear(emb_size, hidden_dim)     # Переводит условие в пространство attention
            self.query_proj = nn.Linear(hidden_dim, hidden_dim)          # Преобразует промежуточный query в финальный вектор запросов
        else:
            self.query_proj = nn.Linear(patch_size, hidden_dim)          # Проецирует запросы в пространство attention, и делает запросы там, т.к. нет внешнего условия
        self.output_proj = nn.Linear(hidden_dim, patch_size)             # Переводит результат из пространства обратно в patch
        self.softmax = nn.Softmax(dim=-1)                                # Для получения attention коэффициентов
        self.dscale = 1 / (hidden_dim ** 0.5)                            # Множитель нормировки для устойчивости
    def forward(self, image, text = None, ret_attn_qkv=False):
        image = image.contiguous()                                       # Приведение к непрерывному виду
        keys = self.key_proj(image)                                      # Описание каждого патча, чтобы понять, насколько он подходит запросу
        values = self.value_proj(image)                                  # Хранит, какие значения передать при выборе данного изображения
        if self.cross and text is None:                                  # Заглушка (если дали внешнее условие, а его нет, берём случайный объект текста)
            text = torch.rand(1, self.emb_size, device=image.device)
        if self.cross:
            cond_channels = self.cond_to_channel_proj(text)              # Технические детали по преобразованию к нужному виду
            cond_attention = self.cond_to_attn_proj(text)                # Технические детали по преобразованию к нужному виду
            cond_query_mixed = torch.einsum("...sc,...sn->...cn", cond_channels, cond_attention) # Смешанная матрица признаков
            queries = self.query_proj(cond_query_mixed)                  # Технические детали по преобразованию к нужному виду
            queries = queries.unsqueeze(2).unsqueeze(3).expand_as(keys)  # Технические детали по преобразованию к нужному виду
        else:
            queries = self.query_proj(image)
        attn_scores = torch.einsum("...jn,...cn->...cj", queries, keys)         # Считаем, насколько запрос похож на ключ
        attn_weights = self.softmax(attn_scores * self.dscale)                  # Нормировка
        attn_output = torch.einsum("...ic,...cn->...in", attn_weights, values)  # Берём взвешенные метки values
        output = self.output_proj(attn_output)                                  # Переводим результат обратно в патч
        output = output + image
        output = output.permute(0, 2, 3, 1, 4)
        output = self.norm(output)
        output = output.permute(0, 3, 1, 2, 4)
        if ret_attn_qkv:
            return output, queries, keys, values
        return output

# Класс для обработки картинки поблочно, для удобства нужно преобразование к квадратичной форме
class PatchImage(nn.Module):
    def __init__(self, patch_size, reverse=False):
        super().__init__()
        self.patch_size = patch_size         # Число элементов в одном блоке
        self.n = int(self.patch_size**(0.5)) # Размер стороны блока
        assert (self.n ** 2) == patch_size, "Size isn't a full square!"
        self.reverse = reverse
    def forward(self, x):
        n = self.n
        if self.reverse:
            b, c, h, w, s = x.shape
            x = torch.reshape(x, (b, c, h, w, n, n))
            x = torch.transpose(x, -2, -3)
            x = torch.reshape(x, (b, c, h * n, w * n))
            return x
        b, c, h, w = x.shape
        x = torch.reshape(x, (b, c, h // n, n, w // n, n))
        x = torch.transpose(x, -2, -3)
        x = torch.reshape(x, (b, c, h // n, w // n, n * n))
        return x

In [None]:
class Word_Encoder(nn.Module):
    def __init__(self, alphabet, emb_size, max_word_size=256):
        super().__init__()
        self.alphabet = list(alphabet) + ["<pad>", "<stress>", "<unk>"]
        self.emb_size = emb_size
        self.max_word_size = max_word_size
        self.embeddings = nn.Embedding(len(self.alphabet), emb_size)
        self.pos_embeddings = nn.Embedding(max_word_size, emb_size)

        self.get_idx = {char: idx for idx, char in enumerate(self.alphabet)}
        self.pad_idx = self.get_idx["<pad>"]
        self.stress_idx = self.get_idx["<stress>"]
        self.unk_idx = self.get_idx["<unk>"]
        self.device = self.embeddings.weight.device

    def tokenize(self, text):
        if isinstance(text, str):
            text = [text]
        tokenized = []
        for word in text:
            word_idxs = []
            i = 0
            n = len(word)
            while i < n:
                if word[i] == "<" and i + 8 < n and word[i:i+8] == "<stress>":
                  word_idxs.append(self.stress_idx)
                  i += 8
                else:
                    char = word[i]
                    if char in self.get_idx:
                        word_idxs.append(self.get_idx[char])
                    else:
                        word_idxs.append(self.unk_idx)
                    i += 1

            tokenized.append(word_idxs)
        max_len = max(len(word) for word in tokenized)
        padded = []
        for word in tokenized:
            padded_word = word
            if len(word) < max_len:
                padded_word += [self.pad_idx] * (max_len - len(word))
            padded.append(padded_word)

        return torch.tensor(padded, dtype=torch.long, device=self.device)

    def forward(self, x):
        batch, n = x.shape
        pos = torch.arange(n, device=x.device).unsqueeze(0).expand(batch, n)
        x = self.embeddings(x) + self.pos_embeddings(pos)
        return x

# Класс, с помощью которого модель определяет, насколько сильное сейчас зашумление, т.е. для каждого шага зашумления она хранит набор признаков
class Noise_Encoder(nn.Module):
    def __init__(self, emb_size, timestamps = 1000):
        super().__init__()
        self.embeddings = nn.Embedding(timestamps, emb_size)
    def forward(self, x):
        self.device = x.device
        return self.embeddings(x)

# Класс, с помощью которого модель определяет, где каждый элемент находится во времени (нужно, чтобы понимать где конец, а где начало слова, или где звук по времени)
class Time_Encoder(nn.Module): 
    def __init__(self, in_channels, out_channels, max_time_size=1024,): 
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, (3, 7), (1, 3), (1, 3))
        self.pos_embs =  nn.Embedding(max_time_size, out_channels)
        self.max_time_size = max_time_size
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.conv2 = nn.Conv2d(in_channels, out_channels, (3, 7), (1, 3), (1, 3))
        self.conv3 = nn.Conv2d(out_channels, out_channels, (3, 7), padding=(1, 3))
        self.act = nn.SiLU()
    def forward(self, image):
        x = self.conv1(image)
        b, c, h, w = x.shape
        y = self.conv2(image)
        time = torch.arange(w, device=image.device).expand(b, w)
        pos = self.pos_embs(time) # [b, w, out]
        pos = torch.permute(pos, [0, 2, 1]).unsqueeze(1)
        x = torch.permute(x, [0, 2, 1, 3])
        pos = pos.expand_as(x)
        x = x + pos
        y = self.act(y)
        x = torch.permute(x, [0, 2, 1, 3])
        z = self.conv3(x) + y
        return z

In [None]:
# Класс для хранения синусоидных меток о позиции во времени
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    def forward(self, t):
        half = self.dim // 2
        emb = math.log(10000) / (half - 1)
        freqs = torch.exp(torch.arange(half, device=t.device) * -emb)
        pts = t.float().unsqueeze(1) * freqs.unsqueeze(0)
        return torch.cat([pts.sin(), pts.cos()], dim=1)


class TTS_diffusion(nn.Module):
    def __init__(self, input_channels = 1, hidden_dims = 32, alphabet = "абвгдеёжзийклмнопрстуфхцчшщъыьэюя", emb_size_word = 128, emb_size_noise = 64,
                       noise_steps = 100, max_word_size = 256, max_time_size = 2048):
        super().__init__()
        
        self.input_scaler = nn.Conv2d(input_channels, hidden_dims, kernel_size=1)  # Сворачивает mel спектрограмму до нужного числа каналов                      
        self.precode = nn.Sequential(  # Набор слоёв, которые сначала сжимают, а потом разжимают изображение попутно добавляя информацию
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 2, 1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 2, 1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.ConvTranspose2d(hidden_dims, hidden_dims, 3, 2, 1, output_padding=1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.ConvTranspose2d(hidden_dims, hidden_dims, 3, 2, 1, output_padding=1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
            )
        
        self.time_enc = Time_Encoder(hidden_dims, hidden_dims, max_time_size) # Каждому столбцу даёт позиционную метку
        self.post_time =  nn.Sequential(
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
        )
        
        self.word_enc = Word_Encoder(alphabet, emb_size_word, max_word_size) # Переводит текст в вектор индексов
        self.noise_enc = Noise_Encoder(emb_size_noise, noise_steps) # Признаки на шаге диффузии

        # Далее у нас есть несколько слоёв attention, формально первый из них (шумовой) ищет на каком шаге находятся зашумления, и как это должно повлиять на расшумление
        # Второй attention ищет непосредственно связи в тексте, изучая, какой звук сопоставить
        # Также есть self attention, чтобы патчи внутри повзаимодействовали сами с собой
        # После этого идёт обратное преобразование патчей в целое изображение
        self.patch_img1_size = 16
        self.patch_img1 = PatchImage(self.patch_img1_size)
        self.atten_noise1 = OurAttentionLayer(self.patch_img1_size, hidden_dims, hidden_dims, emb_size_noise, True)
        self.atten_word1 = OurAttentionLayer(self.patch_img1_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.atten_word2 = OurAttentionLayer(self.patch_img1_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.satten1 = OurAttentionLayer(self.patch_img1_size, hidden_dims, hidden_dims, 1, False)
        self.unpatch_img1 = PatchImage(self.patch_img1_size, True)
        self.main_block1 = nn.Sequential(
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 2, 1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 2, 1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
            )
        self.patch_img2_size = 16
        self.patch_img2 = PatchImage(self.patch_img2_size)
        self.atten_word3 = OurAttentionLayer(self.patch_img2_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.satten2 = OurAttentionLayer(self.patch_img2_size, hidden_dims, hidden_dims, 1, False)
        self.atten_word4 = OurAttentionLayer(self.patch_img2_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.unpatch_img2 = PatchImage(self.patch_img2_size, True)
        self.main_block2 = nn.Sequential(
                nn.ConvTranspose2d(hidden_dims, hidden_dims, 3, 2, 1, output_padding=1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.ConvTranspose2d(hidden_dims, hidden_dims, 3, 2, 1, output_padding=1), nn.GroupNorm(8, hidden_dims), nn.SiLU(),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
        )
        self.patch_img3_size = 16
        self.patch_img3 = PatchImage(self.patch_img3_size)
        self.atten_noise2 = OurAttentionLayer(self.patch_img3_size, hidden_dims, hidden_dims, emb_size_noise, True)
        self.atten_word5 = OurAttentionLayer(self.patch_img3_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.atten_word6 = OurAttentionLayer(self.patch_img3_size, hidden_dims, hidden_dims, emb_size_word, True)
        self.satten3 = OurAttentionLayer(self.patch_img3_size, hidden_dims, hidden_dims, 1, False)
        self.unpatch_img3 = PatchImage(self.patch_img3_size, True)
        self.main_block3 = nn.Sequential(
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
                nn.Conv2d(hidden_dims, hidden_dims, 3, 1, 1), nn.SiLU(), nn.GroupNorm(8, hidden_dims),
        )
        self.output_scaler = nn.Conv2d(hidden_dims, input_channels, kernel_size=1)
        self.out_scaler_conv1 = nn.Conv2d(in_channels=hidden_dims, out_channels=input_channels, kernel_size=1)
        self.out_scaler_conv2 = nn.Conv2d(in_channels=input_channels, out_channels=input_channels, kernel_size=1)
    def drop(self, layer, x, *args):
        if self.training and torch.rand(1, device=x.device).item() < 0.05:
            return x
        return layer(x, *args)
    def forward(self, x, text, noise):
        B, C, H0, W0 = x.shape
        x = self.input_scaler(x)
        words = self.word_enc(text)
        sh = self.noise_enc(noise)
        if sh.dim() == 2:
            sh = sh.unsqueeze(1)
        x = self.time_enc(x)
        x = self.post_time(x)
        x = self.precode(x)
        
        x = self.resize_to_square(x, self.patch_img1_size)
        x = self.patch_img1(x)
        x = self.drop(self.atten_noise1, x, sh) # ✓ p = 0.1
        x = self.atten_word1(x, words)
        x = self.atten_word2(x, words)
        x = self.satten1(x)
        x = self.unpatch_img1(x)
        x = self.main_block1(x)

        x = self.resize_to_square(x, self.patch_img2_size)
        x = self.patch_img2(x)
        x = self.atten_word3(x, words)
        x = self.atten_word4(x, words)
        x = self.satten2(x)
        x = self.unpatch_img2(x)
        x = self.main_block2(x)
        x = self.resize_to_square(x, self.patch_img3_size)
        x = self.patch_img3(x)
        x = self.drop(self.atten_noise2, x, sh) # ✓ p = 0.1
        x = self.atten_word5(x, words)
        x = self.atten_word6(x, words)
        x = self.satten3(x)
        x = self.unpatch_img3(x)
        x = self.main_block3(x)
        
        x = self.output_scaler(x)
        x = torch.nn.functional.interpolate(x, size=(H0, W0), mode='bilinear', align_corners=False)
        return x
    def resize_to_square(self, x, patch_size):
        n = int(math.sqrt(patch_size))
        B,C,H,W = x.shape
        s = max(H, W)
        s = ((s + n - 1) // n) * n
        return F.interpolate(x, (s, s), mode='bilinear', align_corners=False)

# Тренер

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR, SequentialLR
from torch.optim.swa_utils import AveragedModel
from torch.cuda.amp import GradScaler, autocast
import random

class AverageMeter:
    def __init__(self,):
        self.arr = []
    def __call__(self, item, n = 1):
        if n <= 1:
            self.arr.extend([item])
        else:
            self.arr.extend([item] * n)
    def __str__(self,) -> str:
        return str(np.mean(np.array(self.arr)))
    def zero(self,):
        self.arr=[]


class TTS_Trainer:
    def __init__(self, model, vae, train_dl, val_dl, noise_steps=100, epochs=100):
        self.model, self.vae = model, vae
        self.tdl, self.vdl = train_dl, val_dl
        self.epochs = epochs
        self.device = next(model.parameters()).device
        self.opt = torch.optim.AdamW(model.parameters(), lr=1e-4, betas=(0.9,0.99))
        warm = torch.optim.lr_scheduler.LinearLR(self.opt, 0.2, 1.0, total_iters=2000)
        decay = torch.optim.lr_scheduler.CosineAnnealingLR(self.opt, T_max=len(train_dl)*epochs, eta_min=1e-5)
        self.lr_sched = torch.optim.lr_scheduler.SequentialLR(self.opt, [warm, decay], milestones=[2000])
        self.noise_sched = NoiseScheduler(noise_steps, epochs)
        self._set_cosine_schedule(noise_steps)
        self.noise_sched.alpha_hat = self.noise_sched.alpha_hat.to(self.device)
        self.ema = AveragedModel(self.model, avg_fn=lambda e,p,_: e*0.999 + p*0.001)
        
    def _set_cosine_schedule(self, steps, s: float = 0.008):
        t = torch.arange(steps+1, dtype=torch.float32) / steps
        alphas = torch.cos((t + s) / (1 + s) * math.pi / 2) ** 2
        alphas = alphas / alphas[0]
        betas = 1 - (alphas[1:] / alphas[:-1])
        betas = betas.clamp(max=0.999)
        self.noise_sched.betas = betas.to(self.device)
        self.noise_sched.alpha = 1.0 - betas
        self.noise_sched.alpha_hat = torch.cumprod(self.noise_sched.alpha, dim=0)
        print("β[:10] =", betas[:10].cpu().numpy())

    def _q_sample(self, z0, t, eps):
        a = self.noise_sched.alpha_hat[t].view(-1,1,1,1)
        return a.sqrt()*z0 + (1-a).sqrt()*eps

    def train_loop(self, epoch=0):
        self.model.train();  self.vae.eval()
        for p in self.vae.parameters(): p.requires_grad_(False)
        pbar = tqdm(self.tdl, desc=f"Train {epoch:02d}", ncols=1440)
        cnt = 0
        for mel, texts in pbar:
            B = mel.size(0)
            mel = mel.to(self.device).float()
            with torch.no_grad():
                mu, logvar = self.vae.encode(mel)
                std = torch.exp(0.5 * logvar)
                z = mu + std * torch.randn_like(std)
                z = z - z.mean(dim=[2,3], keepdim=True)
            t = torch.randint(0, self.noise_sched.steps, (B,), device=self.device)
            eps = torch.randn_like(mu)
            x_t = self._q_sample(z, t, eps)
            ids = self.model.word_enc.tokenize(texts).to(self.device)
            input_for_model = x_t
            target = eps
            pred = self.model(input_for_model, ids, t)
            if pred.shape != target.shape:
                pred = F.interpolate(pred, size=target.shape[-2:], mode="bilinear")
            beta_t = self.noise_sched.betas[t].view(-1,1,1,1)   # если хочешь вариант β²
            w_t = beta_t * beta_t   
            mse = ((pred - target)**2 * w_t).mean()
            l2 = mse.sqrt()
            loss = mse
            cnt += 1
            self.opt.zero_grad()
            loss.backward()
            total_norm = torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            if (cnt % 100 == 0):
                print(f"step {cnt:4d}  loss (MSE): {loss.item()}   L2: {l2.item()}   grad_norm: {total_norm}")
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            self.opt.step()
            self.lr_sched.step()
            self.ema.update_parameters(self.model)
            pbar.set_postfix(mse=f"{loss.item()}", l2=f"{l2.item()}")
        pbar.close()

    @torch.no_grad()
    def val_loop(self, max_batches=40):
        self.model.eval();  self.vae.eval()
        tot_mse, tot_l2, n = 0.0, 0.0, 0
        for i,(mel,texts) in enumerate(self.vdl):
            if i >= max_batches: break
            B = mel.size(0);  mel = mel.to(self.device).float()
            mu, logvar = self.vae.encode(mel)
            std = torch.exp(0.5 * logvar)
            z = mu + std * torch.randn_like(std)
            z = z - z.mean(dim=[2,3], keepdim=True)
            t = torch.randint(0, self.noise_sched.steps, (B,), device=self.device)
            eps = torch.randn_like(mu)
            x_t = self._q_sample(z, t, eps)
            pred = self.model(x_t, self.model.word_enc.tokenize(texts).to(self.device), t)
            if pred.shape != eps.shape:
                pred = F.interpolate(pred, eps.shape[-2:], mode="bilinear")
            beta_t = self.noise_sched.betas[t].view(-1,1,1,1)   # если хочешь вариант β²
            w_t = beta_t * beta_t   
            mse = ((pred - eps)**2 * w_t).mean().item()
            l2 = mse ** 0.5
            tot_mse += mse * B
            tot_l2 += l2 * B
            n += B
        val_mse = tot_mse / n if n else float("nan")
        val_l2 = tot_l2 / n if n else float("nan")
        print(f"[val] ε-L2 = {val_l2}   MSE = {val_mse}")
        return val_mse, val_l2


    @torch.no_grad()
    def draw_diffusion(self, save="diff.png", steps=(199,140,80,0)):
        self.ema.eval();  self.vae.eval()
        mel0, _ = next(iter(self.vdl))
        mel0 = mel0.to(self.device).float()
        mu,_ = self.vae.encode(mel0)
        C, H, W = mu.shape[1:]
        v = torch.randn(1, C, H, W, device=self.device)
        imgs = []
        for t in reversed(range(self.noise_sched.steps)):
            eps = self.ema.module(v, torch.zeros(1,1,dtype=torch.long,device=self.device), torch.tensor([t], device=self.device))
            v = self.noise_sched.denoise_step(v, eps, t, sigma=0.0)
            if t in steps:
                imgs.append((t, self.vae.decode(v).squeeze().cpu()))
        cols = len(imgs)
        fig, axes=plt.subplots(1, cols, figsize=(2 * cols, 2))
        for i, (tt, im) in enumerate(imgs):
            axes[i].imshow(im,aspect='auto')
            axes[i].set_title(f"t={tt}"); axes[i].axis('off')
        plt.tight_layout();  plt.savefig(save); plt.close(fig)

In [None]:
mel_spec = torchaudio.transforms.MelSpectrogram(
    sample_rate = 16000,
    n_fft = 800,
    hop_length = 200,     
    win_length  = 800,
    n_mels = 80,
)


def wav_to_mel(wav):
    spec = mel_spec(wav)      
    spec = torch.log(spec + 1e-6)
    T = spec.shape[-1]
    MAX_FRAMES = 80
    if T < MAX_FRAMES:
        spec = F.pad(spec, (0, MAX_FRAMES - T))
    else:
        spec = spec[..., :MAX_FRAMES]
    return spec  

class AudioDataset(Dataset):
    def __init__(self, csv_file, audio_dir, target_sr=16000, length_sec=None, transform=None):
        self.table = pd.read_csv(csv_file)
        self.audio_dir = Path(audio_dir)
        self.sr = target_sr
        self.length = int(target_sr * length_sec) if length_sec else None
        self.transform = transform

    def load_wav(self, path):
        wav, sr = torchaudio.load(path)
        if wav.shape[0] > 1:
            wav = wav.mean(dim=0, keepdim=True)
        if sr != self.sr:
            wav = torchaudio.functional.resample(wav, sr, self.sr)
        return wav

    def pad_trim(self, wav):
        if self.length is None:
            return wav
        cur = wav.shape[-1]
        if cur > self.length:
            wav = wav[..., : self.length]
        elif cur < self.length:
            wav = torch.nn.functional.pad(wav, (0, self.length - cur))
        return wav

    def __getitem__(self, idx):
        row = self.table.iloc[idx]
        rel_path = row["path"]
        text = row["sentence"]
        wav = self.load_wav(self.audio_dir / rel_path)
        wav = self.pad_trim(wav).float()
        mel = wav_to_mel(wav)
        if self.transform:
            mel = self.transform(mel)
        return mel, text

    def __len__(self):
        return len(self.table)


def audio_collate(batch):
    mels, texts = zip(*batch)
    return torch.stack(mels), list(texts)                 


from torch.utils.data import DataLoader, SubsetRandomSampler

def build_dataloader(cfg, split, transform=None, workers=4, limit=10_000):
    d  = cfg["dataset"][split]
    ds = AudioDataset(
        d["table"], d["data"],
        cfg["vae"]["freq"], cfg["vae"]["lenght"],
        transform,
    )
    if limit and limit < len(ds):
        idx = np.random.choice(len(ds), limit, replace=False)
        ds = torch.utils.data.Subset(ds, idx)    
    sampler = None

    return DataLoader(
        ds,
        batch_size = cfg[split]["batch_size"],
        shuffle = (split == "train"),
        sampler = sampler,
        num_workers = workers,
        pin_memory = cfg[split]["pin_memory"],
        collate_fn = audio_collate,
    )
def compute_mel_mean_std(dataloader):
    mel_sum, mel_sq_sum, n_elem = 0.0, 0.0, 0
    for mel, _ in tqdm(dataloader):
        mel = mel.float()
        mel_sum += mel.sum().item()
        mel_sq_sum += (mel ** 2).sum().item()
        n_elem += mel.numel()
    mean = mel_sum / n_elem
    var = mel_sq_sum / n_elem - mean ** 2
    std = var ** 0.5
    return mean, std

train_dl = build_dataloader(config, "train", transform=None, workers=0, limit=10000)
mel_mean, mel_std = compute_mel_mean_std(train_dl)
print(f"MEL mean: {mel_mean}, MEL std: {mel_std}")

class MelNormalize:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std
    def __call__(self, mel):
        return (mel - self.mean) / self.std
        
mel_norm = MelNormalize(mel_mean, mel_std)


In [None]:
import torch.backends.cudnn as cudnn
cudnn.benchmark = True
torch.set_float32_matmul_precision("high")

vae = VAE_Audio().to(device)
params_vae = torch.load(str(Path.home() / "Downloads" / "TTSVAE_v4.5.pt"), map_location=device, weights_only=True)
vae.load_state_dict(params_vae)
vae.eval()

train_dataloader = build_dataloader(config, "train", transform = mel_norm, workers=0, limit=10000)
val_dataloader = build_dataloader(config, "val", transform = mel_norm, workers=0, limit=None)

In [None]:
from speechbrain.inference.vocoders import HIFIGAN
from speechbrain.utils.fetching import LocalStrategy

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

vocoder = HIFIGAN.from_hparams(
    source="speechbrain/tts-hifigan-libritts-16kHz",
    run_opts={"device": DEVICE},
    savedir="pretrained_models/hifigan_16k_80",
    local_strategy=LocalStrategy.COPY
).to(DEVICE).eval()

if hasattr(vocoder, "hifigan") and hasattr(vocoder.hifigan, "remove_weight_norm"):
    vocoder.hifigan.remove_weight_norm()
elif hasattr(vocoder, "remove_weight_norm"):
    vocoder.remove_weight_norm()


In [None]:
import os, re, torch, torchaudio
from pathlib import Path

SR, N_FFT, HOP, WIN, N_MELS = 16_000, 800, 200, 800, 80

@torch.no_grad()
def vocode(mel_log: torch.Tensor) -> torch.Tensor:
    if mel_log.dim() == 4:
        mel_log = mel_log.squeeze(1)
    wav = vocoder(mel_log.to(DEVICE))
    wav = wav.squeeze(1)
    wav = wav / wav.abs().amax(dim=1, keepdim=True).clamp_min(1e-6)
    return wav.cpu()

@torch.no_grad()
def text_to_speech(text: str, tts, vae, sched, temp = 0.5, device=None):
    device = device or next(tts.parameters()).device
    ids = tts.word_enc.tokenize([text]).to(device)
    dummy = torch.zeros(1, 1, 80, 80, device=device)
    zshape = vae.encode(dummy)[0].shape
    z = torch.randn(zshape, device=device) * temp
    for t in reversed(range(sched.steps)):
        eps = tts(z, ids, torch.tensor([t], device=device))
        if eps.shape[-2:] != z.shape[-2:]:
            eps = F.interpolate(eps, z.shape[-2:], mode="bilinear")
        z = sched.denoise_step(z, eps, t, sigma=0.0 if t < sched.steps * 0.2 else 1e-4)
    mel_out = vae.decode(z)
    mel_out = mel_out * mel_std + mel_mean
    wav = vocode(mel_out)
    return wav[0]      

def slugify(text, length=16):
    txt = re.sub(r"\s+", "_", text.lower())
    return re.sub(r"[^\w\d_]+", "", txt)[:length]

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tts_model = TTS_diffusion(
    input_channels=32,
    hidden_dims=128,
    alphabet="абвгдеёжзийклмнопрстуфхцчшщъыьэюяАБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ",
    emb_size_word=64,
    emb_size_noise=32,
    noise_steps=300,
    max_word_size=256,
    max_time_size=2048
).to(DEVICE)

trainer = TTS_Trainer(
    model = tts_model,
    vae = vae,
    train_dl = train_dataloader,
    val_dl = val_dataloader,
    noise_steps = 300,
    epochs = 30
)

Path("./samples").mkdir(exist_ok=True)

for epoch in range(trainer.epochs):
    print(f"\nEpoch {epoch + 1}/{trainer.epochs}")
    trainer.train_loop(epoch=epoch)
    val_mse, val_l2 = trainer.val_loop()
    print(f"ε-L2 val = {val_l2}  MSE = {val_mse}")
    trainer.draw_diffusion(f"diff_ep{epoch + 1}.png")
    trainer.ema.eval()
    for phrase in ["привет", "здравствуйте"]:
        wav = text_to_speech(phrase, trainer.ema.module, trainer.vae, trainer.noise_sched, temp=0.5, device=DEVICE)
        fn = f"./samples/ep{epoch+1:04d}_{slugify(phrase)}.mp3"
        torchaudio.save(fn, wav.unsqueeze(0), SR)
        print("OK", fn)
    torch.save(trainer.model.state_dict(), f"TTS_Diffusion_11.{epoch + 1:02d}.pt")


In [None]:
import matplotlib.pyplot as plt
plt.plot(trainer.noise_sched.alpha_hat.cpu().numpy())
plt.title("Alpha_hat Schedule"); plt.show()

In [None]:
trainer.ema.eval()
text = "привет"
wav = text_to_speech(text, trainer.ema.module, trainer.vae, trainer.noise_sched, temp=0.7, device=DEVICE)

wav = wav / wav.abs().max().clamp_min(1e-6)
torchaudio.save(f"sample_{slugify(text)}.wav", wav.unsqueeze(0), SR)
print(f"test_{slugify(text)}.wav")
