<a href="https://colab.research.google.com/github/Kabanosk/ml/blob/main/wallpaper-generation-using-GAN-training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#%matplotlib inline
import argparse
import os
import random
import time

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import pickle
from mpl_toolkits.axes_grid1 import ImageGrid

manualSeed = 999
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

Random Seed:  999


<torch._C.Generator at 0x7fc9241ecc30>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### First we must prepare the data

In [None]:
dataroot = "drive/MyDrive/input"
t = transforms.ToPILImage()

workers = 2

batch_size = 256

image_size = 64

nc = 3

nz = 100

ngf = 64

ndf = 64

num_epochs = 5000

lr = 0.0002

beta1 = 0.5

ngpu = 1

In [None]:
dataset = dset.ImageFolder(root=dataroot,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=workers)

device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

# real_batch = next(iter(dataloader))
# plt.figure(figsize=(8,8))
# plt.axis("off")
# plt.title("Training Images")
# plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=2, normalize=True).cpu(),(1,2,0)))

### Weight initialization
##### From the DCGAN paper (Normal distribution with mean=0, stdev=0.2)

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

### Making a Deep Convolutional Generative Adversarial Network
##### Let's start with the Generator model

In [None]:
class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.ConvTranspose2d( nz, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),

            # nn.ConvTranspose2d(ngf * 8, ngf * 8, 4, 2, 1, bias=False),
            # nn.BatchNorm2d(ngf * 4),
            # nn.ReLU(True),

            # nn.ConvTranspose2d(ngf * 8, ngf * 8, 4, 2, 1, bias=False),
            # nn.BatchNorm2d(ngf * 4),
            # nn.ReLU(True),

            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),

            nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),

            nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            nn.ConvTranspose2d( ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        return self.main(input)


##### Create the Generator model

In [None]:
netG = Generator(ngpu).to(device)

if (device.type == 'cuda') and (ngpu > 1):
    netG = nn.DataParallel(netG, list(range(ngpu)))

netG.apply(weights_init)

print(netG)

Generator(
  (main): Sequential(
    (0): ConvTranspose2d(100, 512, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
    (9): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU(inplace=True)
    (12): ConvTranspose2d(64, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (13): Tanh()
  )
)


##### Then we must make the Discriminator model

In [None]:
class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.5),

            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.5),

            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.2),

            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)

##### Let's create it

In [None]:
netD = Discriminator(ngpu).to(device)

if (device.type == 'cuda') and (ngpu > 1):
    netD = nn.DataParallel(netD, list(range(ngpu)))

netD.apply(weights_init)

print(netD)

Discriminator(
  (main): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Dropout(p=0.5, inplace=False)
    (6): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Dropout(p=0.5, inplace=False)
    (10): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (11): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): LeakyReLU(negative_slope=0.2, inplace=True)
    (13): Dropout(p=0.2, inplace=False)
    (14)

### Loss functions and Optimizers
##### I will use Binary Cross Entropy loss function and Adam optimizer

In [None]:
criterion = nn.BCELoss()

fixed_noise = torch.randn(256, nz, 1, 1, device=device)

real_label = 1.
fake_label = 0.

### Get changes from last training
##### Getting networks, optimizers and num_of_epochs

In [None]:
netG = pickle.load(open('drive/MyDrive/saved_models/d50netG.pkl', 'rb'))
netD = pickle.load(open('drive/MyDrive/saved_models/d50netD.pkl', 'rb'))

optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

with open('drive/MyDrive/saved_models/current_epoch.txt', 'r') as f:
    current_epoch = f.read()

### Training

In [None]:
print("Starting Training Loop...")
t0 = time.time()

