<a href="https://colab.research.google.com/github/NikNord174/Neural_Networks_for_Science/blob/master/EX14_Encoders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Импорты, функции из занятия

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.datasets as dset
import torchvision.transforms as transforms


import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")

from itertools import chain, product

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [None]:
def plot_manifold(latent_r, labels=None, alpha=0.5):
    plt.figure(figsize=(10,10))
    if labels is None:
      plt.scatter(latent_r[:, 0], latent_r[:, 1], cmap="tab10", alpha=0.9)
    else:
      plt.scatter(latent_r[:, 0], latent_r[:, 1], c=labels, cmap="tab10", alpha=0.9)
      plt.colorbar()
    plt.show()

# plotting reconstructed and noised images
def plot_digits(*args, invert_colors=True, digit_size = 28, name=None):
    args = [x.squeeze() for x in args]
    n = min([x.shape[0] for x in args])
    figure = np.zeros((digit_size * len(args), digit_size * n))

    for i in range(n):
        for j in range(len(args)):
            figure[j * digit_size: (j + 1) * digit_size,
                   i * digit_size: (i + 1) * digit_size] = args[j][i].squeeze()

    if invert_colors:
        figure = 1-figure

    plt.figure(figsize=(2*n, 
                        2*len(args))
              )
    
    plt.imshow(figure,
               cmap='Greys_r',
               clim=(0,1))
    
    plt.grid(False)
    ax = plt.gca()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    if name is not None:
        plt.savefig(name)
    plt.show()

def train(enc, 
          dec,
          loader, 
          optimizer, 
          single_pass_handler, 
          loss_handler,
          epoch, 
          log_interval=500):
    for batch_idx, (data, lab) in enumerate(loader):
        batch_size = data.size(0)
        optimizer.zero_grad()
        data = data.to(device)
        lab = lab.to(device)

        latent, output = single_pass_handler(encoder, decoder, data, lab)

        loss = loss_handler(data, output, latent)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(loader.dataset),
                100. * batch_idx / len(loader), loss.item()))

# return reconstructed image, use to compute loss          
def ae_pass_handler(encoder, decoder, data, *args, **kwargs):
    latent = encoder(data)
    recons = decoder(latent)
    return latent, recons

# loss function  
def ae_loss_handler(data, recons, *args, **kwargs):
    return F.binary_cross_entropy(recons, data)

# return result in numpy to visualization
def run_eval(encoder, 
             decoder, 
             loader, 
             single_pass_handler,
             return_real=True, 
             return_recontr=True,
             return_latent=True,
             return_labels=True):
  
    if return_real:
        real = []
    if return_recontr:
        reconstr = []
    if return_latent:
        latent = []
    if return_labels:
        labels = []
    with torch.no_grad():
        for batch_idx, (data, lab) in enumerate(loader):  
            if return_labels:
                labels.append(lab.numpy())
            if return_real:
                real.append(data.numpy())
            
            data = data.to(device)
            lab = lab.to(device)
            rep, rec = single_pass_handler(encoder, decoder, data, lab)
            if return_latent:
                latent.append(rep.to('cpu').numpy())
            if return_recontr:
                reconstr.append(rec.to('cpu').numpy())
    
    result = {}
    if return_real:
        real = np.concatenate(real)
        result['real'] = real.squeeze()
    if return_latent:
        latent = np.concatenate(latent)
        result['latent'] = latent
    if return_recontr:
        reconstr = np.concatenate(reconstr)
        result['reconstr'] = reconstr.squeeze()
    if return_labels:
        labels = np.concatenate(labels)
        result['labels'] = labels
    return result


In [None]:
#handson-ml2

import matplotlib as mpl
def plot_percent_hist(ax, data, bins):
    counts, _ = np.histogram(data, bins=bins)
    widths = bins[1:] - bins[:-1]
    x = bins[:-1] + widths / 2
    ax.bar(x, counts / len(data), width=widths*0.8)
    ax.xaxis.set_ticks(bins)
    ax.yaxis.set_major_formatter(mpl.ticker.FuncFormatter(
        lambda y, position: "{}%".format(int(np.round(100 * y)))))
    ax.grid(True)

  
