## Вариационный автоэнкодер для генерации цифр схожих на датасет MNIST

Автоэнкодер состоит из двух частей - энкодера и декодера. <br>
Задача энкодера сжать входные данные (обычно эти данные обозначают  как **x**) с минимально возможной потерей данных и получить вектор (обычно этот сжатый вектор обозначают  как **z**).
Задача декодера обратная: из сжатого вектора восстановить из **z** исходные данные.

Традиционно автоэнкодеры используют в качестве способа уменьшения размерности 

![Вариационный автоэнкодер, схема](https://lilianweng.github.io/lil-log/assets/images/vae-gaussian.png)

Отличие вариационного автоэнкодера от обычного заключается в способе получения вектора **z**. В вариационном автоэнкодере предполагается, что данные можно охарактеризовать несколькими переменные, которые принадлежат некоторому вероятностному распределению. Зная распределение, можно будет получать (генерировать) данные исключительно из вектора **z**.




Репараметризацию необходимо делать т.к. операция извлечения примера из распределения не дифференцируема. Ниже написан этот способ в виде математической формулы

![alt text](https://cdn-images-1.medium.com/max/1600/1*CEUvzm7vNdSh7cCBgcWxUA.png)

,где $\epsilon$ - случайный шум из нормального распределения;

$\mu$ - среднее значение, полученное от энкодера;

$\sigma$ - стандартное отклонение, полученное от энкодера;






#### Функция потери при обучении

Функция потери состоит из двух  частей:

1.   Потеря восстановления изображения (бинарная кросс-энтропия)
2.   Дивергенция Кульбака-Либлера 

Итоговая функция потери определяется суммой этих двух функций потерь

# Задание

__Создать 2 рализные модели VAE и оценить качество генерации изображений цифр. Построить картинку с плавными переходами между цифрами благодаря семплированию из латентного пространства.__

Создададим 2 модели:
1.   Создадим вариационный автоэнкодер с использованием сверток ([Conv2d](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d)) в энкодере (слои отвечающие за среднее и отклонение остаются полносвязными), и с развертками ([Conv2dTranspose](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html#torch.nn.ConvTranspose2d) в декодере. Размерность скрытого вектора равна двум 

2.  Создадим вариационный автоэнкодер с использованием сверток (Conv2d) в энкодере (слои отвечающие за среднее и отклонение остаются полносвязными), и с развертками ([Upsample](https://pytorch.org/docs/stable/generated/torch.nn.Upsample.html#torch.nn.Upsample), [Conv2d](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d)) в декодере. Размерность скрытого вектора равна двум. [Подробнее](https://distill.pub/2016/deconv-checkerboard/) 

Для построения изображения послепенного перехода цифр необходимо создать сетку из N на N изображений, где по оси Х изменяется значение первого элемента **z**, а по оси Y - второго элемента **z**. Построим такие сети для каждой построенной моделимодели

In [1]:
import torch
import torch.nn as nn
import datetime as dt
import torchvision
import torchvision.transforms as transforms
from tqdm.notebook import tqdm
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import seaborn as sns
import torch.nn.functional as F

## 1 модель:

Загрузим данные:

In [2]:
BATCH_SIZE = 500  # размер батча

transform = transforms.Compose([
    transforms.ToTensor(),
])


train_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=True,
    download=True,
    transform=transform
)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

print("len(train_dataset) =", len(train_dataset))

val_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    download=True,
    transform=transform
)
val_dataloader= torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print("len(val_dataset) =", len(val_dataset))

len(train_dataset) = 60000
len(val_dataset) = 10000


Фотки mnist - чб размером 28 на 28 пикселей

Создадаим класс энкодера:

In [3]:
class FirstEncoder(nn.Module):
    def __init__(self, latent_dim):
        super(FirstEncoder, self).__init__()
        self.latent_dim = latent_dim
        self.conv1 = nn.Sequential(                                           
            nn.Conv2d(1, 32, kernel_size=(3,3), stride=2, padding=1),      
             nn.BatchNorm2d(32),
             nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1),          
             nn.BatchNorm2d(32),
             nn.ReLU())
            
        self.fc_mu = nn.Sequential(
            nn.Linear(32 * 49, self.latent_dim))
        
        self.fc_var = nn.Sequential(
            nn.Linear(32 * 49, self.latent_dim))


    def forward(self, x):
        x = self.conv1(x)
        x = x.reshape(-1, 32*49)
        mu = self.fc_mu(x)
        logvar = self.fc_var(x)
        return mu, logvar

Создадим класс декодера:

In [4]:
class FirstDecoder(nn.Module):
    def __init__(self, latent_dim):
        super(FirstDecoder, self).__init__()
        self.latent_dim = latent_dim
        self.fc_decoder = nn.Sequential(
            nn.Linear(latent_dim, 32*49),
          
        )
        self.conv1 = nn.Sequential(
           nn.ConvTranspose2d(32, 32, kernel_size=3, stride=2, padding=1, output_padding=1),       
           nn.BatchNorm2d(32),
              nn.ReLU(),
            nn.ConvTranspose2d(32, 1, kernel_size=3, stride=2, padding=1, output_padding=1),     
        )

        self.dropout = nn.Dropout(0.5)       
        
        
    def forward(self, x):
        x = self.fc_decoder(x)
        x = x.reshape(-1,32,7,7)
        x = self.conv1(x)
        
        return x

Создадим класс VAE который внутри себя обращаться будет к двум другим классам FirstEncoder и FirstDecoder

In [6]:
class VAE(nn.Module):
        def __init__(self, latent_dim):
            super(VAE, self).__init__()
            self.latent_dim = latent_dim
            self.encoder = FirstEncoder(latent_dim)
            self.decoder = FirstDecoder(latent_dim)

        def forward(self, x):
            mu, logvar = self.encoder(x)
            z = self.reparametrize(mu, logvar)
            x_hat = self.decoder(z)
            return x_hat, mu, logvar
    
    # Функция выполняющая репараметризацию
        def reparametrize(self, mu, logvar):
            std = torch.exp(0.5 * logvar)
            eps = torch.randn_like(std)
            return eps * std + mu

    # Реализация рандомного семплирования из латентного пространства для визуализации
        def sample(self, num_samples) -> torch.Tensor:
            z = torch.randn(num_samples,
                            self.latent_dim)

            z = z.to(self.encoder.fc_mu[0].weight.device)

            samples = torch.sigmoid(self.decoder(z))

            return samples

Загрузим раcширение для tensorboard: 

In [15]:
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Результаты процесса обучения будем отображать в tensorboard среде. Так как GitHub не отображает расширение jupiter tensorboard, поэтому все результаты будут сохранены в tensorboard.dev и представлены в виде ссылки

[__РЕЗУЛЬТАТЫ TensorBoard.dev__](https://tensorboard.dev/experiment/5eQsWOPrTJyCXq3lZnSLLg/#scalars&_smoothingWeight=0.788)



In [16]:
%reload_ext tensorboard
%tensorboard --logdir 'logs'

Reusing TensorBoard on port 6006 (pid 19548), started 0:01:05 ago. (Use '!kill 19548' to kill it.)

In [17]:
epochs = 200   # число эпох
latent_dim = 2   # Размер латентного простанства

# число рандомных семплов и латентного простанства для визуализации процесса обучения:
num_examples_to_generate = 16

device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = VAE(latent_dim).to(device)

# параметры градиентного спуска и scheduler для постепенного уменьшения величины шага
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer=optimizer, step_size=10, gamma=0.5)

In [18]:
summary_writer = SummaryWriter(comment = ' With ConvTranspose in the decoder part')

    
def generate_and_save_images(model, epoch, file_writer):
    '''
    Генерирует 16 примеров и записывает их в file_writer для
    визуализации в tensorboard
    '''
    with torch.no_grad():
        model.eval()
        predictions = model.sample(16).reshape(-1,1,28,28)
    images = torchvision.utils.make_grid(predictions, 4)
    file_writer.add_image("samples", images, global_step=epoch)

    
# Получим результат до начала обучения - рандомный шум : 
generate_and_save_images(model, 0, summary_writer)  

Определим функции потерь:

In [19]:
def compute_loss(model, x):
    x_hat, mu, logvar = model(x)
    ## сигмоида встроена в функцию потери
    recons_loss = nn.functional.binary_cross_entropy_with_logits(x_hat, x, reduction='sum') 

    kld_loss = -0.5 * (1 + logvar - mu ** 2 - logvar.exp()).sum()

    return recons_loss, kld_loss

Приступим к обучению:

In [None]:
for epoch in tqdm(range(1, epochs+1)):
    model.train()
    totat_len = len(train_dataloader)
    for step, train_x in enumerate(train_dataloader):
        recon_loss, kld_loss = compute_loss(model, train_x[0].to(device))
        summary_writer.add_scalar('train/recon_loss', recon_loss, global_step = epoch * totat_len + step)
        summary_writer.add_scalar('train/kld_loss', kld_loss, global_step = epoch * totat_len + step)
         
        loss = recon_loss + kld_loss  # loss - сумму потерь

        summary_writer.add_scalar('train/loss', loss, global_step = epoch * totat_len + step)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 5 == 0:
        losses = []
        model.eval()
        for test_x in val_dataloader:
            recon_loss, kld_loss = compute_loss(model, test_x[0].to(device))
            loss = recon_loss + kld_loss
            losses.append(loss)
        summary_writer.add_scalar('test/loss', torch.stack(losses).mean(), global_step=epoch)
        generate_and_save_images(model, epoch, summary_writer)
    scheduler.step()
    
summary_writer.close()

  0%|          | 0/200 [00:00<?, ?it/s]

Таким образом, мы получим на tensorboard графики изменения recons_loss, kld_loss и суммарного loss на трейне для каждого шага градиентного спуска и на тесте раз в 5 эпох генерили фотки и получали график изменения суммарного loss

Для тестового датасета получим значения средних на выходе из энкодера. Далее будем визуализировать латентное пространство

In [None]:
transform = transforms.Compose([
    transforms.ToTensor()])

val_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    download=True,
    transform=transform
)


BATCH_SIZE = len(val_dataset)
val_dataloader= torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

model.eval()
for test_x in val_dataloader:
    x_hat, mu, logvar = model(test_x[0])
    labels = test_x[1]

print(mu)  

In [None]:
mu_numpy.shape

In [None]:
mu_save = mu
logvar_save = logvar
labels_save = labels
lab = labels_save.numpy()
mu_numpy = mu.detach().numpy()

сохраним модель:

In [None]:
torch.save(model, 'models/model_mnist_convtranspose.pth')

Определим цвета для каждой цифры, чтобы визуализировать в дальнейшем распеделение точек в латентном пространстве. Всего на тесте имеется 10000 изображений. <br>
Посмотрим как распределено число фотографий между десятью классами:

In [None]:
col = list(mcolors.TABLEAU_COLORS.keys())
sns.set_theme(style="whitegrid")
plt.figure(figsize=(8, 6), dpi=80)
sns.countplot(x=lab);

Для каждой из 10000 фоток найдем соответствующее положение в латентном пространстве:

In [None]:
plt.figure(figsize=(10, 10), dpi=80)
for i, point in enumerate(mu_numpy[:]):
    nom = lab[i]
    plt.scatter(point[0],point[1],color = col[nom])
plt.show()

Видно, что сеть смогла кластеризовать классы близко друг с другом, сем самым при наличии всего двух осей латентного пространства, модель неплохо справилась с задачей разделения классов без знания самих меток (то есть обучение без учителя)

Построим картинку с плавными переходами между цифрами благодаря семплированию из латентного пространства:

In [None]:
num = 0
N = 15
plt.figure(figsize=(10, 10), dpi=80)
for i in np.linspace(-2, 2, N, endpoint=True):
    for j in np.linspace(2, -2, N, endpoint=True):
        num += 1
        ax = plt.subplot(N, N, num)
        samples = torch.sigmoid(model.decoder(torch.Tensor([[i,j]]))).reshape(28,28)
        ax.imshow(samples.detach(), cmap='gray')
        ax.axis('off')

Сделаем инвертирование цвета, чтобы получить черные цифры на белом фоне:

In [None]:
num = 0
N = 30
plt.figure(figsize=(30, 30), dpi=200)
for i in np.linspace(-2, 2, N, endpoint=True):
    for j in np.linspace(2, -2, N, endpoint=True):
        num += 1
        ax = plt.subplot(N, N, num)
        samples = 1 - torch.sigmoid(model.decoder(torch.Tensor([[i,j]]))).reshape(28,28)
        ax.imshow(samples.detach(), cmap='gray')
        ax.axis('off')

## 2 модель:

Повторно загрузим данные:

In [None]:
BATCH_SIZE = 500

transform = transforms.Compose([
    transforms.ToTensor()])


train_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=True,
    download=True,
    transform=transform
)

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

print("len(train_dataset) =", len(train_dataset))

val_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    download=True,
    transform=transform
)
val_dataloader= torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print("len(val_dataset) =", len(val_dataset))

