## 변형 오토인코더
변형 오토인코더(Variational autoencoder)를 좀 더 쉽게 이해할 수 있게 오토인코더와 비교하면서 설명하겠습니다. 오토인코더는 다음 그림과 같이  
`입력(숫자 2)` -> `인코더` -> `압축(차원축소)` -> `디코더` -> `출력(숫자2)`  
이 나오게 하는 방법입니다.  
![](../Static/695_2.jpg)  

오토인코더는 차원을 줄이는 것이 목표이기 때문에 새롭게 생성된 데이터의 확률 분포에는 관심이 없습니다.

반면 변형 오토인코더는 표준편차와 평균을 이용하여 확률 분포를 만들고, 거기에서 샘플링하여 디코더를 통과시킨 후 새로운 데이터를 만들어 냅니다. 즉, 변형 오토인코더는 `입력 데이터와 조금 다른 출력 데이터`를 만들어 내는데, 이때 z라는 가우시안 분포를 이용합니다.(z를 잠재벡터(latent vector)라고 합니다.) 중요한 특성의 파라미터를 담고 있는 z 분포에서 벡터를 랜덤하게 샘플링하고 이 분포의 오차를 이용하여 입력 데이터와 유사한 다양한 데이터를 만들어 내는 것이 변형 오토인코더입니다.  

![](../Static/696_1.jpg)

오토인코더는 데이터 벡터에 대한 차원을 축소하여 실제 이미지와 동일한 이미지를 출력하는 것이 목적이었다면, 변형 오토인코더는 데이터가 만들어지는 확률 분포를 찾아 비슷한 데이터를 생성하는 것이 목적입니다.

변형 오토인코더에서 인코더와 디코더에 대한 네트워크는 다음 그림과 같습니다.  
![](../Static/696_2.jpg)  

* qϕ(z|x) : x를 입력받아 잠재 벡터 z와 대응되는 평균과 분산을 구하는 네트워크로 인코더 네트워크를 의미합니다.

* pθ(x|z) : z를 입력받아 x와 대응되는 평균과 분산을 구하는 네트워크로 디코더 네트워크를 의미합니다.

그럼 인코더 네트워크부터 자세히 살펴보겠습니다.  
![](../Static/697_1.jpg)

이번 예제에서는 텐서보드에서 에포크 진행에 따른 오차를 확인할 예정입니다. 따라서 다음 명령으로 텐서보드를 설치합니다. 텐서보드 사용을 위해 `텐서보드 엑스(tensorboardX)` 라이브러리를 설치해야 합니다.

pip install rensorboardX

텐서보드 엑스는 학습 과정을 시각적으로 확인하고자 할 때 사용하는 도구입니다.

변형 오토인코더에서도 오토인코더에서와 마찬가지로 MNIST 데이터셋을 이용합니다. 먼저 필요한 라이브러리를 호출합니다.

In [1]:
import datetime
import os
from tensorboardX import SummaryWriter

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

import torchvision.datasets as datasets
import torchvision.transforms as transforms

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

MNIST 데이터셋을 내려받은 후 이미지를 텐서로 변환합니다.

In [2]:
transform = transforms.Compose([transforms.ToTensor()])

train_dataset = datasets.MNIST(
    root='./MNIST_DATA/',
    train=True,
    transform=transform,
    download=True,
)

test_dataset = datasets.MNIST(
    root='./MNIST_DATA/',
    train=False,
    transform=transform,
    download=True,
)

train_loader = DataLoader(
    dataset=train_dataset, batch_size=100, shuffle=True, num_workers=4, pin_memory=False
)

test_loader = DataLoader(
    dataset=test_dataset, batch_size=100, shuffle=False, num_workers=4
)



모델의 네트워크를 생성합니다. 네트워크는 오토인코더처럼 인코더와 디코더로 구성됩니다.

In [31]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()
        self.input1 = nn.Linear(input_dim, hidden_dim)
        self.input2 = nn.Linear(hidden_dim, hidden_dim)
        self.mean = nn.Linear(hidden_dim, latent_dim)
        self.var = nn.Linear(hidden_dim, latent_dim)

        self.LeakyReLU = nn.LeakyReLU(0.2)
        self.training = True
    
    def forward(self, x):
        h_ = self.LeakyReLU(self.input1(x))
        h_ = self.LeakyReLU(self.input2(h_))
        mean = self.mean(h_)
        log_var = self.var(h_)
        return mean, log_var # 인코더 네트워크에서 평균과 분산을 반환



