<h3 style="text-align: center;"><b>Школа глубокого обучения ФПМИ МФТИ</b></h3>

<h3 style="text-align: center;"><b>Домашнее задание
</b></h3>

# Autoencoders

# Часть 1. Vanilla Autoencoder (10 баллов)

## 1.1 Подготовка данных (0.5 балла)

In [None]:
import os
from PIL import Image

import numpy as np
import pandas as pd

from torch.autograd import Variable
from torchvision import datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision
from torchvision import datasets
from torchvision import transforms as tfs
import torch
import torch.utils.tensorboard as tensorboard
from torchsummary import summary

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme(style="darkgrid", font_scale=1.5)
%matplotlib inline

In [None]:
def read_attributes(attrs_name = "lfw_attributes.txt",
                  images_name = "lfw-deepfunneled"):
    #Download if not exists
    if not os.path.exists(images_name):
        print("images not found, donwloading...")
        os.system("wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz -O tmp.tgz")
        print("extracting...")
        os.system("tar xvzf tmp.tgz && rm tmp.tgz")
        print("done")
        assert os.path.exists(images_name)

    if not os.path.exists(attrs_name):
        print("attributes not found, downloading...")
        os.system("wget http://www.cs.columbia.edu/CAVE/databases/pubfig/download/%s" % attrs_name)
        print("done")

    #Read attrs
    df_attrs = pd.read_csv("lfw_attributes.txt",sep='\t',skiprows=1,) 
    df_attrs = pd.DataFrame(df_attrs.iloc[:,:-1].values, columns = df_attrs.columns[1:])


    #Read photos
    photo_ids = []
    for dirpath, dirnames, filenames in os.walk(images_name):
        for fname in filenames:
            if fname.endswith(".jpg"):
                fpath = os.path.join(dirpath,fname)
                photo_id = fname[:-4].replace('_',' ').split()
                person_id = ' '.join(photo_id[:-1])
                photo_number = int(photo_id[-1])
                photo_ids.append({'person':person_id,'imagenum':photo_number,'photo_path':fpath})

    photo_ids = pd.DataFrame(photo_ids)
    
    #Merge (photos now have same order as attributes)
    df = pd.merge(df_attrs,photo_ids,on=('person','imagenum'))

    assert len(df)==len(df_attrs),"Lost some data when merging dataframes"
    #all_attrs = df.drop(["photo_path","person","imagenum"],axis=1)
    
    return df

In [None]:
%pwd

In [None]:
%cd DLSCourse/Autoencoders/

In [None]:
%pwd

In [None]:
attrs = read_attributes()

In [None]:
attrs.head()

Разбейте выборку картинок на train и val, выведите несколько картинок в output, чтобы посмотреть, как они выглядят, и приведите картинки к тензорам pytorch, чтобы можно было скормить их сети:

Напишем класс датасета, для того, чтобы не загружать все картинки в оперативную память

In [None]:
class FacesDataset(Dataset):
  def __init__(self, filenames, size):
    self.filenames = filenames
    self.size = size
    self.transform = tfs.Compose([
                                  tfs.CenterCrop(110),
                                  tfs.Resize(size=self.size),
                                  tfs.ToTensor(),
                                  tfs.Normalize(mean=0, std=1)
                                  ])
  def __len__(self):
    return len(self.filenames)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()
    
    image_filename = self.filenames[idx]
    image = Image.open(image_filename)
    image.load()
    image = self.transform(image)
    return image


Разделим непосредственно таблицу атрибутов

In [None]:
train_attrs, valid_attrs = train_test_split(attrs, train_size=0.9, shuffle=False)

In [None]:
print("Train attributes shape: ", train_attrs.shape)
print("Valid attributes shape: ", valid_attrs.shape)

Создадим непосредственно датасеты для обучающей и валидацинной выборки

In [None]:
train_set = FacesDataset(train_attrs["photo_path"].values, size=128)
valid_set = FacesDataset(valid_attrs["photo_path"].values, size=128)

In [None]:
def show_images(ground_truth, reconstructions=None, 
                first_title="Source", second_title="Reconstruction"):
  if reconstructions is None:
    size = 1
  else: 
    size = 2
  fig = plt.figure(figsize=(5 * ground_truth.shape[0], size * 5))
  for i, image in enumerate(ground_truth):
    #plt.title("Ground truth")
    plt.subplot(size, ground_truth.shape[0], i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    plt.title(first_title)
    plt.imshow(image.permute((1, 2, 0)).numpy())

  if reconstructions is not None:
    for i, image in enumerate(reconstructions):
    #plt.title("Ground truth")
      plt.subplot(size, ground_truth.shape[0], ground_truth.shape[0] + i + 1)
      plt.grid(False)
      plt.xticks([])
      plt.yticks([])
      plt.title(second_title)
      plt.imshow(image.permute((1, 2, 0)).cpu().detach().numpy())
  plt.ioff()
  return fig

In [None]:
examples = torch.stack([train_set[i] for i in np.random.randint(0, len(train_set), size=5)])

In [None]:
show_images(examples);

In [None]:
train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=128, shuffle=False)

## 1.2 Архитектура модели (1.5 балла)
В этом разделе напишем и обучим автоэнкодер
<img src="https://www.notion.so/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F4b8adf79-8e6a-4b7d-9061-8617a00edbb1%2F__2021-04-30__14.53.33.png?table=block&id=56f187b4-279f-4208-b1ed-4bda5f91bfc0&width=2880&userId=3b1b5e32-1cfb-4b0f-8705-5a524a8f56e3&cache=v2" alt="Autoencoder">

In [None]:
#Latent space's dimension
LATENT_DIM = 512

In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, padding=0):
    super().__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.padding = padding
    self.conv = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, 
                          kernel_size=3, padding=self.padding)
    self.bn = nn.BatchNorm2d(num_features=self.out_channels)
    
  def forward(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = F.elu(x)
    return x

In [None]:
class Autoencoder(nn.Module):
  def __init__(self, latent_dim=LATENT_DIM):
    super(Autoencoder, self).__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        #nn.Conv2d(in_channels=8, out_channels=16, kernel_size=2, stride=2),

        ConvBlock(in_channels=3, out_channels=32), # 128 -> 126
        ConvBlock(in_channels=32, out_channels=32), # 126 -> 124
        nn.MaxPool2d(kernel_size=2, stride=2), # 124 -> 62
        ConvBlock(in_channels=32, out_channels=64), # 62 -> 60
        ConvBlock(in_channels=64, out_channels=64), # 60 -> 58
        nn.MaxPool2d(kernel_size=2, stride=2), # 58 -> 29
        ConvBlock(in_channels=64, out_channels=128), # 29 -> 27
        ConvBlock(in_channels=128, out_channels=128), # 27 -> 25
        nn.MaxPool2d(kernel_size=2, stride=2), # 25 -> 12
        ConvBlock(in_channels=128, out_channels=256), # 12 -> 10
        ConvBlock(in_channels=256, out_channels=256), # 10 -> 8
        nn.MaxPool2d(kernel_size=2, stride=2), # 8 -> 4
        ConvBlock(in_channels=256, out_channels=256), # 4 -> 2
        nn.Conv2d(in_channels=256, out_channels=512, kernel_size=2, stride=2),
        nn.ELU(),
        nn.Flatten()
    )

    self.decoder = nn.Sequential(
        nn.Unflatten(dim=1, unflattened_size=(512, 1, 1)),
        nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 1 -> 2
        ConvBlock(in_channels=256, out_channels=256, padding=1),
        nn.ConvTranspose2d(in_channels=256, out_channels=256, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 2 -> 4
        ConvBlock(in_channels=256, out_channels=256, padding=1),
        ConvBlock(in_channels=256, out_channels=128, padding=1), 
        nn.ConvTranspose2d(in_channels=128, out_channels=128, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        ConvBlock(in_channels=128, out_channels=128, padding=1),
        ConvBlock(in_channels=128, out_channels=64, padding=1),
        nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 8 -> 16
        ConvBlock(in_channels=64, out_channels=64, padding=1),
        ConvBlock(in_channels=64, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 16 -> 32
        ConvBlock(in_channels=32, out_channels=32, padding=1),
        ConvBlock(in_channels=32, out_channels=16, padding=1),
        nn.ConvTranspose2d(in_channels=16, out_channels=16, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 32 -> 64
        ConvBlock(in_channels=16, out_channels=16, padding=1),
        ConvBlock(in_channels=16, out_channels=3, padding=1),
        nn.ConvTranspose2d(in_channels=3, out_channels=3, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 64 -> 128
        nn.Sigmoid()
    )

    

  def forward(self, x):
    latent_code = self.encoder(x)
    reconstruction = self.decoder(latent_code)
    return reconstruction, latent_code

In [None]:
class AutoencoderV2(torch.nn.Module):
  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        ConvBlock(in_channels=3, out_channels=16),
        ConvBlock(in_channels=16, out_channels=16),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=16, out_channels=32),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=32, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=128),
        nn.Flatten(),
        nn.Linear(in_features=4*4*128, out_features=self.latent_dim),
        nn.ELU()

    )
    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=4*4*128),
        nn.ELU(),
        nn.Unflatten(dim=1, unflattened_size=(128, 4, 4)),
        nn.ConvTranspose2d(in_channels=128, out_channels=128, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        ConvBlock(in_channels=128, out_channels=64, padding=1),
        ConvBlock(in_channels=64, out_channels=64, padding=1),
        nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=3, # 8 -> 16
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=64, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 16 -> 32
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=32, out_channels=16, padding=1),
        nn.ConvTranspose2d(in_channels=16, out_channels=16, kernel_size=3, # 32 -> 64
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=16, out_channels=8, padding=1),
        nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=3, # 64 -> 128
                           stride=2, padding=1, output_padding=1),
        nn.Sigmoid()

    )

  def forward(self, sample):
    latent = self.encoder(sample)
    reconstructed = self.decoder(latent)
    return reconstructed, latent

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Training device: ", device)

