<a href="https://colab.research.google.com/github/KaveeshBaddage/DataScienceImpl/blob/main/Generative_Adversarial_Network_(GAN)_for_MNIST_219315E.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import required libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.utils import save_image
from torchvision.transforms import ToTensor
from torchvision import datasets, transforms

import numpy as np

# to save Generator, Discriminator
import pickle as pkl

#to create and download the zipped files
import shutil
from google.colab import files

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load MNIST train and test data 


In [2]:
# define the data transformer
transform = transforms.Compose([transforms.ToTensor(),
  transforms.Normalize((0.5,), (0.5,))
])

# get train dataset
train_dataset = datasets.MNIST(
    root='data/',
    train=True,
    download=True,
    transform=transform
    )  
print("train_dataset size ->",train_dataset.data.size())

# Dataset stores the samples and their corresponding labels, and DataLoader wraps an iterable around the Dataset to enable easy access to the samples.
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset, 
    batch_size=100,
    shuffle=True
    # num_workers=0
    )

# get test dataset
test_dataset = datasets.MNIST(
    root='data/', 
    train=False, 
    transform=transform
    )
print("test_dataset size ->",test_dataset.data.size())

#DataLoader wraps an iterable around the test_dataset to enable easy access to the samples.
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset, 
    batch_size=100,
    # num_workers=0
    shuffle=True
    )

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=9912422.0), HTML(value='')))


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=28881.0), HTML(value='')))


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=1648877.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=4542.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw

train_dataset size -> torch.Size([60000, 28, 28])
test_dataset size -> torch.Size([10000, 28, 28])


  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


# Define Discriminator and Generator classes

In [6]:
# defining Discriminator class

