# __GAN tutorial__

### 라이브러리 및 데이터 불러오기

In [1]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.autograd import Variable
import pickle
from tqdm import tqdm


# 데이터 전처리 방식을 지정
transform = transforms.Compose([
    transforms.ToTensor(),  # 데이터를 PyTorch의 Tensor 형식으로 바꾼다.
    transforms.Normalize(mean=(0.5,), std=(0.5,))  # 픽셀값 0 ~ 1 -> -1 ~ 1
])

# MNIST 데이터셋을 불러온다. 지정한 폴더에 없을 경우 자동으로 다운로드
mnist = datasets.MNIST(root='data', download=True, transform=transform)

# 데이터를 한번에 batch_size만큼만 가져오는 dataloader를 만든다.
dataloader = DataLoader(mnist, batch_size=60, shuffle=True)

In [2]:
print(torch.__version__)

1.4.0


In [3]:
import os
import imageio

if torch.cuda.is_available():
    use_gpu = True
leave_log = True
if leave_log:
    result_dir = 'GAN_generated_images'
    if not os.path.isdir(result_dir):
        os.mkdir(result_dir)

In [4]:
print(torch.cuda.is_available())
#print(torch.cuda.get_device_name(0))
print(torch.cuda.device_count())
print(torch.cuda.current_device())
print(torch.cuda.device(0))

True
1
0
<torch.cuda.device object at 0x7f279d19b910>


In [5]:
torch.autograd.set_detect_anomaly(mode=True)

<torch.autograd.anomaly_mode.set_detect_anomaly at 0x7f279d19bf90>

### GAN의 생성자(Generator)

TODO: LeakyReLU의 inplace=True일시, 에러발생 수정

In [None]:
"""
# 생성자는 랜덤 벡터 z를 입력으로 받아 가짜 이미지를 출력한다.
class Generator(nn.Module):
    
    # 네트워크 구조
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(in_features=100, out_features=256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(in_features=256, out_features=512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(in_features=512, out_features=1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(in_features=1024, out_features=28*28),
            nn.Tanh()  # 출력값을 픽셀값의 범위인 -1과 1사이로 만들어주기 위해 Tanh를 사용
        )
        
    # (batch_size x 100) 크기의 랜덤 벡터를 받아
    # 이미지를 (batch_size x 1 x 28 x 28) 크기로 출력한다.
    def forward(self, inputs):
        return self.main(inputs).view(-1, 1, 28, 28)
"""

In [7]:
# 생성자는 랜덤 벡터 z를 입력으로 받아 가짜 이미지를 출력한다.
class Generator(nn.Module):
    
    # 네트워크 구조
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(in_features=100, out_features=256),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Linear(in_features=256, out_features=512),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Linear(in_features=512, out_features=1024),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Linear(in_features=1024, out_features=28*28),
            nn.Tanh()  # 출력값을 픽셀값의 범위인 -1과 1사이로 만들어주기 위해 Tanh를 사용
        )
        
    # (batch_size x 100) 크기의 랜덤 벡터를 받아
    # 이미지를 (batch_size x 1 x 28 x 28) 크기로 출력한다.
    def forward(self, inputs):
        return self.main(inputs).view(-1, 1, 28, 28)

#### GAN의 구분자(Discriminator)

In [None]:
"""
# 구분자는 이미지를 입력으로 받아 이미지가 진짜인지 가짜인지 출력한다
class Discriminator(nn.Module):
    
    # 네트워크 구조
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(in_features=28*28, out_features=1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=1024, out_features=512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=512, out_features=256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=256, out_features=1),
            nn.Sigmoid())  # 마지막에는 확률값을 나타내는 숫자 하나를 출력
        
    # (batch_size x 1 x 28 x 28) 크기의 이미지를 받아
    # 이미지가 진짜일 확률 0~1 사이로 출력
    def forward(self, inputs):
        inputs = inputs.view(-1, 28*28)
        return self.main(inputs)
"""

In [8]:
# 구분자는 이미지를 입력으로 받아 이미지가 진짜인지 가짜인지 출력한다
class Discriminator(nn.Module):
    
    # 네트워크 구조
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(in_features=28*28, out_features=1024),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=1024, out_features=512),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=512, out_features=256),
            nn.LeakyReLU(0.2, inplace=False),
            nn.Dropout(inplace=True),
            nn.Linear(in_features=256, out_features=1),
            nn.Sigmoid())  # 마지막에는 확률값을 나타내는 숫자 하나를 출력
        
    # (batch_size x 1 x 28 x 28) 크기의 이미지를 받아
    # 이미지가 진짜일 확률 0~1 사이로 출력
    def forward(self, inputs):
        inputs = inputs.view(-1, 28*28)
        return self.main(inputs)