def plot_activations_histogram(activations, height=1, n_bins=10):
    activation_means = activations.mean(axis=0)
    
    mean = activation_means.mean()
    bins = np.linspace(0, 1, n_bins + 1)

    fig, [ax1, ax2] = plt.subplots(figsize=(10, 3), nrows=1, ncols=2, sharey=True)
    plot_percent_hist(ax1, activations.ravel(), bins)
    ax1.plot([mean, mean], [0, height], "k--", label="Overall Mean = {:.2f}".format(mean))
    ax1.legend(loc="upper center", fontsize=14)
    ax1.set_xlabel("Activation")
    ax1.set_ylabel("% Activations")
    ax1.axis([0, 1, 0, height])
    plot_percent_hist(ax2, activation_means, bins)
    ax2.plot([mean, mean], [0, height], "k--")
    ax2.set_xlabel("Neuron Mean Activation")
    ax2.set_ylabel("% Neurons")
    ax2.axis([0, 1, 0, height])



In [None]:
class AddGaussianNoise(): # add noise
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)


In [None]:
# autoencoder model
class Encoder(nn.Module):
    def __init__(self, latent_size):
        super().__init__()
        self.latent_size = latent_size
        hidden_dims = [32, 64, 128, 256, 512]

        # Build Encoder
        modules = []
        in_channels = 1
        for h_dim in hidden_dims[:-1]:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels=in_channels,
                              out_channels=h_dim,
                              kernel_size= 3, stride= 2 , padding  = 1),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            in_channels = h_dim

        modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels=256,
                              out_channels=512,
                              kernel_size= 1),
                    nn.BatchNorm2d(512),
                    nn.LeakyReLU())
        )
        modules.append(nn.Flatten())
        modules.append(nn.Linear(hidden_dims[-1] * 4, latent_size))

        self.encoder = nn.Sequential(*modules)      
    
    def forward(self, x):
        x = self.encoder(x)
        return x
        
class Decoder(nn.Module):
    def __init__(self, latent_size):
        super().__init__()

        hidden_dims = [512, 256, 128, 64, 32]
        self.linear = nn.Linear(in_features=latent_size, 
                                out_features=hidden_dims[0])
        
        modules = []
        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(hidden_dims[i],
                                       hidden_dims[i + 1],
                                       kernel_size=3,
                                       stride = 2,
                                       padding=1,
                                       output_padding=1),
                    nn.BatchNorm2d(hidden_dims[i + 1]),
                    nn.LeakyReLU())
            )


        modules.append(nn.Sequential(
                            nn.ConvTranspose2d(hidden_dims[-1],
                                               hidden_dims[-1],
                                               kernel_size=3,
                                               stride=2,
                                               padding=1,
                                               output_padding=1),
                            nn.BatchNorm2d(hidden_dims[-1]),
                            nn.LeakyReLU(),
                            nn.Conv2d(hidden_dims[-1], out_channels= 1,
                                      kernel_size= 7, padding= 1),
                            nn.Sigmoid()))

        self.decoder = nn.Sequential(*modules)   

        
    def forward(self, x):
        x = self.linear(x)
        x = x.view(-1, 512, 1, 1)
        x = self.decoder(x)
        return x

# Задание 1. Автоэнкодер для KMNIST c MSE-loss


В этом задании обучайте автоэнкодер на датасете KMNIST:

* Используйте энкодер и декодер из лекции
* Уберите из декодера сигмоиду в конце
* Используйте MSE-loss между реальным и восстановленным изображением



Как выглядит латентное представление? 

Разделяются ли в нем классы? 

Далее:


*   Обучите автоэнкодер с размером латентного слоя 30
*   Продемонстрируйте восстановление автоэнкодером переданных ему изображений

Чем восстановление отличается от восстановления автоенкодером, обученном в лекции? 
 
Что не нравится в полученных востановленных изображениях?

Напишите выводы

In [None]:
root = './data'

train_set = dset.KMNIST(root=root, 
                       train=True, 
                       transform=torchvision.transforms.ToTensor(),
                       download=True)
test_set = dset.KMNIST(root=root, 
                      train=False,
                      transform=torchvision.transforms.ToTensor(),
                      download=True)

test_noise_set = dset.KMNIST(root=root, 
                      train=False,
                      transform=torchvision.transforms.Compose([
                          torchvision.transforms.ToTensor(),
                          AddGaussianNoise(0., 0.30)
                      ]),
                      download=True)
train_loader = torch.utils.data.DataLoader(
    train_set, 
    batch_size=64,
    shuffle=True)

test_loader = torch.utils.data.DataLoader(
    train_set, 
    batch_size=64,
    shuffle=False)