In [None]:
criterion = nn.MSELoss(reduction="mean")
autoencoder_mse = Autoencoder().to(device)
optimizer = optim.Adam(autoencoder_mse.parameters(), lr=1e-3)

In [None]:
summary(model=autoencoder_mse, input_size=(3, 128, 128))

## Обучение (2 балла)
Осталось написать код обучения автоэнкодера. При этом было бы неплохо в процессе иногда смотреть, как автоэнкодер реконструирует изображения на данном этапе обучения. Наример, после каждой эпохи (прогона train выборки через автоэекодер) можно смотреть, какие реконструкции получились для каких-то изображений val выборки.

А, ну еще было бы неплохо выводить графики train и val лоссов в процессе тренировки =)

In [None]:
def train_epoch(model, criterion, optimizer, train_loader, summary_writer=None):
  train_losses_epoch = []

  model.train()
  for i, batch in enumerate(train_loader):
    optimizer.zero_grad()
    reconstruction, latent_code = model(batch.to(device).float())
    loss = criterion(reconstruction, batch.to(device).float())
    train_losses_epoch.append(loss.item())
    if summary_writer is not None:
      summary_writer.add_scalar("Epoch. Train loss", loss.item(), i)
    loss.backward()
    optimizer.step()
  
  return train_losses_epoch

In [None]:
def valid_epoch(model, criterion, optimizer, valid_loader, summary_writer=None):
  valid_losses_epoch = []

  model.eval()
  with torch.no_grad():
    for i, batch in enumerate(valid_loader):
      reconstruction, latent_code = model(batch.to(device).float())
      loss = criterion(reconstruction, batch.to(device).float())
      valid_losses_epoch.append(loss.item())
      if summary_writer is not None:
        summary_writer.add_scalar("Epoch. Valid loss", loss.item(), i)

  return valid_losses_epoch

In [None]:
def visualize(examples, reconstructions, train_losses, valid_losses):
  #plt.ion()
  figure = plt.figure(constrained_layout=True, figsize=(32, 8))
  subfigs = figure.subfigures(1, 2, wspace=0.07)
  axs_left = subfigs[0].subplots(2, 5) 
  ax = subfigs[1].subplots(1, 1)
  for j in range(5):
      axs_left[0, j].clear()
      axs_left[0, j].imshow(examples[j].permute((1, 2, 0)).numpy())
      axs_left[1, j].clear()
      axs_left[1, j].imshow(reconstructions[j].permute((1, 2, 0)).cpu().numpy())
      for i in range(2):
        axs_left[i, j].set_xticks([])
        axs_left[i, j].set_yticks([])
      axs_left[0, j].set_title("Source")
      axs_left[1, j].set_title("Reconstruction")

  ax.clear()
  ax.plot(train_losses, label="Train")
  ax.plot(valid_losses, label="Validation")
  ax.set_title("Training AE", fontsize=18) 
  ax.set_xlabel("Epoch", fontsize=14)
  ax.set_ylabel("Loss value", fontsize=14)
  ax.legend()
  figure.canvas.draw()
  figure.canvas.flush_events()
  plt.show()
  return figure


In [None]:
def fit(model, criterion, optimizer, train_loader, valid_loader, epochs, summary_writer=None):
  train_losses, valid_losses = [], []

  pbar = tqdm(range(epochs))
  pbar.set_description("Epoch 1")
  for epoch in pbar:
    if epoch != 0:
      pbar.set_description(f"Epoch {epoch + 1}. \
      Train loss: {round(train_losses[-1], 4)}. \
      Valid loss: {round(valid_losses[-1], 4)}")

    train_losses_epoch = train_epoch(model, criterion, optimizer, 
                                     train_loader, summary_writer)
    valid_losses_epoch = valid_epoch(model, criterion, optimizer, 
                                     valid_loader, summary_writer)
    
    train_losses.append(np.mean(train_losses_epoch))
    valid_losses.append(np.mean(valid_losses_epoch))

    examples = torch.stack([valid_set[i] for i in np.random.randint(0, len(valid_set), size=5)])
    model.eval()
    with torch.no_grad():
      reconstructions, latent_codes = model(examples.to(device).float())

    figure = visualize(examples, reconstructions, train_losses, valid_losses)

    if summary_writer is not None:
      summary_writer.add_scalars("Training", {"Train" : train_losses[-1],
                                             "Valid" : valid_losses[-1]}, epoch)
      if (epoch + 1) % 5 == 0:
        summary_writer.add_figure(f"Reconstruction. Epoch {epoch + 1}", figure)
    
  return train_losses, valid_losses

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter("Vanilla AE v.1. Experiments/MSELoss")

In [None]:
train_losses_mse, valid_losses_mse = fit(autoencoder_mse, criterion, optimizer, train_loader, valid_loader, EPOCHS, writer)

In [None]:
plt.figure(figsize=(15, 10))
plt.plot(train_losses_mse, label="Train")
plt.plot(valid_losses_mse, label="Valid")
plt.xlabel("Epoch")
plt.ylabel("Loss value")
plt.title("Training Vanilla AE (MSE)")
plt.legend()
plt.show()

In [None]:
criterion = nn.MSELoss()
autoencoder_v2_mse = AutoencoderV2().to(device)
optimizer = optim.Adam(autoencoder_v2_mse.parameters(), lr=0.001)

In [None]:
summary(model=autoencoder_v2_mse, input_size=(3, 128, 128))

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter("Vanilla AE v.2. Experiments/MSELoss")

In [None]:
train_losses_v2_mse, valid_losses_v2_mse = fit(autoencoder_v2_mse, criterion, optimizer, train_loader, valid_loader, EPOCHS, writer)

In [None]:
plt.figure(figsize=(15, 10))
plt.plot(train_losses_v2_mse, label="Train")
plt.plot(valid_losses_v2_mse, label="Valid")
plt.xlabel("Epoch")
plt.ylabel("Loss value")
plt.title("Training Vanilla AE")
plt.show()

Давайте посмотрим, как наш тренированный автоэекодер кодирует и восстанавливает картинки:

### Autoencoder v.1

In [None]:
examples = torch.stack([valid_set[i] for i in range(5)])
reconstructions, latent_codes = autoencoder_mse(examples.to(device).float())
show_images(examples, reconstructions);

### Autoencoder v.2

In [None]:
examples = torch.stack([valid_set[i] for i in range(5)])
reconstructions, latent_codes = autoencoder_v2_mse(examples.to(device).float())
show_images(examples, reconstructions);

## 1.4. Sampling (2 балла)

Давайте теперь будем не просто брать картинку, прогонять ее через автоэекодер и получать реконструкцию, а попробуем создать что-то НОВОЕ

Давайте возьмем и подсунем декодеру какие-нибудь сгенерированные нами векторы (например, из нормального распределения) и посмотрим на результат реконструкции декодера:

__Подсказка:__Е сли вместо лиц у вас выводится непонятно что, попробуйте посмотреть, как выглядят латентные векторы картинок из датасета. Так как в обучении нейронных сетей есть определенная доля рандома, векторы латентного слоя могут быть распределены НЕ как `np.random.randn(25, <latent_space_dim>)`. А чтобы у нас получались лица при запихивании вектора декодеру, вектор должен быть распределен так же, как латентные векторы реальных фоток. Так что в таком случае придется рандом немного подогнать.

### Autoencoder v.1

Попробуем сэмплировать из стандартного нормального распределения