Класс энкодера не изменился:

In [None]:
class Encoder(nn.Module):
    def __init__(self, latent_dim):
        super(Encoder, self).__init__()
        self.latent_dim = latent_dim
        self.conv1 = nn.Sequential(                                           
             nn.Conv2d(1, 32, kernel_size=(3,3), stride=2, padding=1),      
             nn.BatchNorm2d(32),
             nn.ReLU(),
             nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1),          
             nn.BatchNorm2d(32),
             nn.ReLU())

        self.fc_mu = nn.Sequential(
             nn.Linear(32 * 49, self.latent_dim))
        
        self.fc_var = nn.Sequential(
             nn.Linear(32 * 49, self.latent_dim))


    def forward(self, x):
        
        x = self.conv1(x)
        x = x.reshape(-1, 32*49)
        mu = self.fc_mu(x)
        logvar = self.fc_var(x)
        return mu, logvar

Декодер теперь реализуется через upsample_bilinear и свертки:

In [None]:
class Decoder(nn.Module):
    def __init__(self, latent_dim):
        super(Decoder, self).__init__()
        self.latent_dim = latent_dim
        self.fc_decoder = nn.Sequential(
            nn.Linear(latent_dim, 32*49))
        
        self.conv1 = nn.Conv2d(32, 32, kernel_size=3, padding=1)       
        self.bn1 = nn.BatchNorm2d(32)
        self.act = nn.ReLU() 
        self.conv2 = nn.Conv2d(32, 1, kernel_size=3, padding=1)   
        
        
    def forward(self, x):
        x = self.fc_decoder(x)
        x = x.reshape(-1,32,7,7)
        x = F.upsample_bilinear(x, size=14)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.act(x)
        x = F.upsample_bilinear(x, size=28)
        x = self.conv2(x)
        
        
        return x

