# This cat does not exist
__Суммарное количество баллов: 10__

__Решение отправлять на `ml.course.practice@gmail.com`__

__Тема письма: `[ML][MS][HW06] <ФИО>`, где вместо `<ФИО>` указаны фамилия и имя__

Цель этого задания - создать котов, которых не существует. В ходе данного задания вы обучите DCGAN и VAE, которые являются одними из первых генеративных моделей. Для этого задания вам наверняка потребуется GPU с CUDA, поэтому рекомендуется использовать Google Colab.

In [None]:
import torch
from torch import nn
from torch.optim import Adam
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os 
import cv2
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import random
import numpy as np

%matplotlib inline

In [None]:
def random_noise(batch_size, channels, side_size):
    return torch.randn(batch_size, channels, side_size, side_size).cuda()

def imagewide_average(x):
    return x.mean(dim=(-1, -2))

In [None]:
def visualise(imgs, rows=2, pic_save_path=None):
    imgs = (imgs.transpose(1, 3) + 1) / 2
    imgs = torch.cat([imgs[i::rows] for i in range(rows)], dim=1)
    cols = len(imgs)
    imgs = (torch.cat(list(imgs), dim=1)).cpu().numpy()[:, :, ::-1]
    plt.figure(figsize=(cols * 1.5, rows * 1.5))
    plt.imshow(imgs)
    plt.axis('off')
    plt.tight_layout()
    if pic_save_path is not None:
        plt.savefig(pic_save_path)
    plt.show()

In [None]:
class MyDataset(Dataset):
    def __init__(self, path_to_dataset="cat_136", h_size=64, w_size=64, to_flip=False, **kwargs):
        self.photo_names = os.listdir(path_to_dataset)
        self.path_base = path_to_dataset
        self.h_size = h_size
        self.w_size = w_size
        self.to_flip = to_flip
    
    def __getitem__(self, index):
        path = self.path_base + "/" + self.photo_names[index]
        img = cv2.imread(path) # 136 x 136
        crop_rate = 8
        x_crop = random.randint(0, crop_rate)
        y_crop = random.randint(0, crop_rate)
        img = img[x_crop:img.shape[0] - crop_rate + x_crop, y_crop:img.shape[1] - crop_rate + y_crop]

        if self.to_flip:
            p = np.random.uniform()
            img = img[:, ::-1] if p >= 0.5 else img

        img = cv2.resize(img, (self.h_size, self.w_size), interpolation=cv2.INTER_CUBIC)
        return 2 * torch.tensor(img).float().transpose(0, 2) / 255. - 1
    
    def __len__(self):
        return len(self.photo_names)

In [None]:
dataset = MyDataset()
visualise(torch.cat([dataset[i].unsqueeze(0) for i in [3, 15, 182, 592, 394, 2941]], dim=0))

### Задание 1 (2 балла)
Для начала реализуем генератор для нашего DCGAN. Предлагается использовать следующую архитектуру:

![](imgs/DCGAN.png)

Для ее реализации вам потребуются модули `nn.BatchNorm2d`, `nn.Conv2d`, `nn.ConvTranspose2D`, `nn.ReLU`, а также функция `F.interpolate`.

#### Методы
* `__init__` - принимает на вход `start_size`, `latent_channels`, `start_channels` и `upsamplings`. Первые два аргумента отвечают за размер случайного шума, из которого в последствии будет сгенерирована картинка. `start_channels` отвечает за то, сколько каналов должно быть в картинке перед тем, как к ней будут применены upsampling блоки. `upsamplings` - это количество upsampling блоков, которые должны быть применены к картинке. В каждом таком локе количество каналов уменьшается в два раза.


* `forward` - принимает на вход `batch_size`, генерирует `batch_size` картинок из случайного шума.

