<a href="https://colab.research.google.com/github/HerrVonBeloff/AI-YP_24-team-42/blob/main/GAN_experiments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Conditional Generative Adversarial Network  (cGAN)

Модель использует Conditional GAN для генерации изображений логотипов на основе текстовых меток. Предобработка данных включает изменение размера, нормализацию и преобразование текстовых меток в числовые индексы. Архитектура модели включает встраивание текста, генератор и дискриминатор. Тренировочный процесс оптимизирует обе модели с использованием функции потерь BCELoss.

**Общая структура**

##1. Модель

1.1. TextEmbedding

Этот модуль преобразует текстовые метки в векторные представления фиксированной длины:

* Вход: индекс текстовой метки.
* Выход: векторное представление размерности embedding_dim (в данном случае 128).
* Это стандартный подход для работы с категориальными данными, где каждая метка представляет собой уникальный класс.

```
class TextEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TextEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, text):
        return self.embedding(text)
```

1.2. Generator

Генератор создает изображения на основе случайного шума (z) и текстового встраивания:

- Вход:
 - z: случайный шум размерности (batch_size, z_dim).
 - text_embedding: векторное представление текста размерности (batch_size, text_embedding_dim).
- Обработка:
 - Шум и текстовое встраивание объединяются в один тензор.
 - Применяется серия сверточных транспонированных слоев (ConvTranspose2d) для увеличения разрешения изображения.
- Выход: изображение размером (batch_size, 3, 32, 32).

```
class Generator(nn.Module):
    def __init__(self, z_dim=100, text_embedding_dim=128, output_channels=3):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.ConvTranspose2d(z_dim + text_embedding_dim, 256, kernel_size=4, stride=1, padding=0),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            ...
        )

    def forward(self, z, text_embedding):
        z = z.view(z.size(0), z.size(1), 1, 1)
        text_embedding = text_embedding.view(text_embedding.size(0), text_embedding.size(1), 1, 1)
        combined = torch.cat([z, text_embedding], dim=1)
        return self.model(combined)
```

1.3. Discriminator

Дискриминатор оценивает реалистичность изображения с учетом текстового встраивания:

- Вход:
 - Изображение размером (batch_size, 3, 32, 32).
 - Текстовое встраивание размером (batch_size, text_embedding_dim).
- Обработка:
 - Изображение обрабатывается через сверточные слои для извлечения признаков.
 - Текстовое встраивание преобразуется в вектор того же пространственного размера, что и признаки изображения.
 - Объединенные признаки передаются через финальный сверточный слой для получения вероятности реалистичности.
- Выход: скалярное значение (вероятность).

```
class Discriminator(nn.Module):
    def __init__(self, input_channels=3, text_embedding_dim=128):
        super(Discriminator, self).__init__()
        self.image_model = nn.Sequential(
            nn.Conv2d(input_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            ...
        )
        self.text_model = nn.Sequential(
            nn.Linear(text_embedding_dim, 256),
            nn.LeakyReLU(0.2, inplace=True),
        )
        self.final_layer = nn.Sequential(
            nn.Conv2d(256 + 256, 1, kernel_size=4, stride=1, padding=0),
            nn.Sigmoid()
        )

    def forward(self, x, text_embedding):
        image_features = self.image_model(x)
        text_features = self.text_model(text_embedding)
        text_features = text_features.view(text_features.size(0), text_features.size(1), 1, 1)
        text_features = text_features.repeat(1, 1, image_features.size(2), image_features.size(3))
        combined = torch.cat([image_features, text_features], dim=1)
        return self.final_layer(combined).view(-1)
```

## 2. Методы предобработки данных

2.1. Загрузка данных

Используется набор данных iamkaikai/amazing_logos_v4, который содержит изображения логотипов и соответствующие текстовые метки. Для удобства работы создается пользовательский класс LogoDataset.

```
class LogoDataset(Dataset):
    def __init__(self, hf_dataset, transform=None):
        self.dataset = hf_dataset
        self.transform = transform
        self.classes = sorted(list(set(self.dataset['text'])))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item['image']
        text = item['text']
        label = self.class_to_idx[text]

        if not isinstance(image, Image.Image):
            image = Image.fromarray(image)

        if image.mode != 'RGB':
            image = image.convert('RGB')

        if self.transform:
            image = self.transform(image)

        text_index = text_to_index[text]

        return image, text_index
  ```

- Преобразования изображений:
 - Изменение размера до (32, 32).
 - Нормализация значений пикселей к диапазону [-1, 1]

```
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
```

2.2. Текстовые метки

