<a href="https://colab.research.google.com/github/AlexeyProvorov/Generative/blob/master/Attention_diffusers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
# === Импорт необходимых библиотек ===

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# === Параметры ===

BATCH_SIZE = 64      # Размер батча
IMAGE_SIZE = 28      # Размер изображений (для MNIST 28x28)
CHANNELS = 1         # Количество каналов (для MNIST 1 канал)
NUM_EPOCHS = 5       # Количество эпох обучения
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === Подготовка данных ===

# Трансформации для данных
transform = transforms.Compose([
    transforms.ToTensor(),                       # Преобразуем изображения в тензоры
    transforms.Normalize((0.5,), (0.5,))         # Нормализуем данные к диапазону [-1, 1]
])

# Загрузка датасета MNIST
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# === Вспомогательные функции для диффузионной модели ===

def linear_beta_schedule(timesteps):
    """
    Генерирует линейное расписание beta от beta_start до beta_end.
    """
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start, beta_end, timesteps)

def get_index_from_list(vals, t, x_shape):
    """
    Извлекает значения по индексу t и преобразует их в нужную форму.
    """
    batch_size = t.shape[0]
    out = vals.gather(-1, t.cpu())
    return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

# === Реализация механизма внимания ===

class SelfAttention(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        # Определяем ключ, запрос и значение
        self.query = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.key = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.value = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        self.gamma = nn.Parameter(torch.zeros(1))  # Параметр для масштабирования выхода

    def forward(self, x):
        """
        Прямой проход механизма внимания.
        """
        batch_size, C, width, height = x.size()

        # Преобразуем x для вычисления запросов, ключей и значений
        proj_query = self.query(x).view(batch_size, -1, width * height)  # [B, C', N]
        proj_key = self.key(x).view(batch_size, -1, width * height)      # [B, C', N]
        proj_value = self.value(x).view(batch_size, -1, width * height)  # [B, C, N]

        # Вычисляем матрицу внимания
        energy = torch.bmm(proj_query.permute(0, 2, 1), proj_key)        # [B, N, N]
        attention = torch.softmax(energy, dim=-1)                        # [B, N, N]

        # Применяем внимание к значениям
        out = torch.bmm(proj_value, attention.permute(0, 2, 1))          # [B, C, N]
        out = out.view(batch_size, C, width, height)                     # [B, C, W, H]

        # Возвращаем выход с учетом параметра gamma
        out = self.gamma * out + x
        return out

# === Определение модели UNet с вниманием ===

class UNet(nn.Module):
    def __init__(self, timesteps):
        super().__init__()

        # Количество каналов на различных уровнях
        self.down_channels = [64, 128, 256]   # Каналы для downsampling
        self.up_channels = [256, 128, 64]     # Каналы для upsampling
        self.timesteps = timesteps

        # Начальный слой
        self.input_conv = nn.Conv2d(CHANNELS, self.down_channels[0], kernel_size=3, padding=1)

        # Слои для спускающейся части (Encoder)
        self.down_layers = nn.ModuleList()
        for i in range(len(self.down_channels) - 1):
            self.down_layers.append(nn.Sequential(
                nn.Conv2d(self.down_channels[i], self.down_channels[i+1], kernel_size=3, stride=2, padding=1),
                nn.ReLU(),
                SelfAttention(self.down_channels[i+1])  # Добавляем слой внимания
            ))

        # Боттлнек
        self.bottleneck = nn.Sequential(
            nn.Conv2d(self.down_channels[-1], self.down_channels[-1], kernel_size=3, padding=1),
            nn.ReLU()
        )

        # Слои для восходящей части (Decoder)
        self.up_layers = nn.ModuleList()
        for i in range(len(self.up_channels) - 1):
            self.up_layers.append(nn.Sequential(
                nn.ConvTranspose2d(self.up_channels[i], self.up_channels[i+1], kernel_size=4, stride=2, padding=1),
                nn.ReLU(),
                SelfAttention(self.up_channels[i+1])  # Добавляем слой внимания
            ))

        # Выходной слой
        self.output_conv = nn.Conv2d(self.up_channels[-1], CHANNELS, kernel_size=3, padding=1)

    def forward(self, x, t):
        """
        Прямой проход модели UNet с учетом времени t.
        """
        # Встраиваем временной шаг t (здесь можно добавить реализацию встраивания времени)
        t_emb = self.time_embedding(t)

        # Начальный сверточный слой
        x = self.input_conv(x)

        # Сохраняем промежуточные слои для skip-connection
        skip_connections = []

        # Спускающаяся часть (Encoder)
        for layer in self.down_layers:
            skip_connections.append(x)  # Сохраняем x до понижения разрешения
            x = layer(x)

        # Боттлнек
        x = self.bottleneck(x)

        # Переворачиваем список skip_connections для удобства
        skip_connections = skip_connections[::-1]

        # Восходящая часть (Decoder)
        for i, layer in enumerate(self.up_layers):
            x = layer(x)
            # Проверяем, чтобы размеры совпадали
            if x.shape != skip_connections[i].shape:
                x = nn.functional.interpolate(x, size=skip_connections[i].shape[2:])
            x = x + skip_connections[i]  # Добавляем соответствующий skip-connection

        # Выходной слой
        x = self.output_conv(x)
        return x

    def time_embedding(self, t):
        """
        Встраивание временного шага t.
        (Здесь можно добавить реализацию встраивания времени, например, синусоидальное встраивание)
        """
        return t

# === Определение диффузионной модели ===

class DiffusionModel:
    def __init__(self, model, timesteps=1000):
        self.model = model
        self.timesteps = timesteps
        self.betas = linear_beta_schedule(timesteps).to(DEVICE)
        self.alphas = 1.0 - self.betas
        self.alpha_hat = torch.cumprod(self.alphas, dim=0)

    def add_noise(self, x0, t):
        """
        Добавляет шум к изображению x0 на шаге t.
        """
        sqrt_alpha_hat = torch.sqrt(get_index_from_list(self.alpha_hat, t, x0.shape))
        sqrt_one_minus_alpha_hat = torch.sqrt(1 - get_index_from_list(self.alpha_hat, t, x0.shape))
        eps = torch.randn_like(x0)
        xt = sqrt_alpha_hat * x0 + sqrt_one_minus_alpha_hat * eps
        return xt, eps

    def p_losses(self, x0, t):
        """
        Вычисляет потери модели на заданном шаге t.
        """
        xt, eps = self.add_noise(x0, t)
        eps_pred = self.model(xt, t)
        return nn.functional.mse_loss(eps_pred, eps)

    def sample(self, img_shape):
        """
        Генерирует новое изображение путем обратного процесса диффузии.
        """
        with torch.no_grad():
            x = torch.randn(img_shape).to(DEVICE)
            for t in reversed(range(self.timesteps)):
                t_tensor = torch.tensor([t]).to(DEVICE)
                eps_pred = self.model(x, t_tensor)
                beta_t = self.betas[t]
                alpha_t = self.alphas[t]
                alpha_hat_t = self.alpha_hat[t]

                if t > 0:
                    noise = torch.randn_like(x)
                else:
                    noise = torch.zeros_like(x)

                x = (1 / torch.sqrt(alpha_t)) * (x - (beta_t / torch.sqrt(1 - alpha_hat_t)) * eps_pred) + torch.sqrt(beta_t) * noise
        return x

# === Обучение модели ===

# Инициализируем модель и диффузионный процесс
timesteps = 1000
model = UNet(timesteps).to(DEVICE)
diffusion = DiffusionModel(model, timesteps=timesteps)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Цикл обучения
for epoch in range(NUM_EPOCHS):
    for step, (images, _) in enumerate(train_loader):
        images = images.to(DEVICE) * 2 - 1  # Приводим изображения к диапазону [-1, 1]

        batch_size = images.size(0)
        t = torch.randint(0, timesteps, (batch_size,), device=DEVICE).long()  # Случайные временные шаги

        # Вычисляем потери
        loss = diffusion.p_losses(images, t)

        # Обновляем параметры модели
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Печатаем информацию каждые 100 шагов
        if step % 100 == 0:
            print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Step [{step}/{len(train_loader)}], Loss: {loss.item():.4f}")

# === Генерация новых изображений ===

# Генерируем новые изображения после обучения
generated_images = diffusion.sample((BATCH_SIZE, CHANNELS, IMAGE_SIZE, IMAGE_SIZE))

# Преобразуем изображения обратно в диапазон [0, 1] для визуализации
generated_images = (generated_images + 1) / 2

# Отображаем несколько сгенерированных изображений

def show_images(images):
    grid = torchvision.utils.make_grid(images, nrow=8)
    plt.figure(figsize=(15,15))
    plt.imshow(grid.permute(1, 2, 0).cpu().numpy(), cmap='gray')
    plt.axis('off')
    plt.show()

show_images(generated_images[:64])


Epoch [1/5], Step [0/938], Loss: 1.7074


KeyboardInterrupt: 