Определим класс VAE, который внутри себя обращаться будет к двум другим классам Encoder и Decoder

In [None]:
class VAE(nn.Module):
        def __init__(self, latent_dim):
            super(VAE, self).__init__()
            self.latent_dim = latent_dim
            self.encoder = Encoder(latent_dim)
            self.decoder = Decoder(latent_dim)

        def forward(self, x):
            mu, logvar = self.encoder(x)
            z = self.reparametrize(mu, logvar)
            x_hat = self.decoder(z)
            return x_hat, mu, logvar
    
    # Функция выполняющая репараметризацию
        def reparametrize(self, mu, logvar):
            std = torch.exp(0.5 * logvar)
            eps = torch.randn_like(std)
            return eps * std + mu

        def sample(self, num_samples) -> torch.Tensor:
            z = torch.randn(num_samples,
                            self.latent_dim)

            z = z.to(self.encoder.fc_mu[0].weight.device)

            samples = torch.sigmoid(self.decoder(z))

            return samples

In [None]:
epochs = 200   # число эпох
latent_dim = 2  # Размер латентного простанства
num_examples_to_generate = 16  

# число рандомных семплов и латентного простанства для визуализации процесса обучения:
model = VAE(latent_dim).to(device)

# параметры градиентного спуска и scheduler для постепенного уменьшения величины шага
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer=optimizer, step_size=10, gamma=0.5)

