# Aquire dataset

## Download dataset

In [1]:
import os

In [2]:
if not os.path.exists('./logs'):
    os.makedirs('./logs')
if not os.path.exists('./datasets'):
    os.makedirs('./datasets')

아래의 코드를 실행하면 데이터셋 다운로드가 진행됩니다.

오류가 날 경우에는 assignment.ipynb 파일이 있는 위치에서 bash ./download_cyclegan_dataset.sh horse2zebra 명령어를 입력해주세요.

!bash ./download_cyclegan_dataset.sh horse2zebra

# Model definition & Hyperparameter

![cyclegan.png](./cyclegan.png)

In [3]:
import torch.nn as nn
import torch.nn.functional
import torch
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from torchvision.utils import make_grid, save_image
from torch.optim.lr_scheduler import StepLR
from torchsummary import summary

import numpy as np

In [4]:
img_size = 256 # 이미지 사이즈 
channels = 3
ngf = 32 # G channels after first layer
ndf = 64 # D channels after first layer

epochs = 200 # 200번이 충분하지만, 시간단축을 위해 15번으로 조정
batch_size = 4 # batch size
lambda_X = 10
lambda_Y = 10
lambda_identity_X = 0.5
lambda_identity_Y = 0.5
lr = 0.0002 # learning rate
betas = (0.5, 0.999)

mean_init = 0.0
std_init = 0.02

In [5]:
# Cuda stuff
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print("Device is " + str(device) + ".")

Device is cuda.


# CycleGAN Model

In [6]:
# ResidualBlock 설계

class ResidualBlock(nn.Module):
    def __init__(self, c):
        super(ResidualBlock, self).__init__()
        
        block = [nn.ReflectionPad2d(1),
                 nn.Conv2d(c, c, 3, 1, 0),
                 nn.InstanceNorm2d(c),
                 nn.ReLU(),
                 nn.ReflectionPad2d(1),
                 nn.Conv2d(c, c, 3, 1, 0),
                 nn.InstanceNorm2d(c)]
        
        self.block = nn.Sequential(*block)
        
    
    def forward(self, x):
        # Hint : 미리 정의해놓은 Residual Block을 Forward하는 코드를 추가해주세요.
        x = self.block(x)
        return x

In [7]:
# Generator 설계

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        
        # Encoding
        model = []
        model += [nn.ReflectionPad2d(4),
                  nn.Conv2d(3, ngf, 9, 1, 0),
                  nn.InstanceNorm2d(ngf),
                  nn.ReLU()]
        model += [nn.Conv2d(ngf, ngf*2, 4, 2, 1),
                  nn.InstanceNorm2d(ngf*2),
                  nn.ReLU()]
        model += [nn.Conv2d(ngf*2, ngf*4, 4, 2, 1),
                  nn.InstanceNorm2d(ngf*4),
                  nn.ReLU()]
        
        # Transformation
        for i in range(6):
            model += [ResidualBlock(ngf*4)]   # Hint : 채널 수를 그대로 유지하면서 반복시켜주는 residual block
        
        # Decoding
        model += [nn.ConvTranspose2d(ngf*4, ngf*2, 4, 2, 1), # Hint : 줄여준 H * W 를 다시 반대로 늘려주는 과정
                  nn.InstanceNorm2d(ngf*2),
                  nn.ReLU()]
        model += [nn.ConvTranspose2d(ngf*2, ngf, 4, 2, 1), 
                  nn.InstanceNorm2d(ngf),
                  nn.ReLU()]
        model += [nn.ReflectionPad2d(4),
                  nn.Conv2d(ngf, 3, 9, 1, 0),
                  nn.Tanh()]
        
        self.model = nn.Sequential(*model)

    def weight_init(self, mean, std):
        for m in self._modules:
            normal_init(self._modules[m], mean, std)

    def forward(self, x):
        return self.model(x)