Текстовые метки преобразуются в числовые индексы с помощью словаря text_to_index. Это позволяет использовать их как входные данные для модели.

## 3. Тренировочный процесс


3.1. Цикл обучения

- Дискриминатор:
 - Оценивает реальные изображения и метки как "реальные" (метка 1).
 - Оценивает сгенерированные изображения как "фальшивые" (метка 0).
 - Суммарная функция потерь:

```
loss_discriminator = loss_real + loss_fake
```
- Генератор:
 - Пытается обмануть дискриминатор, заставляя его считать сгенерированные изображения "реальными".
 - Функция потерь:

 ```
loss_generator = criterion(output_generator, real_labels)
```

3.2. Сохранение контрольных точек
Каждую эпоху сохраняются:

- Генератор и дискриминатор.
- Оптимизаторы.
- Сгенерированные образцы для визуальной оценки.


##4. Заключение

4.1. Сильные стороны модели

- Использование текстовых меток делает модель условной, что повышает качество генерации.

- Архитектура генератора и дискриминатора хорошо подходит для задачи генерации изображений небольшого размера.

4.2. Недостатки

- Размер изображений ограничен (32, 32), что может быть недостаточно для сложных логотипов.

- Необходимость больших объемов данных для обучения.

#Реализация

Шаг 1: Установка необходимых библиотек

In [None]:
!pip install datasets torch torchvision pillow -q


