<p style="align: center;"><img align=center src="https://drive.google.com/uc?export=view&id=1TRNaCfYstvcIQqoUSdukYQGF6LuyL7Tv" width=600 height=320/></p>
<h3 style="text-align: center;"><b>Школа глубокого обучения ФПМИ МФТИ</b></h3>

# Домашнее задание. Автоэнкодеры


# Часть 1. Vanilla Autoencoder (9 баллов)

## 1.1. Подготовка данных (0.5 балла)


In [None]:
import numpy as np
from torch.autograd import Variable
from torchvision import datasets, transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data_utils
import torch
import matplotlib.pyplot as plt

import os
import pandas as pd
import skimage.io
from skimage.transform import resize
from IPython import display


%matplotlib inline

In [None]:
def fetch_dataset(attrs_name = "lfw_attributes.txt",
                      images_name = "lfw-deepfunneled",
                      dx=80,dy=80,
                      dimx=64,dimy=64
    ):

    #download if not exists
    if not os.path.exists(images_name):
        print("images not found, donwloading...")
        os.system("wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz -O tmp.tgz")
        print("extracting...")
        os.system("tar xvzf tmp.tgz && rm tmp.tgz")
        print("done")
        assert os.path.exists(images_name)

    if not os.path.exists(attrs_name):
        print("attributes not found, downloading...")
        os.system("wget http://www.cs.columbia.edu/CAVE/databases/pubfig/download/%s" % attrs_name)
        print("done")

    #read attrs
    df_attrs = pd.read_csv("lfw_attributes.txt",sep='\t',skiprows=1,)
    df_attrs = pd.DataFrame(df_attrs.iloc[:,:-1].values, columns = df_attrs.columns[1:])


    #read photos
    photo_ids = []
    for dirpath, dirnames, filenames in os.walk(images_name):
        for fname in filenames:
            if fname.endswith(".jpg"):
                fpath = os.path.join(dirpath,fname)
                photo_id = fname[:-4].replace('_',' ').split()
                person_id = ' '.join(photo_id[:-1])
                photo_number = int(photo_id[-1])
                photo_ids.append({'person':person_id,'imagenum':photo_number,'photo_path':fpath})

    photo_ids = pd.DataFrame(photo_ids)
    # print(photo_ids)
    #mass-merge
    #(photos now have same order as attributes)
    df = pd.merge(df_attrs,photo_ids,on=('person','imagenum'))

    assert len(df)==len(df_attrs),"lost some data when merging dataframes"

    # print(df.shape)
    #image preprocessing
    all_photos =df['photo_path'].apply(skimage.io.imread)\
                                .apply(lambda img:img[dy:-dy,dx:-dx])\
                                .apply(lambda img: resize(img,[dimx,dimy]))

    all_photos = np.stack(all_photos.values)#.astype('uint8')
    all_attrs = df.drop(["photo_path","person","imagenum"],axis=1)

    return all_photos, all_attrs

In [None]:
# The following line fetches you two datasets: images, usable for autoencoder training and attributes.
# Those attributes will be required for the final part of the assignment (applying smiles), so please keep them in mind


data, attrs = fetch_dataset()

In [None]:
backup = data.copy()

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device


Разбейте выборку картинок на train и val, выведите несколько картинок в output, чтобы посмотреть, как они выглядят, и приведите картинки к тензорам pytorch, чтобы можно было скормить их сети:

In [None]:
np.random.shuffle(data)


data_size = data.shape[0]
train_size = data_size // 10 * 7

train_data = torch.FloatTensor(data[0:train_size]).to(device)
val_data = torch.FloatTensor(data[train_size:data_size]).to(device)

print(train_data.shape)
print(val_data.shape)
print(data.shape)

train_loader = data_utils.DataLoader(train_data, batch_size=64)
val_loader = data_utils.DataLoader(val_data, batch_size=64)

In [None]:
plt.figure(figsize=(20,10))
for i in range(6):
    plt.subplot(1, 6, i + 1)
    plt.imshow(train_data[i].cpu())
    print(train_data[i].shape)

## 1.2. Архитектура модели (1.5 балла)
В этом разделе мы напишем и обучем обычный автоэнкодер.



<img src="https://www.notion.so/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F4b8adf79-8e6a-4b7d-9061-8617a00edbb1%2F__2021-04-30__14.53.33.png?table=block&id=56f187b4-279f-4208-b1ed-4bda5f91bfc0&width=2880&userId=3b1b5e32-1cfb-4b0f-8705-5a524a8f56e3&cache=v2" alt="Autoencoder">


^ напомню, что автоэнкодер выглядит вот так

In [None]:
dim_code = 16 # выберите размер латентного вектора
in_feat = 64 * 64 * 3