In [None]:
summary_writer = SummaryWriter(comment = ' With Upsampling in the decoder part')

# Получим результат до начала обучения - рандомный шум :     
generate_and_save_images(model, 0, summary_writer)

In [None]:
for epoch in tqdm(range(1, epochs+1)):
    model.train()
    totat_len = len(train_dataloader)
    for step, train_x in enumerate(train_dataloader):
        recon_loss, kld_loss = compute_loss(model, train_x[0].to(device))
        summary_writer.add_scalar('train/recon_loss', recon_loss, global_step = epoch * totat_len + step)
        summary_writer.add_scalar('train/kld_loss', kld_loss, global_step = epoch * totat_len + step)
        loss = recon_loss + kld_loss

        summary_writer.add_scalar('train/loss', loss, global_step = epoch * totat_len + step)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 5 == 0:
        losses = []
        model.eval()
        for test_x in val_dataloader:
            recon_loss, kld_loss = compute_loss(model, test_x[0].to(device))
            loss = recon_loss + kld_loss
            losses.append(loss)
        summary_writer.add_scalar('test/loss', torch.stack(losses).mean(), global_step=epoch)
        generate_and_save_images(model, epoch, summary_writer)
    scheduler.step()
    
summary_writer.close()

Таким образом, мы получим на tensorboard графики изменения recons_loss, kld_loss и суммарного loss на трейне для каждого шага градиентного спуска и на тесте раз в 5 эпох генерили фотки и получали график изменения суммарного loss