class Discriminator(nn.Module):

    def __init__(self, input_size, d_output_size):

        # access methods from parent class nn.Module
        super(Discriminator, self).__init__()
        
        # Hidden linear layers
        self.fc1 = nn.Linear(input_size, 1024)
        self.fc2 = nn.Linear(self.fc1.out_features, self.fc1.out_features//2)
        self.fc3 = nn.Linear(self.fc2.out_features, self.fc2.out_features//2)
        
        # Final output layer
        self.fc4 = nn.Linear(self.fc3.out_features, d_output_size)
        
    def forward(self, x):
        # flatten image
        x = x.view(-1, 28*28) #  reshape the tensor to 28*28 tensor
        # all hidden layers
        x = F.leaky_relu(self.fc1(x), 0.2) # (input, negative_slope=0.2) This function returns x if it receives any positive input, but for any negative value of x, it returns a really small value which is 0.02 times x
        x = F.dropout(x, 0.3) #  “deactivates” few neurons in the neural network randomly in the probability  in order to avoid the problem of overfitting.
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.dropout(x, 0.3)
        x = F.leaky_relu(self.fc3(x), 0.2)
        x = F.dropout(x, 0.3)
        # final layer
        output = torch.sigmoid(self.fc4(x)) # squishes any real number into a range between 0 and 1


        return output

In [4]:
# defining generator class 1

class Generator(nn.Module):

    def __init__(self, input_size, g_output_size):
        super(Generator, self).__init__()
        
        # Hidden linear layers
        self.fc1 = nn.Linear(input_size, 256)
        self.fc2 = nn.Linear(self.fc1.out_features, self.fc1.out_features*2)
        self.fc3 = nn.Linear(self.fc2.out_features, self.fc1.out_features*2)
        
        # Final output layer
        self.fc4 = nn.Linear(self.fc3.out_features, g_output_size)

    def forward(self, x):
        # all hidden layers
        x = F.leaky_relu(self.fc1(x), 0.2) # (input, negative_slope=0.2)
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.leaky_relu(self.fc3(x), 0.2)
        output = torch.tanh(self.fc4(x)) #Returns a new tensor with the hyperbolic tangent of the elements of self.fc4(x)

        return output

# Define Hyper Parameters and initiate the Discriminator and Generator

In [7]:

# Discriminator hyperparams

# Size of input image to Discriminator (28*28)
input_size = 784
# Size of discriminator output for real or fake images
d_output_size = 1


# Generator hyperparams

#Generator input  latent vector size
z_size = 100
# Discriminator output size
g_output_size = 784



D = Discriminator(input_size, d_output_size).to(device)
G = Generator(z_size, g_output_size).to(device)

print(D)
print(G)

Discriminator(
  (fc1): Linear(in_features=784, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (fc4): Linear(in_features=256, out_features=1, bias=True)
)
Generator(
  (fc1): Linear(in_features=100, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=512, bias=True)
  (fc4): Linear(in_features=512, out_features=784, bias=True)
)


In [8]:
# Measures the Binary Cross Entropy between the target and the output
criterion = nn.BCELoss() 

# Set learning rate and optimizers
lr = 0.0002 
G_optimizer = optim.Adam(G.parameters(), lr = lr)
D_optimizer = optim.Adam(D.parameters(), lr = lr)

# Define model training functions

In [9]:
def Train_Discriminator(x):
    
    # clear out the gradients of all Variables since no need to keep the gradient calculation result of the previous batch
    D.zero_grad()

    # train discriminator on real
    x_real, y_real = x.view(-1, 28*28), torch.ones(100, 1)
    x_real, y_real = Variable(x_real.to(device)), Variable(y_real.to(device))

    D_output = D(x_real)
    D_real_loss = criterion(D_output, y_real)
    D_real_score = D_output

    # train discriminator on facke
    z = Variable(torch.randn(100, 100).to(device))
    x_fake, y_fake = G(z), Variable(torch.zeros(100, 1).to(device))

    D_output = D(x_fake)
    D_fake_loss = criterion(D_output, y_fake)
    D_fake_score = D_output

    # gradient backprop & optimize ONLY D's parameters
    D_loss = D_real_loss + D_fake_loss
    D_loss.backward()
    D_optimizer.step()
        
    return  D_loss.data.item()

In [10]:
def Train_Generator(x):
    
    # clear out the gradients of all Variables since no need to keep the gradient calculation result of the previous batch
    G.zero_grad()

    z = Variable(torch.randn(100, 100).to(device))
    y = Variable(torch.ones(100, 1).to(device))

    G_output = G(z)
    D_output = D(G_output)
    G_loss = criterion(D_output, y)

    # gradient backprop & optimize ONLY G's parameters
    G_loss.backward()
    G_optimizer.step()
        
    return G_loss.data.item()

# Train the models

In [11]:
n_epoch = 200
for epoch in range(1, n_epoch+1):           
    D_losses, G_losses = [], []
    for batch_idx, (x, _) in enumerate(train_loader):
        D_losses.append(Train_Discriminator(x))
        G_losses.append(Train_Generator(x))

    print('[%d/%d]: loss_d: %.3f, loss_g: %.3f' % (
            (epoch), n_epoch, torch.mean(torch.FloatTensor(D_losses)), torch.mean(torch.FloatTensor(G_losses))))

[1/200]: loss_d: 1.055, loss_g: 2.368
[2/200]: loss_d: 0.905, loss_g: 2.059
[3/200]: loss_d: 0.741, loss_g: 2.219
[4/200]: loss_d: 0.440, loss_g: 2.963
[5/200]: loss_d: 0.426, loss_g: 3.106
[6/200]: loss_d: 0.436, loss_g: 3.283
[7/200]: loss_d: 0.449, loss_g: 2.958
[8/200]: loss_d: 0.584, loss_g: 2.604
[9/200]: loss_d: 0.576, loss_g: 2.548
[10/200]: loss_d: 0.606, loss_g: 2.518
[11/200]: loss_d: 0.620, loss_g: 2.425
[12/200]: loss_d: 0.695, loss_g: 2.151
[13/200]: loss_d: 0.718, loss_g: 2.086
[14/200]: loss_d: 0.766, loss_g: 1.925
[15/200]: loss_d: 0.816, loss_g: 1.809
[16/200]: loss_d: 0.810, loss_g: 1.816
[17/200]: loss_d: 0.840, loss_g: 1.819
[18/200]: loss_d: 0.810, loss_g: 1.891
[19/200]: loss_d: 0.794, loss_g: 1.897
[20/200]: loss_d: 0.778, loss_g: 1.901
[21/200]: loss_d: 0.826, loss_g: 1.824
[22/200]: loss_d: 0.884, loss_g: 1.656
[23/200]: loss_d: 0.874, loss_g: 1.687
[24/200]: loss_d: 0.903, loss_g: 1.598
[25/200]: loss_d: 0.914, loss_g: 1.577
[26/200]: loss_d: 0.916, loss_g: 1

# Save and download the models

In [12]:
# Save the Generator
with open('G.pkl', 'wb') as f:
    pkl.dump(G, f)

# Save the Discriminator
with open('D.pkl', 'wb') as f:
    pkl.dump(D, f)


# download files into the local machine
files.download('D.pkl')
files.download('G.pkl')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Create and annotate a fake data set

In [13]:
#create directory to save generated images and files
!mkdir fake_digits

#create 100 latent vectors which has 100  dimension

sample_size=100

z = np.random.uniform(-1, 1, size=(sample_size, z_size))
z = torch.from_numpy(z).float()

G.eval() # eval mode

# set image directory root location
root = '/content'

#get the grid image
grid_view = G(z.to(device))
save_image(grid_view.view(grid_view.size(0), 1, 28, 28), root + r'/grid.png', normalize = True) 


for i, image_tensor in enumerate(z):
    image_tensor = image_tensor.detach()

    # save the latent vector, related to the image
    f = open(root + r'/fake_digits/%d.txt' %i,'a')
    f.write(np.array2string(image_tensor.numpy(), separator=','))
    f.close()

    # create image using latent vector
    rand_image = G(image_tensor.to(device)) 

    # save image
    rand_image = rand_image.view(-1, 28) # convert [784] tensor in to [28,28] tensor to create image
    save_image(rand_image, root + r'/fake_digits/%d.png' %i, normalize = True)


#create and download zip file contains generated image files and Latent vectors
shutil.make_archive('fake_digits', 'zip', 'fake_digits')
files.download('fake_digits.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Create random digit images From the real MNIST test data set

In [14]:
#create directory to save generated images and test files
!mkdir real_images

# extract 100 random digit images from the real MNIST test data set

for idx, (img, _) in enumerate(test_dataset):

    # save the latent vector, related to the image
    f = open(root + r'/real_images/%d.txt' %idx,'a')
    f.write(np.array2string(img.numpy(), separator=','))
    f.close()

    # save the image
    save_image(img, '/content/real_images/%d.png' %idx, normalize = True)

    # stop the iteration after creating 100 images
    if idx == 100: 
        break   

#create and download a zip file contains generated image files and Latent vectors
shutil.make_archive('real_images', 'zip', 'real_images')
files.download('real_images.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Prepare S0 and S1 

In [16]:
#upload image zip files after grouping thm according to its label and extract them
!unzip '/content/Real_Images.zip' -d '/content/S0-real_images'
!unzip '/content/Fake_Digits.zip' -d '/content/S1-Fake_Digits'

Archive:  /content/Real_Images.zip
   creating: /content/S0-real_images/Real_Images/
   creating: /content/S0-real_images/Real_Images/0/
  inflating: /content/S0-real_images/Real_Images/0/3.png  
  inflating: /content/S0-real_images/Real_Images/0/3.txt  
  inflating: /content/S0-real_images/Real_Images/0/10.png  
  inflating: /content/S0-real_images/Real_Images/0/10.txt  
  inflating: /content/S0-real_images/Real_Images/0/13.png  
  inflating: /content/S0-real_images/Real_Images/0/13.txt  
  inflating: /content/S0-real_images/Real_Images/0/25.png  
  inflating: /content/S0-real_images/Real_Images/0/25.txt  
  inflating: /content/S0-real_images/Real_Images/0/28.png  
  inflating: /content/S0-real_images/Real_Images/0/28.txt  
  inflating: /content/S0-real_images/Real_Images/0/55.png  
  inflating: /content/S0-real_images/Real_Images/0/55.txt  
  inflating: /content/S0-real_images/Real_Images/0/69.png  
  inflating: /content/S0-real_images/Real_Images/0/69.txt  
  inflating: /content/S0-

In [17]:
real_image_path ='/content/S0-real_images/Real_Images'
fake_image_path ='/content/S1-Fake_Digits/Fake_Digits'

# get data set to the runtime

train_data=datasets.ImageFolder(
    root=real_image_path,
    transform=transforms.Compose([transforms.Grayscale(),transforms.ToTensor()])
)


test_data=datasets.ImageFolder(
    root=fake_image_path,
    transform=transforms.Compose([transforms.Grayscale(),transforms.ToTensor()])
)

In [18]:
from torch.utils.data import DataLoader
loaders = {
    'S0' : torch.utils.data.DataLoader(train_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
    
    'S1'  : torch.utils.data.DataLoader(test_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
}
loaders

{'S0': <torch.utils.data.dataloader.DataLoader at 0x7f9cfc07b4d0>,
 'S1': <torch.utils.data.dataloader.DataLoader at 0x7f9cfc07bd10>}

# Define Classifier

In [19]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=1,              
                out_channels=36,  #16          
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(36, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        # fully connected layer, output 10 classes
        self.out = nn.Linear(32 * 7 * 7, 10)

        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        x = x.view(x.size(0), -1)       
        output = self.out(x)
        return output, x    # return x for visualization

In [20]:
cnn=CNN()
print(cnn)

CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 36, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(36, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (out): Linear(in_features=1568, out_features=10, bias=True)
)


In [21]:
#define loss function for classifier
loss_func = nn.CrossEntropyLoss()
loss_func

CrossEntropyLoss()

In [22]:
# define optimizer for classifier
optimizer = optim.Adam(cnn.parameters(), lr=0.01)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 0.01
    weight_decay: 0
)

# Train the Classifier

In [24]:
num_epochs = 10

#Remember to run on GPU

def train(num_epochs, cnn, loaders):
    
    cnn.train()
        
    # Train the model
    total_step = len(loaders['S0'])
        
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(loaders['S0']):
            
            # gives batch data, normalize x when iterate train_loader
            b_x = Variable(images)   # batch x
            b_y = Variable(labels)   # batch y
            output = cnn(b_x)[0]               
            loss = loss_func(output, b_y)
            
            # clear gradients for this training step   
            optimizer.zero_grad()           
            
            # backpropagation, compute gradients 
            loss.backward()    
            # apply gradients             
            optimizer.step()                
            
            
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
            pass
        pass
    pass

train(num_epochs, cnn, loaders)

Epoch [1/10], Step [1/1], Loss: 0.1353
Epoch [2/10], Step [1/1], Loss: 0.0601
Epoch [3/10], Step [1/1], Loss: 0.0509
Epoch [4/10], Step [1/1], Loss: 0.0472
Epoch [5/10], Step [1/1], Loss: 0.0137
Epoch [6/10], Step [1/1], Loss: 0.0025
Epoch [7/10], Step [1/1], Loss: 0.0042
Epoch [8/10], Step [1/1], Loss: 0.0067
Epoch [9/10], Step [1/1], Loss: 0.0039
Epoch [10/10], Step [1/1], Loss: 0.0005


In [28]:
# Save the Classifier
with open('C.pkl', 'wb') as f:
    pkl.dump(cnn, f)


# download files into the local machine
files.download('C.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Testing accuracy of the Classifier model

In [25]:
def test():
  cnn.eval()
  with torch.no_grad():
    correct=0
    total=0
    for images, labels in loaders['S1']:
      test_output,last_layer=cnn(images)
      pred_y=torch.max(test_output,1)[1].data.squeeze()
      accuracy=(pred_y==labels).sum().item()/float(labels.size(0))
      pass
    print('Test Acc %.2f' % accuracy)
    pass


test()

Test Acc 0.83


# Print prediction and actual numbers

In [27]:
sample = next(iter(loaders['S1']))
imgs, lbls = sample
actual_number = lbls[:100].numpy()
test_output,last_layer=cnn(imgs[:100])
pred_y=torch.max(test_output,1)[1].data.numpy().squeeze()

print(f'Prediction number {pred_y}')
print(f'Actual number {actual_number}')

Prediction number [3 0 1 0 6 6 7 0 9 3 3 9 3 3 1 9 3 3 9 9 3 5 7 0 2 6 9 9 7 0 6 9 6 3 3 9 2
 9 9 3 3 3 3 0 9 1 1 3 1 0 3 6 3 7 9 6 7 8 5 4 0 9 9 3 3 0 3 3 6]
Actual number [3 0 1 0 6 6 7 0 9 3 3 4 8 8 1 4 3 3 9 9 8 5 7 0 2 6 9 9 7 0 6 9 6 3 3 8 1
 7 9 3 3 3 3 0 9 1 1 3 1 0 8 6 8 7 9 6 7 8 5 4 0 9 9 5 3 0 3 8 6]