In [None]:
num_samples = 25
z = torch.tensor(np.random.randn(num_samples, LATENT_DIM)).float().to(device)
output = autoencoder_mse.decoder(z).cpu().detach()
for i in range(num_samples // 5):
  show_images(output[i*5:(i+1)*5, :, :, :], first_title="Generated")


Теперь попробуем привести распределение к нормальному с параметрами, вычисленными по латентным векторам обучающей выборки

In [None]:
autoencoder_mse.eval()
with torch.no_grad():
  latent_vectors = []
  for image in train_loader.dataset:
    reconstructions, latent = autoencoder_mse(image[None, :, :, :].to(device).float())
    latent_vectors.append(latent.cpu().detach().numpy().squeeze())

In [None]:
len(latent_vectors)

In [None]:
latent_vectors = np.stack(latent_vectors)
latent_vectors.shape

In [None]:
latent_mean = latent_vectors.mean(axis=0)
latent_std = latent_vectors.std(axis=0)

In [None]:
latent_mean.shape, latent_std.shape

In [None]:
num_samples = 25
z = torch.tensor(np.random.normal(latent_mean, latent_std, (num_samples, LATENT_DIM))).float().to(device)
output = autoencoder_mse.decoder(z).cpu().detach()
for i in range(num_samples // 5):
  show_images(output[i*5:(i+1)*5, :, :, :], first_title="Generated")


### Autoencoder v.2

Попробуем сэмплировать из стандартного нормального распределения

In [None]:
num_samples = 25
z = torch.tensor(np.random.randn(num_samples, LATENT_DIM)).float().to(device)
output = autoencoder_v2_mse.decoder(z).cpu().detach()
for i in range(num_samples // 5):
  show_images(output[i*5:(i+1)*5, :, :, :], first_title="Generated")


Теперь попробуем привести распределение к нормальному с параметрами, вычисленными по латентным векторам обучающей выборки

In [None]:
autoencoder_v2_mse.eval()
with torch.no_grad():
  latent_vectors = []
  for image in train_loader.dataset:
    reconstructions, latent = autoencoder_v2_mse(image[None, :, :, :].to(device).float())
    latent_vectors.append(latent.cpu().detach().numpy().squeeze())

In [None]:
len(latent_vectors)

In [None]:
latent_vectors = np.stack(latent_vectors)
latent_vectors.shape

In [None]:
latent_mean = latent_vectors.mean(axis=0)
latent_std = latent_vectors.std(axis=0)

In [None]:
latent_mean.shape, latent_std.shape

In [None]:
num_samples = 25
z = torch.tensor(np.random.normal(latent_mean, latent_std, (num_samples, LATENT_DIM))).float().to(device)
output = autoencoder_mse.decoder(z).cpu().detach()
for i in range(num_samples // 5):
  show_images(output[i*5:(i+1)*5, :, :, :], first_title="Generated")


### Вывод

Хотелось бы отметить, что вторая версия лучше восстанавливает картинки, но первая лучше справляется с генерацией картинок

## Time to make fun! (4 балла)
Давайте научимся пририсовывать людям улыбки =)

<img src="https://i.imgur.com/tOE9rDK.png" alt="linear" width="700" height="400">

План такой:

1. Нужно выделить "вектор улыбки": для этого нужно из выборки изображений найти несколько (~15) людей с улыбками и столько же без.

Найти людей с улыбками вам поможет файл с описанием датасета, скачанный вместе с датасетом. В нем указаны имена картинок и присутствубщие атрибуты (улыбки, очки...)

2. Вычислить латентный вектор для всех улыбающихся людей (прогнать их через encoder) и то же для всех грустненьких

3. Вычислить, собственно, вектор улыбки -- посчитать разность между средним латентным вектором улыбающихся людей и средним латентным вектором грустных людей

4. А теперь приделаем улыбку грустному человеку: добавим полученный в пункте 3 вектор к латентному вектору грустного человека и прогоним полученный вектор через decoder. Получим того же человека, но уже не грустненького!

In [None]:
attrs.head()

Конечно же желательно рассматривать людей одного пола при формировании улыбки. Рассмотрим улыбающихся и грустных мужчин  

### Autoencoder v.1

In [None]:
smiling_men_indices = train_attrs[train_attrs["Male"] > 1].sort_values(by="Smiling", ascending=False).index[:30]

In [None]:
sad_men_indices = train_attrs[train_attrs["Male"] > 1].sort_values(by="Smiling", ascending=True).index[:30]

In [None]:
smiling_examples = torch.stack([train_set[i] for i in smiling_men_indices])
sad_examples = torch.stack([train_set[i] for i in sad_men_indices])
show_images(smiling_examples[:10], sad_examples[:10], first_title="Smile", second_title="Sad");

In [None]:
_, smiling_men_vectors = autoencoder_mse(smiling_examples.to(device).float())
_, sad_men_vectors = autoencoder_mse(sad_examples.to(device).float())

In [None]:
smile_vector = smiling_men_vectors.mean(axis=0) - sad_men_vectors.mean(axis=0)

In [None]:
smile_vector.shape

In [None]:
changed_men = autoencoder_mse.decoder(sad_men_vectors + smile_vector)

In [None]:
changed_men.shape

In [None]:
show_images(sad_examples[:10], changed_men[:10], first_title="Source", second_title="Changed");

###| Autorncoder v.2

In [None]:
smiling_examples = torch.stack([train_set[i] for i in smiling_men_indices])
sad_examples = torch.stack([train_set[i] for i in sad_men_indices])
show_images(smiling_examples[:10], sad_examples[:10], first_title="Smile", second_title="Sad");

In [None]:
_, smiling_men_vectors = autoencoder_v2_mse(smiling_examples.to(device).float())
_, sad_men_vectors = autoencoder_v2_mse(sad_examples.to(device).float())

In [None]:
smile_vector = smiling_men_vectors.mean(axis=0) - sad_men_vectors.mean(axis=0)

In [None]:
smile_vector.shape

In [None]:
changed_men = autoencoder_v2_mse.decoder(sad_men_vectors + smile_vector)

In [None]:
changed_men.shape

In [None]:
show_images(sad_examples[:10], changed_men[:10], first_title="Source", second_title="Changed");

Вуаля! Вы восхитительны!

Теперь вы можете пририсовывать людям не только улыбки, но и много чего другого -- закрывать/открывать глаза, пририсовывать очки... в общем, все, на что хватит фантазии и на что есть атрибуты в `all_attrs`:)

### Вывод

Первая версия лучше справилась с добавлением улыбки (лица все таки более правдоподобные и без помех)

# Часть 2: Variational Autoencoder (10 баллов)

Займемся обучением вариационных автоэнкодеров — проапгрейженной версии AE. Обучать будем на датасете MNIST, содержащем написанные от руки цифры от 0 до 9

In [None]:
BATCH_SIZE = 1000
train_set = datasets.MNIST(root="./MNIST/", train=True, transform=tfs.ToTensor(), download=True)
test_set = datasets.MNIST(root="./MNIST/", train=False, transform=tfs.ToTensor(), download=False)

train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

## 2.1 Архитектура модели и обучение (2 балла)

In [None]:
LATENT_DIM = 8

Реализуем VAE. Архитектуру (conv, fully-connected, ReLu, etc) можете выбирать сами. Рекомендуем пользоваться более сложными моделями, чем та, что была на семинаре:) Экспериментируйте!

In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, padding=0):
    super().__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.padding = padding
    self.conv = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, 
                          kernel_size=3, padding=self.padding)
    self.bn = nn.BatchNorm2d(num_features=self.out_channels)
    
  def forward(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = F.relu(x)
    return x

In [None]:
class VAE_CNN(nn.Module):

  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        ConvBlock(in_channels=1, out_channels=32), #28 -> 26
        ConvBlock(in_channels=32, out_channels=32), #26 -> 24
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2),
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(), # 24 -> 12
        ConvBlock(in_channels=32, out_channels=32), # 12 -> 10
        ConvBlock(in_channels=32, out_channels=32), # 10 -> 8
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2), # 8 -> 4
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(in_features=4*4*32, out_features=2*self.latent_dim),
    )

    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=32*4*4),
        nn.ReLU(),
        nn.Unflatten(dim=1, unflattened_size=(32, 4, 4)),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 8 -> 16
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32), # 16 -> 14
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 14 -> 28
                           stride=2, padding=1, output_padding=1),
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=1, padding=1),
        nn.Sigmoid()
    )

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    sample = std * eps + mu
    return sample

  def get_latent_vector(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    return z

  def forward(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    reconstruction = self.decoder(z)
    return reconstruction, mu, log_var

  def sample(self, z):
    generated = self.decoder(z)
    return generated


In [None]:
class VAE_CNN(nn.Module):

  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=8, kernel_size=4), # 28 -> 25
        nn.BatchNorm2d(num_features=8),
        nn.ReLU(),
        nn.Conv2d(in_channels=8, out_channels=16, kernel_size=4), # 25 -> 22
        nn.BatchNorm2d(num_features=16),
        nn.ReLU(),
        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4), # 22 -> 19
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=4), # 19 -> 16
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2), # 16 -> 8
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2), # 8 -> 4
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(in_features=32*4*4, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=2*self.latent_dim),
    )

    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=128),
        nn.ReLU(),
        nn.Linear(in_features=128, out_features=32*4*4),
        nn.Unflatten(dim=1, unflattened_size=(32, 4, 4)),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 8 -> 16
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32), # 16 -> 14
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 14 -> 28
                           stride=2, padding=1, output_padding=1),
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=1, padding=1),
        nn.Sigmoid()
    )

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    sample = std * eps + mu
    return sample

  def get_latent_vector(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    return z

  def forward(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    reconstruction = self.decoder(z)
    return reconstruction, mu, log_var

  def sample(self, z):
    generated = self.decoder(z)
    return generated


In [None]:
class VAE_FC(nn.Module):

  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=28*28, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=128),
        nn.ReLU(),
        nn.Linear(in_features=128, out_features=2*self.latent_dim)
    )

    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=64),
        nn.ReLU(),
        nn.Linear(in_features=64, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=28*28),
        nn.Sigmoid(),
        nn.Unflatten(dim=1, unflattened_size=(1, 28, 28))
    )

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    sample = std * eps + mu
    return sample

  def get_latent_vector(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    return z

  def forward(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    reconstruction = self.decoder(z)
    return reconstruction, mu, log_var

  def sample(self, z):
    generated = self.decoder(z)
    return generated


In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [None]:
vae_cnn = VAE_CNN().to(device)

In [None]:
summary(model=vae, input_size=(1, 28, 28))

In [None]:
vae_fc = VAE_FC().to(device)

In [None]:
summary(model=vae_fc, input_size=(1, 28, 28))

Определим лосс и его компоненты для VAE:

Надеюсь, вы уже прочитали материал в towardsdatascience (или еще где-то) про VAE и знаете, что лосс у VAE состоит из двух частей: KL и log-likelihood.

Общий лосс будет выглядеть так:

$$\mathcal{L} = -D_{KL}(q_{\phi}(z|x)||p(z)) + \log p_{\theta}(x|z)$$

Формула для KL-дивергенции:

$$D_{KL} = -\frac{1}{2}\sum_{i=1}^{dimZ}(1+log(\sigma_i^2)-\mu_i^2-\sigma_i^2)$$

В качестве log-likelihood возьмем привычную нам кросс-энтропию.

In [None]:
def KL_divergence(mu, log_var):
  loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
  return loss

def log_likelihood(x, reconstruction):
  loss = nn.BCELoss(reduction="sum")
  return loss(reconstruction, x)

def loss_vae(x, reconstruction, mu, log_var):
  return KL_divergence(mu, log_var) + log_likelihood(x, reconstruction)

Обучим модель:

In [None]:
def train_epoch(model, criterion, optimizer, train_loader, summary_writer=None):
  train_losses_epoch = []

  model.train()
  for i, (batch, _) in enumerate(train_loader):
    optimizer.zero_grad()
    reconstruction, mu, log_var = model(batch.to(device).float())
    loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
    train_losses_epoch.append(loss.item())
    if summary_writer is not None:
      summary_writer.add_scalar("Epoch. Train loss", loss.item(), i)
    loss.backward()
    optimizer.step()
  
  return train_losses_epoch

In [None]:
def valid_epoch(model, criterion, optimizer, valid_loader, summary_writer=None):
  valid_losses_epoch = []

  model.eval()
  with torch.no_grad():
    for i, (batch, _) in enumerate(valid_loader):
      reconstruction, mu, log_var = model(batch.to(device).float())
      loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
      valid_losses_epoch.append(loss.item())
      if summary_writer is not None:
        summary_writer.add_scalar("Epoch. Valid loss", loss.item(), i)

  return valid_losses_epoch

In [None]:
def visualize(examples, reconstructions, train_losses, valid_losses):
  #plt.ion()
  figure = plt.figure(constrained_layout=True, figsize=(32, 8))
  subfigs = figure.subfigures(1, 2, wspace=0.07)
  axs_left = subfigs[0].subplots(2, 5) 
  ax = subfigs[1].subplots(1, 1)
  for j in range(5):
      axs_left[0, j].clear()
      axs_left[0, j].imshow(examples[j].view(28, 28).numpy())
      axs_left[1, j].clear()
      axs_left[1, j].imshow(reconstructions[j].view(28, 28).cpu().numpy())
      for i in range(2):
        axs_left[i, j].set_xticks([])
        axs_left[i, j].set_yticks([])
      axs_left[0, j].set_title("Source")
      axs_left[1, j].set_title("Reconstruction")

  ax.clear()
  ax.plot(train_losses, label="Train")
  ax.plot(valid_losses, label="Validation")
  ax.set_title("Training AE", fontsize=18) 
  ax.set_xlabel("Epoch", fontsize=14)
  ax.set_ylabel("Loss value", fontsize=14)
  ax.legend()
  #figure.canvas.draw()
  #figure.canvas.flush_events()
  plt.show()
  return figure


In [None]:
def fit(model, criterion, optimizer, train_loader, valid_loader, epochs, summary_writer=None):
  train_losses, valid_losses = [], []

  pbar = tqdm(range(epochs))
  pbar.set_description("Epoch 1")
  for epoch in pbar:
    if epoch != 0:
      pbar.set_description(f"Epoch {epoch + 1}. \
      Train loss: {round(train_losses[-1], 4)}. \
      Valid loss: {round(valid_losses[-1], 4)}")

    train_losses_epoch = train_epoch(model, criterion, optimizer, 
                                     train_loader, summary_writer)
    valid_losses_epoch = valid_epoch(model, criterion, optimizer, 
                                     valid_loader, summary_writer)
    
    train_losses.append(np.mean(train_losses_epoch))
    valid_losses.append(np.mean(valid_losses_epoch))

    examples = torch.stack([valid_loader.dataset[i][0] 
                            for i in np.random.randint(0, len(valid_loader.dataset), size=5)])
    model.eval()
    with torch.no_grad():
      reconstructions, _, _ = model(examples.to(device).float())

    figure = visualize(examples, reconstructions, train_losses, valid_losses)

    if summary_writer is not None:
      summary_writer.add_scalars("Training", {"Train" : train_losses[-1],
                                             "Valid" : valid_losses[-1]}, epoch)
      if (epoch + 1) % 5 == 0:
        summary_writer.add_figure(f"Reconstruction. Epoch {epoch + 1}", figure)
    
  return train_losses, valid_losses

### CNN VAE

In [None]:
vae_cnn = VAE_CNN().to(device)
criterion = loss_vae
optimizer = optim.Adam(params=vae_cnn.parameters(), lr=0.001)

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter(log_dir="VAE Experiments/CNN")

In [None]:
train_losses, valid_losses = fit(vae_cnn, criterion, optimizer, 
                                 train_loader, test_loader, EPOCHS, writer)

Давайте посмотрим, как наш тренированный VAE кодирует и восстанавливает картинки:

In [None]:
def show_images(ground_truth, reconstructions=None, 
                first_title="Source", second_title="Reconstruction"):
  if reconstructions is None:
    size = 1
  else: 
    size = 2
  fig = plt.figure(figsize=(5 * ground_truth.shape[0], size * 5))
  for i, image in enumerate(ground_truth):
    #plt.title("Ground truth")
    plt.subplot(size, ground_truth.shape[0], i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    plt.title(first_title)
    plt.imshow(image.view(28, 28).numpy())

  if reconstructions is not None:
    for i, image in enumerate(reconstructions):
    #plt.title("Ground truth")
      plt.subplot(size, ground_truth.shape[0], ground_truth.shape[0] + i + 1)
      plt.grid(False)
      plt.xticks([])
      plt.yticks([])
      plt.title(second_title)
      plt.imshow(image.view(28, 28).cpu().detach().numpy())
  plt.ioff()
  return fig

In [None]:
examples = torch.stack([test_set[i][0] for i in range(5)])
reconstructions, _, _ = vae_cnn(examples.to(device).float())
show_images(examples, reconstructions);

Давайте попробуем проделать для VAE то же, что и с обычным автоэнкодером -- подсунуть decoder'у из VAE случайные векторы из нормального распределения и посмотреть, какие картинки получаются:

In [None]:
z = torch.tensor(np.array([np.random.normal(0, 1, LATENT_DIM) for i in range(10)])).float().to(device)
output = vae_cnn.sample(z).cpu().detach()
show_images(output, first_title="Generated");

### FC VAE

In [None]:
vae_fc = VAE_FC().to(device)
criterion = loss_vae
optimizer = optim.Adam(params=vae_fc.parameters(), lr=0.001)

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter(log_dir="VAE Experiments/FC")

In [None]:
train_losses, valid_losses = fit(vae_fc, criterion, optimizer, 
                                 train_loader, test_loader, EPOCHS, writer)

In [None]:
examples = torch.stack([test_set[i][0] for i in range(5)])
reconstructions, _, _ = vae_fc(examples.to(device).float())
show_images(examples, reconstructions);

In [None]:
z = torch.tensor(np.array([np.random.normal(0, 1, LATENT_DIM) for i in range(10)])).float().to(device)
output = vae_fc.sample(z).cpu().detach()
show_images(output, first_title="Generated");

## 2.2. Latent Representation (2 балла)

Давайте посмотрим, как латентные векторы картинок лиц выглядят в пространстве.
Ваша задача -- изобразить латентные векторы картинок точками в двумерном просторанстве. 

Это позволит оценить, насколько плотно распределены латентные векторы изображений цифр в пространстве. 

Плюс давайте сделаем такую вещь: покрасим точки, которые соответствуют картинкам каждой цифры, в свой отдельный цвет

Подсказка: красить -- это просто =) У plt.scatter есть параметр c (color), см. в документации.


Итак, план:
1. Получить латентные представления картинок тестового датасета
2. С помощтю `TSNE` (есть в `sklearn`) сжать эти представления до размерности 2 (чтобы можно было их визуализировать точками в пространстве)
3. Визуализировать полученные двумерные представления с помощью `matplotlib.scatter`, покрасить разными цветами точки, соответствующие картинкам разных цифр.

In [None]:
from sklearn.manifold import TSNE

In [None]:
loader = DataLoader(test_set, batch_size=100, num_workers=2)
vae.eval()
latent_vectors_cnn, latent_vectors_fc = [], []
with torch.no_grad():
  for batch, _ in loader:
    output_cnn = vae_cnn.get_latent_vector(batch.to(device).float())
    output_fc = vae_fc.get_latent_vector(batch.to(device).float())
    latent_vectors_cnn.append(output_cnn)
    latent_vectors_fc.append(output_fc)
latent_vectors_cnn = torch.cat(latent_vectors_cnn)
latent_vectors_fc = torch.cat(latent_vectors_fc)

In [None]:
tsne = TSNE()

In [None]:
view_cnn = tsne.fit_transform(latent_vectors_cnn.cpu().detach().numpy())
view_fc = tsne.fit_transform(latent_vectors_fc.cpu().detach().numpy())

In [None]:
labels = torch.cat([y for _, y in loader])

In [None]:
figure, (ax1, ax2) = plt.subplots(1, 2, figsize=(30, 15))
sns.scatterplot(view_cnn[:, 0], view_cnn[:, 1], hue=labels, palette=sns.color_palette(), ax=ax1);
sns.scatterplot(view_fc[:, 0], view_fc[:, 1], hue=labels, palette=sns.color_palette(), ax=ax2);
ax1.set_title("VAE CNN latent vectors distrubution")
ax2.set_title("VAE FC latent vectors distrubution")

Что вы думаете о виде латентного представления?

В целом результат довольно хороший. При использовании CNN VAE восстановление и генерация картинок проходит лучше, чем при использовании FC VAE, однако во втором случае латентные вектора каждого класса более четко разделяются.

## 2.3. Conditional VAE (6 баллов)


Мы уже научились обучать обычный AE на датасете картинок и получать новые картинки, используя генерацию шума и декодер. 
Давайте теперь допустим, что мы обучили AE на датасете MNIST и теперь хотим генерировать новые картинки с числами с помощью декодера (как выше мы генерили рандомные лица). 
И вот нам понадобилось сгенерировать цифру 8, и мы подставляем разные варианты шума, но восьмерка никак не генерится:(

Хотелось бы добавить к нашему AE функцию "выдай мне рандомное число из вот этого вот класса", где классов десять (цифры от 0 до 9 образуют десять классов).  Conditional AE — так называется вид автоэнкодера, который предоставляет такую возможность. Ну, название "conditional" уже говорит само за себя.

И в этой части задания мы научимся такие обучать.

### Архитектура

На картинке ниже представлена архитектура простого Conditional VAE.

По сути, единственное отличие от обычного -- это то, что мы вместе с картинкой в первом слое энкодера и декодера передаем еще информацию о классе картинки. 

То есть, в первый (входной) слой энкодера подается конкатенация картинки и информации о классе (например, вектора из девяти нулей и одной единицы). В первый слой декодера подается конкатенация латентного вектора и информации о классе.


![alt text](https://sun9-63.userapi.com/impg/Mh1akf7mfpNoprrSWsPOouazSmTPMazYYF49Tw/djoHNw_9KVA.jpg?size=1175x642&quality=96&sign=e88baec5f9bb91c8443fba31dcf0a4df&type=album)

![alt text](https://sun9-73.userapi.com/impg/UDuloLNKhzTBYAKewgxke5-YPsAKyGOqA-qCRg/MnyCavJidxM.jpg?size=1229x651&quality=96&sign=f2d21bfacc1c5755b76868dc4cfef39c&type=album)



На всякий случай: это VAE, то есть, latent у него все еще состоит из mu и sigma

Таким образом, при генерации новой рандомной картинки мы должны будем передать декодеру сконкатенированные латентный вектор и класс картинки.

P.S. Также можно передавать класс картинки не только в первый слой, но и в каждый слой сети. То есть на каждом слое конкатенировать выход из предыдущего слоя и информацию о классе.

In [None]:
class CVAE_CNN(nn.Module):

  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=8, kernel_size=4), # 28 -> 25
        nn.BatchNorm2d(num_features=8),
        nn.ReLU(),
        nn.Conv2d(in_channels=8, out_channels=16, kernel_size=4), # 25 -> 22
        nn.BatchNorm2d(num_features=16),
        nn.ReLU(),
        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4), # 22 -> 19
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=4), # 19 -> 16
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2), # 16 -> 8
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=2, stride=2), # 8 -> 4
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(in_features=32*4*4, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=2*self.latent_dim),
    )

    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=128),
        nn.ReLU(),
        nn.Linear(in_features=128, out_features=32*4*4),
        nn.Unflatten(dim=1, unflattened_size=(32, 4, 4)),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 8 -> 16
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=32), # 16 -> 14
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 14 -> 28
                           stride=2, padding=1, output_padding=1),
        nn.BatchNorm2d(num_features=32),
        nn.ReLU(),
        ConvBlock(in_channels=32, out_channels=1, padding=1),
        nn.Sigmoid()
    )

  def encode(self, x, class_num):
    output = self.encoder(torch.cat(x, class_num))
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    return mu, log_var, class_num

  def gaussian_sampler(self, mu, log_var):
    if self.training:
      std = torch.exp(0.5 * log_var)
      eps = torch.randn_like(mu)
      return std * eps + mu
    else:
      return mu

  def decode(self, z, class_num):
    reconstruction = self.decoder(torch.cat(z, class_num))
    return reconstruction

  def forward(self, x, class_num):
    mu, log_var, class_num = self.encode(x, class_num)
    z = self.gaussian_sampler(mu. log_var)
    reconstruction = self.decode(z, class_num)
    return mu, log_var, reconstruction