Реализуем autoencoder. Архитектуру (conv, fully-connected, ReLu, etc) можете выбирать сами. Экспериментируйте!

In [None]:
from copy import deepcopy

class Autoencoder(nn.Module):
    def __init__(self):

        super().__init__()

        self.encoder = nn.Sequential(
            nn.Linear(in_feat, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, dim_code)
        )

        self.decoder = nn.Sequential(
            nn.Linear(dim_code, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, in_feat)
        )

    def forward(self, x):

        x = x.flatten(1, 3)

        x = self.encoder(x)
        latent_code = x

        x = self.decoder(x).view(-1, 64, 64, 3)
        reconstruction = torch.sigmoid(x)


        return reconstruction, latent_code

## 1.3 Обучение (2 балла)

Осталось написать код обучения автоэнкодера. При этом было бы неплохо в процессе иногда смотреть, как автоэнкодер реконструирует изображения на данном этапе обучения. Наример, после каждой эпохи (прогона train выборки через автоэекодер) можно смотреть, какие реконструкции получились для каких-то изображений val выборки.

А, ну еще было бы неплохо выводить графики train и val лоссов в процессе тренировки =)

In [None]:
from tqdm import tqdm
import time

In [None]:
criterion = nn.CrossEntropyLoss()
checker = Autoencoder().to(device)
optimizer = optim.Adam(autoencoder.parameters())


epochs = 15

im_to_show = 5

sample = val_data[torch.randperm(val_data.shape[0])[:5]]

t_losses = []
v_losses = []

for epoch in tqdm(range(epochs)):
    t_l = []
    v_l = []
    for k, loader in enumerate([train_loader, val_loader]):
        for batch in loader:
            batch = batch.to(device)
            if k == 0:
                checker.train()
                optimizer.zero_grad()
                pred, lc = checker(batch)
                loss = criterion(pred.flatten(1,3), batch.flatten(1,3))
                # print(loss.item())
                t_l.append(loss.item())
                loss.backward()
                optimizer.step()

            if k == 1:
                with torch.no_grad():
                    checker.eval()
                    pred, lc = checker(batch)
                    loss = criterion(pred.flatten(1,3), batch.flatten(1,3))
                    v_l.append(loss.item())


    t_losses.append(np.mean(t_l))
    v_losses.append(np.mean(v_l))

#     with torch.no_grad():
#         autoencoder.eval()
#         plt.figure(figsize=(10,3))
#         plt.title('epoch' + str(epoch + 1))
#         for i in range(im_to_show):
#             plt.subplot(2, im_to_show, i + 1)
#             plt.imshow(np.array(sample[i].cpu() * 255).astype(np.uint8))
#             plt.subplot(2, im_to_show, i + im_to_show + 1)
#             pr, _ = autoencoder(sample[i][None, :])
#             plt.imshow(np.array(pr.cpu()[0]))

#         display.clear_output(wait=True)
#         display.display(plt.gcf())

# display.clear_output(wait=True)
# plt.figure(figsize=(12,6))
# plt.plot(range(epochs), t_losses, label='train')
# plt.plot(range(epochs), v_losses, label='val')
# plt.legend()

Давайте посмотрим, как наш тренированный автоэекодер кодирует и восстанавливает картинки:

In [None]:
aboba = val_data[torch.randperm(val_data.shape[0])[26:31]]
plt.figure(figsize=(10,3))
plt.title('epoch' + str(epoch + 1))
with torch.no_grad():
    autoencoder.eval()
    for i in range(im_to_show):
        plt.subplot(2, im_to_show, i + 1)
        plt.imshow(np.array(aboba[i].cpu()))
        plt.subplot(2, im_to_show, i + im_to_show + 1)
        pr, _ = autoencoder(aboba[i][None, :])
        plt.imshow(np.array(pr.cpu()[0]))

Not bad, right?

## 1.4. Sampling (2 балла)

Давайте теперь будем не просто брать картинку, прогонять ее через автоэекодер и получать реконструкцию, а попробуем создать что-то НОВОЕ

Давайте возьмем и подсунем декодеру какие-нибудь сгенерированные нами векторы (например, из нормального распределения) и посмотрим на результат реконструкции декодера:

__Подсказка:__Е сли вместо лиц у вас выводится непонятно что, попробуйте посмотреть, как выглядят латентные векторы картинок из датасета. Так как в обучении нейронных сетей есть определенная доля рандома, векторы латентного слоя могут быть распределены НЕ как `np.random.randn(25, <latent_space_dim>)`. А чтобы у нас получались лица при запихивании вектора декодеру, вектор должен быть распределен так же, как латентные векторы реальных фоток. Так что в таком случае придется рандом немного подогнать.

