# Downloading cat dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
cd drive/My\ Drive/gan_cats

/content/drive/My Drive/gan_cats


In [None]:
cd Setting up the data

/content/drive/My Drive/gan_cats/Deep-learning-with-cats/Setting up the data


In [None]:
!sh setting_up_script.sh
mv cats_bigger_than_64x64 "../Dataset/cats_64x64"
mv cats_bigger_than_128x128 "../Dataset/cats_128x128"

# WGAN_GP

## Installing and importing libraries

In [None]:
!pip install tensorboardX

In [None]:
from PIL import Image
from torch.utils import data

from IPython import display
import torch
from torch.autograd import Variable
from torch import autograd
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import pandas as pd
import torchvision
from torchvision import datasets, models, transforms 
from utils import Logger
from torchvision import utils
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import time
import os
import copy


In [None]:
base_dir = "output_folder"
if (not os.path.exists(base_dir)):
   os.mkdir(base_dir)
   logs_dir = f"{base_dir}/logs"
   os.mkdir(logs_dir)
   os.mkdir(f"{base_dir}/images")
   os.mkdir(f"{base_dir}/models")
logs_dir = f"{base_dir}/logs"
log_output = open(f"{logs_dir}/log.txt", 'w')

In [None]:
logger = Logger(logs_dir)

In [None]:
use_gpu = torch.cuda.is_available()
device = torch.device("cuda:0" if use_gpu else "cpu")

print("Torch version: ", torch.__version__)
print("GPU Available: {}".format(use_gpu))

Torch version:  1.5.1+cu101
GPU Available: True


In [None]:
transform = transforms.Compose([
	transforms.Resize((64,64)),
	transforms.ToTensor(),
	transforms.Normalize(mean = [0.5, 0.5, 0.5], std = [0.5, 0.5, 0.5])
])

In [None]:
input_folder = "Dataset"
data1 = datasets.ImageFolder(root=input_folder, transform=transform)

In [None]:
batch_size=64
def generate_random_sample():
	while True:
		random_indexes = np.random.choice(data1.__len__(), size=batch_size, replace=False)
		batch = [data1[i][0] for i in random_indexes]
		yield torch.stack(batch, 0)
random_sample = generate_random_sample()

## Model

In [None]:
class Generator(torch.nn.Module):
    def __init__(self, channels):
        super().__init__()
        # Filters [1024, 512, 256]
        # Input_dim = 100
        # Output_dim = C (number of channels)
        self.main_module = nn.Sequential(
            # Z latent vector 100
            nn.ConvTranspose2d(in_channels=100, out_channels=1024, kernel_size=4, stride=1, padding=0),
            nn.BatchNorm2d(num_features=1024),
            nn.ReLU(True),

            # State (1024x4x4)
            nn.ConvTranspose2d(in_channels=1024, out_channels=512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(num_features=512),
            nn.ReLU(True),

            # State (512x8x8)
            nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(num_features=256),
            nn.ReLU(True),

            # State (256x16x16)
            nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(num_features=128),
            nn.ReLU(True),

            # State (128x32x32)
            nn.ConvTranspose2d(in_channels=128, out_channels=channels, kernel_size=4, stride=2, padding=1))
            # output of main module --> Image (Cx64x64)

        self.output = nn.Tanh()

    def forward(self, x):
        x = self.main_module(x)
        return self.output(x)

In [None]:
class Generator(torch.nn.Module):
    def __init__(self, channels):
        super().__init__()
        # Filters [1024, 512, 256]
        # Input_dim = 100
        # Output_dim = C (number of channels)
        self.main_module = nn.Sequential(
            # Z latent vector 100
            nn.ConvTranspose2d(in_channels=100, out_channels=1024, kernel_size=4, stride=1, padding=0),
            nn.SELU(inplace=True),

            # State (1024x4x4)
            nn.ConvTranspose2d(in_channels=1024, out_channels=512, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),

            # State (512x8x8)
            nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),

            # State (256x16x16)
            nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),

            # State (128x32x32)
            nn.ConvTranspose2d(in_channels=128, out_channels=channels, kernel_size=4, stride=2, padding=1))
            # output of main module --> Image (Cx64x64)

        self.output = nn.Tanh()

    def forward(self, x):
        x = self.main_module(x)
        return self.output(x)