In [None]:
class CVAE_FC(nn.Module):

  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        nn.Linear(in_features=28*28 + 10, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=128),
        nn.ReLU(),
        nn.Linear(in_features=128, out_features=2*self.latent_dim)
    )

    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim + 10, out_features=64),
        nn.ReLU(),
        nn.Linear(in_features=64, out_features=256),
        nn.ReLU(),
        nn.Linear(in_features=256, out_features=28*28),
        nn.Sigmoid(),
        nn.Unflatten(dim=1, unflattened_size=(1, 28, 28))
    )

  def encode(self, x, class_num):
    output = self.encoder(torch.cat((x.view(-1, 28*28), 
                                     F.one_hot(class_num, 10).view(-1, 10)), 1)) \
                                     .view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    return mu, log_var, class_num

  def gaussian_sampler(self, mu, log_var):
    if self.training:
      std = torch.exp(0.5 * log_var)
      eps = torch.randn_like(mu)
      return std * eps + mu
    else:
      return mu

  def decode(self, z, class_num):
    reconstruction = self.decoder(torch.cat((z, F.one_hot(class_num, 10).view(-1, 10)), 1))
    return reconstruction

  def forward(self, x, class_num):
    mu, log_var, class_num = self.encode(x, class_num)
    z = self.gaussian_sampler(mu, log_var)
    reconstruction = self.decode(z, class_num)
    return mu, log_var, reconstruction

