<a href="https://colab.research.google.com/github/EngBaz/Generative-AI-GANs/blob/main/Vanilla_GAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CONNECT TO GOOGLE DRIVE

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')
import sys    
path_to_module = '/content/gdrive/MyDrive/Assignment_2_Generative_models'
sys.path.append(path_to_module)

Mounted at /content/gdrive


#IMPORT THE NECESSARY LIBRARIES

---



In [2]:
#Prerequisites
import os
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.utils import save_image
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pylab
import numpy as np

#Device configuration
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


#THE GENERATOR AND DISCRIMINATOR NETWORK

In [3]:
class Generator_Dense(nn.Module):
    def __init__(self, g_latent_input_dim, g_hidden_dim, g_output_dim,use_batch_norm=False):
      super(Generator_Dense, self).__init__()
      self.linear_1 = nn.Linear(g_latent_input_dim, g_hidden_dim)
      self.linear_2 = nn.Linear(g_hidden_dim, g_hidden_dim)
      self.linear_3 = nn.Linear(g_hidden_dim, g_output_dim)

      self.use_batch_norm = use_batch_norm

      self.bnorm1 = nn.BatchNorm1d(g_hidden_dim)
      self.bnorm2 = nn.BatchNorm1d(g_output_dim)

      self.activation_for_hidden = nn.PReLU()
      self.activation_for_last = nn.Tanh()

    def forward(self, x):
      x = self.linear_1(x)
      if self.use_batch_norm:
        x = self.bnorm1(x)
      x = self.activation_for_hidden(x)
      x = self.linear_2(x)
      if self.use_batch_norm:
        x = self.bnorm1(x)
      x = self.activation_for_hidden(x)
      x = self.linear_2(x)
      if self.use_batch_norm:
        x = self.bnorm1(x)
      x = self.activation_for_hidden(x)
      x = self.linear_3(x)
      if self.use_batch_norm:
        x = self.bnorm2(x)
      x = self.activation_for_last(x)
      return x


class Discriminator_Dense(torch.nn.Module):
      def __init__(self, image_size, hidden_size,use_batch_norm=False):
        super(Discriminator_Dense, self).__init__()
        self.linear_1 = nn.Linear(image_size, hidden_size)
        self.linear_2 = nn.Linear(hidden_size, hidden_size)
        self.linear_3 = nn.Linear(hidden_size, 1)
        self.use_batch_norm = use_batch_norm
        self.bnorm1 = nn.BatchNorm1d(hidden_size)
        self.activation_for_hidden = nn.LeakyReLU(0.2)
        self.activation_for_last = nn.Sigmoid()

      def forward(self, x):
        x = self.linear_1(x)
        if self.use_batch_norm:
          x = self.bnorm1(x)
        x = self.activation_for_hidden(x)
        x = self.linear_2(x)
        if self.use_batch_norm:
          x = self.bnorm1(x)
        x = self.activation_for_hidden(x)
        x = self.linear_2(x)
        if self.use_batch_norm:
          x = self.bnorm1(x)
        x = self.activation_for_hidden(x)
        x = self.linear_3(x)
        x = self.activation_for_last(x)
        return x

#HYPERPARAMETERS

In [4]:
#Hyperparameters
latent_size = 128
hidden_size = 256
image_size = 784
num_epochs = 5
batch_size = 32
save_dir = f'/content/gdrive/MyDrive/Assignment_2_Generative_models/dense_network/models_{latent_size}'
generated_images_dir = save_dir + '/generated/'
sample_dir = save_dir + '/saples/'

#CREATE DIRECTORIES IF THEY DON'T EXIST

In [5]:
if not os.path.exists(sample_dir):
    os.makedirs(sample_dir)

if not os.path.exists(save_dir):
    os.makedirs(save_dir)

if not os.path.exists(generated_images_dir):
    os.makedirs(generated_images_dir)

#READ THE DATASET

When reading the dataset, we imediatelly do two things:


1.   Convert the image to tensor
2.   Normalize image to have 0 mean and 1 standard deviation

We use a batchsize of 32.

In [6]:
#Image processing
transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.5),   # 3 for RGB channels
                                     std=(0.5)),
                ])

#Reading the dataset
mnist = torchvision.datasets.MNIST(root='./data/',
                                   transform=transform,
                                   download=True)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 165785593.92it/s]

Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw






Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 41160616.32it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 66078341.68it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 7833276.63it/s]


Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw



#CREATE A DATALOADER

In [7]:
data_loader = torch.utils.data.DataLoader(dataset=mnist,
                                          batch_size=batch_size, 
                                          shuffle=True)

# DEFINE THE GENERATOR AND DISCRIMINATOR MODEL


In [8]:
D = Discriminator_Dense(image_size,hidden_size)
G = Generator_Dense(latent_size,hidden_size,image_size)

#MOVE THE GENERATOR AND DISCRIMINATOR TO GPU


In [9]:
D = D.to(device)
G = G.to(device)