In [None]:
torch.normal(mean=1, std=2, size=(16,))

In [None]:
elem = val_data[170]

plt.figure(figsize=(2,2))
plt.imshow(np.array(elem.cpu()))
plt.show()

autoencoder.eval()
pred, lc = autoencoder(elem[None, :])
with torch.no_grad():
    plt.figure(figsize=(2,2))
    plt.imshow(np.array(pred[0].cpu()))
    plt.show()


print(lc)

In [None]:
with torch.no_grad():
    value = torch.normal(mean=0, std=1, size=(16,))
    x = value[None, :].cuda()
    x = autoencoder.decoder(x).view(-1, 64, 64, 3)
    reconstruction = torch.sigmoid(x)
    plt.figure(figsize=(2,2))
    plt.imshow(np.array(reconstruction.cpu()[0]))
    plt.show()

In [None]:
attrs.head()

## 1.5 Time to make fun! (3 балла)

Давайте научимся пририсовывать людям улыбки =)

In [None]:
a = [1,2,3,4]
a[-2:]

<img src="https://i.imgur.com/tOE9rDK.png" alt="linear" width="700" height="400">

План такой:

1. Нужно выделить "вектор улыбки": для этого нужно из выборки изображений найти несколько (~15) людей с улыбками и столько же без.

Найти людей с улыбками вам поможет файл с описанием датасета, скачанный вместе с датасетом. В нем указаны имена картинок и присутствубщие атрибуты (улыбки, очки...)

2. Вычислить латентный вектор для всех улыбающихся людей (прогнать их через encoder) и то же для всех грустненьких

3. Вычислить, собственно, вектор улыбки -- посчитать разность между средним латентным вектором улыбающихся людей и средним латентным вектором грустных людей

4. А теперь приделаем улыбку грустному человеку: добавим полученный в пункте 3 вектор к латентному вектору грустного человека и прогоним полученный вектор через decoder. Получим того же человека, но уже не грустненького!

In [None]:
n_elements = 700
half_n = 350


smile = attrs.sort_values('Smiling')[-n_elements:].sort_values('Male')
sad = attrs.sort_values('Smiling')[:n_elements].sort_values('Male')

women_smile = smile[:half_n].index
men_smile = smile[-half_n:].index

women_sad = sad[:half_n].index
men_sad = sad[-half_n:].index

autoencoder.eval()
wsm_elems = torch.FloatTensor(backup[women_smile]).to(device)
msm_elems = torch.FloatTensor(backup[men_smile]).to(device)
wsd_elems = torch.FloatTensor(backup[women_sad]).to(device)
msd_elems = torch.FloatTensor(backup[men_sad]).to(device)

_, lc_wsm = autoencoder(wsm_elems)
_, lc_wsd = autoencoder(wsd_elems)
_, lc_msm = autoencoder(msm_elems)
_, lc_msd = autoencoder(msd_elems)

men_diff = torch.mean(lc_msm - lc_msd, -2)
women_diff = torch.mean(lc_wsm - lc_wsd, -2)


n_test = 5

test = attrs.sort_values('Smiling')[n_elements+4:n_elements+4+n_test]

test_men = torch.FloatTensor(backup[
    test[test['Male'] > 0].index
]).to(device)
test_women = torch.FloatTensor(backup[
    test[test['Male'] <= 0].index
]).to(device)


pr_m, lcs_m = autoencoder(test_men)
pr_w, lcs_w = autoencoder(test_women)


subjects_m = torch.sigmoid(autoencoder.decoder(lcs_m + men_diff).view(-1, 64, 64, 3))
subjects_w = torch.sigmoid(autoencoder.decoder(lcs_w + women_diff).view(-1, 64, 64, 3))

n_m = subjects_m.shape[0]
n_w = subjects_w.shape[0]

with torch.no_grad():
    for i in range(n_m):
        plt.subplot(3, n_m, i + 1)
        plt.imshow(np.array(test_men[i].cpu()))
        plt.subplot(3, n_m, i + n_m +1)
        plt.imshow(np.array(pr_m[i].cpu()))
        plt.subplot(3, n_m, i + 2 * n_m + 1)
        plt.imshow(np.array(subjects_m[i].cpu()))
    plt.show()
    for i in range(n_w):
        plt.subplot(3, n_w, i + 1)
        plt.imshow(np.array(test_women[i].cpu()))
        plt.subplot(3, n_w, i + n_w +1)
        plt.imshow(np.array(pr_w[i].cpu()))
        plt.subplot(3, n_w, i + 2 * n_w + 1)
        plt.imshow(np.array(subjects_w[i].cpu()))
    plt.show()