### Обучение

In [None]:
def train_epoch(model, criterion, optimizer, train_loader, summary_writer=None):
  train_losses_epoch = []

  model.train()
  for i, (batch, labels) in enumerate(train_loader):
    optimizer.zero_grad()
    mu, log_var, reconstruction = model(batch.to(device).float(), labels.to(device))
    loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
    train_losses_epoch.append(loss.item())
    if summary_writer is not None:
      summary_writer.add_scalar("Epoch. Train loss", loss.item(), i)
    loss.backward()
    optimizer.step()
  
  return train_losses_epoch

In [None]:
def valid_epoch(model, criterion, optimizer, valid_loader, summary_writer=None):
  valid_losses_epoch = []

  model.eval()
  with torch.no_grad():
    for i, (batch, labels) in enumerate(valid_loader):
      mu, log_var, reconstruction = model(batch.to(device).float(), labels.to(device))
      loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
      valid_losses_epoch.append(loss.item())
      if summary_writer is not None:
        summary_writer.add_scalar("Epoch. Valid loss", loss.item(), i)

  return valid_losses_epoch

In [None]:
def visualize(examples, labels, reconstructions, train_losses, valid_losses):
  #plt.ion()
  figure = plt.figure(constrained_layout=True, figsize=(32, 8))
  subfigs = figure.subfigures(1, 2, wspace=0.07)
  axs_left = subfigs[0].subplots(2, 5) 
  ax = subfigs[1].subplots(1, 1)
  for j in range(5):
      axs_left[0, j].clear()
      axs_left[0, j].imshow(examples[j].view(28, 28).numpy())
      axs_left[1, j].clear()
      axs_left[1, j].imshow(reconstructions[j].view(28, 28).cpu().numpy())
      for i in range(2):
        axs_left[i, j].set_xticks([])
        axs_left[i, j].set_yticks([])
      axs_left[0, j].set_title(f"Source. Label {labels[j]}")
      axs_left[1, j].set_title("Reconstruction")

  ax.clear()
  ax.plot(train_losses, label="Train")
  ax.plot(valid_losses, label="Validation")
  ax.set_title("Training AE", fontsize=18) 
  ax.set_xlabel("Epoch", fontsize=14)
  ax.set_ylabel("Loss value", fontsize=14)
  ax.legend()
  #figure.canvas.draw()
  #figure.canvas.flush_events()
  plt.show()
  return figure