for epoch in range(int(current_epoch), num_epochs):
    for i, data in enumerate(dataloader, 0):

        netD.zero_grad()

        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)

        output = netD(real_cpu).view(-1)

        errD_real = criterion(output, label)

        errD_real.backward()
        D_x = output.mean().item()

        noise = torch.randn(b_size, nz, 1, 1, device=device)


        fake = netG(noise)
        label.fill_(fake_label)

        output = netD(fake.detach()).view(-1)

        errD_fake = criterion(output, label)

        errD_fake.backward()
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake
        optimizerD.step()

        netG.zero_grad()
        label.fill_(real_label)

        output = netD(fake).view(-1)

        errG = criterion(output, label)

        errG.backward()
        D_G_z2 = output.mean().item()
        optimizerG.step()

        if i % 5 == 0 or i == len(dataloader):
            t1 = time.time()
            print(f'[{epoch}/{num_epochs}][{i}/{len(dataloader)}]'
                  f'\tLoss_D: {round(errD.item(), 4)}\tLoss_G: {round(errG.item(), 4)}'
                  f'\tD(x): {round(D_x, 2)}\tD(G(z)): {round(D_G_z1, 2)} \t {round(t1-t0, 2)} s')
            t0 = time.time()

        if  epoch % 10 == 0 and i == 0:
            # saving state
            pickle.dump(netG, open('drive/MyDrive/saved_models/d50netG.pkl', 'wb'))
            pickle.dump(netD, open('drive/MyDrive/saved_models/d50netD.pkl', 'wb'))

            with open('drive/MyDrive/saved_models/current_epoch.txt', 'w') as f:
                f.write(str(epoch))

            print(
                f"\n########################################\n"
                f"###   State was saved on {epoch} epoch  ###\n"
                f"########################################\n"
            )

            # saving picture generated by network at current epoch
            noise = torch.randn(1, nz, 1, 1, device=device)
            fake = netG(noise).detach().cpu()[0]
            t(fake).save(f'drive/MyDrive/output/fake_epoch_{int(epoch)}.jpg')


Starting Training Loop...
[2290/5000][0/38]	Loss_D: 0.1713	Loss_G: 2.9524	D(x): 0.99	D(G(z)): 0.15 	 188.38 s

########################################
###   State was saved on 2290 epoch  ###
########################################

[2290/5000][5/38]	Loss_D: 0.1233	Loss_G: 2.4482	D(x): 0.96	D(G(z)): 0.06 	 3.48 s
[2290/5000][10/38]	Loss_D: 0.1567	Loss_G: 2.676	D(x): 0.96	D(G(z)): 0.1 	 7.25 s
[2290/5000][15/38]	Loss_D: 0.2781	Loss_G: 2.1147	D(x): 1.0	D(G(z)): 0.23 	 8.61 s
[2290/5000][20/38]	Loss_D: 0.3319	Loss_G: 2.0649	D(x): 1.0	D(G(z)): 0.23 	 9.12 s
[2290/5000][25/38]	Loss_D: 0.0788	Loss_G: 2.5144	D(x): 0.98	D(G(z)): 0.05 	 6.83 s
[2290/5000][30/38]	Loss_D: 0.0771	Loss_G: 3.1203	D(x): 0.98	D(G(z)): 0.05 	 9.62 s
[2290/5000][35/38]	Loss_D: 0.2501	Loss_G: 1.8088	D(x): 0.93	D(G(z)): 0.14 	 7.26 s
[2291/5000][0/38]	Loss_D: 0.0705	Loss_G: 2.2624	D(x): 1.0	D(G(z)): 0.06 	 6.18 s
[2291/5000][5/38]	Loss_D: 0.179	Loss_G: 1.8633	D(x): 1.0	D(G(z)): 0.16 	 6.15 s
[2291/5000][10/38]	Loss_D: 0

In [None]:
pickle.dump(netG, open(f'drive/MyDrive/saved_models/model_{batch_size}_{num_epochs}_d50netG.pkl', 'wb'))

In [None]:
n = 1024

t = transforms.ToPILImage()
fixed_noise = torch.randn(n, nz, 1, 1, device=device)
for i in range(n):
    fake = netG(fixed_noise).detach().cpu()[i]
    t(fake).resize((640, 400)).save(f'drive/MyDrive/output/fake_{i}.jpg')

In [None]:
images = []
fixed_noise = torch.randn(64, nz, 1, 1, device=device)
for i in range(64):
    fake = netG(fixed_noise).detach().cpu()[i]
    images.append(t(fake))

In [None]:
fig = plt.figure(figsize=(8, 8))

grid = ImageGrid(fig, 111,
                 nrows_ncols=(8, 8),
                 axes_pad=0.1)

for ax, im in zip(grid, images):
    ax.axis('off')
    ax.imshow(im)

plt.show()

In [None]:
model = pickle.load(open(f'drive/MyDrive/saved_models/model_256_1000.pkl', 'rb'))

In [None]:
fake = model(fixed_noise).detach().cpu()[0]
t(fake)