test_noised_dataloader = torch.utils.data.DataLoader(
    torch.utils.data.Subset(test_noise_set, list(range(64))),
    batch_size=64, 
    shuffle=False)



In [None]:
# enter your code here


# Задание 2. Разреженный autoencoder с KL-loss



На занятии мы обсуждали, что разреженный автоэнкодер можно делать двумя путями - при помощи L1 и при помощи KL лосса. На занятии мы сделали с L1  лоссом. 

Ваша задача состоит в том, чтобы реализовать разреженный автоэнкодер с KL-лоссом. 

$$KL(P||Q) =  p(x) \log \dfrac {p(x)} {\hat{p}(x) + \epsilon} + (1 - p(x)) \log \dfrac {(1 - p(x))} {1 - \hat{p}(x) + \epsilon} $$
Обучите автоэнкодер с требованием, чтобы активировалось не более 10% нейронов. 

Учтите, что лосс надо считать по активациям, распределенным от 0 до 1 - то есть надо выполнить преобразование, аналогичное тому, что мы делали ранее



Напоминаем, что в этом случае мы:
 1. Усредняем активации по батчу
 2. Для каждого нейрона таким образом получаем среднюю "вероятность" активироваться
 3. Эта вероятность вряд ли будет в точности равна 0 или 1, но в выражение для KL-loss в знаменатель на всякий случай, добавляем малое число epsilon.
 4. Подсчитанный лосс усредняем по всем нейронам слоя
 5. Сделайте выводы


Постройте графики:

 1. Того, как восстанавливает автоэнкодер полученные изображения
 2. Средних активаций для каждого класса 
 3. Распределения силы активаций в целом и средней силы активации каждого нейрона





В следующих заданиях будет использоваться обычный MNIST

In [None]:

root = './data'

train_set = dset.MNIST(root=root, 
                       train=True, 
                       transform=torchvision.transforms.ToTensor(),
                       download=True)
test_set = dset.MNIST(root=root, 
                      train=False,
                      transform=torchvision.transforms.ToTensor(),
                      download=True)

test_noise_set = dset.MNIST(root=root, 
                      train=False,
                      transform=torchvision.transforms.Compose([
                          torchvision.transforms.ToTensor(),
                          AddGaussianNoise(0., 0.30)
                      ]),
                      download=True)
train_loader = torch.utils.data.DataLoader(
    train_set, 
    batch_size=64,
    shuffle=True)

test_loader = torch.utils.data.DataLoader(
    train_set, 
    batch_size=64,
    shuffle=False)

test_noised_dataloader = torch.utils.data.DataLoader(
    torch.utils.data.Subset(test_noise_set, list(range(64))),
    batch_size=64, 
    shuffle=False)



In [None]:
def to_01_activation(latent):
  activations = (torch.sigmoid(latent.abs()) - 0.5) * 2
  return activations

def sparse_kl_loss(latent, p=0.10, eps=10e-5):
  activations = to_01_activation(latent)
  phat = ... 
  loss = ...
  return loss.mean()


def sparse_ae_pass_handler(encoder, decoder, data, *args, **kwargs):
    latent = encoder(data)
    recons = decoder(latent)
    return latent, recons

def sparse_ae_loss_handler(data, recons, latent, beta=0.1, *args, **kwargs):
    return F.binary_cross_entropy(recons, data) + sparse_kl_loss(latent)

In [None]:
latent_size = 16 * 16

learning_rate = 1e-4
encoder = Encoder(latent_size=latent_size)
decoder = Decoder(latent_size=latent_size)


encoder = encoder.to(device)
decoder = decoder.to(device)

optimizer = optim.Adam(chain(encoder.parameters(),
                            decoder.parameters()
                           ),
                      lr=learning_rate)

In [None]:
# enter your code here


# Задание 3.1 Сэмплирование из обычного VAE



В случае с обычным VAE мы визуализировали только центры распределений, соответствувющих цифрам. Хотелось бы получить более полное представление о том, как выглядит латентное пространство. 

Задача:
 1. Обучить VAE из лекции (latent_size = 2)
 2. Визуализировать латентные представления, который приходят на вход в decoder. Для этого можно воспользоваться функцией vae_reparametrization, возможно, переписав ее на numpy (один из вариантов)
 3. Из каждого распределения, соответствующего объекту надо сэмплировать несколько раз(хотя бы 10). Чтобы точки на полученном графике не полностью перекрывались, используйте параметр alpha из sns.scatterplot или plot_manifold из лекции