#DEFINE THE OPTIMIZER AND THE LOSS

In [10]:
criterion = nn.BCELoss()
d_optimizer = torch.optim.Adam(D.parameters(), lr=0.0002)
g_optimizer = torch.optim.Adam(G.parameters(), lr=0.0002)

# TRAINING LOOP

Before going in the the main training loop we need to define some utility functions:

---



Here we define the opposite of the normalization, remember that when we first read the dataset, we normalized it (mean=0, std =1), now we do the opposite so that when saving the images we get the correct image.

In [11]:
def denorm(x):
    out = (x + 1) / 2
    return out.clamp(0, 1)

In PyTorch, for every mini-batch during the training phase, we typically want to explicitly set the gradients to zero before starting to do backpropragation (i.e., updating the weights and biases) because PyTorch accumulates the gradients on subsequent backward passes. This accumulating behaviour is convenient while training RNNs or when we want to compute the gradient of the loss summed over multiple mini-batches. So, the default action has been set to accumulate (i.e. sum) the gradients on every loss.backward() call.

Because of this, when you start your training loop, ideally you should zero out the gradients so that you do the parameter update correctly. Otherwise, the gradient would be a combination of the old gradient, which you have already used to update your model parameters, and the newly-computed gradient. It would therefore point in some other direction than the intended direction towards the minimum (or maximum, in case of maximization objectives).

In [12]:
def reset_grad():
    d_optimizer.zero_grad()
    g_optimizer.zero_grad()

Next, we define NumPy arrays for saving the losses and scores, they are initially zero.

In [13]:
d_losses = np.zeros(num_epochs)
g_losses = np.zeros(num_epochs)
real_scores = np.zeros(num_epochs)
fake_scores = np.zeros(num_epochs)

# The `Training` loop

In [14]:
def generate_images(generator_model_dir,number_of_images,save_dir,latent_size = 64,save_combined=False):

  #Loading pretrained model from the drive directory
  if torch.cuda.is_available():
    G = torch.jit.load(generator_model_dir,map_location='cuda')
    G.to(device)
  else:
    G = torch.jit.load(generator_model_dir,map_location='cpu')

  G.eval()

  #Generating random noise to pass it to the Generator
  z = torch.randn(number_of_images, latent_size).to(device)
  z = Variable(z)

  #Creating fake images from the distribution
  fake_images = G(z)
  fake_images = fake_images.view(fake_images.size(0), 1, 28, 28)
  

  #Save sampled images
  fake_images = fake_images.view(fake_images.size(0), 1, 28, 28)
  #Save_image(denorm(fake_images.data), os.path.join(save_dir, 'fake_images-{i}.png'))
  if save_combined:
    torchvision.utils.save_image(denorm(fake_images.data), f'{save_dir}/combined.png')
  for i in range(fake_images.size(0)):
           torchvision.utils.save_image(denorm(fake_images.data)[i, :, :, :], f'{save_dir}/{i}.png')