In [8]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        
        model = []
        model += [nn.Conv2d(3, ndf, 4, 2, 1),   # outputchannel : ndf, kernel: 4, stride:2 , padding : 1
                  nn.LeakyReLU(0.2)]
        
        in_channels = ndf
        out_channels = ndf*2
        for i in range(2):
            # Hint : 어떤 변수가 input channel이 되고, 어떤 변수가 output channel이 되나요?
            model += [nn.Conv2d(in_channels, out_channels, 4, 2, 1),     
                      nn.InstanceNorm2d(out_channels),
                      nn.LeakyReLU(0.2)]
            # Hint : 매 반복마다 channel 수가 두배가 되도록 하려면?
            in_channels = out_channels           
            out_channels = out_channels*2

        model += [nn.Conv2d(in_channels, out_channels, 4, 1, 1),
                  nn.InstanceNorm2d(out_channels),
                  nn.LeakyReLU(0.2)]
        
        model += [nn.Conv2d(out_channels, 1, 4, 1, 1)]
        
        self.model = nn.Sequential(*model)

    def weight_init(self, mean, std):
        for m in self._modules:
            normal_init(self._modules[m], mean, std)
        
    def forward(self, x):
        return self.model(x)
    
def normal_init(m, mean, std):
    if isinstance(m, nn.ConvTranspose2d) or isinstance(m, nn.Conv2d):
        m.weight.data.normal_(mean, std)
        m.bias.data.zero_()

# Data Load

In [9]:
# Dataset Code

import os
from PIL import Image
import random

class UnallignedDataset(Dataset):
    def __init__(self, root, transform, phase='train'):
        dir_A = os.path.join(root, phase + 'A')
        dir_B = os.path.join(root, phase + 'B')
        
        self.A_paths = [os.path.join(dir_A, f) for f in os.listdir(dir_A)]
        self.B_paths = [os.path.join(dir_B, f) for f in os.listdir(dir_B)]
        self.A_size = len(self.A_paths)
        self.B_size = len(self.B_paths)
        
        self.transform = transform
        
    def __getitem__(self, index):
        A_path = self.A_paths[index % self.A_size]
        B_path = self.B_paths[random.randint(0, self.B_size - 1)]
        
        A_img = Image.open(A_path).convert('RGB')
        B_img = Image.open(B_path).convert('RGB')

        A = self.transform(A_img)
        B = self.transform(B_img)
        return A, B
    
    def __len__(self):
        return max(self.A_size, self.B_size)

In [10]:
# 학습을 돕기 위한 추가 테크닉 (과제를 위해 알아야할 필요는 없음) (참고: https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/issues/75)

class ImagePool():
    def __init__(self, pool_size):
        self.pool_size = pool_size
        self.images = []
        
    def get(self, img):
        if len(self.images) < self.pool_size:
            self.images.append(img)
            return img
        else:
            p = random.random()
            if p > 0.5:
                idx = random.randint(0, self.pool_size-1)
                tmp = self.images[idx]
                self.images[idx] = img
                return tmp
            else:
                return img

# Training

In [11]:
os.getcwd()

'C:\\Users\\ironm\\OneDrive\\GitHub\\Samsung-AI-KAIST\\Assignment_0731'

In [12]:
G = Generator().to(device)
F = Generator().to(device)
D_X = Discriminator().to(device)
D_Y = Discriminator().to(device)
G.weight_init(mean_init, std_init)
F.weight_init(mean_init, std_init)
D_X.weight_init(mean_init, std_init)
D_Y.weight_init(mean_init, std_init)
G.train()
F.train()
D_X.train()
D_Y.train()

root_dir = os.getcwd() # this line is added becasue I use Windows.

transform = transforms.Compose([transforms.Resize(img_size), transforms.CenterCrop(img_size), transforms.ToTensor(), transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))])
train_loader = torch.utils.data.DataLoader(dataset=UnallignedDataset(root_dir+os.sep+'datasets'+os.sep+'horse2zebra', transform), 
                                           batch_size=batch_size, 
                                           shuffle=True, # Hint : dataloader 내의 data 들이 뒤섞여 있기를 바란다면 어떤 옵션을 추가하나요?
                                           pin_memory=True, 
                                          # num_workers=1  # This cause error on Windows
                                          )
test_loader = torch.utils.data.DataLoader(dataset=UnallignedDataset(root_dir+os.sep+'datasets'+os.sep+'horse2zebra', transform, phase='test'), 
                                           batch_size=batch_size, 
                                           shuffle=False, # Hint : dataloader 내의 data 들이 뒤섞여 있기를 바란다면 어떤 옵션을 추가하나요?
                                           pin_memory=True, 
                                          # num_workers=1   # This cause error on Windows
                                         )

