In [None]:
!nvidia-smi

In [None]:
#Set GPU
import os
import random
os.environ['CUDA_VISIBLE_DEVICES'] = '0' #Set GPU number

In [None]:
#Load Packages
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import imageio
from IPython.display import HTML

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
from torchvision.utils import make_grid

In [None]:
USE_CUDA = torch.cuda.is_available()
print(USE_CUDA)

In [None]:
if not os.path.exists('./checkpoint'):
  os.mkdir('./checkpoint')
if not os.path.exists('./dataset'):
  os.mkdir('./dataset')
if not os.path.exists('./img'):
  os.mkdir('./img')
if not os.path.exists('./img/real'):
  os.mkdir('./img/real')
if not os.path.exists('./img/fake'):
  os.mkdir('./img/fake')

In [None]:
# visualize the first image from the torch tensor
def vis_image(image):
    plt.imshow(image[0].detach().cpu().numpy(),cmap='gray')
    plt.show()


In [None]:
def save_gif(training_progress_images, images):
    '''
        training_progress_images: list of training images generated each iteration
        images: image that is generated in this iteration
    '''
    img_grid = make_grid(images.data)
    img_grid = np.transpose(img_grid.detach().cpu().numpy(), (1, 2, 0))
    img_grid = 255. * img_grid 
    img_grid = img_grid.astype(np.uint8)
    training_progress_images.append(img_grid)
    imageio.mimsave('./img/training_progress.gif', training_progress_images)
    return training_progress_images

In [None]:
# visualize gif file
def plot_gif(training_progress_images, plot_length=10):
    plt.close()
    fig = plt.figure()
    
    total_len = len(training_progress_images)
    for i in range(plot_length):
        im = plt.imshow(training_progress_images[int(total_len/plot_length)*i])
        plt.show()

In [None]:
def save_image_list(dataset, real):
  if real:
    base_path = './img/real'
  else:
    base_path = './img/fake'

  dataset_path = []

  for i in range(len(dataset)):
    save_path = f'{base_path}/image_{i}.png'
    dataset_path.append(save_path)
    vutils.save_image(dataset[i], save_path)

  return base_path

In [None]:
dataroot = './training_blk/'
from PIL import Image
def custom_loader(path):
    with open(path, 'rb') as f:
        img = Image.open(f)
        return img.convert('RGBA')

#dataset = dset.ImageFolder(root = dataroot, loader = custom_loader)
#dataset = dset.ImageFolder(root = dataroot, transform = transforms.Compose([transforms.Resize(64), transforms.CenterCrop(64), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5,0.5))]))
dataset = dset.ImageFolder(root = dataroot, transform = transforms.Compose([transforms.Resize(64), transforms.CenterCrop(64), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5, 0.5), (0.5, 0.5,0.5, 0.5))]), loader = custom_loader)
#dataset = torch.utils.data.TensorDataset(tensor_a, transform = transforms.Compose([transforms.Resize(64), transforms.CenterCrop(64), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5,0.5))]))

dataloader = torch.utils.data.DataLoader(dataset, batch_size = 128, shuffle = True, num_workers = 2)
device = torch.device('cuda:0')

In [None]:
for i, data in enumerate(dataset):
    #print(i, "  Data: ", data)
    print(data[0].shape)


In [None]:
#Intialize convolutional, convolutional-transpose from a Normal distribution (0, 0.02) and batch normalization layers to meet this criteria
def weights_init(m):
  classname = m.__class__.__name__
  if classname.find('Conv') != -1:
    nn.init.normal_(m.weight.data, 0.0, 0.02)
  elif classname.find('BatchNorm') != -1:
    nn.init.normal_(m.weight.data, 1.0, 0.02)
    nn.init.constant_(m.bias.data, 0)

In [None]:
# Num of channels in training image (In this case, RGB-3)
nc = 4
# Size of z latent vector (Size of generator input)
nz = 100
# Size of feature maps in generator
ngf = 64
# Size of feature maps in discriminator
ndf = 64

num_epochs = 150
lr = 0.0002
# Beta1 hyperparam for Adam potimizers
beta1 = 0.5
# Number of GPUs available.
ngpu = 1

