<a href="https://colab.research.google.com/github/UthpalaPitawela/Data_Science_Implementations/blob/main/219383H_GAN_Assignment_U.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torchvision.utils import save_image
import pickle as pkl
from google.colab import files

device = torch.device('cuda' 
                      if torch.cuda.is_available() 
                      else 'cpu')

**Create folders to save real and fake images**

In [None]:
!mkdir s0
!mkdir s1


**Load MNIST dataset**

In [None]:
transform = transforms.Compose([transforms.ToTensor(),
  transforms.Normalize((0.5,), (0.5,))
])
#Get train and test datasets
train_dataset = datasets.MNIST(
    root='data/',
    train=True,
    download=True,
    transform=transform
    )  

test_dataset = datasets.MNIST(
    root='data/', 
    train=False, 
    transform=transform
    )

#Used a data loader to iterarate train and test datasets
train_loader = torch.utils.data.DataLoader(
    train_dataset, 
    100,
    shuffle=True
    )


test_loader = torch.utils.data.DataLoader(
    test_dataset, 
    100,
    shuffle=True
    )

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=9912422.0), HTML(value='')))


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=28881.0), HTML(value='')))


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=1648877.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=4542.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw



  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


**Define Generator class**




In [None]:

class Generator(nn.Module):

    def __init__(self, input_size, g_output_size):
        super(Generator, self).__init__()
        
        # define hidden linear layers
        self.fc1 = nn.Linear(input_size, 256)
        self.fc2 = nn.Linear(256, 512)
        
        # final fully-connected layer
        self.fc3 = nn.Linear(512, g_output_size)

    def forward(self, x):
        # Used leaky_relu as the activation function as it is a fix for the "Dying Relu" Problem 
        x = F.leaky_relu(self.fc1(x), 0.2) 
        x = F.leaky_relu(self.fc2(x), 0.2)
        out = torch.tanh(self.fc3(x)) # tanh is used as the activation function for the output layer

        return out

**Define Discrimator class**

In [None]:
class Discriminator(nn.Module):

    def __init__(self, input_size, d_output_size):
        super(Discriminator, self).__init__()
        
        # define hidden linear layers
        self.fc1 = nn.Linear(input_size, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 128)
        
        # final fully-connected layer
        self.fc5 = nn.Linear(128, d_output_size)
        
    def forward(self, x):
        # flatten image: change to a 28x28 tensor
        x = x.view(-1, 28*28) 
        # all hidden layers: Activation function is leaky_relu
        x = F.leaky_relu(self.fc1(x), 0.2) 
        x = F.dropout(x, 0.3) #0.3 dropout used to reduce overfitting
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.dropout(x, 0.3)
        x = F.leaky_relu(self.fc3(x), 0.2)
        x = F.dropout(x, 0.3)
        x = F.leaky_relu(self.fc4(x), 0.2)
        x = F.dropout(x, 0.3)
        # final layer: sigmoid used as the activation function for the output layer
        out = torch.sigmoid(self.fc5(x))


        return out

**Define hyper parameters**

In [None]:

# Discriminator hyperparameters

# Size of input image to discriminator 
discriminator_input_size = 784

# Size of discriminator output
discrimiator_output_size = 1

# Generator hyperparameters

# Size of latent vector to give to generator
z_size = 100
# Size of discriminator output 28x28 (generated image)
generator_output_size = 784


D = Discriminator(discriminator_input_size, discrimiator_output_size).to(device)
G = Generator(z_size, generator_output_size).to(device)

print("Discriminator", D)
print ("Generator", G)