In [None]:
class Discriminator(torch.nn.Module):
    def __init__(self, channels):
        super().__init__()
        # Filters [256, 512, 1024]
        # Input_dim = channels (Cx64x64)
        # Output_dim = 1
        self.main_module = nn.Sequential(
            # Omitting batch normalization in critic because our new penalized training objective (WGAN with gradient penalty) is no longer valid
            # in this setting, since we penalize the norm of the critic's gradient with respect to each input independently and not the enitre batch.
            # There is not good & fast implementation of layer normalization --> using per instance normalization nn.InstanceNorm2d()
            # Image (Cx64x64)
            nn.Conv2d(in_channels=channels, out_channels=128, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(128, affine=True),
            nn.LeakyReLU(0.2, inplace=True),

            # State (128x32x32)
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(256, affine=True),
            nn.LeakyReLU(0.2, inplace=True),
            
            # State (256x16x16)
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(512, affine=True),
            nn.LeakyReLU(0.2, inplace=True),

            # State (512x8x8)
            nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(1024, affine=True),
            nn.LeakyReLU(0.2, inplace=True))
            # output of main module --> State (1024x4x4)

        self.output = nn.Sequential(
            # The output of D is no longer a probability, we do not apply sigmoid at the output of D.
            nn.Conv2d(in_channels=1024, out_channels=1, kernel_size=4, stride=1, padding=0))


    def forward(self, x):
        x = self.main_module(x)
        return self.output(x)

    def feature_extraction(self, x):
        # Use discriminator for feature extraction then flatten to vector of 16384
        x = self.main_module(x)
        return x.view(-1, 1024*4*4)

In [None]:
class Discriminator(torch.nn.Module):
    def __init__(self, channels):
        super().__init__()
        # Filters [256, 512, 1024]
        # Input_dim = channels (Cx64x64)
        # Output_dim = 1
        self.main_module = nn.Sequential(
            # Omitting batch normalization in critic because our new penalized training objective (WGAN with gradient penalty) is no longer valid
            # in this setting, since we penalize the norm of the critic's gradient with respect to each input independently and not the enitre batch.
            # There is not good & fast implementation of layer normalization --> using per instance normalization nn.InstanceNorm2d()
            # Image (Cx64x64)
            nn.Conv2d(in_channels=channels, out_channels=128, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),

            # State (128x32x32)
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),
            
            # State (256x16x16)
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True),

            # State (512x8x8)
            nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=4, stride=2, padding=1),
            nn.SELU(inplace=True))
            # output of main module --> State (1024x4x4)

        self.output = nn.Sequential(
            # The output of D is no longer a probability, we do not apply sigmoid at the output of D.
            nn.Conv2d(in_channels=1024, out_channels=1, kernel_size=4, stride=1, padding=0))


    def forward(self, x):
        x = self.main_module(x)
        return self.output(x)

    def feature_extraction(self, x):
        # Use discriminator for feature extraction then flatten to vector of 16384
        x = self.main_module(x)
        return x.view(-1, 1024*4*4)

In [None]:
def calculate_gradient_penalty( real_images, fake_images):
        eta = torch.FloatTensor(batch_size,1,1,1).uniform_(0,1)
        eta = eta.expand(batch_size, real_images.size(1), real_images.size(2), real_images.size(3))
        eta = eta.to(device)
        
        interpolated = eta * real_images + ((1 - eta) * fake_images)
        interpolated = interpolated.to(device)

        interpolated = Variable(interpolated, requires_grad=True)

        # calculate probability of interpolated examples
        out_interpolated = discriminator(interpolated)

        # calculate gradients of probabilities with respect to examples
        gradients = autograd.grad(outputs=out_interpolated, inputs=interpolated,
                                             grad_outputs=torch.ones(out_interpolated.size()).to(device),
                                            create_graph=True, retain_graph=True)[0]

        grad_penalty = ((gradients.norm(2, dim=1).norm(2, dim=1).norm(2, dim=1) - 1) ** 2).mean() * lambda_gp
        return grad_penalty