인코더 역할은 데이터(x)가 주어졌을 때 디코더가 원래 데이터로 잘 복원할 수 있는 이상적인 확률 분포 p(z|x)를 찾는 것입니다. 변형 오토인코더에서는 이상적인 확률 분포를 찾는데 변분추론을 사용합니다.

이번에는 디코더 네트워크를 정의합니다.

In [32]:
class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        self.hidden1 = nn.Linear(latent_dim, hidden_dim)
        self.hidden2 = nn.Linear(hidden_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, output_dim)
        self.LeakyReLU = nn.LeakyReLU(0.2)

    def forward(self, x):
        h = self.LeakyReLU(self.hidden1(x))
        h = self.LeakyReLU(self.hidden2(h))
        x_hat = torch.sigmoid(self.output(h))
        return x_hat # 디코더 결과는 시그모이드를 통과했으므로, 0~1 값을 갖습니다.

디코더는 추출한 샘플을 입력으로 받아 다시 원본으로 재구축(재생성)하는 역할을 합니다.

이제 평균과 표준편차가 주어졌을 때 잠재 벡터 z를 만들기 위해 `reparameterization()` 이라는 이름으로 함수를 생성해 보겠습니다.

In [33]:
class Model(nn.Module):
    def __init__(self, Encoder, Decoder):
        super(Model, self).__init__()
        self.Encoder = Encoder
        self.Decoder = Decoder

    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(device=device)
        z = mean + var * epsilon # z값 구하기
        return z
    
    def forward(self, x):
        mean, log_var = self.Encoder(x)
        z = self.reparameterization(mean, torch.exp(0.5 * log_var))
        x_hat = self.Decoder(z)
        return x_hat, mean, log_var # 디코더의 결과와 평균, 표준편차(log를 취한 표준편차)를 반환

* `reparameterization()` 함수는 z 벡터를 샘플링하기 위한 용도입니다. z는 가우시안 분포라고 가정했기 때문에 인코더에서 받아 온 평균과 표준편차를 이용하여 z를 생성합니다. 그리고 z벡터를 디코더에 다시 통과시켜서 입력과 동일한 데이터(x_hat)를 만들어 내는 작업을 합니다.

* 인코더에서 받아 온 평균과 표준편차를 이용하지만 표준편차는 값을 그대로 사용하지 않습니다. 값이 음수가 되지 않도록 로그(log)를 취하는데, 다음과 같은 방식을 취합니다.  
![](../Static/fn2-88.jpg)  
따라서 변수 이름도 `var`에서 `log_var`로 변경했습니다.

필요한 모델의 네트워크(인코더와 디코더) 객체를 초기화합니다.

In [34]:
x_dim = 784
hidden_dim = 400
latent_dim = 200
epochs = 30
batch_size = 100

encoder = Encoder(input_dim=x_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)
decoder = Decoder(latent_dim=latent_dim, hidden_dim=hidden_dim, output_dim=x_dim)

model = Model(Encoder=encoder, Decoder=decoder).to(device=device)

오차를 계산하기 위한 손실 함수를 정의합니다.

In [35]:
def loss_function(x, x_hat, mean, log_var):
    reproduction_loss = nn.functional.binary_cross_entropy(
    x_hat, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mean.pow(2) - log_var.exp())
    return reproduction_loss, KLD


optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


* `loss_function` 오차를 구하는 함수입니다. 변분추론으로  p(z|x)와 q(z) 사이의 `쿨백-라이블러 발산(KLD)`을 계산하고, KLD가 줄어드는 쪽으로 q(z)를 조금씩 업데이트합니다. 즉, 변형 오토인코더에서 손실 함수가 쿨백-라이블러 발산이 됩니다. 즉, 손실 함수에서 반환되는 값을 수식처럼 모두 더하여 사용하는 것이 최종 손실 함수가 됩니다.

이제 모델 학습에 필요한 함수를 정의합니다.

In [36]:
saved_loc = 'scalar/' # 텐서보드에서 사용할 경로
writer = SummaryWriter(saved_loc)

model.train()