In [None]:
######################---------Genrator----------#######################
class Generator(nn.Module):
  def __init__(self, ngpu):
    super(Generator, self).__init__()
    self.ngpu = ngpu
    self.main = nn.Sequential(
        #Input latent Z
        nn.ConvTranspose2d(nz, ngf*8, 4, 1, 0, bias = False),
        nn.BatchNorm2d(ngf*8),
        nn.ReLU(True),
        #1st state (ngf*8 *4 *4)
        nn.ConvTranspose2d(ngf*8, ngf*4, 4, 2, 1, bias = False),
        nn.BatchNorm2d(ngf*4),
        nn.ReLU(True),
        # state size. (ngf*4) x 8 x 8
        nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ngf * 2),
        nn.ReLU(True),
        # state size. (ngf*2) x 16 x 16
        nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ngf),
        nn.ReLU(True),
        #state size(ngf) X 32 X 32
        nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias = False),
        nn.Tanh()
    )
  def forward(self, input):
    output = self.main(input)
    return output
    


In [None]:
######################---------Genrator----------#######################
class Generator(nn.Module):
  def __init__(self, ngpu):
    super(Generator, self).__init__()
    self.ngpu = ngpu
    self.main = nn.Sequential(
        #Input latent Z
        nn.ConvTranspose2d(nz, ngf*16, 4, 1, 0, bias = False),
        nn.BatchNorm2d(ngf*16),
        nn.ReLU(True),
        #0st state (ngf*16 *4 *4)
        nn.ConvTranspose2d(ngf*16, ngf*8, 4, 2, 1, bias = False),
        nn.BatchNorm2d(ngf*8),
        nn.ReLU(True),
        #1st state (ngf*8 *8 *8)
        nn.ConvTranspose2d(ngf*8, ngf*4, 4, 2, 1, bias = False),
        nn.BatchNorm2d(ngf*4),
        nn.ReLU(True),
        # state size. (ngf*4) x 16 x 16
        nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ngf * 2),
        nn.ReLU(True),
        # state size. (ngf*2) x 32  x 32
        nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ngf),
        nn.ReLU(True),
        #state size(ngf) X 64 X 64
        nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias = False),
        nn.Tanh()
    )
  def forward(self, input):
    output = self.main(input)
    return output

In [None]:
##Create Generator
netG = Generator(ngpu).cuda()

if (device.type == 'cuda') and (ngpu>1):
  netG = nn.DataParallel(netG, list(range(ngpu)))
netG.apply(weights_init)

print(netG)

In [None]:
######################---------Discriminator----------#######################

class Discriminator(nn.Module):
  def __init__(self, ngpu):
    super(Discriminator, self).__init__()
    self.ngpu = ngpu
    self.main = nn.Sequential(
        # nc * 64 * 64 is input from training set & Generator
        nn.Conv2d(nc, ndf, 4, 2, 1, bias = False),
        nn.LeakyReLU(0.2, inplace = True),
         # state size. (ndf) x 32 x 32
        nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 2),
        nn.LeakyReLU(0.2, inplace=True),
        # state size. (ndf*2) x 16 x 16
        nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 4),
        nn.LeakyReLU(0.2, inplace=True),
        # state size. (ndf*4) x 8 x 8
        nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 8),
        nn.LeakyReLU(0.2, inplace=True),
        # state size , (ndf*8) *4 *4
        nn.Conv2d(ndf*8, 1, 4, 1, 0, bias = False),
        nn.Sigmoid()
    )
  def forward(self, input):
    output = self.main(input)
    return output.view(-1, 1).squeeze(1)


In [None]:
######################---------Discriminator----------#######################
class Discriminator(nn.Module):
  def __init__(self, ngpu):
    super(Discriminator, self).__init__()
    self.ngpu = ngpu
    self.main = nn.Sequential(
        # nc * 128 * 128 is input from training set & Generator
        nn.Conv2d(nc, ndf, 4, 2, 1, bias = False),
        nn.LeakyReLU(0.2, inplace = True),
         # state size. (ndf) x 64 x 64
        nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 2),
        nn.LeakyReLU(0.2, inplace=True),
        # state size. (ndf*2) x 32 x 32
        nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 4),
        nn.LeakyReLU(0.2, inplace=True),
        # state size. (ndf*4) x 16 x 16
        nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 8),
        nn.LeakyReLU(0.2, inplace=True),
        #state size. (ndf*8) x 8 x8
        nn.Conv2d(ndf * 8, ndf * 16, 4, 2, 1, bias=False),
        nn.BatchNorm2d(ndf * 16),
        nn.LeakyReLU(0.2, inplace=True),
        # state size , (ndf*16) *4 *4
        nn.Conv2d(ndf*16, 1, 4, 1, 0, bias = False),
        nn.Sigmoid()
    )
  def forward(self, input):
    output = self.main(input)
    return output.view(-1, 1).squeeze(1)