In [None]:
generator = Generator(3)
discriminator = Discriminator(3)

G_load = 'output_folder/models/G_18001.pth'
D_load = 'output_folder/models/D_18001.pth'

if G_load != '':
  	generator.load_state_dict(torch.load(G_load))
if D_load != '':
	discriminator.load_state_dict(torch.load(D_load))

generator.to(device)
discriminator.to(device)

n_iters = 15000
critic_iter = 5
lambda_gp = 10
b1 = 0.5
b2 = 0.999
lr = 1e-4
batch_size = 64

optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(b1, b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))

In [None]:
import time
start = time.time()

one = torch.tensor(1, dtype=torch.float)
one_neg = one * -1

one = one.to(device)
one_neg = one_neg.to(device)

for g_iter in range(12851,n_iters):
    for p in discriminator.parameters():
                p.requires_grad = True
   
    for d_iter in range(critic_iter):
                discriminator.zero_grad()

                images = random_sample.__next__()
                images = Variable(images).to(device)          

                d_loss_real = discriminator(images)
                d_loss_real = d_loss_real.mean()
                d_loss_real.backward(one_neg)

                # Train with fake images
                z = torch.rand((batch_size, 100, 1, 1))
                z = Variable(z).to(device)

                fake_images = generator(z)
                d_loss_fake = discriminator(fake_images)
                d_loss_fake = d_loss_fake.mean()
                d_loss_fake.backward(one)

                # Train with gradient penalty
                gradient_penalty = calculate_gradient_penalty(images.data, fake_images.data)
                gradient_penalty.backward()


                d_loss = d_loss_fake - d_loss_real + gradient_penalty
                Wasserstein_D = d_loss_real - d_loss_fake
                optimizer_D.step()
                print(f'  Discriminator iteration: {d_iter + 1}/{critic_iter}, loss_fake: {d_loss_fake}, loss_real: {d_loss_real}')

    # Generator update
    for p in discriminator.parameters():
                p.requires_grad = False  # to avoid computation

    generator.zero_grad()
    z = Variable(torch.randn(batch_size, 100, 1, 1)).to(device)
    fake_images = generator(z)
    g_loss = discriminator(fake_images)
    g_loss = g_loss.mean()
    g_loss.backward(one_neg)
    g_cost = -g_loss
    optimizer_G.step()

    display.clear_output(True)
    logger.log(Wasserstein_D.item(), d_loss.item(), g_loss.item(), g_iter + 1)

    #log_value('errD', Wasserstein_D.item(), g_iter)
    #log_value('errD_penalty', d_loss.item(), g_iter)
    #log_value('errG', g_loss.item(), g_iter)
    print(f' iteration: {g_iter + 1}/{n_iters}')
    print(f' Discriminator :loss_fake: {d_loss_fake}, loss_real: {d_loss_real}    Generator :  g_loss: {g_loss}')
    
    z = Variable(torch.randn(batch_size, 100, 1, 1)).to(device)
    samples = generator(z)
    samples = samples.mul(0.5).add(0.5)
    samples = samples.data.cpu()[:16]
    grid = utils.make_grid(samples)
    logger.log_images(samples, 16, g_iter + 1)
                
    if (g_iter) % 50 == 0:
          torch.save(generator.state_dict(), '%s/models/G_%d.pth' % ("output_folder",g_iter+1))
          torch.save(discriminator.state_dict(), '%s/models/D_%d.pth' % ("output_folder", g_iter+1))


end = time.time()
print('Time of training-{}'.format((end - start)))

In [None]:
torch.save(generator.state_dict(), '%s/models/G_%d.pth' % ("output_folder",g_iter+1))
torch.save(discriminator.state_dict(), '%s/models/D_%d.pth' % ("output_folder", g_iter+1))