In [None]:
class VAEEncoder(Encoder):
    def __init__(self, latent_size):
        if latent_size % 2 != 0:
            raise Exception("Latent size for VAEEncoder must be even")
        super().__init__(latent_size)

def vae_split(latent):
    size = latent.shape[1] // 2
    mu = latent[:, :size]
    log_var = latent[:, size:]
    return mu, log_var

def vae_reparametrize(mu, log_var):
    sigma = torch.exp(0.5 * log_var)
    z = torch.randn(mu.shape[0],
                    mu.shape[1]).cuda()
    return z * sigma + mu 

def vae_pass_handler(encoder, decoder, data, *args, **kwargs):
    latent = encoder(data)
    mu, log_var = vae_split(latent)
    sample = vae_reparametrize(mu, log_var)
    recons = decoder(sample)
    return latent, recons

def kld_loss(mu, log_var):
    var = log_var.exp()
    kl_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - var, dim = 1),
                          dim = 0)  
    return kl_loss

def vae_loss_handler(data, recons, latent, kld_weight=0.005, *args, **kwargs):
    mu, log_var = vae_split(latent)
    kl_loss = kld_loss(mu, log_var)
    return kld_weight * kl_loss + F.binary_cross_entropy(recons, data)

In [None]:
# enter your code here


## Задание 3.2. Визуализация латетного пространства


Для модели из предыдущего занятия, можно получить распределение классов цифр на плоскости, типа такого:

<img src="https://edunet.kea.su/repo/src/L14_Encoders/img/vae_sampling.png" alt="alttext" style="width: 500px;"/>

Вашей задачей будет получить похожее изображение, используя модель из задания выше. Используйте побольше изображений в каждом ряду (скажем, 50). Иначе все цифры можете не увидеть

In [None]:
# enter your code here

# Задание 4. Перенос стиля при помощи CVAE



Обучите CVAE из лекции (latent_size=2)

Посмотрим на результат применения нескольких разных стилей семерки, для других цифр


<img src="https://edunet.kea.su/repo/src/L14_Encoders/img/style_transfer.png" alt="alttext" style="width: 500px;"/>. 

Задача:

Реализовать визуализацию стилей тройки, для других цифр

Для этого нужно выбрать 10 разных троек, желательно брать случайные 10 троек из датасета(seed зафиксируем)




# Задание 5. Conditional adversarial autoencoder



У нас есть AAE. У нас есть Conditional Autoencoders.

Осталось скрестить. Конкретно - теперь будем передавать метку декодеру.

Что произойдет, если не передавать метку дискриминатору, но при этом передавать метку декодеру? 

Правильно, автоэнкодеру может оказаться выгоднее игнорировать метку и "зарезервировать" под каждую цифру только часть латентного пространства. 
Мы так не хотим. Потому необходимо добавить в дискриминатор метку - получим "условный" дискриминатор

Остается еще один вопрос - что передавать ему в качестве метки, когда мы передаем ему сэмплы из равномерного распределения? 

Будем просто передавать ему равное число меток разных классов. Можно просто сделать так, что число положительных примеров было кратно 10 (рекомендую), можно выбирать метки случайно. 

Таким образом, надо:

1. Реализовать "условный" дискриминатор
2. Поменять цикл обучения так, чтобы использовать условный декодер и условный дискриминатор.
3. Для обученного автоэнкодера посэмплировать 4 и еще одну любую выбранную вами цифру

4. **Допбаллы**: можете проверить, что будет, если использовать обычный дискриминатор. Действительно ли нейросеть каждому объекту начнет сопоставлять лишь часть пространства?

Может быть полезным, еще больше увеличить размер батча с реальными изображениями. Для батча размера 128 на 10 классов изображений получится, что на каждой итерации для латентного пространства каждой цифры используется порядка 12 изображений, что не очень много.

Равномерное распределение можете взять на отрезке от -1 до 1

In [None]:
def generate_uniform(shape): # U[-1, 1]
  return torch.rand( *shape ) * 2 - 1

In [None]:
class CDiscriminator(nn.Module):
    def __init__(self, latent_size):
        super().__init__()
        self.discriminator = nn.Sequential(nn.Linear(latent_size + 10, 512),
                                           nn.BatchNorm1d(512),
                                           nn.LeakyReLU(0.2),
                                           nn.Linear(512, 256),
                                           nn.BatchNorm1d(256),
                                           nn.LeakyReLU(0.2),
                                           nn.Linear(256, 1),
                                           nn.Sigmoid())
        
    def forward(self, x, lab):
        x = ...
        x = self.discriminator(x)
        return x

