# Обучение GAN на MNIST

В этом задании вам предстоит [обучить генератор и дискриминатор](https://arxiv.org/abs/1406.2661) на датасете MNIST.

## Настройка google colab

Для корректной работы ноутбука и отправки заданий в тестирующую систему запустите следующие две ячейки (до начала работы).

In [None]:
%%bash

rm colab_setup.py

wget -q https://raw.githubusercontent.com/hse-cs-ami/coursera-advanced-dl/main/utils/colab_setup.py -O colab_setup.py

In [None]:
import colab_setup

colab_setup.Week02GAN().setup()

In [None]:
from testing import TestWeek02


tester = TestWeek02()

tester.set_email('### YOUR EMAIL ###')
tester.set_token('### YOUR TOKEN ###')

## Необходимые импорты

In [None]:
from collections import defaultdict
from time import perf_counter
from warnings import filterwarnings
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torchvision.transforms as t
from IPython.display import clear_output
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from tqdm import tqdm


filterwarnings('ignore')

sns.set(style='darkgrid')

# Необходимые константы

В этом задании вы будете работать с изображениями из набора данных MNIST, они имеют размер 28 на 28 пикселей.

В качестве гиперпараметра генератора мы будем использовать число 128 - размерность вектора шума, который генератор будет получать на вход.

In [None]:
IMAGE_SIZE = 28
NOISE_DIMENSION = 128

## Генератор

Заполните пропуски в коде, чтобы получился рабочий класс модели Генератора.

Ваш Генератор должен иметь следующую архитектуру:

 - Линейный слой (входной) с размерностью входа равной размерности шума, выхода `N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `N`, выхода `2N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `2N`, выхода `4N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `4N`, выхода `размер изображения` x `размер изображения`
 - Функция активации `Tanh`
 
`N` является гиперпараметром архитектуры и задается в конструкторе аргументом `baze_d`.

Обратите внимание, что на выходе Генератор должен возвращать изображение (то есть тензор размерности `(BC, 1, 28, 28)`, где `BC` это размер батча).

In [None]:
class Generator(nn.Module):
    def __init__(self, baze_d: int = 256):
        super().__init__()
        
        self.layers = ### Ваш код

    def forward(self, x):
        return self.layers(x).view(-1, 1, IMAGE_SIZE, IMAGE_SIZE)

In [None]:
# тестируем то, как вы написали класс генератора
tester.set_email('### YOUR EMAIL ###')
tester.set_token('### YOUR TOKEN ###')

tester.test01(Generator)

## Дискриминатор

Заполните пропуски в коде, чтобы получился рабочий класс модели Дискриминатора.

Ваш Дискриминатор должен иметь следующую архитектуру:

 - Линейный слой (входной) с размерностью входа `размер изображения` x `размер изображения`, выхода `4N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `4N`, выхода `2N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `2N`, выхода `N`
 - Функция активации `LeakyReLU`
 
 - Линейный слой с размерностью входа `N`, выхода `1`
 - Функция активации `Sigmoid`
 
`N` является гиперпараметром архитектуры и задается в конструкторе аргументом `baze_d`.

Обратите внимание, что на входе Дискриминатор получает изображение (то есть тензор размерности `(BC, 1, 28, 28)`, где `BC` это размер батча), а линейный слой ожидает увидеть вектор (то есть тензор размерности `(BC, 28 x 28)`).

In [None]:
class Discriminator(nn.Module):
    def __init__(self, base_d: int = 256):
        super().__init__()

        self.layers = ### Ваш код

    def forward(self, x):
        return self.layers(x.view(-1, 784))

In [None]:
# тестируем то, как вы написали класс дискриминатора
tester.set_email('### YOUR EMAIL ###')
tester.set_token('### YOUR TOKEN ###')

tester.test02(Discriminator)

## Оболочка

Заполните пропуски в коде, чтобы получился рабочий класс оболочки.

Вам нужно заполнить пропуски в функциях `update_generator` и `update_discriminator`.

Каждая из этих функций должна считать ошибку для соответствующей модели и запускать проход назад.

В функции `update_generator` вам нужно посчитать ошибку для Генератора: Генератор должен обмануть Дискриминатор, то есть

$$Loss\left(D\left(G\left(Noise\right)\right), 1\right).$$

В функции `update_discriminator` вам нужно посчитать ошибку для Дискриминатора: Дискриминатор должен распознать выход Генератора и реальные данные:

$$Loss\left(D\left(G\left(Noise\right)\right), 0\right) + Loss\left(D\left(Images\right), 1\right).$$

In [None]:
def make_noise(bs, device, n_features=NOISE_DIMENSION):
    return torch.randn(bs, n_features).to(device)

def make_ones(bs, device):
    return torch.ones(bs, 1).to(device)

def make_zeros(bs, device):
    return torch.zeros(bs, 1).to(device)

In [None]:
class Wrapper:

    def __init__(self) -> None:

        transform: t.Compose = t.Compose(
            [
                t.ToTensor(),
                t.Normalize((0.5), (0.5))
            ]
        )

        dataset = MNIST(root='./data', train=True, download=True, transform=transform)

        batch_size = 64

        self.loader: DataLoader = DataLoader(dataset, shuffle=True, batch_size=batch_size, num_workers=2)

        self.device: torch.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

        self.generator = Generator().to(self.device)
        self.discriminator = Discriminator().to(self.device)

        self.optimizer_generator = Adam(self.generator.parameters(), lr=1e-4)
        self.optimizer_discriminator = Adam(self.discriminator.parameters(), lr=1e-4)
        
        self.loss = nn.BCELoss()

        self.history = defaultdict(list)
        
        self.test_noise = make_noise(100, self.device)

    def run(self, epochs: int = 100):
        total_time = 0

        for epoch in range(epochs):
            # генерируем картинку из шума
            self.image()
            
            # строим графики
            self.plot_stats(epochs)

            # обучаемся
            start = perf_counter()
            self.train_epoch()
            total_time += perf_counter() - start

        # строим финальные графики и печатаем сколько заняло обучение
        self.plot_stats(epochs)
        print(f'Время на обучение: {total_time:.2f} секунд')

    def train_epoch(self) -> None:
        self.generator.train()
        self.discriminator.train()
        
        g_loss = 0
        d_loss = 0

        for images, _ in tqdm(self.loader, desc='Обучение'):
            images = images.to(self.device)
            
            self.set_grad('discriminator', True)
            self.set_grad('generator', False)
            
            self.optimizer_discriminator.zero_grad()
            d_loss += self.update_discriminator(images)
            self.optimizer_discriminator.step()
            
            self.set_grad('generator', True)
            self.set_grad('discriminator', False)
            
            self.optimizer_generator.zero_grad()
            g_loss += self.update_generator(images.size(0))
            self.optimizer_generator.step()
            
        self.history['generator_loss'].append(g_loss / len(self.loader))
        self.history['discriminator_loss'].append(d_loss / len(self.loader))
    
    def update_generator(self, bs):
        loss = ### Ваш код
        
        loss.backward()
        
        return loss.item()
        
    def update_discriminator(self, real_data):
        fake_data = ### Ваш код (используйте .detach())

        real_loss = ### Ваш код
        fake_loss = ### Ваш код
        
        loss = real_loss + fake_loss
        
        loss.backward()

        return loss.item()
    
    def set_grad(self, name: str, requires_grad: bool) -> None:
        if name == 'discriminator':
            for param in self.discriminator.parameters():
                param.requires_grad = requires_grad
        elif name == 'generator':
            for param in self.generator.parameters():
                param.requires_grad = requires_grad
        else:
            raise ValueError(f'Incorrect name {name}')

    @torch.inference_mode()    
    def image(self):
        self.history['images'].append(
            (make_grid(
                self.generator(self.test_noise).cpu(),
                nrow=10,
                pad_value=255
            ).numpy().transpose(1,2,0) * 255).astype('uint8')
        )
    
    def plot_stats(self, epochs) -> None:
        clear_output(wait=True)

        plt.figure(figsize=(10, 10))
        
        plt.title('Ошибка моделей в зависимости от номера эпохи')
        plt.plot(
            range(1, len(self.history['generator_loss']) + 1),
            self.history['generator_loss'],
            label='Генератор', marker='^'
        )
        plt.plot(
            range(1, len(self.history['discriminator_loss']) + 1),
            self.history['discriminator_loss'],
            label='Дискриминатор', marker='^'
        )

        plt.xlim([0.5, epochs + 0.5])

        plt.xlabel('Эпоха')
        plt.ylabel('Ошибка')

        plt.legend()

        plt.show()
        
        plt.figure(figsize=(10, 10))
        
        plt.title('Примеры генерации из тестового шума')
        plt.imshow(self.history['images'][-1])
        
        plt.grid(False)
        plt.axis('off')

        plt.show()

In [None]:
wrapper = Wrapper()

# Запуск обучения двух моделей

Признак качественно обученных моделей - ошибка обеих моделей выходит на плато + изображения, сгенерированные из тестового шума, становятся всё более похожие на реальные цифры.

Подобные результаты можно ждать после 70-100 эпох.

In [None]:
wrapper.run()

In [None]:
# тестируем то, как вы написали оболочку и как хорошо вам удалось обучить ваши модели
tester.set_email('### YOUR EMAIL ###')
tester.set_token('### YOUR TOKEN ###')

tester.test03(wrapper)