[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Шаг 2: Импорт библиотек

In [None]:
!pip install datasets -q
!python -m spacy download en_core_web_md -q


[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_md')



[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from datasets import load_dataset
from PIL import Image
import os
import torchvision
from torch.optim.lr_scheduler import StepLR

Шаг 3: Конфигурация устройства

In [None]:
import torch
print(torch.version.cuda)
print(torch.cuda.is_available())

12.1
True


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


Шаг 4: Загрузка данных

In [None]:
dataset = load_dataset("iamkaikai/amazing_logos_v4", split="train")

text_to_index = {text: idx for idx, text in enumerate(set(dataset['text']))}
index_to_text = {idx: text for text, idx in text_to_index.items()}

Loading dataset shards:   0%|          | 0/29 [00:00<?, ?it/s]

Шаг 5: Создание пользовательского класса Dataset

In [None]:
class LogoDataset(Dataset):
    def __init__(self, hf_dataset, transform=None):
        self.dataset = hf_dataset
        self.transform = transform
        self.classes = sorted(list(set(self.dataset['text'])))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item['image']
        text = item['text']
        label = self.class_to_idx[text]

        if not isinstance(image, Image.Image):
            image = Image.fromarray(image)

        if image.mode != 'RGB':
            image = image.convert('RGB')

        if self.transform:
            image = self.transform(image)

        text_index = text_to_index[text]

        return image, text, text_index

Шаг 6: Определение преобразований

In [None]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

Шаг 7: Создание DataLoader

In [None]:
logo_dataset = LogoDataset(dataset, transform=transform)
dataloader = DataLoader(logo_dataset, batch_size=64, shuffle=True)

Шаг 8: Определение моделей

In [None]:
class TextEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, spatial=4):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.spatial = spatial
        self.embedding_dim = embedding_dim

        self.proj = nn.Sequential(
            nn.Linear(embedding_dim, embedding_dim * 2),
            nn.ReLU(True),
            nn.Linear(embedding_dim * 2, embedding_dim * spatial * spatial),
            nn.BatchNorm1d(embedding_dim * spatial * spatial),
            nn.ReLU(True)
        )

    def forward(self, text):
        x = self.embedding(text)                   # (B, 128)
        x = self.proj(x)                           # (B, 2048)
        return x.view(x.size(0), self.embedding_dim, self.spatial, self.spatial)


# Генератор
class Generator(nn.Module):
    def __init__(self, z_dim=100, text_embedding_dim=128, output_channels=3):
        super().__init__()
        self.z_dim = z_dim
        self.text_embedding_dim = text_embedding_dim
        self.spatial = 4

        self.z_preprocess = nn.Sequential(
            nn.Conv2d(z_dim, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True)
        )

        # Нормализация шума
        self.initial_conv = nn.Sequential(
            nn.Conv2d(128 + text_embedding_dim, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True)
        )

        self.upsample_blocks = nn.Sequential(
            nn.Upsample(scale_factor=2),  # 4 → 8
            nn.Conv2d(512, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(True),

            nn.Upsample(scale_factor=2),  # 8 → 16
            nn.Conv2d(256, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),

            nn.Upsample(scale_factor=2),  # 16 → 32
            nn.Conv2d(128, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),

            nn.Upsample(scale_factor=2),  # 32 → 64
            nn.Conv2d(64, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(True),

            nn.Upsample(scale_factor=2),  # 64 → 128
            nn.Conv2d(32, 16, 3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(True),

            nn.Upsample(scale_factor=2),  # 128 → 256
            nn.Dropout2d(0.2),
            nn.Conv2d(16, output_channels, 3, padding=1),
            nn.Tanh()
        )


    def forward(self, z, text_embedding):
        B = z.size(0)
        z = z.view(B, self.z_dim, self.spatial, self.spatial)
        z = self.z_preprocess(z)
        x = torch.cat([z, text_embedding], dim=1)
        x = self.initial_conv(x)
        return self.upsample_blocks(x)


class MinibatchStdDev(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        B, C, H, W = x.size()
        std = torch.std(x, dim=0, keepdim=True)  # (1, C, H, W)
        mean_std = std.mean().expand(B, 1, H, W)  # (B, 1, H, W)
        return torch.cat([x, mean_std], dim=1)  # (B, C+1, H, W)


# Дискриминатор
class Discriminator(nn.Module):
    def __init__(self, input_channels=3, text_embedding_dim=128, spatial=4):
        super().__init__()
        self.spatial = spatial
        self.image_model = nn.Sequential(
            nn.Conv2d(input_channels, 16, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(16, 32, 4, 2, 1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(32, 64, 4, 2, 1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(64, 128, 4, 2, 1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(128, 256, 4, 2, 1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(256, 512, 4, 2, 1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
        )

        # Обработка текстового эмбеддинга как spatial карты
        self.text_proj = nn.Sequential(
            nn.Conv2d(text_embedding_dim, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
        )

        # Финальный классификатор + MinibatchStdDev чтобы отслеживать однообразие
        self.minibatch = MinibatchStdDev()
        self.final_layer = nn.Sequential(
            nn.Conv2d(512 + 512 + 1, 1, 4, 1, 0),  # добавлен 1 канал!
            #nn.Sigmoid() # не нужна, так как теперь criterion = nn.BCEWithLogitsLoss()
        )


    def forward(self, x, text_embedding):
        B = x.size(0)

        # Пропускаем картинку через CNN
        image_features = self.image_model(x)  # (B, 512, 4, 4)

        # Обработка текста через свёртки
        text_features = self.text_proj(text_embedding)  # (B, 512, 4, 4)

        # Объединяем изображение и текст
        combined = torch.cat([image_features, text_features], dim=1)  # (B, 1024, 4, 4)

        # Для MinibatchStdDev
        combined = self.minibatch(combined)

        # Финальный прогноз
        return self.final_layer(combined).view(-1)

Шаг 9: Инициализация моделей и оптимизаторов

In [None]:
# Инициализация моделей
vocab_size = len(text_to_index)
text_embedding = TextEmbedding(vocab_size=vocab_size, embedding_dim=128).to(device)
generator = Generator(z_dim=100, text_embedding_dim=128).to(device)
discriminator = Discriminator(text_embedding_dim=128).to(device)

# Инициализация весов
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

generator.apply(weights_init)
discriminator.apply(weights_init)

# Оптимизаторы и функция потерь
lr = 0.0002
optimizerG = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizerD = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
#criterion = nn.BCELoss()
#criterion = nn.BCEWithLogitsLoss()

# Шедулеры
schedulerG = StepLR(optimizerG, step_size=10, gamma=0.5)
schedulerD = StepLR(optimizerD, step_size=10, gamma=0.5)

# Параметры тренировки
num_epochs = 5
fixed_noise = torch.randn(64, 100 * 4 * 4, device=device)
fixed_text = torch.randint(0, vocab_size, (64,), device=device)

# Создание выходных каталогов
os.makedirs("output/samples_training", exist_ok=True)
os.makedirs("output/checkpoints", exist_ok=True)

In [None]:
z = torch.randn(4, 100 * 4 * 4, device=device)
text = torch.randint(0, vocab_size, (4,), device=device)
text_emb = text_embedding(text)  # (4, 128, 4, 4)
fake_images = generator(z, text_emb)
print(f"Размер сгенерированных изображений: {fake_images.shape}")
out = discriminator(fake_images, text_emb)
print(f"Размер выходов дискриминатора: {out.shape}")


Размер сгенерированных изображений: torch.Size([4, 3, 256, 256])
Размер выходов дискриминатора: torch.Size([4])


Шаг 10: Тренировочный цикл

Метрики

In [None]:
import numpy as np
import torch.nn.functional as F
from torchvision.models import inception_v3
from torchvision import transforms
import torchvision.models as models

from scipy.linalg import sqrtm

In [None]:
def inception_score(images, batch_size=32, splits=10):
    """
    Расчёт Inception для набора картинок.

    Args:
        images (torch.Tensor): Тензор (N, 3, H, W).
        batch_size (int): Размер батчей для InceptionV3.
        splits (int): Количество разбиений для расчёта IS.

    Returns:
        float: Inception Score.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    N = len(images)

    # Загрузка модели InceptionV3
    inception_model = inception_v3(
        weights=models.Inception_V3_Weights.DEFAULT, transform_input=False
    ).to(device)
    inception_model.eval()

    # Предобработка картинок - убрал нормализацию - наш вход уже нормализован
    transform = transforms.Compose(
        [
            transforms.Resize(
                (299, 299)
            )  # Необходимо, так как InceptionV3 обучалась именно на таких размерах
        ]
    )
    images = torch.stack([transform(img) for img in images])

    # Расчёт предсказаний
    preds = []
    for i in range(0, N, batch_size):
        batch = images[i : i + batch_size].to(device)
        with torch.no_grad():
            preds.append(F.softmax(inception_model(batch), dim=1))
    preds = torch.cat(preds, dim=0).cpu().numpy()

    # Расчёт Inception Score
    split_scores = []
    for k in range(splits):
        part = preds[k * (N // splits) : (k + 1) * (N // splits), :]
        py = np.mean(part, axis=0)
        scores = [np.sum(p * (np.log(p) - np.log(py))) for p in part]
        split_scores.append(np.exp(np.mean(scores)))

    return np.mean(split_scores), np.std(split_scores)

In [None]:
def calculate_fid(real_images, generated_images, batch_size=32):
    """
    Вычисление Frechet Inception Distance (FID).

    Args:
        real_images (torch.Tensor): Тензор реальных изображений (N, 3, H, W).
        generated_images (torch.Tensor): Тензор сгенерированных изображений (M, 3, H, W).
        batch_size (int): Размер батча для модели InceptionV3, которая и рассчитывает метрики.

    Returns:
        float: FID.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    inception_model = inception_v3(
        weights=models.Inception_V3_Weights.DEFAULT, transform_input=False
    ).to(device)
    inception_model.eval()

    def get_activations(images):
        """Извлечение фич из InceptionV3."""
        activations = []
        for i in range(0, len(images), batch_size):
            batch = images[i : i + batch_size].to(device)
            with torch.no_grad():
                features = inception_model(batch).detach()
                activations.append(features.cpu())
        return torch.cat(activations, dim=0).numpy()

    # Обработка images - убрал нормализацию - наш вход уже нормализован
    transform = transforms.Compose(
        [
            transforms.Resize(
                (299, 299)
            )  # Необходимо, так как InceptionV3 обучалась именно на таких размерах
        ]
    )
    real_images = torch.stack([transform(img) for img in real_images])
    generated_images = torch.stack([transform(img) for img in generated_images])

    # Извлечение активаций (как я понимаю, из слоя нейросети InceptionV3)
    act_real = get_activations(real_images)
    act_gen = get_activations(generated_images)

    # Расчёт статистик
    mu_real, sigma_real = act_real.mean(axis=0), np.cov(act_real, rowvar=False)
    mu_gen, sigma_gen = act_gen.mean(axis=0), np.cov(act_gen, rowvar=False)

    # Расчёт FID
    diff = mu_real - mu_gen
    covmean = sqrtm(sigma_real @ sigma_gen).real
    fid = diff.dot(diff) + np.trace(sigma_real + sigma_gen - 2 * covmean)

    return fid

In [None]:
# Установка CLIP и зависимостей
!pip install ftfy regex tqdm -q
!pip install git+https://github.com/openai/CLIP.git -q



[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip

[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import clip

model, preprocess = clip.load("ViT-B/32", device=device)


In [None]:
def evaluate_clip_scores(images: list[Image.Image], texts: list[str]) -> dict:
    assert len(images) == len(texts)

    image_inputs = torch.stack([preprocess(img) for img in images]).to(device)
    text_inputs = clip.tokenize(texts, truncate=True).to(device)

    with torch.no_grad():
        image_features = model.encode_image(image_inputs)
        text_features = model.encode_text(text_inputs)

    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    similarities = (image_features * text_features).sum(dim=-1)
    scores = similarities.tolist()

    return {
        "clip-i (mean)": sum(scores) / len(scores),
        "clip-i (all)": scores,
        "clip-t (mean)": sum(scores) / len(scores),
        "clip-t (all)": scores
    }


Цикл

In [None]:
# from tqdm.notebook import tqdm
# import pandas as pd
# import os
# from copy import deepcopy

# os.makedirs("output/samples_training", exist_ok=True)
# os.makedirs("output/checkpoints", exist_ok=True)

# metrics_data = pd.DataFrame()

# num_generator_steps = 3  # число шагов генератора на один шаг дискриминатора
# ema_generator = deepcopy(generator)  # EMA генератор
# ema_decay = 0.999  # коэффициент EMA

# # Тренировочный цикл
# for epoch in tqdm(range(num_epochs)):
#     for i, (real_images, text_metr, text_labels) in enumerate(dataloader):
#         real_images = real_images.to(device)
#         text_labels = text_labels.to(device)
#         batch_size = real_images.size(0)

#         # Label smoothing - сглашивание меток, чтобы они стали мерой уверенности
#         real_labels = torch.full((batch_size,), 0.9, device=device)
#         fake_labels = torch.zeros(batch_size, device=device)

#         text_embeddings = text_embedding(text_labels)
#         text_embeddings_detached = text_embeddings.detach()

#         # Обучение дискриминатора
#         discriminator.zero_grad()
#         noise = torch.randn(batch_size, 100 * 4 * 4, device=device)  # 🔧 заменили 100 → 100 * 4 * 4
#         fake_images = generator(noise, text_embeddings_detached)
#         output_real = discriminator(real_images, text_embeddings_detached)
#         output_fake = discriminator(fake_images.detach(), text_embeddings_detached)
#         loss_real = criterion(output_real, real_labels)
#         loss_fake = criterion(output_fake, fake_labels)
#         loss_discriminator = loss_real + loss_fake
#         loss_discriminator.backward()
#         optimizerD.step()

#         # Обучение генератора несколько раз
#         for _ in range(num_generator_steps):
#             generator.zero_grad()
#             noise = torch.randn(batch_size, 100 * 4 * 4, device=device)  # 🔧 тоже увеличено
#             fake_images = generator(noise, text_embeddings_detached)
#             output_generator = discriminator(fake_images, text_embeddings_detached)
#             loss_generator = criterion(output_generator, real_labels)
#             loss_generator.backward()
#             optimizerG.step()

#             # Обновление EMA весов - чтобы избежать взрыва градиентов
#             with torch.no_grad():
#                 for ema_param, param in zip(ema_generator.parameters(), generator.parameters()):
#                     ema_param.data = ema_decay * ema_param.data + (1.0 - ema_decay) * param.data

#         # Метрики
#         if i % 100 == 0:
#             model.eval()
#             with torch.no_grad():
#                 num_samples = min(32, fake_images.size(0), real_images.size(0), len(text_metr))
#                 fake_batch = fake_images[:num_samples].cpu()
#                 real_batch = real_images[:num_samples].cpu()

#                 # CLIP
#                 pil_images = [transforms.ToPILImage()(img) for img in fake_batch]
#                 texts = [label for label in text_metr[:num_samples]]
#                 scores = evaluate_clip_scores(pil_images, texts)
#                 print(f"[Epoch {epoch} | Step {i}] CLIP-I: {scores['clip-i (mean)']:.4f} | CLIP-T: {scores['clip-t (mean)']:.4f}")

#                 # IS / FID
#                 try:
#                     is_mean, is_std = inception_score(fake_batch)
#                     fid_score = calculate_fid(real_batch, fake_batch)
#                     print(f"[Epoch {epoch} | Step {i}] IS: {is_mean:.4f} ± {is_std:.4f} | FID: {fid_score:.4f}")
#                 except Exception as e:
#                     print(f"Error calculating IS/FID: {e}")
#                     is_mean, is_std, fid_score = float('nan'), float('nan'), float('nan')

#                 # Метрики → датафрейм + сохранение
#                 new_row = pd.DataFrame([{
#                     "epoch": epoch,
#                     "step": i,
#                     "g_loss": loss_generator.item(),
#                     "d_loss": loss_discriminator.item(),
#                     "clip_i": scores["clip-i (mean)"],
#                     "clip_t": scores["clip-t (mean)"],
#                     "inception_score_mean": is_mean,
#                     "inception_score_std": is_std,
#                     "fid_score": fid_score
#                 }])
#                 metrics_data = pd.concat([metrics_data, new_row], ignore_index=True)
#                 metrics_data.to_csv("output/metrics.csv", index=False)

#             model.train()

#             # Вывод прогресса
#             print(f"[Epoch {epoch}/{num_epochs}] [Batch {i}/{len(dataloader)}] "
#                   f"Loss D: {loss_discriminator.item():.4f}, Loss G: {loss_generator.item():.4f}")

#     # Сохранение сгенерированных образцов
#     with torch.no_grad():
#         fixed_text_embeddings = text_embedding(fixed_text).detach()
#         fixed_images = generator(fixed_noise, fixed_text_embeddings).detach().cpu()
#         grid = torchvision.utils.make_grid(fixed_images, nrow=8, normalize=True)
#         torchvision.utils.save_image(grid, f"output/samples_training/fake_epoch_{epoch}.png")

#     # Сохранение контрольных точек модели
#     torch.save({
#         'epoch': epoch,
#         'generator_state_dict': generator.state_dict(),
#         'discriminator_state_dict': discriminator.state_dict(),
#         'optimizerG_state_dict': optimizerG.state_dict(),
#         'optimizerD_state_dict': optimizerD.state_dict()
#     }, f"output/checkpoints/model_epoch_{epoch}.pth")

# print("Training complete.")


## С wandb

In [None]:
!pip install wandb -q
import wandb
import os
import uuid


[notice] A new release of pip is available: 25.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
key = ""
wandb.login(key=key) # Авторизация по токену

# Пытаемся загрузить или создать run id, чтобы перезапускать его же
if os.path.exists("wandb_run_id.txt"):
    with open("wandb_run_id.txt", "r") as f:
        run_id = f.read().strip()
else:
    run_id = str(uuid.uuid4())
    with open("wandb_run_id.txt", "w") as f:
        f.write(run_id)

# Создаёт сессию логирования, указывает название эксперимента и конфиг
wandb.init(
    project="hse_first_project_2025",
    name="cGAN",
    id=run_id,
    resume="allow",
    config={
        "epochs": 20,
        "batch_size": 64,
        "learning_rate": 0.0002,
    }
)

In [None]:
def compute_gradient_penalty(D, real_samples, fake_samples, text_embeddings):
    alpha = torch.rand(real_samples.size(0), 1, 1, 1, device=real_samples.device)
    interpolates = (alpha * real_samples + (1 - alpha) * fake_samples).requires_grad_(True)

    d_interpolates = D(interpolates, text_embeddings)

    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=torch.ones_like(d_interpolates),
        create_graph=True,
        retain_graph=True,
        only_inputs=True
    )[0]

    gradients = gradients.view(gradients.size(0), -1)
    gp = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gp

# CLIP similarity
def clip_similarity_loss(images: torch.Tensor, texts: list[str], model, preprocess, device):
    model.eval()
    image_inputs = torch.stack([preprocess(transforms.ToPILImage()(img)) for img in images]).to(device)
    text_inputs = clip.tokenize(texts, truncate=True).to(device)

    with torch.no_grad():
        image_features = model.encode_image(image_inputs)
        text_features = model.encode_text(text_inputs)

    image_features = image_features / image_features.norm(dim=-1, keepdim=True)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)

    similarity = (image_features * text_features).sum(dim=-1)  # cosine similarity
    return 1 - similarity.mean()  # чем ниже, тем ближе к смыслу

In [None]:
from tqdm.notebook import tqdm
import pandas as pd
import os
from copy import deepcopy

os.makedirs("output/samples_training", exist_ok=True)
os.makedirs("output/checkpoints", exist_ok=True)

metrics_data = pd.DataFrame()

global_step = 0  # для wandb логирования
num_generator_steps = 3  # число шагов генератора на один шаг дискриминатора
ema_generator = deepcopy(generator) # EMA генератор
for param in ema_generator.parameters():
    param.requires_grad = False
ema_generator.eval()
ema_decay = 0.999  # коэффициент EMA

# Тренировочный цикл
for epoch in tqdm(range(num_epochs)):
    for i, (real_images, text_metr, text_labels) in enumerate(dataloader):
        real_images = real_images.to(device)
        text_labels = text_labels.to(device)
        batch_size = real_images.size(0)

        # Label smoothing - сглаживание меток, чтобы они стали мерой уверенности
        real_labels = torch.full((batch_size,), 0.9, device=device)
        fake_labels = torch.rand(batch_size, device=device) * 0.1  # смягчённые фейковые метки

        text_embeddings = text_embedding(text_labels)
        text_embeddings_detached = text_embeddings.detach()

        # Обучение дискриминатора (WGAN)
        discriminator.zero_grad()
        noise = torch.randn(batch_size, 100 * 4 * 4, device=device)
        fake_images = generator(noise, text_embeddings_detached)
        output_real = discriminator(real_images, text_embeddings_detached)
        output_fake = discriminator(fake_images.detach(), text_embeddings_detached)

        # Считаем градиентный штраф
        gradient_penalty = compute_gradient_penalty(discriminator, real_images.data, fake_images.data, text_embeddings_detached)

        # Общий лосс WGAN-GP
        lambda_gp = 10
        loss_discriminator = -(output_real.mean() - output_fake.mean()) + lambda_gp * gradient_penalty

        # Проверка на NaN/Inf
        if torch.isnan(loss_discriminator) or torch.isinf(loss_discriminator):
            print("NaN или Inf в дискриминаторе — пропускаем итерацию")
            continue

        loss_discriminator.backward()
        optimizerD.step()


        # Обучение генератора несколько раз
        total_loss_generator = 0
        for _ in range(num_generator_steps):
            generator.zero_grad()
            noise = torch.randn(batch_size, 100 * 4 * 4, device=device)
            fake_images = generator(noise, text_embeddings_detached)
            output_generator = discriminator(fake_images, text_embeddings_detached)

            # CLIP-guided loss
            texts = [text_metr[j] for j in range(batch_size)]  # берём оригинальные текстовые строки
            clip_loss = clip_similarity_loss(fake_images.detach().cpu(), texts, model, preprocess, device)
            lambda_clip = 2.0  # коэффициент влияния CLIP

            # лосс для WGAN для генератора + смысловая привязка
            loss_generator = -output_generator.mean() + lambda_clip * clip_loss

            # Проверка на NaN/Inf
            if torch.isnan(loss_generator) or torch.isinf(loss_generator):
                print("NaN или Inf в генераторе — пропускаем итерацию")
                continue

            total_loss_generator += loss_generator.item()
            loss_generator.backward()
            optimizerG.step()

            # Обновление EMA весов - чтобы избежать взрыва градиентов
            with torch.no_grad():
                for ema_param, param in zip(ema_generator.parameters(), generator.parameters()):
                    ema_param.data = ema_decay * ema_param.data + (1.0 - ema_decay) * param.data

        avg_loss_generator = total_loss_generator / num_generator_steps

        # Метрики
        if i % 100 == 0:
            generator.eval()
            discriminator.eval()
            with torch.no_grad():
                num_samples = min(32, fake_images.size(0), real_images.size(0), len(text_metr))
                fake_batch = fake_images[:num_samples].cpu()
                real_batch = real_images[:num_samples].cpu()

                # CLIP
                pil_images = [transforms.ToPILImage()(img) for img in fake_batch]
                texts = [label for label in text_metr[:num_samples]]
                scores = evaluate_clip_scores(pil_images, texts)
                print(f"[Epoch {epoch} | Step {i}] CLIP-I: {scores['clip-i (mean)']:.4f} | CLIP-T: {scores['clip-t (mean)']:.4f}")

                # IS / FID
                try:
                    is_mean, is_std = inception_score(fake_batch)
                    fid_score = calculate_fid(real_batch, fake_batch)
                    print(f"[Epoch {epoch} | Step {i}] IS: {is_mean:.4f} ± {is_std:.4f} | FID: {fid_score:.4f}")
                except Exception as e:
                    print(f"Error calculating IS/FID: {e}")
                    is_mean, is_std, fid_score = float('nan'), float('nan'), float('nan')

                # Метрики → датафрейм + сохранение
                new_row = pd.DataFrame([{
                    "epoch": epoch,
                    "step": i,
                    "g_loss": avg_loss_generator,
                    "d_loss": loss_discriminator.item(),
                    "clip_i": scores["clip-i (mean)"],
                    "clip_t": scores["clip-t (mean)"],
                    "inception_score_mean": is_mean,
                    "inception_score_std": is_std,
                    "fid_score": fid_score
                }])
                metrics_data = pd.concat([metrics_data, new_row], ignore_index=True)
                metrics_data.to_csv("output/metrics.csv", index=False)

                wandb.log({
                    "epoch": epoch,
                    "g_loss": avg_loss_generator,
                    "d_loss": loss_discriminator.item(),
                    "clip_i": scores["clip-i (mean)"],
                    "clip_t": scores["clip-t (mean)"],
                    "inception_score_mean": is_mean,
                    "inception_score_std": is_std,
                    "fid_score": fid_score
                }, step=global_step)
                global_step += 1

            generator.train()
            discriminator.train()

            # Вывод прогресса
            print(f"[Epoch {epoch}/{num_epochs}] [Batch {i}/{len(dataloader)}] "
                  f"Loss D: {loss_discriminator.item():.4f}, Loss G: {avg_loss_generator:.4f}")

    # Сохранение сгенерированных образцов
    with torch.no_grad():
        fixed_text_embeddings = text_embedding(fixed_text).detach()
        fixed_images = generator(fixed_noise, fixed_text_embeddings).detach().cpu()
        grid = torchvision.utils.make_grid(fixed_images, nrow=8, normalize=True)
        torchvision.utils.save_image(grid, f"output/samples_training/fake_epoch_{epoch}.png")

    # Сохранение контрольных точек модели
    torch.save({
        'epoch': epoch,
        'generator_state_dict': generator.state_dict(),
        'discriminator_state_dict': discriminator.state_dict(),
        'optimizerG_state_dict': optimizerG.state_dict(),
        'optimizerD_state_dict': optimizerD.state_dict()
    }, f"output/checkpoints/model_epoch_{epoch}.pth")

    schedulerG.step()
    schedulerD.step()

print("Training complete.")

wandb.finish()


  0%|          | 0/5 [00:00<?, ?it/s]

[Epoch 0 | Step 0] CLIP-I: 0.1323 | CLIP-T: 0.1323
[Epoch 0 | Step 0] IS: 1.2019 ± 0.0860 | FID: 1542.3689
[Epoch 0/5] [Batch 0/6208] Loss D: 403.8373, Loss G: 0.2753
[Epoch 0 | Step 100] CLIP-I: 0.1559 | CLIP-T: 0.1559
[Epoch 0 | Step 100] IS: 1.2320 ± 0.0625 | FID: 1485.5283
[Epoch 0/5] [Batch 100/6208] Loss D: -11.7725, Loss G: 3.8261
[Epoch 0 | Step 200] CLIP-I: 0.1745 | CLIP-T: 0.1745
[Epoch 0 | Step 200] IS: 1.7206 ± 0.2833 | FID: 1391.2139
[Epoch 0/5] [Batch 200/6208] Loss D: 1.7089, Loss G: -1.5034
[Epoch 0 | Step 300] CLIP-I: 0.1509 | CLIP-T: 0.1509
[Epoch 0 | Step 300] IS: 1.6080 ± 0.2098 | FID: 1289.6542
[Epoch 0/5] [Batch 300/6208] Loss D: -0.6364, Loss G: 2.8035
[Epoch 0 | Step 400] CLIP-I: 0.1529 | CLIP-T: 0.1529
[Epoch 0 | Step 400] IS: 1.5886 ± 0.2008 | FID: 1476.6733
[Epoch 0/5] [Batch 400/6208] Loss D: -0.1490, Loss G: 5.8667
[Epoch 0 | Step 500] CLIP-I: 0.1712 | CLIP-T: 0.1712
[Epoch 0 | Step 500] IS: 1.4489 ± 0.1518 | FID: 1427.5407
[Epoch 0/5] [Batch 500/6208] Loss

KeyboardInterrupt: 

In [None]:
wandb.finish()

0,1
clip_i,▁▄▆▃▆▄▇▅▅▅▆▇▇▇▇▆▇▆▅▆▅▆▆▆▆▆▆▇▄▆▆▇▇▆▆▆▅▇▅█
clip_t,▅▁▂▅▇█▄▄▄▄▇▁▅▄▅▇▆▆▆▆▅▆▅▅▄▅▆▅▆▇▆▅▇▆▄▆▄▆▄█
d_loss,█▃▃▃▃▃▃▃▃▃▃▃▃▃▃▂▃▂▂▃▃▃▃▃▂▂▃▃▂▂▂▃▃▂▃▁▂▃▂▂
epoch,▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁██
fid_score,█▇▆▅▇▄▄▆▇█▅▄▇▃▄▄▅▄▆▅▄▅▆▄▅▄▅▅▅▃▂▄▄▆▃▄▃▃▇▁
g_loss,▁▁▁▁▁▁▁▁▁▁▁▂▂▂▂▂▂▃▃▄▄▄▄▄▄▄▄▆▅▅▅▆▆▆▆▇▇███
inception_score_mean,▁▄▄▃▄▆▄▅▃▆▆▇▆▅▅▅▅▆▆▅▆▅▅▅▇▆█▇▇▅▆▆▆▅▅▄▆▅▆▆
inception_score_std,▁▁▅▄▃▃▄▅▄▅▂▃▂▆▅▄▅▅▆▄▅▆▄▄▅▄▇▅▂█▃▃▆▃▄▆▃▅▅▅

0,1
clip_i,0.19047
clip_t,0.19047
d_loss,-28.29402
epoch,1.0
fid_score,972.72233
g_loss,149.56443
inception_score_mean,1.79096
inception_score_std,0.2524