In [None]:
class Generator(nn.Module):
    def __init__(self, start_size=2, latent_channels=32, start_channels=1024, upsamplings=6, **kwargs):
        super().__init__()
        self.start_size = start_size
        self.latent_channels = latent_channels
        self.start_channels = start_channels
        self.upsamplings = upsamplings

        self.prep = nn.Conv2d(self.latent_channels, self.start_channels, kernel_size=1, stride=1, padding=0, bias=False)
        self.up = nn.Sequential(
            *[self._up(self.start_channels // 2 ** i) for i in range(self.upsamplings)]
        )
        self.finale = self._finale(self.start_channels // 2 ** self.upsamplings)
    
    def _up(self, c_in):
        c_out = c_in // 2
        return nn.Sequential(
            nn.ConvTranspose2d(c_in, c_out, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(c_out, affine=False),
            nn.ReLU()
            )
    
    def _finale(self, c_in):
        return nn.Sequential(
            nn.Conv2d(c_in, 3, kernel_size=1, stride=1, padding=0, bias=False),
            nn.Tanh()
            )

    def forward(self, batch_size: int):
        out = torch.randn((batch_size, self.latent_channels, self.start_size, self.start_size)).cuda()

        out = self.prep(out)
        out = self.up(out)
        out = self.finale(out)
        return out

### Задание 2 (2 балла)
Для начала реализуем дискриминатор для нашего DCGAN. Предлагается использовать следующую архитектуру:

![](imgs/Disc_DCGAN.png)

Для ее реализации вам потребуются модули `nn.BatchNorm2d`, `nn.Conv2d`, `nn.ReLU` и `nn.Sigmoid`.

#### Методы
* `__init__` - принимает на вход `start_channels` и `downsamplings`. `start_channels` определяет количество каналов, которые должны быть в изображении перед применением downsampling блоков.


* `forward` - принимает на вход `x` - тензор с картинками. Возвращает вектор с размерностью `batch_size`.

In [None]:
class Discriminator(nn.Module):
    def __init__(self, w_size, h_size, downsamplings=6, start_channels=8, **kwargs):
        super().__init__()
        self.downsamplings = downsamplings
        self.start_channels = start_channels
        self.prep = nn.Conv2d(3, start_channels, kernel_size=1, stride=1, padding=0, bias=False)
        self.down = nn.Sequential(
            *[self._down(self.start_channels * 2 ** i) for i in range(self.downsamplings)]
        )
        self.finale = self._finale(self.start_channels * 2 ** self.downsamplings, w_size, h_size)

    def _down(self, c_in):
        c_out = c_in * 2
        return nn.Sequential(
            nn.Conv2d(c_in, c_out, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(c_out, affine=False),
            # nn.Dropout(p=0.25),
            nn.ReLU()
            )
    
    def _finale(self, c_in,w_size, h_size):
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(c_in * w_size * h_size // 2 ** (2 * self.downsamplings), 1),
            nn.Sigmoid()
        )
        
    def forward(self, X):
        out = self.prep(X)
        out = self.down(out)
        return self.finale(out)

In [None]:
def train_gan(config):
    name = config["name"]
    if not os.path.exists(f"models/gan/{name}"):
        os.mkdir(f"models/gan/{name}")
        os.mkdir(f"models/gan/{name}/generators")
        os.mkdir(f"models/gan/{name}/discriminators")
    gen_dir = f"models/gan/{name}/generators"
    disc_dir = f"models/gan/{name}/discriminators"

    epochs = config["epochs"]
    batch_size = config["batch_size"]

    generator = Generator(**config).cuda()
    discriminator = Discriminator(**config).cuda()
    visualise_every = 10

    lr = config["lr"]
    gen_optim = Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    disc_optim = Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

    dataset = MyDataset(**config)

    t = tqdm(range(epochs))
    for ep in t:
        dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
        total_batches = 0
        gen_loss_avg = 0
        disc_loss_avg = 0

        for i, batch in enumerate(dataloader):
            if len(batch) < batch_size:
                continue
            total_batches += 1
            # Positive update
            batch = batch.cuda()
            pred = discriminator(batch)
            loss = F.binary_cross_entropy(pred, torch.ones_like(pred))
            disc_optim.zero_grad()
            loss.backward()
            disc_optim.step()
            disc_loss_avg += loss.item()

            # Negative update
            batch = generator(batch_size).detach()
            pred = discriminator(batch)
            loss = F.binary_cross_entropy(pred, torch.zeros_like(pred))
            disc_optim.zero_grad()
            loss.backward()
            disc_optim.step()
            disc_loss_avg += loss.item()

            # Generator update
            batch = generator(batch_size)
            pred = discriminator(batch)
            loss = F.binary_cross_entropy(pred, torch.ones_like(pred))
            gen_optim.zero_grad()
            loss.backward()
            gen_optim.step()
            gen_loss_avg += loss.item()
        
        if (ep + 1) % visualise_every == 0:
            print(f"Epoch {ep + 1} | Discriminator loss: {disc_loss_avg / total_batches} | Generator loss: {gen_loss_avg / total_batches}")
            if config["pic_save_path"]:
                pic_path = config["pic_save_path"] + "/" + config["name"] + f"/{ep + 1}.png"
            with torch.no_grad():
                visualise(generator(4), rows=2, pic_save_path=pic_path)
            torch.save(generator, f"{gen_dir}/{ep + 1}")
            torch.save(discriminator, f"{disc_dir}/{ep + 1}")

        t.set_description(f"Dloss = {disc_loss_avg / total_batches:.4f} | Gloss = {gen_loss_avg / total_batches:.4f}")
    return generator

In [None]:
cat = {
    "name": "cat_128",
    "pic_save_path": "pics/gan",
    "path_to_dataset": "cat_136",
    "h_size": 128,
    "w_size": 128,
    "epochs": 300,
    "batch_size": 8,
    "lr": 1e-4,
    "to_flip": True,
}
gen = train_gan(cat)
with torch.no_grad():
    visualise(gen(100), rows=10)

In [None]:
# goose = {
#     "name": "goose_128",
#     "pic_save_path": "pics/gan",
#     "path_to_dataset": "goose",
#     "h_size": 128,
#     "w_size": 128,
#     "epochs": 1000,
#     "batch_size": 8,
#     "lr": 1e-4,
#     "to_flip": True,
# }
# gen = train_gan(goose)
# with torch.no_grad():
#     visualise(gen(100), rows=10)

In [None]:
for epoch in [10, 50, 100, 150, 200, 250, 260]:
    print(f"Epoch: {epoch}")
    gen = torch.load(f"models/gan/cat_128/generators/{epoch}")
    gen.eval()
    with torch.no_grad():
        visualise(gen(27), rows=3)

In [None]:
for epoch in [10, 100, 200, 400, 600, 700]:
    print(f"Epoch: {epoch}")
    gen = torch.load(f"models/gan/goose_128/generators/{epoch}")
    gen.eval()
    with torch.no_grad():
        visualise(gen(27), rows=3)

In [None]:
gen = None

### Задание 3 (5 баллов)
Теперь посмотрим на другую модель: Variational Autoencoder. В отличии от GAN, в котором генератор пытается себя обмануть дискриминатор, а дискриминатор старается не быть обманутым, VAE решает задачу реконструкции элемента множества X с применением регуляризации в латентном пространстве. 

Полностью архитектура выглядит так:
![](imgs/VAE.png)

Из нее можно выделить две части: Encoder (по изображению возвращает mu и sigma) и Decoder (по случайному шуму восстанавливает изображение). На высоком уровне VAE можно представить так:

![](imgs/VAE_highlevel.png)

В данном задании вам необходимо реализовать полную архитектуру VAE.

#### Методы
* `__init__` - принимает на вход `img_size`, `downsamplings`, `latent_size`, `linear_hidden_size`, `down_channels` и `up_channels`. `img_size` - размер стороны входного изображения. `downsamplings` - количество downsampling (и upsampling) блоков. `latent_size` - размер латентного пространства, в котором в который будет закодирована картинка. `linear_hidden_size` количество нейронов на скрытом слое полносвязной сети в конце encoder'а. Для полносвязной сети decoder'а это число стоит умножить на 2. `down_channels` - количество каналов, в которое будет преобразовано трехцветное изображение перед применением `downsampling` блоков. `up_channels` - количество каналов, которое должно получиться после применения всех upsampling блоков.

* `forward` - принимает на вход `x`. Считает распределение $N(\mu, \sigma^2)$ и вектор $z \sim N(\mu, \sigma^2)$. Возвращает $x'$ - восстановленную из вектора $z$ картинку и $D_{KL}(N(\mu, \sigma^2), N(0, 1)) = 0.5 \cdot (\sigma^2 + \mu^2 - \log \sigma^2 - 1)$.

* `encode` - принимает на вход `x`. Возвращает вектор из распределения $N(\mu, \sigma^2)$.

* `decode` - принимает на вход `z`. Возвращает восстановленную по вектору картинку.


#### Если хочется улучшить качество
https://arxiv.org/pdf/1906.00446.pdf

In [None]:
class VAE(nn.Module):
    def __init__(self, img_size=128, downsamplings=5, latent_size=512, down_channels=8, up_channels=16):
        super().__init__()
        self.latent_size = latent_size
        self.encoder = nn.Sequential(
            nn.Conv2d(3, down_channels, kernel_size=1, stride=1, padding=0),
            *[self._down(down_channels * 2 ** i) for i in range(downsamplings)],
            nn.Conv2d(down_channels * 2 ** downsamplings, latent_size * 2, kernel_size=1, stride=1, padding=0)
        )

        dec_chn = up_channels * 2 ** downsamplings
        self.decoder = nn.Sequential(
            nn.Conv2d(latent_size, dec_chn, kernel_size=1, stride=1, padding=0),
            *[self._up(dec_chn // 2 ** i) for i in range(downsamplings)],
            nn.Conv2d(up_channels, 3, kernel_size=1, stride=1, padding=0),
            nn.Tanh()
        )

    def _down(self, c_in):
        c_out = c_in * 2
        return nn.Sequential(
            nn.Conv2d(c_in, c_out, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(c_out),
            # nn.Dropout(),
            nn.ReLU()
            )

    def _up(self, c_in):
        c_out = c_in // 2
        return nn.Sequential(
            nn.ConvTranspose2d(c_in, c_out, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(c_out),
            nn.ReLU()
            )

    def forward(self, x):
        mu, sigma = torch.split(self.encoder(x), self.latent_size, dim=1)
        sigma = torch.exp(sigma)
        z = mu + sigma * torch.randn_like(sigma)
        kld = 0.5 * (sigma ** 2 + mu ** 2 - 2 * torch.log(sigma) - 1)
        return self.decoder(z), kld
    
    def encode(self, x):
        mu, sigma = torch.split(self.encoder(x), self.latent_size, dim=1)
        z = mu + torch.exp(sigma) * torch.randn_like(sigma)
        return z
    
    def decode(self, z):
        return self.decoder(z)

In [None]:
def train_vae(config):
    name = config["name"]
    if not os.path.exists(f"models/vae/{name}"):
        os.mkdir(f"models/vae/{name}")
    model_dir = f"models/vae/{name}"

    vae = VAE(downsamplings=7)
    vae.cuda()

    epochs = config["epochs"]
    batch_size = config["batch_size"]
    vae_optim = Adam(vae.parameters(), lr=config["lr"])

    dataset = MyDataset(**config)

    test_imgs_1 = torch.cat([dataset[i].unsqueeze(0) for i in (0, 34, 76, 1509)])
    test_imgs_2 = torch.cat([dataset[i].unsqueeze(0) for i in (734, 123, 512, 3634)])

    t = tqdm(range(epochs))

    for ep in t:
        dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
        total_batches = 0
        rec_loss_avg = 0
        kld_loss_avg = 0

        for i, batch in enumerate(dataloader):
            if len(batch) < batch_size:
                continue
            total_batches += 1
            x = batch.cuda()
            x_rec, kld = vae(x)
            img_elems = float(np.prod(list(batch.size())))
            kld_loss = kld.sum() / batch_size
            rec_loss = ((x_rec - x)**2).sum() / batch_size
            loss = rec_loss + 0.1 * kld_loss # https://openreview.net/forum?id=Sy2fzU9gl
            vae_optim.zero_grad()
            loss.backward()
            vae_optim.step()
            kld_loss_avg += kld_loss.item()
            rec_loss_avg += rec_loss.item()
        
        if (ep + 1) % 10 == 0:
            print(f"Epoch {ep+1} | Reconstruction loss: {rec_loss_avg / total_batches} | KLD loss: {kld_loss_avg / total_batches}")
            if config["pic_save_path"]:
                pic_path = config["pic_save_path"] + "/" + config["name"] + f"/{ep + 1}.png"
            torch.save(vae, f"{model_dir}/{ep + 1}")
            with torch.no_grad():
                z_1 = vae.encode(test_imgs_1.cuda())
                z_2 = vae.encode(test_imgs_2.cuda())
                x_int = []
                for i in range(9):
                    z = (i * z_1 + (8 - i) * z_2) / 8
                    x_int.append(vae.decode(z))
                x_int = torch.cat(x_int)
                visualise(x_int, rows=len(test_imgs_1), pic_save_path=pic_path)
                z_rand = torch.randn_like(z_1)
                x_int = vae.decode(z_rand)
                visualise(x_int, rows=len(test_imgs_1)//2, pic_save_path=pic_path)
        t.set_description(f"Rloss = {rec_loss_avg / total_batches:.4f} | KLDloss = {kld_loss_avg / total_batches:.4f}")
    return vae

In [None]:
# cat = {
#     "name": "cat_128",
#     "path_to_dataset": "cat_136",
#     "pic_save_path": "pics/vae",
#     "h_size": 128,
#     "w_size": 128,
#     "epochs": 1000,
#     "batch_size": 8,
#     "lr": 1e-4,
#     "to_flip": True,
# }
# vae_cat = train_vae(cat)

In [None]:
cat = {
    "name": "cat_128",
    "path_to_dataset": "cat_136",
    "pic_save_path": "pics/vae",
    "h_size": 128,
    "w_size": 128,
    "epochs": 1000,
    "batch_size": 8,
    "lr": 1e-4,
    "to_flip": True,
}
dataset = MyDataset(**cat)


In [None]:
for epoch in [10, 200, 400, 600, 800, 1000]:
    print(f"Epoch: {epoch}")
    vae = torch.load(f"models/vae/cat_128/{epoch}")
    vae.eval()
    with torch.no_grad():
        x_int = vae.decode(torch.randn((27, 512, 1, 1)).cuda())
        visualise(x_int, rows=3)

In [None]:
vae = torch.load(f"models/vae/cat_128/1000")
vae.eval()
with torch.no_grad():
    x_int = vae.decode(torch.randn((100, 512, 1, 1)).cuda())
    visualise(x_int, rows=10)

Странно, то тут котов сильнее размазывает, т.е. при загрузки модельки, чем при генерации в ноутбуке сразу после обучения в этом же ноутбуке

Вот для примера, как оно было для того же кол-ва эпох в конце:

![](pics/vae_old_1000.png)

In [None]:
vae = None