In [9]:
## 생성자와 구분자 객체 만들기
G = Generator()
D = Discriminator()

if use_gpu:
    G.cuda()
    D.cuda()

In [10]:
## 손실 함수와 최적화 기법 지정
# Binaary Cross Entropy loss
criterion = nn.BCELoss()

# 생성자의 매개 변수를 최적화하는 Adam optimizer
G_optimizer = Adam(G.parameters(), lr=0.0002, betas=(0.5, 0.999))
# 구분자의 매개 변수를 최적화하는 Adam optimizer
D_optimizer = Adam(D.parameters(), lr=0.0002, betas=(0.5, 0.999))

In [11]:
# 학습 결과 시각화하기
%matplotlib inline
from matplotlib import pyplot as plt
import numpy as np

def square_plot(data, path):
    """Take an array of shape (n, height, width) or (n, height, width, 3)
       and visualizer each (height, width) thing in a grid of size approx. sqrt(n) by sqrt(n)"""
    
    if type(data) == list:
        data = np.concatenate(data)
    # normalize data for display
    data = (data - data.min()) / (data.max() - data.min())
    
    # force the number of filters to be squere
    n = int(np.ceil(np.sqrt(data.shape[0])))
    
    padding = (((0, n ** 2 - data.shape[0]) ,
                (0, 1), (0, 1))  # add some space between filters
               + ((0, 0),) * (data.ndim - 3))  # don't pad the last dimension (if there is one)
    data = np.pad(data, padding, mode='constant', constant_values=1)  # pad with ones (white)
    
    # tilethe filters into an image
    data = data.reshape((n, n) + data.shape[1:]).transpose((0, 2, 1, 3) + tuple(range(4, data.ndim + 1)))

    data = data.reshape((n * data.shape[1], n * data.shape[3]) + data.shape[4:])
    
    plt.imsave(path, data, cmap='gray')

In [12]:
if leave_log:
    train_hist = {}
    train_hist['D_losses'] = []
    train_hist['G_losses'] = []
    generated_images = []
    
z_fixed = Variable(torch.randn(5 * 5, 100), volatile=True)
if use_gpu:
    z_fixed = z_fixed.cuda()

  import sys


### 모델 학습