Вуаля! Вы восхитительны!

Теперь вы можете пририсовывать людям не только улыбки, но и много чего другого -- закрывать/открывать глаза, пририсовывать очки... в общем, все, на что хватит фантазии и на что есть атрибуты в `all_attrs`:)

# Часть 2: Variational Autoencoder (10 баллов)

Займемся обучением вариационных автоэнкодеров — проапгрейженной версии AE. Обучать будем на датасете MNIST, содержащем написанные от руки цифры от 0 до 9

In [None]:
batch_size = 32
# MNIST Dataset
train_dataset = datasets.MNIST(root='./mnist_data/', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./mnist_data/', train=False, transform=transforms.ToTensor(), download=False)

# Data Loader (Input Pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

## 2.1 Архитектура модели и обучение (2 балла)

Реализуем VAE. Архитектуру (conv, fully-connected, ReLu, etc) можете выбирать сами. Рекомендуем пользоваться более сложными моделями, чем та, что была на семинаре:) Экспериментируйте!

In [None]:
in_feat = 28 * 28
latent_dim = 64

In [None]:
class VAE(nn.Module):
    def __init__(self):

        super().__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1), # 28 - 28
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(2), # 28 - 14
            nn.Conv2d(16, 64, kernel_size=5), # 14 - 10
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2), # 10 - 5
            nn.Conv2d(64, 128, kernel_size=5), # 5 - 1
            nn.ReLU(),
            nn.Flatten(1, 3),
            nn.Linear(128, latent_dim * 2)
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2), # 1 - 3
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2), # 3 - 7
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2), # 7 - 14
            nn.ReLU(),
            nn.BatchNorm2d(8),
            nn.ConvTranspose2d(8, 1, kernel_size=2, stride=2), # 14 - 28
        )

    def encode(self, x):
        x = self.encoder(x).view(-1, 2, latent_dim)

        mu = x[:, 0, :]
        logsigma = x[:, 1, :]

        return mu, logsigma

    def gaussian_sampler(self, mu, logsigma):
        if self.training:
            std = torch.exp(0.5 * logsigma)
            eps = torch.randn_like(std)
            return eps * std + mu
        else:
            # на инференсе возвращаем не случайный вектор из нормального распределения, а центральный -- mu.
            # на инференсе выход автоэнкодера должен быть детерминирован.
            return mu

    def decode(self, z):
        reconstruction = self.decoder(z.view(-1, 64, 1, 1))

        return reconstruction

    def forward(self, x):
        mu, logsigma = self.encode(x)
        sample = self.gaussian_sampler(mu, logsigma)
        reconstruction = self.decode(sample)
#         reconstruction = torch.sigmoid(reconstruction)
        return mu, logsigma, reconstruction

Определим лосс и его компоненты для VAE:

Надеюсь, вы уже прочитали материал в towardsdatascience (или еще где-то) про VAE и знаете, что лосс у VAE состоит из двух частей: KL и log-likelihood.

Общий лосс будет выглядеть так:

$$\mathcal{L} = -D_{KL}(q_{\phi}(z|x)||p(z)) + \log p_{\theta}(x|z)$$

Формула для KL-дивергенции:

$$D_{KL} = -\frac{1}{2}\sum_{i=1}^{dimZ}(1+log(\sigma_i^2)-\mu_i^2-\sigma_i^2)$$

В качестве log-likelihood возьмем привычную нам кросс-энтропию.

In [None]:
def KL_divergence(mu, logsigma):
    """
    часть функции потерь, которая отвечает за "близость" латентных представлений разных людей
    """
    loss = - 1 / 2 * torch.sum(1 + logsigma - mu ** 2 - torch.exp(logsigma))
    return loss

def log_likelihood(x, reconstruction):
    """
    часть функции потерь, которая отвечает за качество реконструкции (как mse в обычном autoencoder)
    """
#     loss = nn.CrossEntropyLoss()
#     return loss(x.flatten(1,3), reconstruction.flatten(1,3))
    loss = nn.MSELoss(size_average=False)
    return loss(x.flatten(1,3), reconstruction.flatten(1,3))

def loss_vae(x, mu, logsigma, reconstruction, beta):
    return log_likelihood(x, reconstruction) + beta * KL_divergence(mu, logsigma)

И обучим модель:

In [None]:
criterion = loss_vae
vaencoder = VAE().to(device)
optimizer = torch.optim.Adam(vaencoder.parameters())


epochs = 10
im_to_show = 5

tt = []
vv = []