# Задание 6. Детекция аномалий с помощью Autoencoder

Представим следующую ситуацию, нам нужна система, которая принимает изображение сетчатки глаза, если же изображение не является фотографией сетчатки глаза, то нам нужно сообщить пользователю об ошибке и не принимать это изображение.

Идея заключается в следующем: если автоэнкодер может выучить внутреннее представление данных,например фотографий сетчатки глаза, то при восстановлении данных, которые не являются фотографией сетчатки глаза, ошибка будет существенно больше. Установив порог этой ошибки мы сможем отделять нужные фотографии от не нужных.



*   Обучите автоэнкодер на фотографиях сетчатки глаза(RetinaMNIST)
*   Подайте в автоэнкодер другое изображение(можно взять BloodMNIST)
*   Посчитайте ошибку восстановления для разных датасетов
*   Установите порог(значение ошибки) для определения класса фотографии(сетчатка глаза или нет)
*   Проведите тесты 
*   Напишите выводы



In [None]:
from IPython.display import clear_output
!pip install --upgrade git+https://github.com/MedMNIST/MedMNIST.git
clear_output()

In [None]:
import medmnist
from medmnist import INFO, Evaluator

In [None]:
data_flag = 'retinamnist'
info = INFO[data_flag]

DataClass = getattr(medmnist, info['python_class'])

# preprocessing
data_transform = transforms.Compose([transforms.Grayscale(num_output_channels=1),transforms.ToTensor()])

# load the data
train_dataset = DataClass(split='train', transform=data_transform, download=True)
test_dataset = DataClass(split='test', transform=data_transform, download=True)

# encapsulate data into dataloader form
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)

# load the data
data_flag = 'bloodmnist'
info = INFO[data_flag]
DataClass = getattr(medmnist, info['python_class'])

test_dataset_1 = DataClass(split='test', transform=data_transform, download=True)
test_loader_1 = torch.utils.data.DataLoader(dataset=test_dataset_1, batch_size=1, shuffle=False)

In [None]:
# enter your code here

# Задание 7. AAE и латентное пространство



На самом деле, идея с равномерными распределением не очень хороша - мы строго отделили их друг от друга, что принуждает наш автоэнкодер либо разбивать даже похожие цифры, если они относятся к разным классам, либо игнорировать дискриминатор и получать за это штраф.

Приятнее в данном случае использовать распределения, которые определены на области от плюс до минус бесконечности. 

Расположить их на таком расстоянии, чтобы в принципе они пересекались, но очень редко. Для этой цели нам подойдут нормальные распределения.

Сделаем сетку только из 9 нормальных распределений - все равно мы уже не раз наблюдали, что 4 и 9 очень трудно отличимы, потому кластеров у нас максимально 9

In [None]:
def generate_on_grid(size_for_bound, grid, generator):
    samples = []
    for boundaries in grid:
        s = generator(size_for_bound, *boundaries)
        samples.append(s)
    sample = torch.cat(samples, dim=0)
    return sample

def generator_normal(size, xmin, xmax, ymin, ymax):
    mean_x = (xmin + xmax) / 2
    mean_y = (ymin + ymax) / 2
    std_x = (xmax - mean_x) / 2.5
    std_y = (ymax - mean_y) / 2.5
    x = torch.FloatTensor(size, 1).normal_(mean_x, std_x)
    y = torch.FloatTensor(size, 1).normal_(mean_y, std_y)
    return torch.cat([x, y], dim=1)

In [None]:
from itertools import product
grids = [ (x1, x2, x3, x4) for (x1, x2), (x3, x4) in  product([[-1.5, -0.5], [-0.5, 0.5], [0.5, 1.5]], 
                                                           [[-1.5, -0.5], [-0.5, 0.5], [0.5, 1.5]])]
#grids.append((-0.5, 0.5, -2.5, -1.5))
plot_manifold(generate_on_grid(516, grids, generator_normal).numpy())

Обучите AAE переводить изображения в приведенное латентное пространство. Учтите, что может потребоваться много эпох для достижения приемлемого результата.

Возможно, для того, чтобы добиться сходимости модели, надо поиграть с числом реальных объектов, которые подаются нейросети на вход 