In [None]:
## Create Discriminator
netD = Discriminator(ngpu).cuda()

if (device.type == 'cuda') and (ngpu>1):
  netD = nn.DataParallel(netD, list(range(ngpu)))

netD.apply(weights_init)

print(netD)

In [None]:
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1InzR1qylS3Air4IvpS9CoamqJ0r9bqQg' -O inception.py

In [None]:
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1AtTxnuasIaSTTmI9MI7k8ugY8KJ1cw3Y' -O fid_score.py

In [None]:
from fid_score import calculate_fid_given_paths  # The code is downloaded from github

In [None]:
print(device)

In [None]:
#BCELoss function
criterion = nn.BCELoss()

# Get Z from random noise
fixed_noise = torch.randn(64, nz, 1, 1).cuda()

#Set real or fake label
real_label = 1
fake_label = 0

#Setup Optimizer
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas = (beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr = lr, betas = (beta1, 0.999))

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
#Training Loop

img_list = []
G_losses = []
D_losses = []
training_progress_images_list = []
iters = 0

print("Start Training!!!")

for epoch in range(num_epochs):
  for i, data in enumerate(dataloader, 0):

    # Set Initiating grad
    netD.zero_grad()
    real_cpu = data[0].cuda()
    b_size = real_cpu.size(0)
    label = torch.full((b_size,), real_label, dtype=torch.float).cuda()
    output = netD(real_cpu)
    errD_real = criterion(output, label)
    errD_real.backward()
    D_x = output.mean().item()

 
    #Train with fake batch
    noise = torch.randn(b_size, nz, 1, 1).cuda()
    #Generate Fake image by G
    fake = netG(noise)
    label.fill_(fake_label)
    # Discriminate fake images by D
    output = netD(fake.detach())
    #Calculate loss
    errD_fake = criterion(output,label)
    errD_fake.backward()
    D_G_z1 = output.mean().item()
    
    #Add gradient from all real & all fake batches: log(D(x)) + (1-log(D(G(z))))
    errD = errD_real+ errD_fake
    # Update D
    optimizerD.step()

    #Train G
    netG.zero_grad()
    label.fill_(real_label)
    # Following updated result of D, G tries to learn cheating D
    output = netD(fake)
    errG = criterion(output, label)
    errG.backward()
    D_G_z2 = output.mean().item()
    #Update G

    optimizerG.step()

    if i % 300 == 0:
      print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'% (epoch, num_epochs, i, len(dataloader), errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
      fake = netG(fixed_noise)
      training_progress_images_list = save_gif(training_progress_images_list, fake)  # Save fake image while training!
      
      # Check pointing for every epoch
      torch.save(netG.state_dict(), './checkpoint/netG_epoch_%d.pth' % (epoch))
      torch.save(netD.state_dict(), './checkpoint/netD_epoch_%d.pth' % (epoch))

In [None]:
dataroot = './training_blk/'

test_dataset = dset.ImageFolder(root= dataroot,
                                           transform=transforms.Compose(
                                           [transforms.Resize(64), transforms.CenterCrop(64),
                                            transforms.ToTensor(),
                                            transforms.Normalize((0.5, 0.5, 0.5, 0.5), (0.5, 0.5,0.5, 0.5))
                        ]), loader = custom_loader )

dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=True, num_workers=2)

for i, (data, _) in enumerate(dataloader):
    real_dataset = data
    break
    
noise = torch.randn(1000, nz, 1, 1).cuda()
fake_dataset = netG(noise)

In [None]:
real_image_path_list = save_image_list(real_dataset, True)
fake_image_path_list = save_image_list(fake_dataset, False)

In [None]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
#%%capture
fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())