In [None]:
#import random module
import random

# import torch modules
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils

#import numpy and matplotlib
import numpy as np
import matplotlib.pyplot as plt

#set manual seed for torch and random module
manualSeed = 999
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

# some images files get truncated when converted from image to tensor format
# To avoid error set the load truncated images True
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


In [None]:
# location of dataset images prepared for training
dataroot = "Dataset_location"

# number of worker is the number of threads for loading the images
workers = 4

# set ngpu=1 to use GPU for training or to 0 otherwise
ngpu = 1

#set the device to gpu or cpu 
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")
print(device)

#uncomment and print to view the details of avaliable GPU and avaliable memory
# torch.cuda.get_device_properties(device)
# torch.cuda.max_memory_allocated(device=device)

# set batch size based on GPU memory avaliable
batch_size = 12

# image size and number of channels
image_size = 1536
nc = 3

# shape of noise vector
nz = 100

# number of base feature map in generator and discrimininator
ngf = 32
ndf = 32

# set num_epochs based on batch size and total number of images avaliable
num_epochs = 1666

# torchvision dataset is used to pre load all the images, normalize and to converted to tensors
dataset = dset.ImageFolder(root=dataroot,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.ToTensor(),
                               transforms.Normalize((0.1, 0.1, 0.1), (0.1, 0.1, 0.1)),
                           ]))
# torchvision dataloader is used shuffle the images  and to wrap an iterable for trainig based on batch size
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=workers)



In [None]:
# function to intialize weights of the generator and discriminator 
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
#set the parameter for generator G 
class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
          # Layer 1
            #nn.ConvTranspose2d(input features, output features, kernel size, stride, padding, enable or disable bias),
            nn.ConvTranspose2d(nz, ngf * 32, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 32), # batch normalization layer 
            nn.SELU(inplace=True), # activation layer 
            # Layer 2
            nn.ConvTranspose2d(ngf * 32 , ngf * 16, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ngf * 16),
            nn.SELU(inplace=True),
            # Layer 3
            nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ngf * 8 ),
            nn.SELU(inplace=True),
            # Layer 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.SELU(inplace=True),
            # Layer 5
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.SELU(inplace=True),
            # Layer 6
            nn.ConvTranspose2d(ngf * 2, ngf , 4, 2, 0, bias=False),
            nn.BatchNorm2d(ngf),
            nn.SELU(inplace=True),
            # Layer 7 
            nn.ConvTranspose2d( ngf, nc, 4, 2, 0, bias=False),
            nn.Tanh()   # tanh activation in the last layer 
        )

    def forward(self, input):
        return self.main(input)

In [None]:
# load generator G into GPU memory 
netG = Generator(ngpu).to(device)
if (device.type == 'cuda') and (ngpu > 1):
    netG = nn.DataParallel(netG, list(range(ngpu)))
#intialize weights for generator G
netG.apply(weights_init)
#print model 
print(netG)