def train(epoch, model, train_loader, optimizer):
    train_loss = 0
    for batch_idx, (x, _) in enumerate(train_loader):
        x = x.view(batch_size, x_dim)
        x = x.to(device)

        optimizer.zero_grad()
        x_hat, mean, log_var = model(x)
        BCE, KLD = loss_function(x, x_hat, mean, log_var)
        loss = BCE + KLD
        writer.add_scalar("Train/Reconstruction Error", BCE.item(), batch_idx + epoch *
                          (len(train_loader.dataset)/batch_size))
        writer.add_scalar("Train/KL-Divergence", KLD.item(), batch_idx + epoch *
                          (len(train_loader.dataset)/batch_size))
        writer.add_scalar("Train/Total Loss", loss.item(), batch_idx + epoch *
                          (len(train_loader.dataset)/batch_size))

        train_loss += loss.item()
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\t Loss: {:.6f}'.format(
                  epoch, batch_idx * len(x), len(train_loader.dataset),
                  100. * batch_idx / len(train_loader),
                  loss.item() / len(x)))
            
    print("======> Epoch: {} Average loss: {:.4f}".format(
          epoch, train_loss / len(train_loader.dataset)))



In [37]:
def test(epoch, model, test_loader):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch_idx, (x, _) in enumerate(test_loader):
            x = x.view(batch_size, x_dim)
            x = x.to(device)
            x_hat, mean, log_var = model(x)
            BCE, KLD = loss_function(x, x_hat, mean, log_var)
            loss = BCE + KLD

            writer.add_scalar("Test/Reconstruction Error", BCE.item(), batch_idx +
                              epoch * (len(test_loader.dataset)/batch_size))
            writer.add_scalar("Test/KL-Divergence", KLD.item(), batch_idx + epoch *
                              (len(test_loader.dataset)/batch_size))
            writer.add_scalar("Test/Total Loss", loss.item(), batch_idx + epoch *
                              (len(test_loader.dataset)/batch_size))
            test_loss += loss.item()

            if batch_idx == 0:
                n = min(x.size(0), 8)
                comparison = torch.cat(
                    [x[:n], x_hat.view(batch_size, x_dim)[:n]])
                grid = torchvision.utils.make_grid(comparison.cpu())
                writer.add_image(
                    "Test image - Above: Real data, below: reconstruction data", grid, epoch)


In [38]:
from tqdm.auto import tqdm
for epoch in tqdm(range(0, epochs)):
    train(epoch, model, train_loader, optimizer)
    test(epoch, model, test_loader)
    print("\n")
writer.close()


  0%|          | 0/30 [00:00<?, ?it/s]



  3%|▎         | 1/30 [00:05<02:47,  5.78s/it]





  7%|▋         | 2/30 [00:11<02:44,  5.88s/it]





 10%|█         | 3/30 [00:17<02:39,  5.89s/it]





 13%|█▎        | 4/30 [00:24<02:39,  6.14s/it]





 17%|█▋        | 5/30 [00:31<02:42,  6.48s/it]





 20%|██        | 6/30 [00:38<02:40,  6.68s/it]





 23%|██▎       | 7/30 [00:45<02:34,  6.70s/it]





 27%|██▋       | 8/30 [00:52<02:30,  6.82s/it]





 30%|███       | 9/30 [00:59<02:24,  6.90s/it]





 33%|███▎      | 10/30 [01:06<02:19,  6.98s/it]





 37%|███▋      | 11/30 [01:14<02:18,  7.30s/it]





 40%|████      | 12/30 [01:22<02:17,  7.62s/it]





 43%|████▎     | 13/30 [01:31<02:17,  8.08s/it]





 47%|████▋     | 14/30 [01:40<02:12,  8.28s/it]





 50%|█████     | 15/30 [01:49<02:05,  8.37s/it]





 53%|█████▎    | 16/30 [01:58<02:00,  8.58s/it]





 57%|█████▋    | 17/30 [02:07<01:53,  8.72s/it]





 60%|██████    | 18/30 [02:16<01:47,  9.00s/it]





 63%|██████▎   | 19/30 [02:27<01:43,  9.36s/it]





 67%|██████▋   | 20/30 [02:37<01:37,  9.75s/it]





 70%|███████   | 21/30 [02:49<01:32, 10.33s/it]





 73%|███████▎  | 22/30 [03:02<01:28, 11.08s/it]





 77%|███████▋  | 23/30 [03:16<01:24, 12.06s/it]





 80%|████████  | 24/30 [03:33<01:21, 13.61s/it]





 83%|████████▎ | 25/30 [03:48<01:09, 13.82s/it]





 87%|████████▋ | 26/30 [03:59<00:52, 13.09s/it]





 90%|█████████ | 27/30 [04:09<00:36, 12.13s/it]





 93%|█████████▎| 28/30 [04:18<00:22, 11.15s/it]





 97%|█████████▋| 29/30 [04:27<00:10, 10.46s/it]





100%|██████████| 30/30 [04:36<00:00,  9.20s/it]