Discriminator Discriminator(
  (fc1): Linear(in_features=784, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (fc4): Linear(in_features=256, out_features=128, bias=True)
  (fc5): Linear(in_features=128, out_features=1, bias=True)
)
Generator Generator(
  (fc1): Linear(in_features=100, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=784, bias=True)
)


In [None]:
# BCELoss is a Softmax activation plus a Cross-Entropy loss
criterion = nn.BCELoss() 

# optimizer optim.Adam is used; has some benefits;Computationally efficient, Little memory requirements.
learning_rate = 0.0002 
G_optimizer = optim.Adam(G.parameters(), lr = learning_rate)
D_optimizer = optim.Adam(D.parameters(), lr = learning_rate)

**Train the discriminator**

In [None]:
def D_train(x):
    D.zero_grad()

    # train discriminator with real images 
    x_real, y_real = x.view(-1, 28*28), torch.ones(100, 1)
    x_real, y_real = Variable(x_real.to(device)), Variable(y_real.to(device))

    D_output = D(x_real)
    D_real_loss = criterion(D_output, y_real)
    D_real_score = D_output

    # train discriminator with fake images
    z = Variable(torch.randn(100, 100).to(device))
    x_fake, y_fake = G(z), Variable(torch.zeros(100, 1).to(device))

    D_output = D(x_fake)
    D_fake_loss = criterion(D_output, y_fake)
    D_fake_score = D_output

    # gradient backprop & optimize ONLY D's parameters
    D_loss = D_real_loss + D_fake_loss
    D_loss.backward()
    D_optimizer.step()
        
    return  D_loss.data.item()

**Train the generator**

In [None]:
def G_train(x):
    G.zero_grad()

    z = Variable(torch.randn(100, 100).to(device))
    y = Variable(torch.ones(100, 1).to(device))

    G_output = G(z)
    D_output = D(G_output)
    G_loss = criterion(D_output, y)

    # gradient backprop & optimize ONLY Generator's parameters
    G_loss.backward()
    G_optimizer.step()
        
    return G_loss.data.item()

In [None]:
n_epoch = 200
for epoch in range(1, n_epoch+1):           
    D_losses, G_losses = [], []
    for batch_idx, (x, _) in enumerate(train_loader):
        D_losses.append(D_train(x))
        G_losses.append(G_train(x))

    print('[%d/%d]: loss_d: %.3f, loss_g: %.3f' % (
            (epoch), n_epoch, torch.mean(torch.FloatTensor(D_losses)), torch.mean(torch.FloatTensor(G_losses))))

[1/200]: loss_d: 0.370, loss_g: 3.546
[2/200]: loss_d: 0.402, loss_g: 3.164
[3/200]: loss_d: 0.427, loss_g: 3.142
[4/200]: loss_d: 0.473, loss_g: 3.005
[5/200]: loss_d: 0.500, loss_g: 2.893
[6/200]: loss_d: 0.518, loss_g: 2.672
[7/200]: loss_d: 0.508, loss_g: 2.782
[8/200]: loss_d: 0.609, loss_g: 2.372
[9/200]: loss_d: 0.640, loss_g: 2.332
[10/200]: loss_d: 0.612, loss_g: 2.459
[11/200]: loss_d: 0.651, loss_g: 2.259
[12/200]: loss_d: 0.692, loss_g: 2.188
[13/200]: loss_d: 0.704, loss_g: 2.134
[14/200]: loss_d: 0.733, loss_g: 2.000
[15/200]: loss_d: 0.743, loss_g: 2.004
[16/200]: loss_d: 0.710, loss_g: 2.078
[17/200]: loss_d: 0.756, loss_g: 2.047
[18/200]: loss_d: 0.763, loss_g: 1.932
[19/200]: loss_d: 0.754, loss_g: 1.970
[20/200]: loss_d: 0.748, loss_g: 2.006
[21/200]: loss_d: 0.772, loss_g: 1.913
[22/200]: loss_d: 0.767, loss_g: 1.946
[23/200]: loss_d: 0.768, loss_g: 1.933
[24/200]: loss_d: 0.799, loss_g: 1.885
[25/200]: loss_d: 0.799, loss_g: 1.887
[26/200]: loss_d: 0.807, loss_g: 1

**Download the discriminator and generator**

In [None]:
# Save the Generator
with open('G.pkl', 'wb') as f:
    pkl.dump(G, f)

# Save the Discriminator
with open('D.pkl', 'wb') as f:
    pkl.dump(D, f)


# download files into the local machine
files.download('D.pkl')
files.download('G.pkl')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**Create the fake/s1 image set**

In [None]:

from torchvision.utils import save_image
import numpy as np

sample_size=200
#create 100 latent vectors which has 100  dimension
z = np.random.uniform(-1, 1, size=(sample_size, z_size))
z = torch.from_numpy(z).float()

print(z[0].size())

G.eval() # eval mode


#get the grid image
grid_view = G(z.to(device))
save_image(grid_view.view(grid_view.size(0), 1, 28, 28), '/content/grid.png', normalize = True) 

for i, image_tensor in enumerate(z):
    image_tensor = image_tensor.detach()
    print(i,image_tensor)

    # save the latent vector, related to the image
    f = open('/content/s1/%d.txt' %i,'a')
    f.write(np.array2string(image_tensor.numpy(), separator=','))
    f.close()

    # create image using latent vector
    rand_image = G(image_tensor.to(device)) 

    # save image
    rand_image = rand_image.view(-1, 28) # convert [784] tensor in to [28,28] tensor to create image
    save_image(rand_image, '/content/s1/%d.png' %i, normalize = True)


#create a zip file contains generated image files and Latent vectors
import shutil
shutil.make_archive('s1', 'zip', 's1')
files.download('s1.zip')

**Create the real/s0 image set**

In [None]:

for idx, (img, _) in enumerate(train_dataset):
    # save the latent vector, related to the image
    f = open('/content/s0/%d.txt' %idx,'a')
    f.write(np.array2string(img.numpy(), separator=','))
    f.close()
    save_image(img, '/content/s0/%d.png' %idx, normalize = True)
    if idx == 100:
        break   

#create a zip file with generated image files and Latent vectors
shutil.make_archive('s0', 'zip', 's0')
files.download('s0.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**Here folders need to created with labels and then need to upload for the classifier**

In [None]:
!unzip '/content/s1.zip' -d '/content/fake'
!unzip '/content/s0.zip' -d '/content/real'

Archive:  /content/s1.zip
   creating: /content/fake/s1/0/
 extracting: /content/fake/s1/0/1.png  
  inflating: /content/fake/s1/0/1.txt  
 extracting: /content/fake/s1/0/21.png  
  inflating: /content/fake/s1/0/21.txt  
 extracting: /content/fake/s1/0/34.png  
  inflating: /content/fake/s1/0/34.txt  
 extracting: /content/fake/s1/0/37.png  
  inflating: /content/fake/s1/0/37.txt  
 extracting: /content/fake/s1/0/51.png  
  inflating: /content/fake/s1/0/51.txt  
 extracting: /content/fake/s1/0/56.png  
  inflating: /content/fake/s1/0/56.txt  
 extracting: /content/fake/s1/0/63.png  
  inflating: /content/fake/s1/0/63.txt  
 extracting: /content/fake/s1/0/68.png  
  inflating: /content/fake/s1/0/68.txt  
 extracting: /content/fake/s1/0/69.png  
  inflating: /content/fake/s1/0/69.txt  
 extracting: /content/fake/s1/0/75.png  
  inflating: /content/fake/s1/0/75.txt  
 extracting: /content/fake/s1/0/81.png  
  inflating: /content/fake/s1/0/81.txt  
   creating: /content/fake/s1/1/
 extract

In [None]:
s0_path ='/content/real/s0'
s1_path ='/content/fake/s1'

train_data=datasets.ImageFolder(
    root=s0_path,
    transform=transforms.Compose([transforms.Grayscale(),transforms.ToTensor()])
)


test_data=datasets.ImageFolder(
    root=s1_path,
    transform=transforms.Compose([transforms.Grayscale(),transforms.ToTensor()])
)


**Get the data to the data loader**

In [None]:
from torch.utils.data import DataLoader
loaders = {
    'train' : torch.utils.data.DataLoader(train_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
    
    'test'  : torch.utils.data.DataLoader(test_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
}
loaders

{'test': <torch.utils.data.dataloader.DataLoader at 0x7f2ee9d8cc50>,
 'train': <torch.utils.data.dataloader.DataLoader at 0x7f2ee9d8ced0>}

**Classifier** 

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=1,              
                out_channels=36,  #16          
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(36, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        # fully connected layer, output 10 classes
        self.out = nn.Linear(32 * 7 * 7, 10)

        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        x = x.view(x.size(0), -1)       
        output = self.out(x)
        return output, x    # return x for visualization


In [None]:
cnn=CNN()
print(cnn)

CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 36, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(36, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (out): Linear(in_features=1568, out_features=10, bias=True)
)


**Loss function**

In [None]:
loss_func = nn.CrossEntropyLoss()
loss_func

CrossEntropyLoss()

**Optimizer**

In [None]:
optimizer = optim.Adam(cnn.parameters(), lr=0.01)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 0.01
    weight_decay: 0
)

**Train data**

In [None]:
num_epochs = 10

def train(num_epochs, cnn, loaders):
    
    cnn.train()
        
    # Train the model
    total_step = len(loaders['train'])
        
    for epoch in range(num_epochs):
        correct = 0;
        total = 0;
        for i, (images, labels) in enumerate(loaders['train']):
            total=total+1
            # gives batch data, normalize x when iterate train_loader
            b_x = Variable(images)   # batch x
            b_y = Variable(labels)   # batch y
            output = cnn(b_x)[0]
            output_label = torch.max(output,1)[1].data.squeeze()
            loss = loss_func(output, b_y)             
            accuracy=(output_label==b_y).sum().item()/float(b_y.size(0))
           
            
            # clear gradients for this training step   
            optimizer.zero_grad()           
            
            # backpropagation, compute gradients 
            loss.backward()    
            # apply gradients             
            optimizer.step()                
            
            
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
            print('Training Acc. %.2f' % accuracy)
            
        pass
    pass

train(num_epochs, cnn, loaders)

Epoch [1/10], Step [1/2], Loss: 0.7915
Training Acc. 0.71
Epoch [1/10], Step [2/2], Loss: 0.5134
Training Acc. 0.75
Epoch [2/10], Step [1/2], Loss: 0.5696
Training Acc. 0.78
Epoch [2/10], Step [2/2], Loss: 0.1315
Training Acc. 1.00
Epoch [3/10], Step [1/2], Loss: 0.4532
Training Acc. 0.86
Epoch [3/10], Step [2/2], Loss: 0.2690
Training Acc. 1.00
Epoch [4/10], Step [1/2], Loss: 0.3545
Training Acc. 0.89
Epoch [4/10], Step [2/2], Loss: 0.0521
Training Acc. 1.00
Epoch [5/10], Step [1/2], Loss: 0.3530
Training Acc. 0.88
Epoch [5/10], Step [2/2], Loss: 0.2591
Training Acc. 0.88
Epoch [6/10], Step [1/2], Loss: 0.2291
Training Acc. 0.92
Epoch [6/10], Step [2/2], Loss: 0.2582
Training Acc. 0.88
Epoch [7/10], Step [1/2], Loss: 0.2529
Training Acc. 0.91
Epoch [7/10], Step [2/2], Loss: 0.1457
Training Acc. 1.00
Epoch [8/10], Step [1/2], Loss: 0.1835
Training Acc. 0.94
Epoch [8/10], Step [2/2], Loss: 0.2483
Training Acc. 0.88
Epoch [9/10], Step [1/2], Loss: 0.1390
Training Acc. 0.95
Epoch [9/10], 

**Test data**

In [None]:
def test():
  cnn.eval()
  with torch.no_grad():
    correct=0
    total=0
    for images, labels in loaders['test']:
      test_output,last_layer=cnn(images)
      pred_y=torch.max(test_output,1)[1].data.squeeze()
      accuracy=(pred_y==labels).sum().item()/float(labels.size(0))
      pass
    print('Test Acc %.2f' % accuracy)
    pass


test()

Test Acc 0.82


**Save the classifier**

In [None]:
# Save the classifier
with open('C.pkl', 'wb') as f:
    pkl.dump(cnn, f)


# download the classifier model
files.download('C.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>