for epoch in tqdm(range(epochs)):

    show_indexes = torch.randint(10000, size=(im_to_show, ))
    show_elems = test_dataset.data[show_indexes].to(device).float().view(im_to_show, 1, 28, 28)

    t_l = []
    v_l = []
    
    for k, loader in enumerate([train_loader, test_loader]):
        print('---', epoch, k)
        for batch, label in loader:
            batch = batch.to(device)
            if k == 0:
                vaencoder.train()
                optimizer.zero_grad()
                mu, logsigma, reconstruction = vaencoder(batch)
                loss = criterion(batch, mu, logsigma, reconstruction, 1)
                t_l.append(loss.item())
                loss.backward()
                optimizer.step()

            if k == 1:
                with torch.no_grad():
                    vaencoder.eval()
                    mu, logsigma, reconstruction = vaencoder(batch)
                    loss = criterion(batch, mu, logsigma, reconstruction, 1)
                    v_l.append(loss.item())

    tt.append(np.mean(t_l))
    vv.append(np.mean(v_l))

    with torch.no_grad():
        autoencoder.eval()

        _, _, rec = vaencoder(show_elems)

        plt.figure(figsize=(5,2))
        for i in range(im_to_show):
            plt.subplot(2, im_to_show, i + 1)
            plt.imshow(np.array(show_elems[i][0].cpu()))
            plt.subplot(2, im_to_show, i + im_to_show + 1)
            plt.imshow(np.array(rec[i][0].cpu()))

        display.clear_output(wait=True)
        display.display(plt.gcf())

Давайте посмотрим, как наш тренированный VAE кодирует и восстанавливает картинки:

Давайте попробуем проделать для VAE то же, что и с обычным автоэнкодером -- подсунуть decoder'у из VAE случайные векторы из нормального распределения и посмотреть, какие картинки получаются:

In [None]:
# вспомните про замечание из этого же пункта обычного AE про распределение латентных переменных
z = torch.normal(mean=0, std=1, size=(10, 64, )).to(device)
vaencoder.eval()
pred = vaencoder.decode(z)


for i in range(10):
    plt.subplot(1, 10, i + 1)
    plt.imshow(pred[i][0].cpu().detach().numpy())

## 2.2. Latent Representation (2 балла)

Давайте посмотрим, как латентные векторы картинок лиц выглядят в пространстве.
Ваша задача -- изобразить латентные векторы картинок точками в двумерном просторанстве.

Это позволит оценить, насколько плотно распределены латентные векторы изображений цифр в пространстве.

Плюс давайте сделаем такую вещь: покрасим точки, которые соответствуют картинкам каждой цифры, в свой отдельный цвет

Подсказка: красить -- это просто =) У plt.scatter есть параметр c (color), см. в документации.


Итак, план:
1. Получить латентные представления картинок тестового датасета
2. С помощтю `TSNE` (есть в `sklearn`) сжать эти представления до размерности 2 (чтобы можно было их визуализировать точками в пространстве)
3. Визуализировать полученные двумерные представления с помощью `matplotlib.scatter`, покрасить разными цветами точки, соответствующие картинкам разных цифр.

In [None]:
from sklearn.manifold import TSNE

In [None]:
teda = test_dataset.data
tela = test_dataset.targets
size = len(tela)
indexes = [
    (tela == i).nonzero().view(-1) for i in range(10)
]

In [None]:
with torch.no_grad():
    autoencoder.eval()
    
    for n, ind in enumerate(indexes):
        values = teda[ind][:, None, :, :].to(device)
        sample = autoencoder.gaussian_sampler(*autoencoder.encode(values.float())).cpu().detach().numpy()
        sample_embadded = TSNE(2).fit_transform(sample)
        plt.figure(figsize=(5, 5))
        plt.scatter(sample_embadded[:, 0], sample_embadded[:, 1], alpha=0.2)
        plt.show()
        

Что вы думаете о виде латентного представления?

__Congrats v2.0!__

## 2.3. Conditional VAE (6 баллов)