In [16]:
### 모델 학습을 위한 반복문
# 데이터셋을 100번 돌며 학습한다.
for epoch in range(10):  # range(100)
    
    if leave_log:
        D_losses = []
        G_losses = []
    
    # 한번에 batch_size만큼 데이터를 가져온다.
    for real_data, _ in dataloader:
        batch_size = real_data.size(0)
        
        # 데이터를 pytorch의 변수로 변환한다.
        real_data = Variable(real_data)

        ### 구분자 학습시키기

        # 이미지가 진짜일 때 정답 값은 1이고 가짜일 때는 0이다.
        # 정답지에 해당하는 변수를 만든다.
        target_real = Variable(torch.ones(batch_size, 1))
        target_fake = Variable(torch.zeros(batch_size, 1))
         
        if use_gpu:
            real_data, target_real, target_fake = real_data.cuda(), target_real.cuda(), target_fake.cuda()
            
        # 진짜 이미지를 구분자에 넣는다.
        D_result_from_real = D(real_data)
        # 구분자의 출력값이 정답지인 1에서 멀수록 loss가 높아진다.
        D_loss_real = criterion(D_result_from_real, target_real)

        # 생성자에 입력으로 줄 랜덤 벡터 z를 만든다.
        z = Variable(torch.randn((batch_size, 100)))
        
        if use_gpu:
            z = z.cuda()
            
        # 생성자로 가짜 이미지를 생성한다.
        fake_data = G(z)
        
        # 생성자가 만든 가짜 이미지를 구분자에 넣는다.
        D_result_from_fake = D(fake_data)
        # 구분자의 출력값이 정답지인 0에서 멀수록 loss가 높아진다.
        D_loss_fake = criterion(D_result_from_fake, target_fake)
        
        # 구분자의 loss는 두 문제에서 계산된 loss의 합이다.
        D_loss = D_loss_real + D_loss_fake
        
        # 구분자의 매개 변수의 미분값을 0으로 초기화한다.
        D.zero_grad()
        # 역전파를 통해 매개 변수의 loss에 대한 미분값을 계산한다.
        D_loss.backward()
        # 최적화 기법을 이용해 구분자의 매개 변수를 업데이트한다.
        D_optimizer.step()
        
        if leave_log:
            #D_losses.append(D_loss.data[0])
            D_losses.append(D_loss.data)  # 수정 - PyTorch>=0.5, the index of 0-dim tensor is invalid


        # train generator G

        ### 생성자 학습시키기
        
        # 생성자에 입력으로 줄 랜덤 벡터 z를 만든다.
        z = Variable(torch.randn((batch_size, 100)))
        
        if use_gpu:
            z = z.cuda()
        
        # 생성자로 가짜 이미지를 생성한다.
        fake_data = G(z)
        # 생성자가 만든 가짜 이미지를 구분자에 넣는다.
        D_result_from_fake = D(fake_data)
        # 생성자의 입장에서 구분자의 출력값이 1에서 멀수록 loss가 높아진다.
        G_loss = criterion(D_result_from_fake, target_real)
        
        # 생성자의 매개 변수의 미분값을 0으로 초기화한다.
        G.zero_grad()
        # 역전파를 통해 매개 변수의 loss에 대한 미분값을 계산한다.
        G_loss.backward()
        # 최적화 기법을 이용해 생성자의 매개 변수를 업데이트한다.
        G_optimizer.step()
        
        if leave_log:
            #G_losses.append(G_loss.data[0])
            G_losses.append(G_loss.data)  # 수정 - PyTorch>=0.5, the index of 0-dim tensor is invalid
            
    if leave_log:
        true_positive_rate = (D_result_from_real > 0.5).float().mean().data  # 수정 .data[0] to .data
        true_negative_rate = (D_result_from_fake < 0.5).float().mean().data  # 수정 .data[0] to .data
        base_message = ("Epoch: {epoch:<3d} D Loss: {d_loss:<8.6} G Loss: {g_loss:<8.6} "
                        "True Positive Rate: {tpr:<5.1%} True Negative Rate: {tnr:<5.1%}"
                       )
        message = base_message.format(
                    epoch=epoch,
                    d_loss=sum(D_losses)/len(D_losses),
                    g_loss=sum(G_losses)/len(G_losses),
                    tpr=true_positive_rate,
                    tnr=true_negative_rate
        )
        print(message)
    
    if leave_log:
        fake_data_fixed = G(z_fixed)
        image_path = result_dir + '/epochs/epoch{}.png'.format(epoch)
        square_plot(fake_data_fixed.view(25, 28, 28).cpu().data.numpy(), path=image_path)
        generated_images.append(image_path)
    
    if leave_log:
        train_hist['D_losses'].append(torch.mean(torch.FloatTensor(D_losses)))
        train_hist['G_losses'].append(torch.mean(torch.FloatTensor(G_losses)))

torch.save(G.state_dict(), "GAN_output/gan_generator.pkl")
torch.save(D.state_dict(), "GAN_output/gan_discriminator.pkl")
with open('GAN_output/gan_train_history.pkl', 'wb') as f:
    pickle.dump(train_hist, f)

generated_image_array = [imageio.imread(generated_image) for generated_image in generated_images]
imageio.mimsave(result_dir + '/GAN_generation.gif', generated_image_array, fps=5)

Epoch: 0   D Loss: 0.619918 G Loss: 2.28332  True Positive Rate: 85.0% True Negative Rate: 100.0%
Epoch: 1   D Loss: 0.801668 G Loss: 1.78568  True Positive Rate: 71.7% True Negative Rate: 88.3%
Epoch: 2   D Loss: 0.995826 G Loss: 1.3685   True Positive Rate: 78.3% True Negative Rate: 96.7%
Epoch: 3   D Loss: 1.0769   G Loss: 1.20336  True Positive Rate: 73.3% True Negative Rate: 73.3%
Epoch: 4   D Loss: 1.11182  G Loss: 1.14009  True Positive Rate: 70.0% True Negative Rate: 85.0%
Epoch: 5   D Loss: 1.11716  G Loss: 1.126    True Positive Rate: 53.3% True Negative Rate: 75.0%
Epoch: 6   D Loss: 1.13535  G Loss: 1.10088  True Positive Rate: 38.3% True Negative Rate: 71.7%
Epoch: 7   D Loss: 1.14134  G Loss: 1.082    True Positive Rate: 51.7% True Negative Rate: 80.0%
Epoch: 8   D Loss: 1.17078  G Loss: 1.03105  True Positive Rate: 61.7% True Negative Rate: 66.7%
Epoch: 9   D Loss: 1.17432  G Loss: 1.02913  True Positive Rate: 41.7% True Negative Rate: 71.7%