In [None]:
def fit(model, criterion, optimizer, train_loader, valid_loader, epochs, summary_writer=None):
  train_losses, valid_losses = [], []

  pbar = tqdm(range(epochs))
  pbar.set_description("Epoch 1")
  for epoch in pbar:
    if epoch != 0:
      pbar.set_description(f"Epoch {epoch + 1}. \
      Train loss: {round(train_losses[-1], 4)}. \
      Valid loss: {round(valid_losses[-1], 4)}")

    train_losses_epoch = train_epoch(model, criterion, optimizer, 
                                     train_loader, summary_writer)
    valid_losses_epoch = valid_epoch(model, criterion, optimizer, 
                                     valid_loader, summary_writer)
    
    train_losses.append(np.mean(train_losses_epoch))
    valid_losses.append(np.mean(valid_losses_epoch))

    indices = np.random.randint(0, len(valid_loader.dataset), size=5)
    examples = torch.stack([valid_loader.dataset[i][0] for i in indices])
    labels = torch.tensor([valid_loader.dataset[i][1] for i in indices])

    model.eval()
    with torch.no_grad():
      _, _, reconstructions = model(examples.to(device).float(), labels.to(device))

    figure = visualize(examples, labels, reconstructions, train_losses, valid_losses)

    if summary_writer is not None:
      summary_writer.add_scalars("Training", {"Train" : train_losses[-1],
                                             "Valid" : valid_losses[-1]}, epoch)
      if (epoch + 1) % 5 == 0:
        summary_writer.add_figure(f"Reconstruction. Epoch {epoch + 1}", figure)
    
  return train_losses, valid_losses

In [None]:
EPOCHS = 100
cvae_fc = CVAE_FC().to(device)
optimizer = optim.Adam(cvae_fc.parameters())
criterion = loss_vae

In [None]:
writer = tensorboard.SummaryWriter("./CVAE Experiments")

In [None]:
train_losses, valid_losses = fit(cvae_fc, criterion, optimizer, train_loader, test_loader, EPOCHS, writer)

### Sampling


Тут мы будем сэмплировать из CVAE. Это прикольнее, чем сэмплировать из простого AE/VAE: тут можно взять один и тот же латентный вектор и попросить CVAE восстановить из него картинки разных классов!
Для MNIST вы можете попросить CVAE восстановить из одного латентного вектора, например, картинки цифры 5 и 7.

Давайте будем семплировать из одних векторов цифры всех классов

In [None]:
z = torch.FloatTensor(np.array([np.random.randn(LATENT_DIM) for i in range(10)])).to(device)
labels = torch.tensor(np.arange(0, 10))
print(z.shape, labels.shape)

In [None]:
figure, axs = plt.subplots(10, 10, figsize=(10, 10), 
                           gridspec_kw = {'wspace':0, 'hspace':0})
for i in range(10):
  for j in range(10):
    axs[i, j].imshow(cvae_fc.decode(z[i].view(1, -1).to(device), 
                                    labels[j].to(device))\
                     .view(28, 28).cpu().detach().numpy())
    axs[i, j].set_xticks([])
    axs[i, j].set_yticks([])

Splendid! Вы великолепны!


### Latent Representations

Давайте посмотрим, как выглядит латентное пространство картинок в CVAE и сравним с картинкой для VAE =)

Опять же, нужно покрасить точки в разные цвета в зависимости от класса.

In [None]:
loader = DataLoader(test_set, batch_size=100, num_workers=2)
cvae_fc.eval()
latent_vectors_fc = []
with torch.no_grad():
  for batch, labels in loader:
    mu, log_var, _ = cvae_fc.encode(batch.to(device).float(), labels.to(device))
    latent_vectors_fc.append(cvae_fc.gaussian_sampler(mu, log_var))
latent_vectors_fc = torch.cat(latent_vectors_fc)

In [None]:
tsne = TSNE()

In [None]:
view_fc = tsne.fit_transform(latent_vectors_fc.cpu().detach().numpy())

In [None]:
labels = torch.cat([y for _, y in loader])

In [None]:
plt.figure(figsize=(15, 10))
sns.scatterplot(view_fc[:, 0], view_fc[:, 1], hue=labels, palette=sns.color_palette());
plt.title("CVAE FC latent vectors distrubution")

Что вы думаете насчет этой картинки? Отличается от картинки для VAE?

Отличие явно заметно. Теперь нет кластеров по классам. Здесь скорее уже идет распределение по стилистике изображения цифры, а их довольно много и они могу пересекаться, поэтому латентные вектора распределены так "беспорядочно"

# BONUS 1: Denoising

## Внимание! За бонусы доп. баллы не ставятся, но вы можете сделать их для себя.

У автоэнкодеров, кроме сжатия и генерации изображений, есть другие практические применения. Про одно из них эта бонусная часть задания.

Автоэнкодеры могут быть использованы для избавления от шума на фотографиях (denoising). Для этого их нужно обучить специальным образом: input картинка будет зашумленной, а выдавать автоэнкодер должен будет картинку без шума. 
То есть, loss-функция AE останется той же (MSE между реальной картинкой и выданной), а на вход автоэнкодеру будет подаваться зашумленная картинка.

<a href="https://ibb.co/YbRJ1nZ"><img src="https://i.ibb.co/0QD164t/Screen-Shot-2020-06-04-at-4-49-50-PM.png" alt="Screen-Shot-2020-06-04-at-4-49-50-PM" border="0"></a>

Для этого нужно взять ваш любимый датасет (датасет лиц из первой части этого задания или любой другой) и сделать копию этого датасета с шумом. 

В питоне шум можно добавить так:

In [None]:
noise_factor = 0.5
X_noisy = X + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=X.shape) 

## Подготовка данных

In [None]:
def read_attributes(attrs_name = "lfw_attributes.txt",
                  images_name = "lfw-deepfunneled"):
    #Download if not exists
    if not os.path.exists(images_name):
        print("images not found, donwloading...")
        os.system("wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz -O tmp.tgz")
        print("extracting...")
        os.system("tar xvzf tmp.tgz && rm tmp.tgz")
        print("done")
        assert os.path.exists(images_name)

    if not os.path.exists(attrs_name):
        print("attributes not found, downloading...")
        os.system("wget http://www.cs.columbia.edu/CAVE/databases/pubfig/download/%s" % attrs_name)
        print("done")

    #Read attrs
    df_attrs = pd.read_csv("lfw_attributes.txt",sep='\t',skiprows=1,) 
    df_attrs = pd.DataFrame(df_attrs.iloc[:,:-1].values, columns = df_attrs.columns[1:])


    #Read photos
    photo_ids = []
    for dirpath, dirnames, filenames in os.walk(images_name):
        for fname in filenames:
            if fname.endswith(".jpg"):
                fpath = os.path.join(dirpath,fname)
                photo_id = fname[:-4].replace('_',' ').split()
                person_id = ' '.join(photo_id[:-1])
                photo_number = int(photo_id[-1])
                photo_ids.append({'person':person_id,'imagenum':photo_number,'photo_path':fpath})

    photo_ids = pd.DataFrame(photo_ids)
    
    #Merge (photos now have same order as attributes)
    df = pd.merge(df_attrs,photo_ids,on=('person','imagenum'))

    assert len(df)==len(df_attrs),"Lost some data when merging dataframes"
    #all_attrs = df.drop(["photo_path","person","imagenum"],axis=1)
    
    return df

In [None]:
attrs = read_attributes()

In [None]:
attrs.head()

In [None]:
class Noise():
  def __init__(self, noise_factor=0.5, mean=0, std=1):
    self.noise_factor = noise_factor
    self.mean=mean
    self.std = std

  def __call__(self, sample):
    noisy_sample = sample + self.noise_factor * np.random.normal(self.mean, self.std, sample.shape)
    return noisy_sample


In [None]:
class FacesDataset(Dataset):
  def __init__(self, filenames, size, noise=False):
    self.filenames = filenames
    self.size = size
    self.noise = noise
    transforms_array = [
                        tfs.CenterCrop(110),
                        tfs.Resize(size=self.size),
                        tfs.ToTensor(),
                        tfs.Normalize(mean=0, std=1)
                        ]
    if self.noise:
      transforms_array.append(Noise(noise_factor=0.15))
    self.transform = tfs.Compose(transforms_array)
    
  def __len__(self):
    return len(self.filenames)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()
    
    image_filename = self.filenames[idx]
    image = Image.open(image_filename)
    image.load()
    image = self.transform(image)
    return image


Разделим непосредственно таблицу атрибутов

In [None]:
train_attrs, valid_attrs = train_test_split(attrs, train_size=0.9, shuffle=False)

In [None]:
print("Train attributes shape: ", train_attrs.shape)
print("Valid attributes shape: ", valid_attrs.shape)

Создадим непосредственно датасеты для обучающей и валидацинной выборки

In [None]:
train_set = FacesDataset(train_attrs["photo_path"].values, size=128)
valid_set = FacesDataset(valid_attrs["photo_path"].values, size=128)