Мы уже научились обучать обычный AE на датасете картинок и получать новые картинки, используя генерацию шума и декодер.
Давайте теперь допустим, что мы обучили AE на датасете MNIST и теперь хотим генерировать новые картинки с числами с помощью декодера (как выше мы генерили рандомные лица).
И вот нам понадобилось сгенерировать цифру 8, и мы подставляем разные варианты шума, но восьмерка никак не генерится:(

Хотелось бы добавить к нашему AE функцию "выдай мне рандомное число из вот этого вот класса", где классов десять (цифры от 0 до 9 образуют десять классов).  Conditional AE — так называется вид автоэнкодера, который предоставляет такую возможность. Ну, название "conditional" уже говорит само за себя.

И в этой части задания мы научимся такие обучать.

### Архитектура

На картинке ниже представлена архитектура простого Conditional VAE.

По сути, единственное отличие от обычного -- это то, что мы вместе с картинкой в первом слое энкодера и декодера передаем еще информацию о классе картинки.

То есть, в первый (входной) слой энкодера подается конкатенация картинки и информации о классе (например, вектора из девяти нулей и одной единицы). В первый слой декодера подается конкатенация латентного вектора и информации о классе.


![alt text](https://sun9-63.userapi.com/impg/Mh1akf7mfpNoprrSWsPOouazSmTPMazYYF49Tw/djoHNw_9KVA.jpg?size=1175x642&quality=96&sign=e88baec5f9bb91c8443fba31dcf0a4df&type=album)

![alt text](https://sun9-73.userapi.com/impg/UDuloLNKhzTBYAKewgxke5-YPsAKyGOqA-qCRg/MnyCavJidxM.jpg?size=1229x651&quality=96&sign=f2d21bfacc1c5755b76868dc4cfef39c&type=album)



На всякий случай: это VAE, то есть, latent у него все еще состоит из mu и sigma

Таким образом, при генерации новой рандомной картинки мы должны будем передать декодеру сконкатенированные латентный вектор и класс картинки.

P.S. Также можно передавать класс картинки не только в первый слой, но и в каждый слой сети. То есть на каждом слое конкатенировать выход из предыдущего слоя и информацию о классе.

In [None]:
class_dim = 10
in_feat = 28 * 28
latent_dim = 64

In [None]:
class CVAE(nn.Module):
    def __init__(self):
        # <определите архитектуры encoder и decoder
        # помните, у encoder должны быть два "хвоста",
        # т.е. encoder должен кодировать картинку в 2 переменные -- mu и logsigma>
        super().__init__()

        self.encoder = nn.Sequential(
            nn.Linear(in_feat + class_dim, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, latent_dim * 2)
        )

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + class_dim, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, in_feat)
        )
        
    def get_class_vector(self, num, class_num):
        clazz = torch.zeros(num, class_dim)
        for i in range(num):
            clazz[i, class_num[i]] = 1
        return clazz.float().to(device)

    def encode(self, x, class_num):
        x = torch.cat([
            x.flatten(1),
            self.get_class_vector(x.shape[0], class_num)
        ], dim=1)
        
        x = self.encoder(x).view(-1, 2, latent_dim)

        mu = x[:, 0, :]
        logsigma = x[:, 1, :]

        return mu, logsigma

    def gaussian_sampler(self, mu, logsigma):
        if self.training:
            std = torch.exp(0.5 * logsigma)
            eps = torch.randn_like(std)
            return eps * std + mu
        else:
            # на инференсе возвращаем не случайный вектор из нормального распределения, а центральный -- mu.
            # на инференсе выход автоэнкодера должен быть детерминирован.
            return mu

    def decode(self, z, class_num):
        z = torch.cat([
            z,
            self.get_class_vector(z.shape[0], class_num)
        ], dim=1)
        
        reconstruction = self.decoder(z).view(-1, 1, 28, 28)

        return reconstruction

    def forward(self, x, class_num):
        mu, logsigma = self.encode(x, class_num)
        sample = self.gaussian_sampler(mu, logsigma)
        reconstruction = self.decode(sample, class_num)
        return mu, logsigma, reconstruction

In [None]:
criterion = loss_vae
autoencoder = CVAE().to(device)
optimizer = torch.optim.Adam(autoencoder.parameters())


epochs = 5
im_to_show = 5

tt = []
vv = []


for epoch in tqdm(range(epochs)):

    show_indexes = torch.randint(10000, size=(im_to_show, ))
    show_elems = test_dataset.data[show_indexes].to(device).float().view(im_to_show, 1, 28, 28)
    show_ind = test_dataset.targets[show_indexes].to(device)

    t_l = []
    v_l = []
    
    for k, loader in enumerate([train_loader, test_loader]):
        print('---', epoch, k)
        for batch, label in loader:
            batch = batch.to(device)
            if k == 0:
                autoencoder.train()
                optimizer.zero_grad()
                mu, logsigma, reconstruction = autoencoder(batch, label)
                loss = criterion(batch, mu, logsigma, reconstruction, 1)
                t_l.append(loss.item())
                loss.backward()
                optimizer.step()

            if k == 1:
                with torch.no_grad():
                    autoencoder.eval()
                    mu, logsigma, reconstruction = autoencoder(batch, label)
                    loss = criterion(batch, mu, logsigma, reconstruction, 1)
                    v_l.append(loss.item())

    tt.append(np.mean(t_l))
    vv.append(np.mean(v_l))

    with torch.no_grad():
        autoencoder.eval()
f
        _, _, rec = autoencoder(show_elems, show_ind)

        plt.figure(figsize=(5,2))
        for i in range(im_to_show):
            plt.subplot(2, im_to_show, i + 1)
            plt.imshow(np.array(show_elems[i][0].cpu()))
            plt.subplot(2, im_to_show, i + im_to_show + 1)
            plt.imshow(np.array(rec[i][0].cpu()))

        display.clear_output(wait=True)
        display.display(plt.gcf())

### Sampling


Тут мы будем сэмплировать из CVAE. Это прикольнее, чем сэмплировать из простого AE/VAE: тут можно взять один и тот же латентный вектор и попросить CVAE восстановить из него картинки разных классов!
Для MNIST вы можете попросить CVAE восстановить из одного латентного вектора, например, картинки цифры 5 и 7.

In [None]:
with torch.no_grad():
    autoencoder.eval()
    pred = autoencoder.decode(
        torch.normal(mean=0, std=1, size=(10, latent_dim)).to(device), 
        torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
#         torch.ones(10).int()
    )


    for i in range(10):
        plt.subplot(1, 10, i + 1)
        plt.imshow(pred[i][0].cpu().detach().numpy())

Splendid! Вы великолепны!


# BONUS 1: Denoising

## Внимание! За бонусы доп. баллы не ставятся, но вы можете сделать их для себя.

У автоэнкодеров, кроме сжатия и генерации изображений, есть другие практические применения. Про одно из них эта бонусная часть задания.

Автоэнкодеры могут быть использованы для избавления от шума на фотографиях (denoising). Для этого их нужно обучить специальным образом: input картинка будет зашумленной, а выдавать автоэнкодер должен будет картинку без шума.
То есть, loss-функция AE останется той же (MSE между реальной картинкой и выданной), а на вход автоэнкодеру будет подаваться зашумленная картинка.

<a href="https://ibb.co/YbRJ1nZ"><img src="https://i.ibb.co/0QD164t/Screen-Shot-2020-06-04-at-4-49-50-PM.png" alt="Screen-Shot-2020-06-04-at-4-49-50-PM" border="0"></a>

Для этого нужно взять ваш любимый датасет (датасет лиц из первой части этого задания или любой другой) и сделать копию этого датасета с шумом.

В питоне шум можно добавить так:

In [None]:
np.random.shuffle(backup)

data = torch.Tensor(backup).permute(0, 3, 1, 2).float()

data_size = data.shape[0]
train_size = data_size // 10 * 7

train_data = torch.FloatTensor(data[0:train_size]).to(device)
val_data = torch.FloatTensor(data[train_size:data_size]).to(device)

noise_factor = 0.5

train_noisy = train_data + noise_factor * torch.normal(mean=0, std=1, size=train_data.shape).to(device)
val_noisy = val_data + noise_factor * torch.normal(mean=0, std=1, size=val_data.shape).to(device)

tn_loader = data_utils.DataLoader(
    torch.cat([
        train_noisy[:, None, :, :, :],
        train_data[:, None, :, :, :]
    ], dim=1),
    batch_size=64)

vn_loader = data_utils.DataLoader(
    torch.cat([
        val_noisy[:, None, :, :, :],
        val_data[:, None, :, :, :]
    ], dim=1),
    batch_size=64)

In [None]:
data.shape

In [None]:
from copy import deepcopy

class Denoiser(nn.Module):
    def __init__(self):

        super().__init__()

        self.encoder = nn.Sequential(
            # 64
            nn.Conv2d(3, 16, kernel_size=3), # 62
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(2), # 31
            nn.Conv2d(16, 32, kernel_size=4), # 28
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2), # 14
            nn.Conv2d(32, 64, kernel_size=3), # 12
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2), # 6
            nn.Conv2d(64, 64, kernel_size=3), # 4
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2), # 2
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=1), # 4
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.ConvTranspose2d(32, 32, kernel_size=4, stride=1), # 7
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2), # 14
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.ConvTranspose2d(16, 8, kernel_size=5, stride=2), # 31
            nn.ReLU(),
            nn.BatchNorm2d(8),
            nn.ConvTranspose2d(8, 3, kernel_size=4, stride=2), # 64
        )

    def forward(self, x):

        x = self.encoder(x)
        latent_code = x

        x = self.decoder(x)
        return x, latent_code