In [None]:
#set the parameter for discriminator D
class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # Layer 1
            #nn.Conv2d(input features, output features, kernel size, stride, padding, enable or disable bias),
            nn.Conv2d(nc, ndf, 4, 2, 0, bias=False),
            nn.SELU(inplace=True), # by enabling inplace parameter the tensors are changed directly without making a copy
            # Layer 2
            nn.Conv2d(ndf, ndf * 2, 4, 2, 0, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.SELU(inplace=True),
            # Layer 3
            nn.Conv2d(ndf * 2, ndf * 4, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.SELU(inplace=True),
            # Layer 4
            nn.Conv2d(ndf * 4, ndf * 8, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.SELU(inplace=True),
            # Layer 5
            nn.Conv2d(ndf * 8, ndf * 16, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ndf * 16),
            nn.SELU(inplace=True),
            # Layer 6
            nn.Conv2d(ndf * 16, ndf * 32, 4, 3, 0, bias=False),
            nn.BatchNorm2d(ndf * 32),
            nn.SELU(inplace=True),
            # Layer 7
            nn.Conv2d(ndf * 32, 1, 4, 1, 0, bias=False),
            nn.Sigmoid() # sigmoid activation in the final layer
            
        )

    def forward(self, input):
        return self.main(input)

In [None]:
# load the discriminator into GPU memory
netD = Discriminator(ngpu).to(device)
if (device.type == 'cuda') and (ngpu > 1):
    netD = nn.DataParallel(netD, list(range(ngpu)))
#intilize the weights for discriminator
netD.apply(weights_init)
#print the model
print(netD)

In [None]:
# set the hyperparameters for training

#generator  and discriminator learning rate
lrG = 0.00025
lrD = 0.00015

# setup adam optimizer
beta1 = 0.5
optimizerD = optim.Adam(netD.parameters(), lr=lrD, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lrG, betas=(beta1, 0.999))

#initialize  loss function 
criterion = nn.BCELoss()

# initialize loss vector
fixed_noise = torch.randn(1, nz, 1, 1, device=device)

# labelling for real and fake images
real_label = 0.9
fake_label = 0.1


In [None]:
#function to plot the gradient flow in generator during trainig
def plot_grad_flow(named_parameters):
    ave_grads = []
    layers = []
    for n, p in named_parameters:
        if(p.requires_grad) and ("bias" not in n):
            layers.append(n)
            ave_grads.append(p.grad.abs().mean().cpu().detach().numpy())
    plt.plot(ave_grads, alpha=0.3, color="b")
    plt.hlines(0, 0, len(ave_grads)+1, linewidth=1, color="k" )
    plt.xticks(range(0,len(ave_grads), 1), layers, rotation="vertical")
    plt.xlim(xmin=0, xmax=len(ave_grads))
    plt.xlabel("Layers")
    plt.ylabel("average gradient")
    plt.title("Gradient flow")
    plt.grid(True)

In [None]:
# list to record generator and discriminator losses during training
G_losses = []
D_losses = []
iters = 0

# iterate from 0 to number of epochs
for epoch in range(num_epochs):
    # iterate through the dataloader 
    for i, data in enumerate(dataloader, 0):

        #----------------------Discriminator Training -------------------------------

        # set the discriminator gradients to zero
        netD.zero_grad()
        # load the image to GPU memory 
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)

        # create the real labels for real images
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)

        # forward pass the image through discriminator 
        output = netD(real_cpu).view(-1)

        # calculate the loss of discrimator w.r.to its label
        errD_real = criterion(output, label)

        # calculate and upgrade the gradients
        errD_real.backward()

        # calculate the mean loss of the batch 
        D_x = output.mean().item() #.item() function returns the variable to CPU memory

        # generatre noise vector
        noise = torch.randn(b_size, nz, 1, 1, device=device)

        # forward pass the latent noise vector to generator
        fake = netG(noise)
        
        # labels for fake images
        label.fill_(fake_label)

        #forward pass fake images to discriminator
        output = netD(fake.detach()).view(-1)

        # calculate loss w.r.to image labels
        errD_fake = criterion(output, label)

        # calculate and upgrade the gradients
        errD_fake.backward()

        # calculate the mean loss for the batch
        D_G_z1 = output.mean().item()

        # add loss for real batch and fake batch 
        errD = (errD_real + errD_fake)

        # optimizer setup   
        optimizerD.step()

        #--------------------------------- Generator training ---------------------------------

        # set generator gradients to zero 
        netG.zero_grad()

        # create the labels for real images
        label.fill_(real_label)  

        # forward pass the generatred images to discriminator
        output = netD(fake).view(-1)
 
        # calculate loss for fake batch w.r.to real image labels
        errG = criterion(output, label)

        # calculate and update gradients
        errG.backward()

        #uncomment to plot the gradients flow for generator
        plot_grad_flow(netG.named_parameters())

        # calculate the mean loss for batch of fake images 
        D_G_z2 = output.mean().item()

        # optimizer step
        optimizerG.step()

        # un comment to print used memeory of GPU during training 
        #print(torch.cuda.memory_allocated(device=device)/10**6,"MB / ",torch.cuda.max_memory_allocated(device=device)/10**6,"MB")

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())
        
        # print the losses to analyse during trainig
        if i % 200  ==0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
            print(torch.cuda.memory_allocated(device=device)/10**6,"MB / ",torch.cuda.max_memory_allocated(device=device)/10**6,"MB")
        iters += 1

In [None]:
# save the generator and discriminator model 
torch.save(netG.state_dict(), 'netG.pt')
torch.save(netD.state_dict(), 'netD.pt')
torch.save(optimizerG.state_dict(), 'optimizerG.pt')
torch.save(optimizerD.state_dict(), 'OptimizerD.pt')

# the saved models can be loaded to resume training later 