X_pool = ImagePool(50)
Y_pool = ImagePool(50)

mse_criterion = nn.MSELoss()
l1_criterion = nn.L1Loss()

GF_optimizer = torch.optim.Adam(list(G.parameters()) + list(F.parameters()), lr=lr, betas=betas)
D_X_optimizer = torch.optim.Adam(D_X.parameters(), lr=lr, betas=betas)
D_Y_optimizer = torch.optim.Adam(D_Y.parameters(), lr=lr, betas=betas)

GF_scheduler = StepLR(GF_optimizer, 1, lr/100.0)
D_X_scheduler = StepLR(D_X_optimizer, 1, lr/100.0)
D_Y_scheduler = StepLR(D_Y_optimizer, 1, lr/100.0)

In [13]:
summary(G, (3, 256, 256))
summary(D_X, (3, 256, 256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
   ReflectionPad2d-1          [-1, 3, 264, 264]               0
            Conv2d-2         [-1, 32, 256, 256]           7,808
    InstanceNorm2d-3         [-1, 32, 256, 256]               0
              ReLU-4         [-1, 32, 256, 256]               0
            Conv2d-5         [-1, 64, 128, 128]          32,832
    InstanceNorm2d-6         [-1, 64, 128, 128]               0
              ReLU-7         [-1, 64, 128, 128]               0
            Conv2d-8          [-1, 128, 64, 64]         131,200
    InstanceNorm2d-9          [-1, 128, 64, 64]               0
             ReLU-10          [-1, 128, 64, 64]               0
  ReflectionPad2d-11          [-1, 128, 66, 66]               0
           Conv2d-12          [-1, 128, 64, 64]         147,584
   InstanceNorm2d-13          [-1, 128, 64, 64]               0
             ReLU-14          [-1, 128,

실습시간에 사용했던 CycleGAN 코드와는 약간 다르지만, Train 코드의 구조는 같습니다. 빈칸으로 뚫어놨던 부분은 실습시간에서도 다뤘던 부분이니 코드를 잘 읽어보고 풀어주세요. (G와 F는 Generator / D_X와 D_Y는 Discriminator입니다.)

In [14]:
def mean(lst):
    return sum(lst)/len(lst)

# Prepare some test data, 5 of each kind
test_data = [(x.to(device), y.to(device)) for i, (x, y) in enumerate(test_loader) if i<5]

# Define target vectors
fake_target = 0.0
real_target = 1.0
for epoch in range(epochs):
    G_gan_loss_epoch = []
    G_cycle_loss_epoch = []
    G_ident_loss_epoch = []
    D_X_gan_loss_epoch = []
    
    # Linear lr decay
    if epoch > 99:
        GF_scheduler.step()
        D_X_scheduler.step()
        D_Y_scheduler.step()
        
    for i, (X, Y) in enumerate(train_loader):
        X = X.to(device)
        Y = Y.to(device)
        #########################################################
        # Update generators
        #########################################################
        GF_optimizer.zero_grad()
        
        # Translate from X to Y, check D_Y output
        G_out = G(X)
        D_Y_out = D_Y(G_out.detach())
        G_gan_loss = mse_criterion(D_Y_out, torch.ones_like(D_Y_out).to(device))
        
        # Translate from Y to X, check D_X output
        F_out = F(Y)
        D_X_out = D_X(F_out.detach())
        F_gan_loss = mse_criterion(D_X_out, torch.ones_like(D_X_out).to(device))
        
        # Translate from X to Y to X, check reconstruction error
        X_recon = F(G_out)
        G_cycle_loss = l1_criterion(X_recon, X) * lambda_X
        
        # Translate from Y to X to Y, check reconstruction error
        Y_recon = G(F_out)
        F_cycle_loss = l1_criterion(Y_recon, Y) * lambda_Y
        
        # Translate a picture from Y from X to Y, should be Y
        Y_ident = G(Y)
        G_ident_loss = l1_criterion(Y_ident, Y) * lambda_identity_X * lambda_X
        
        # Translate a picture from X from Y to X, should be X
        X_ident = F(X)
        F_ident_loss = l1_criterion(X_ident, X) * lambda_identity_X * lambda_Y
        
        # Hint : Generator를 학습시키기 위해 어떤 Loss들을 사용했나요?? (Generator G와 F를 한번에 학습시키는 Loss입니다)
        GF_loss = G_cycle_loss + F_cycle_loss + G_ident_loss + F_ident_loss + G_gan_loss + F_gan_loss 
        GF_loss.backward()
        GF_optimizer.step()
        
        #########################################################
        # Update discriminators
        # D_Y, minimize L_D_Y = E_y (D(y) - 1) ^2 + E_x (D(x))^2
        #########################################################
        D_Y_optimizer.zero_grad()
        
        # Test D_Y with fake and real input
        G_out = Y_pool.get(G_out)
        D_Y_out_fake = D_Y(G_out.detach())
        D_Y_out_real = D_Y(Y)
        # Calculate loss
        D_Y_loss_fake = mse_criterion(D_Y_out_fake, torch.zeros_like(D_Y_out_fake).to(device))
        D_Y_loss_real = mse_criterion(D_Y_out_real, torch.ones_like(D_Y_out_real).to(device))
        D_Y_gan_loss = (D_Y_loss_real + D_Y_loss_fake)*0.5
        
        D_Y_gan_loss.backward() # Hint : back propagation 해주기
        D_Y_optimizer.step() # Hint : optimizer가 한 스텝 나아가기
        
        #########################################################
        # D_X, minimize L_D_X = E_x (D(x) - 1) ^2 + E_y (D(y))^2
        #########################################################
        D_X_optimizer.zero_grad()
        
        # Test D_X with fake and real input
        F_out = X_pool.get(F_out)
        D_X_out_fake = D_X(F_out.detach())
        D_X_out_real = D_X(X)
        # Calculate loss
        D_X_loss_fake = mse_criterion(D_X_out_fake, torch.zeros_like(D_X_out_fake).to(device))
        D_X_loss_real = mse_criterion(D_X_out_real, torch.ones_like(D_X_out_real).to(device))
        D_X_gan_loss = (D_X_loss_real + D_X_loss_fake)*0.5
        
        D_X_gan_loss.backward() # Hint : back propagation 해주기
        D_X_optimizer.step() # Hint : optimizer가 한 스텝 나아가기
        
        # Save losses
        G_gan_loss_epoch.append(G_gan_loss.item())
        G_cycle_loss_epoch.append(G_cycle_loss.item())
        G_ident_loss_epoch.append(G_ident_loss.item())
        D_X_gan_loss_epoch.append(D_X_gan_loss.item())
        
        # Do some test output every 100 batches
        if i % 100 == 0:
            checkname = 'Epoch [%d/%d], Batch [%d/%d]' % (epoch+1, epochs, i, len(train_loader))
            savename = './logs/Epoch%dBatch%d' % (epoch+1, i)
            print(checkname)
            
            image_tensor = None
            # Generate test outputs
            
            with torch.no_grad():
                G.eval()
                F.eval()
                for X, Y in test_data:
                    G_out = G(X)
                    F_out = F(Y)
                    if image_tensor is None:
                        image_tensor = torch.cat((X, G_out, Y, F_out), 0)
                    else:
                        image_tensor = torch.cat((image_tensor, X, G_out, Y, F_out), 0)
                G.train()
                F.train()
            save_image(image_tensor, savename + '.png', nrow=4, padding=50)
            
#             save_image(image_tensor, './i.' nrow=4, padding=2, normalize=True)
#             writer.add_image('test_images', image, i+epoch*len(train_loader))
    
    # Calculate mean
    G_gan_loss_epoch = mean(G_gan_loss_epoch)
    G_cycle_loss_epoch = mean(G_cycle_loss_epoch)
    G_ident_loss_epoch = mean(G_ident_loss_epoch)
    G_loss_epoch = G_gan_loss_epoch + G_cycle_loss_epoch + G_ident_loss_epoch
    D_X_gan_loss_epoch = mean(D_X_gan_loss_epoch)


IndentationError: unexpected indent (<ipython-input-14-ba86301c80e7>, line 125)

In [None]:
torch.save(G.state_dict(), 'G.pt')
torch.save(F.state_dict(), 'F.pt')
torch.save(D_X.state_dict(), 'D_X.pt')
torch.save(D_Y.state_dict(), 'D_Y.pt')

In [None]:
save_image(image_tensor, './i.jpg', nrow=4, padding=2, normalize=True)