In [15]:
total_step = len(data_loader)
for epoch in range(num_epochs):
    for i, (images, _) in enumerate(data_loader):
        images = images.view(batch_size, -1).to(device)
        images = Variable(images)
        #Create the labels which are later used as input for the BCE loss
        real_labels = torch.ones(batch_size, 1).to(device)
        real_labels = Variable(real_labels)
        fake_labels = torch.zeros(batch_size, 1).to(device)
        fake_labels = Variable(fake_labels)

       
        #Train Discriminator
        #Compute BCE_Loss using real images where BCE_Loss(x, y) = - y * log(D(x)) - (1-y) * log(1 - D(x))
        #Second term of the loss is always zero since real_labels == 1
        outputs = D(images)
        d_loss_real = criterion(outputs, real_labels)
        real_score = outputs
        
        #Compute BCELoss using fake images
        #First term of the loss is always zero since fake_labels == 0
        z = torch.randn(batch_size, latent_size).to(device)
        z = Variable(z)
        fake_images = G(z)
        outputs = D(fake_images)
        d_loss_fake = criterion(outputs, fake_labels)
        fake_score = outputs
        
        #If D is trained very well, then don't update
        d_loss = d_loss_real + d_loss_fake
        reset_grad()
        d_loss.backward()
        d_optimizer.step()
        

        #Training the Generator
        #Compute loss with fake images
        z = torch.randn(batch_size, latent_size).to(device)
        z = Variable(z)
        fake_images = G(z)
        outputs = D(fake_images)
        
        #Train G to maximize log(D(G(z)) instead of minimizing log(1-D(G(z)))
        g_loss = criterion(outputs, real_labels)
        
  
        reset_grad()
        g_loss.backward()
        g_optimizer.step()
        

        #Adding the losses and scores to the predefind NumPy arrays
        d_losses[epoch] = d_losses[epoch]*(i/(i+1.)) + d_loss.data*(1./(i+1.))
        g_losses[epoch] = g_losses[epoch]*(i/(i+1.)) + g_loss.data*(1./(i+1.))
        real_scores[epoch] = real_scores[epoch]*(i/(i+1.)) + real_score.mean().data*(1./(i+1.))
        fake_scores[epoch] = fake_scores[epoch]*(i/(i+1.)) + fake_score.mean().data*(1./(i+1.))
        
        if (i+1) % 200 == 0:
            print('Epoch [{}/{}], Step [{}/{}], d_loss: {:.4f}, g_loss: {:.4f}, D(x): {:.2f}, D(G(z)): {:.2f}' 
                  .format(epoch, num_epochs, i+1, total_step, d_loss.data, g_loss.data, 
                          real_score.mean().data, fake_score.mean().data))
    
    #Save real images
    if (epoch+1) == 10:
        images = images.view(images.size(0), 1, 28, 28)
        save_image(denorm(images.data), os.path.join(sample_dir, 'real_images.png'))
    
    #Save sampled images
    fake_images = fake_images.view(fake_images.size(0), 1, 28, 28)
    save_image(denorm(fake_images.data), os.path.join(sample_dir, 'fake_images-{}.png'.format(epoch+1)))
    
    #Save and plot statistics
    np.save(os.path.join(save_dir, 'd_losses.npy'), d_losses)
    np.save(os.path.join(save_dir, 'g_losses.npy'), g_losses)
    np.save(os.path.join(save_dir, 'fake_scores.npy'), fake_scores)
    np.save(os.path.join(save_dir, 'real_scores.npy'), real_scores)
    
    plt.figure()
    pylab.xlim(0, num_epochs + 1)
    plt.plot(range(1, num_epochs + 1), d_losses, label='d loss')
    plt.plot(range(1, num_epochs + 1), g_losses, label='g loss')    
    plt.legend()
    plt.savefig(os.path.join(save_dir, 'loss.pdf'))
    plt.close()

    plt.figure()
    pylab.xlim(0, num_epochs + 1)
    pylab.ylim(0, 1)
    plt.plot(range(1, num_epochs + 1), fake_scores, label='fake score')
    plt.plot(range(1, num_epochs + 1), real_scores, label='real score')    
    plt.legend()
    plt.savefig(os.path.join(save_dir, 'accuracy.pdf'))
    plt.close()

    #Save the model at checkpoints
    if (epoch+1) % 10 == 0:
        G_scripted = torch.jit.script(G) # Export to TorchScript
        G_scripted.save(os.path.join(save_dir, 'G_scripted_{}.pt'.format(epoch+1)))
        D_scripted = torch.jit.script(D) # Export to TorchScript
        D_scripted.save(os.path.join(save_dir, 'D_scripted_{}.pt'.format(epoch+1)))


        generate_images(os.path.join(save_dir, 'G_scripted_{}.pt'.format(epoch+1)),batch_size, generated_images_dir, latent_size = latent_size)

        torch.save(G.state_dict(), os.path.join(save_dir, 'G--{}.ckpt'.format(epoch+1)))
        torch.save(D.state_dict(), os.path.join(save_dir, 'D--{}.ckpt'.format(epoch+1)))

#Save the model checkpoints 
torch.save(G.state_dict(), 'G.ckpt')
torch.save(D.state_dict(), 'D.ckpt')

Epoch [0/5], Step [200/1875], d_loss: 0.8771, g_loss: 2.2448, D(x): 0.70, D(G(z)): 0.27
Epoch [0/5], Step [400/1875], d_loss: 0.3744, g_loss: 3.8256, D(x): 0.87, D(G(z)): 0.16
Epoch [0/5], Step [600/1875], d_loss: 0.0497, g_loss: 5.3526, D(x): 0.98, D(G(z)): 0.03
Epoch [0/5], Step [800/1875], d_loss: 0.6924, g_loss: 2.7342, D(x): 0.70, D(G(z)): 0.18
Epoch [0/5], Step [1000/1875], d_loss: 0.2765, g_loss: 5.0606, D(x): 0.93, D(G(z)): 0.16
Epoch [0/5], Step [1200/1875], d_loss: 0.7874, g_loss: 4.8226, D(x): 0.80, D(G(z)): 0.26
Epoch [0/5], Step [1400/1875], d_loss: 2.2872, g_loss: 1.0852, D(x): 0.38, D(G(z)): 0.50
Epoch [0/5], Step [1600/1875], d_loss: 1.7313, g_loss: 1.7404, D(x): 0.47, D(G(z)): 0.30
Epoch [0/5], Step [1800/1875], d_loss: 1.4655, g_loss: 1.3718, D(x): 0.65, D(G(z)): 0.56
Epoch [1/5], Step [200/1875], d_loss: 0.1629, g_loss: 3.1403, D(x): 0.96, D(G(z)): 0.11
Epoch [1/5], Step [400/1875], d_loss: 0.8977, g_loss: 1.9031, D(x): 0.68, D(G(z)): 0.34
Epoch [1/5], Step [600/1875