<a href="https://colab.research.google.com/github/Saadkhalid913/ML-Practice/blob/main/AutoencoderPractice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [59]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torchvision
import os 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [60]:
# ---- CONSTANTS ---- 
batch_size = 128 
image_size = (1,28,28)

encoder_input_shape = 784
encoder_first_hidden_layer = 400 
latent_dimention_size = 6


In [61]:
# array of transformation to be applied to the training set 
transforms = torchvision.transforms.Compose([
              torchvision.transforms.ToTensor()
])

train_set = torchvision.datasets.MNIST(root="./data",
                                      transform=transforms,
                                      train=True,
                                      download = True
                                      )

train_dataloader = DataLoader(dataset=train_set,
                              batch_size=batch_size,
                              shuffle=True,
                              )

test_set = torchvision.datasets.MNIST(root="./data",
                                      transform=transforms,
                                      train=False,
                                      download = True
                                      )

test_dataloader = DataLoader(dataset=test_set,
                              batch_size=batch_size,
                              shuffle=True,
                              )

train_set = torchvision.datasets.MNIST(root="./data",
                                      transform=transforms,
                                      train=True,
                                      download = True
                                      )

train_dataloader = DataLoader(dataset=train_set,
                              batch_size=batch_size,
                              shuffle=True,
                              )


In [62]:
# create a directory for the results 
sample_dir = "results"
if not os.path.exists(sample_dir):
  os.mkdir(sample_dir)

In [63]:
class VAE(nn.Module):
  def __init__(self):
    # initialize to use all the pytorch features 
    super(VAE, self).__init__()


    self.fc1 = nn.Linear(encoder_input_shape, encoder_first_hidden_layer)
    self.log_var = nn.Linear(encoder_first_hidden_layer, latent_dimention_size) # log var (refer to paper)
    self.mean = nn.Linear(encoder_first_hidden_layer, latent_dimention_size) # mean of distribution

    self.dec_fc1 = nn.Linear(latent_dimention_size, encoder_first_hidden_layer) # (latent dim => hidden dim)
    self.dec_fc2 = nn.Linear(encoder_first_hidden_layer, encoder_input_shape) # (hidden dim => 784)


  def encode(self, x):
    # layer 1 
    x = self.fc1(x)
    x = F.relu(x)

    # Mean and Logvar (for reparamaterization trick)
    mu = self.mean(x)
    log_var = self.log_var(x)
    return (mu, log_var)

  def reparameterize(self, mean, log_var):
    # refer back for why we do this 
    standard_deviation = torch.exp(log_var / 2)
    noise = torch.randn_like(standard_deviation)
    
    return mean + standard_deviation * noise 

  def decode(self, x):
    x = self.dec_fc1(x)
    x = F.relu(x)
    x = self.dec_fc2(x)
    x = F.sigmoid(x)
    return x 

 
 





In [64]:
model = VAE().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0015)

In [66]:
def loss(output, ground_truth, mu, log_var):
    BCE = F.binary_cross_entropy(input=output, target=ground_truth.view(-1, encoder_input_shape), reduction="sum")
    KL_div = 0.5 * torch.sum(log_var.exp() + mu.pow(2) - 1 - log_var, 1)
    kld_sum = torch.sum(KL_div)

    return BCE + kld_sum

def train(epoch):
    model.train()
    total_loss = 0
    for i, (image, _) in enumerate(train_dataloader):
      mu, log_var = model.encode(image.view(-1, encoder_input_shape))
      image = image.to(device)
      x = model.reparameterize(mu, log_var)
      output = model.decode(x)
      optimizer.zero_grad()
      batch_loss = loss(output, image, mu, log_var)
      batch_loss.backward()
      optimizer.step()
      total_loss += batch_loss.item()

    print(f"Total loss for Epoch #{epoch} is {total_loss / (len(train_dataloader))}")
    if (epoch % 5 == 0):
      test(epoch)


def test(epoch: int):
  model.eval()
  total_loss = 0
  with torch.no_grad():
    for i, (image, _) in enumerate(test_dataloader):
      mu, log_var = model.encode(image.view(-1, encoder_input_shape))
      latent = model.reparameterize(mu, log_var)
      output = model.decode(latent)
      test_loss = loss(output, image, mu, log_var)
      total_loss += test_loss.item()
      if (i == 0):
        comparision = torch.cat([image[: 5], output.view(batch_size, 1, 28,28)[: 5]])

        # the image needs to be saved to the cpu before we can save it 
        torchvision.utils.save_image(comparision.cpu(), "results/reconstruction_" + str(epoch) + ".png", nrow=5)






for i in range(0,15):
  train(i)



Total loss for Epoch #0 is 16082.171518939898
Total loss for Epoch #1 is 15887.149834671509
Total loss for Epoch #2 is 15729.898906000133
Total loss for Epoch #3 is 15603.97225646322
Total loss for Epoch #4 is 15500.036241088086
Total loss for Epoch #5 is 15415.952868886594
Total loss for Epoch #6 is 15337.231678521455
Total loss for Epoch #7 is 15268.845559451625
Total loss for Epoch #8 is 15209.991785630997
Total loss for Epoch #9 is 15157.950938666045
Total loss for Epoch #10 is 15107.614986590484
Total loss for Epoch #11 is 15069.081167127531
Total loss for Epoch #12 is 15034.110959571562
Total loss for Epoch #13 is 14994.139546408582
Total loss for Epoch #14 is 14965.689625949493