In [None]:
val_data.shape

In [None]:
!pip install torchsummary

In [None]:
from torchsummary import summary

In [None]:
criterion = nn.MSELoss()
autoencoder = Denoiser().to(device)
optimizer = optim.Adam(autoencoder.parameters())


summary(autoencoder, (3, 64, 64))

In [None]:
epochs = 15
im_to_show = 5

t_losses = []
v_losses = []

show_indexes = torch.randint(3945, size=(im_to_show, ))
show_sample = val_data[show_indexes]
show_noisy = val_noisy[show_indexes]

for epoch in tqdm(range(epochs)):
    t_l = []
    v_l = []
    for k, loader in enumerate([tn_loader, vn_loader]):
        for element in loader:
            element = element.to(device)
            
            real = element[:, 1].view(-1, 3, 64, 64)
            noisy = element[:, 0].view(-1, 3, 64, 64)
            
            
            if k == 0:
                autoencoder.train()
                optimizer.zero_grad()
                pred, _ = autoencoder(noisy)
                loss = criterion(pred.flatten(1), real.flatten(1))
                # print(loss.item())
                t_l.append(loss.item())
                loss.backward()
                optimizer.step()

            if k == 1:
                with torch.no_grad():
                    autoencoder.eval()
                    pred, _ = autoencoder(noisy)
                    loss = criterion(pred.flatten(1), real.flatten(1))
                    v_l.append(loss.item())


    t_losses.append(np.mean(t_l))
    v_losses.append(np.mean(v_l))
    
    if epoch % 5 == 0:

        with torch.no_grad():
            autoencoder.eval()

            pred, _ = autoencoder(show_noisy.to(device))

            plt.figure(figsize=(10,3))

            for i in range(im_to_show):
                plt.subplot(3, im_to_show, i + 1)
                plt.imshow(show_sample[i].permute(1, 2, 0).cpu().detach().numpy())

                plt.subplot(3, im_to_show, i + im_to_show + 1)
                plt.imshow(show_noisy[i].permute(1, 2, 0).cpu().detach().numpy())

                plt.subplot(3, im_to_show, i + 2 * im_to_show + 1)
                plt.imshow(pred[i].permute(1, 2, 0).cpu().detach().numpy())

            display.clear_output(wait=True)
            display.display(plt.gcf())