Для тестового датасета получим значения средних на выходе из энкодера. Далее будем визуализировать латентное пространство

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])


val_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    download=True,
    transform=transform
)
print("len(val_dataset) =", len(val_dataset))

BATCH_SIZE = len(val_dataset)
val_dataloader= torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

model.eval()
for test_x in val_dataloader:
    x_hat, mu, logvar = model(test_x[0])
    labels = test_x[1]
    
print(mu)  

In [None]:
mu_save = mu
logvar_save = logvar
labels_save = labels
lab = labels_save.numpy()
mu_numpy = mu.detach().numpy()

сохраним модель:

In [None]:
torch.save(model, 'models/model_mnist_upsampling.pth')

Для каждой из 10000 фоток найдем соответствующее положение в латентном пространстве:

In [None]:
plt.figure(figsize=(10, 10), dpi=80)
for i, point in enumerate(mu_numpy[:]):
    nom = lab[i]
    plt.scatter(point[0],point[1],color = col[nom])
plt.show()

Видно, что сеть смогла кластеризовать классы близко друг с другом, сем самым при наличии всего двух осей латентного пространства, модель неплохо справилась с задачей разделения классов без знания самих меток (то есть обучение без учителя)

Построим картинку с плавными переходами между цифрами благодаря семплированию из латентного пространства:

In [None]:
num = 0
N = 30
plt.figure(figsize=(30, 30), dpi=200)
for i in np.linspace(-2, 2, N, endpoint=True):
    for j in np.linspace(2, -2, N, endpoint=True):
        num += 1
        ax = plt.subplot(N, N, num)
        samples = 1 - torch.sigmoid(model.decoder(torch.Tensor([[i,j]]))).reshape(28,28)
        ax.imshow(samples.detach(), cmap='gray')
        ax.axis('off')

In [None]:
# Загрузил через терминал рузультат tensorboard:
'''
tensorboard dev upload --logdir logs \
    --name "experiment" \
    --description "200 epochs VAE"
'''

__ВЫВОД:__