Также создадим зашумленные датасеты

In [None]:
train_set_noisy = FacesDataset(train_attrs["photo_path"].values, size=128, noise=True)
valid_set_noisy = FacesDataset(valid_attrs["photo_path"].values, size=128, noise=True)

In [None]:
def show_images(ground_truth, reconstructions=None, 
                first_title="Source", second_title="Reconstruction"):
  if reconstructions is None:
    size = 1
  else: 
    size = 2
  fig = plt.figure(figsize=(5 * ground_truth.shape[0], size * 5))
  for i, image in enumerate(ground_truth):
    #plt.title("Ground truth")
    plt.subplot(size, ground_truth.shape[0], i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    plt.title(first_title)
    plt.imshow(image.permute((1, 2, 0)).numpy())

  if reconstructions is not None:
    for i, image in enumerate(reconstructions):
    #plt.title("Ground truth")
      plt.subplot(size, ground_truth.shape[0], ground_truth.shape[0] + i + 1)
      plt.grid(False)
      plt.xticks([])
      plt.yticks([])
      plt.title(second_title)
      plt.imshow(image.permute((1, 2, 0)).cpu().detach().numpy())
  plt.ioff()
  return fig

Примеры оригинальных изображений

In [None]:
examples = torch.stack([train_set[i] for i in np.random.randint(0, len(train_set), size=5)])

In [None]:
show_images(examples);

Примеры зашумленных изображений

In [None]:
examples = torch.stack([train_set_noisy[i] for i in np.random.randint(0, len(train_set), size=5)])

In [None]:
show_images(examples);

In [None]:
train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=128, shuffle=False)

In [None]:
train_loader_noisy = DataLoader(train_set_noisy, batch_size=128, shuffle=True)
valid_loader_noisy = DataLoader(valid_set_noisy, batch_size=128, shuffle=False)

## Архитектура

In [None]:
#Latent space's dimension
LATENT_DIM = 512

In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, padding=0):
    super().__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.padding = padding
    self.conv = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, 
                          kernel_size=3, padding=self.padding)
    self.bn = nn.BatchNorm2d(num_features=self.out_channels)
    
  def forward(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = F.elu(x)
    return x

In [None]:
class NoiseLambda(nn.Module):
  def __init__(self, noise_factor=0.5, mean=0, std=1):
    super().__init__()
    self.noise_factor = noise_factor
    self.mean=mean
    self.std = std

  def forward(self, sample):
    noisy_sample = sample + self.noise_factor * \
    torch.tensor(np.random.normal(self.mean, self.std, sample.shape), device=device).float()
    return noisy_sample

In [None]:
class Autoencoder(torch.nn.Module):
  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim
    self.noise_flag = True
    self.noise = NoiseLambda()

    self.encoder = nn.Sequential(
        ConvBlock(in_channels=3, out_channels=16),
        ConvBlock(in_channels=16, out_channels=16),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=16, out_channels=32),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=32, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=128),
        nn.Flatten(),
        nn.Linear(in_features=4*4*128, out_features=self.latent_dim),
        nn.ELU()

    )
    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=4*4*128),
        nn.ELU(),
        nn.Unflatten(dim=1, unflattened_size=(128, 4, 4)),
        nn.ConvTranspose2d(in_channels=128, out_channels=128, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        ConvBlock(in_channels=128, out_channels=64, padding=1),
        ConvBlock(in_channels=64, out_channels=64, padding=1),
        nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=3, # 8 -> 16
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=64, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 16 -> 32
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=32, out_channels=16, padding=1),
        nn.ConvTranspose2d(in_channels=16, out_channels=16, kernel_size=3, # 32 -> 64
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=16, out_channels=8, padding=1),
        nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=3, # 64 -> 128
                           stride=2, padding=1, output_padding=1),
        nn.Sigmoid()

    )
  
  def change_noise(self, flag):
    self.noise_flag = flag

  def forward(self, sample):
    if self.noise_flag:
      sample = self.noise(sample)
    latent = self.encoder(sample)
    reconstructed = self.decoder(latent)
    return reconstructed, latent

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Training device: ", device)

In [None]:
autoencoder = Autoencoder().to(device)

In [None]:
summary(autoencoder, input_size=(3, 128, 128))

## Обучение

In [None]:
def train_epoch(model, criterion, optimizer, train_loader, summary_writer=None):
  train_losses_epoch = []

  model.train()
  for i, batch in enumerate(train_loader):
    optimizer.zero_grad()
    reconstruction, latent_code = model(batch.to(device).float())
    loss = criterion(reconstruction, batch.to(device).float())
    train_losses_epoch.append(loss.item())
    if summary_writer is not None:
      summary_writer.add_scalar("Epoch. Train loss", loss.item(), i)
    loss.backward()
    optimizer.step()
  
  return train_losses_epoch

In [None]:
def valid_epoch(model, criterion, optimizer, valid_loader, summary_writer=None):
  valid_losses_epoch = []

  model.eval()
  with torch.no_grad():
    for i, batch in enumerate(valid_loader):
      reconstruction, latent_code = model(batch.to(device).float())
      loss = criterion(reconstruction, batch.to(device).float())
      valid_losses_epoch.append(loss.item())
      if summary_writer is not None:
        summary_writer.add_scalar("Epoch. Valid loss", loss.item(), i)

  return valid_losses_epoch

In [None]:
def visualize(examples, reconstructions, train_losses, valid_losses):
  #plt.ion()
  figure = plt.figure(constrained_layout=True, figsize=(32, 8))
  subfigs = figure.subfigures(1, 2, wspace=0.07)
  axs_left = subfigs[0].subplots(2, 5) 
  ax = subfigs[1].subplots(1, 1)
  for j in range(5):
      axs_left[0, j].clear()
      axs_left[0, j].imshow(examples[j].permute((1, 2, 0)).numpy())
      axs_left[1, j].clear()
      axs_left[1, j].imshow(reconstructions[j].permute((1, 2, 0)).cpu().numpy())
      for i in range(2):
        axs_left[i, j].set_xticks([])
        axs_left[i, j].set_yticks([])
      axs_left[0, j].set_title("Source")
      axs_left[1, j].set_title("Reconstruction")

  ax.clear()
  ax.plot(train_losses, label="Train")
  ax.plot(valid_losses, label="Validation")
  ax.set_title("Training AE", fontsize=18) 
  ax.set_xlabel("Epoch", fontsize=14)
  ax.set_ylabel("Loss value", fontsize=14)
  ax.legend()
  figure.canvas.draw()
  figure.canvas.flush_events()
  plt.show()
  return figure


In [None]:
def fit(model, criterion, optimizer, train_loader, valid_loader, epochs, summary_writer=None):
  train_losses, valid_losses = [], []

  pbar = tqdm(range(epochs))
  pbar.set_description("Epoch 1")
  for epoch in pbar:
    if epoch != 0:
      pbar.set_description(f"Epoch {epoch + 1}. \
      Train loss: {round(train_losses[-1], 4)}. \
      Valid loss: {round(valid_losses[-1], 4)}")

    train_losses_epoch = train_epoch(model, criterion, optimizer, 
                                     train_loader, summary_writer)
    valid_losses_epoch = valid_epoch(model, criterion, optimizer, 
                                     valid_loader, summary_writer)
    
    train_losses.append(np.mean(train_losses_epoch))
    valid_losses.append(np.mean(valid_losses_epoch))

    examples = torch.stack([valid_set_noisy[i] for i in np.random.randint(0, len(valid_set), size=5)])
    model.eval()
    with torch.no_grad():
      reconstructions, latent_codes = model(examples.to(device).float())

    figure = visualize(examples, reconstructions, train_losses, valid_losses)

    if summary_writer is not None:
      summary_writer.add_scalars("Training", {"Train" : train_losses[-1],
                                             "Valid" : valid_losses[-1]}, epoch)
      if (epoch + 1) % 5 == 0:
        summary_writer.add_figure(f"Reconstruction. Epoch {epoch + 1}", figure)
    
  return train_losses, valid_losses

In [None]:
optimizer = optim.Adam(autoencoder.parameters())
criterion = nn.MSELoss(reduction="mean")

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter("AE Denoising Experiments")

In [None]:
train_losses_mse, valid_losses_mse = fit(autoencoder, criterion, optimizer, train_loader, valid_loader, EPOCHS, writer)

## Результат

In [None]:
examples = torch.stack([valid_set_noisy[i] for i in np.random.randint(0, len(valid_set_noisy), size=5)])

In [None]:
denoised, _ = autoencoder(examples.to(device).float())

In [None]:
show_images(examples, denoised, first_title="Source", second_title="Denoised");

# BONUS 2: Image Retrieval

## Внимание! За бонусы доп. баллы не ставятся, но вы можете сделать их для себя.

Давайте представим, что весь наш тренировочный датасет -- это большая база данных людей. И вот мы получили картинку лица какого-то человека с уличной камеры наблюдения (у нас это картинка из тестового датасета) и хотим понять, что это за человек. Что нам делать? Правильно -- берем наш VAE, кодируем картинку в латентное представление и ищем среди латентныз представлений лиц нашей базы самые ближайшие!

План:

1. Получаем латентные представления всех лиц тренировочного датасета
2. Обучаем на них LSHForest `(sklearn.neighbors.LSHForest)`, например, с `n_estimators=50`
3. Берем картинку из тестового датасета, с помощью VAE получаем ее латентный вектор
4. Ищем с помощью обученного LSHForest ближайшие из латентных представлений тренировочной базы
5. Находим лица тренировочного датасета, которым соответствуют ближайшие латентные представления, визуализируем!

In [None]:
#Latent space's dimension
LATENT_DIM = 512

In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, padding=0):
    super().__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.padding = padding
    self.conv = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, 
                          kernel_size=3, padding=self.padding)
    self.bn = nn.BatchNorm2d(num_features=self.out_channels)
    
  def forward(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = F.elu(x)
    return x

In [None]:
class VAE(torch.nn.Module):
  def __init__(self, latent_dim=LATENT_DIM):
    super().__init__()
    self.latent_dim = latent_dim

    self.encoder = nn.Sequential(
        ConvBlock(in_channels=3, out_channels=16),
        ConvBlock(in_channels=16, out_channels=16),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=16, out_channels=32),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=32, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=64),
        nn.MaxPool2d(kernel_size=2, stride=2),
        ConvBlock(in_channels=64, out_channels=128),
        nn.Flatten(),
        nn.Linear(in_features=4*4*128, out_features=2*self.latent_dim),
        nn.ELU()

    )
    self.decoder = nn.Sequential(
        nn.Linear(in_features=self.latent_dim, out_features=4*4*128),
        nn.ELU(),
        nn.Unflatten(dim=1, unflattened_size=(128, 4, 4)),
        nn.ConvTranspose2d(in_channels=128, out_channels=128, kernel_size=3, 
                           stride=2, padding=1, output_padding=1), # 4 -> 8
        ConvBlock(in_channels=128, out_channels=64, padding=1),
        ConvBlock(in_channels=64, out_channels=64, padding=1),
        nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=3, # 8 -> 16
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=64, out_channels=32, padding=1),
        nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, # 16 -> 32
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=32, out_channels=16, padding=1),
        nn.ConvTranspose2d(in_channels=16, out_channels=16, kernel_size=3, # 32 -> 64
                           stride=2, padding=1, output_padding=1),
        ConvBlock(in_channels=16, out_channels=8, padding=1),
        nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=3, # 64 -> 128
                           stride=2, padding=1, output_padding=1),
        nn.Sigmoid()

    )

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    sample = std * eps + mu
    return sample

  def get_latent_vector(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    return z

  def forward(self, x):
    output = self.encoder(x).view(-1, 2, self.latent_dim)
    mu = output[:, 0, :]
    log_var = output[:, 1, :]
    z = self.reparameterize(mu, log_var)
    reconstruction = self.decoder(z)
    return reconstruction, mu, log_var

  def sample(self, z):
    generated = self.decoder(z)
    return generated

In [None]:
def train_epoch(model, criterion, optimizer, train_loader, summary_writer=None):
  train_losses_epoch = []

  model.train()
  for i, batch in enumerate(train_loader):
    optimizer.zero_grad()
    reconstruction, mu, log_var = model(batch.to(device).float())
    loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
    train_losses_epoch.append(loss.item())
    if summary_writer is not None:
      summary_writer.add_scalar("Epoch. Train loss", loss.item(), i)
    loss.backward()
    optimizer.step()

  return train_losses_epoch

In [None]:
def valid_epoch(model, criterion, optimizer, valid_loader, summary_writer=None):
  valid_losses_epoch = []

  model.eval()
  with torch.no_grad():
    for i, batch in enumerate(valid_loader):
      reconstruction, mu, log_var = model(batch.to(device).float())
      loss = criterion(batch.to(device).float(), reconstruction, mu, log_var)
      valid_losses_epoch.append(loss.item())
      if summary_writer is not None:
        summary_writer.add_scalar("Epoch. Valid loss", loss.item(), i)

  return valid_losses_epoch

In [None]:
def visualize(examples, reconstructions, train_losses, valid_losses):
  #plt.ion()
  figure = plt.figure(constrained_layout=True, figsize=(32, 8))
  subfigs = figure.subfigures(1, 2, wspace=0.07)
  axs_left = subfigs[0].subplots(2, 5) 
  ax = subfigs[1].subplots(1, 1)
  for j in range(5):
      axs_left[0, j].clear()
      axs_left[0, j].imshow(examples[j].permute(1, 2, 0).numpy())
      axs_left[1, j].clear()
      axs_left[1, j].imshow(reconstructions[j].permute(1, 2, 0).cpu().numpy())
      for i in range(2):
        axs_left[i, j].set_xticks([])
        axs_left[i, j].set_yticks([])
      axs_left[0, j].set_title("Source")
      axs_left[1, j].set_title("Reconstruction")

  ax.clear()
  ax.plot(train_losses, label="Train")
  ax.plot(valid_losses, label="Validation")
  ax.set_title("Training AE", fontsize=18) 
  ax.set_xlabel("Epoch", fontsize=14)
  ax.set_ylabel("Loss value", fontsize=14)
  ax.legend()
  #figure.canvas.draw()
  #figure.canvas.flush_events()
  plt.show()
  return figure


In [None]:
def fit(model, criterion, optimizer, train_loader, valid_loader, epochs, summary_writer=None):
  train_losses, valid_losses = [], []

  pbar = tqdm(range(epochs))
  pbar.set_description("Epoch 1")
  for epoch in pbar:
    if epoch != 0:
      pbar.set_description(f"Epoch {epoch + 1}. \
      Train loss: {round(train_losses[-1], 4)}. \
      Valid loss: {round(valid_losses[-1], 4)}")

    train_losses_epoch = train_epoch(model, criterion, optimizer, 
                                     train_loader, summary_writer)
    valid_losses_epoch = valid_epoch(model, criterion, optimizer, 
                                     valid_loader, summary_writer)
    
    train_losses.append(np.mean(train_losses_epoch))
    valid_losses.append(np.mean(valid_losses_epoch))

    examples = torch.stack([valid_loader.dataset[i]
                            for i in np.random.randint(0, len(valid_loader.dataset), size=5)])
    model.eval()
    with torch.no_grad():
      reconstructions, _, _ = model(examples.to(device).float())

    figure = visualize(examples, reconstructions, train_losses, valid_losses)

    if summary_writer is not None:
      summary_writer.add_scalars("Training", {"Train" : train_losses[-1],
                                             "Valid" : valid_losses[-1]}, epoch)
      if (epoch + 1) % 5 == 0:
        summary_writer.add_figure(f"Reconstruction. Epoch {epoch + 1}", figure)
    
  return train_losses, valid_losses

In [None]:
vae = VAE()
criterion = loss_vae
optimizer = optim.Adam(params=vae.parameters(), lr=0.001)

In [None]:
vae = vae.to(device)

In [None]:
summary(vae, input_size=(3, 128, 128))

In [None]:
EPOCHS = 100
writer = tensorboard.SummaryWriter(log_dir="VAE Retrieval Experiments")

In [None]:
train_losses, valid_losses = fit(vae, criterion, optimizer, 
                                 train_loader, valid_loader, EPOCHS, writer)

In [None]:
torch.save(vae.state_dict(), "model_state.pt")

In [None]:
vae.load_state_dict(torch.load("model_state.pt"))

Немного кода вам в помощь: (feel free to delete everything and write your own)

In [None]:
torch.cuda.empty_cache()

In [None]:
!nvidia-smi

In [None]:
vae.eval()
with torch.no_grad():
  codes = [vae.get_latent_vector(batch.to(device).float()) for batch in train_loader]

In [None]:
codes = torch.cat(codes)

In [None]:
codes.shape

In [None]:
#обучаем LSHForest
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors().fit(codes.cpu().detach().numpy())

In [None]:
def get_similar(image, n_neighbors=5):
  # функция, которая берет тестовый image и с помощью метода kneighbours у LSHForest ищет ближайшие векторы
  # прогоняет векторы через декодер и получает картинки ближайших людей

  code = vae.get_latent_vector(image.view(-1, 3, 128, 128).to(device).float())

  (distances,), (idx,) = nn.kneighbors(code.cpu().detach().numpy(), n_neighbors=n_neighbors)

  return distances, idx

In [None]:
def show_similar(image):

  # функция, которая принимает тестовый image, ищет ближайшие к нему и визуализирует результат
    
    distances, neighbors_indices = get_similar(image, n_neighbors=11)
    
    plt.figure(figsize=(16,12))
    plt.subplot(3,4,1)
    plt.imshow(image.cpu().numpy().transpose([1,2,0]))
    plt.xticks([])
    plt.yticks([])
    plt.title("Original image")
    
    for i in range(11):
        plt.subplot(3,4,i+2)
        plt.imshow(train_set[neighbors_indices[i]].cpu().numpy().transpose([1,2,0]))
        plt.xticks([])
        plt.yticks([])
        plt.title("Dist=%.3f"%distances[i])
    plt.show()

In [None]:
#print(np.random.randint(0, len(valid_set), 1).item())
example = valid_set[np.random.randint(0, len(valid_set), 1).item()]
show_similar(example)