# plt.figure(figsize=(12,6))
# plt.plot(range(epochs), t_losses, label='train')
# plt.plot(range(epochs), v_losses, label='val')
# plt.legend()

In [None]:
with torch.no_grad():
    autoencoder.eval()
    a, _ = autoencoder(val_noisy[6:11])
    for i in range(5):
        plt.figure(figsize=(2,2))
        plt.imshow(a[i].permute(1, 2, 0).cpu().detach().numpy())
        plt.show()

# BONUS 2: Image Retrieval

## Внимание! За бонусы доп. баллы не ставятся, но вы можете сделать их для себя.

Давайте представим, что весь наш тренировочный датасет -- это большая база данных людей. И вот мы получили картинку лица какого-то человека с уличной камеры наблюдения (у нас это картинка из тестового датасета) и хотим понять, что это за человек. Что нам делать? Правильно -- берем наш VAE, кодируем картинку в латентное представление и ищем среди латентныз представлений лиц нашей базы самые ближайшие!

План:

1. Получаем латентные представления всех лиц тренировочного датасета
2. Обучаем на них LSHForest `(sklearn.neighbors.LSHForest)`, например, с `n_estimators=50`
3. Берем картинку из тестового датасета, с помощью VAE получаем ее латентный вектор
4. Ищем с помощью обученного LSHForest ближайшие из латентных представлений тренировочной базы
5. Находим лица тренировочного датасета, которым соответствуют ближайшие латентные представления, визуализируем!

Немного кода вам в помощь: (feel free to delete everything and write your own)

In [None]:
codes = autoencoder.encoder(train_data)

In [None]:
# обучаем LSHForest
from sklearn.neighbors import LSHForest
lshf = LSHForest(n_estimators=50).fit(codes)

In [None]:
def get_similar(image, n_neighbors=5):
  # функция, которая берет тестовый image и с помощью метода kneighbours у LSHForest ищет ближайшие векторы
  # прогоняет векторы через декодер и получает картинки ближайших людей

  code = <получение латентного представления image>

  (distances,),(idx,) = lshf.kneighbors(code, n_neighbors=n_neighbors)

  return distances, X_train[idx]

In [None]:
def show_similar(image):

  # функция, которая принимает тестовый image, ищет ближайшие к нему и визуализирует результат

    distances,neighbors = get_similar(image,n_neighbors=11)

    plt.figure(figsize=[8,6])
    plt.subplot(3,4,1)
    plt.imshow(image.cpu().numpy().transpose([1,2,0]))
    plt.title("Original image")

    for i in range(11):
        plt.subplot(3,4,i+2)
        plt.imshow(neighbors[i].cpu().numpy().transpose([1,2,0]))
        plt.title("Dist=%.3f"%distances[i])
    plt.show()