This notebook uses [PyTorch](https://pytorch.org/), which is a library that can automatically differentiate functions and is commonly used to implement neural networks. Because of its capability to automatically differentiate functions, the formula for the gradient does not need to be manually derived -- this comes especially handy when working with neural networks, whose gradient formula can be quite complicated, especially for complex architectures. 

We recommend going over the [PyTorch tutorial](https://pytorch.org/tutorials/beginner/basics/intro.html) and then the starter code below. For any new functions that you come across, you can look up the documentation [here](https://pytorch.org/docs/stable/index.html). 

We recommend running this notebook on Google Colab to avoid having to install PyTorch and to take advantage of GPUs, which make training faster.

## Load packages

In [34]:
# load packages
# make sure to install the pacakge "tqdm" for the progress bar when training.
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.utils import save_image
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import imageio
import matplotlib.image as mpimg
from scipy import ndimage

path_prefix = ""

If running on Google Colab, you need to upload the file `autoencoder_starter.py`. To do so, you need to click on the folder icon on the left side of the page, which brings up a panel that would allow you to upload files. Note however uploading the file this way has a downside in that if your Python runtime times out or is otherwise restarted, the file will be erased and you will need to re-upload. 

If you would like to avoid this, you can create a directory named `CMPT_726Fall2021_A3` in your Google Drive and upload `autoencoder_starter.py` to that directory. Then execute the following block of code to mount your Google Drive. 

In [35]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
trainTransform  = torchvision.transforms.Compose([torchvision.transforms.ToTensor(), torchvision.transforms.Normalize((0.1307,), (0.3081,))])
trainset = torchvision.datasets.FashionMNIST(root='{}/./data'.format(path_prefix),  train=True,download=True, transform=transform)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
path_prefix = "/content/gdrive/MyDrive/CMPT_726Fall2021_A3"

import sys
sys.path.insert(1, path_prefix)

## Define your architecture here.

The `Autoencoder` class has several important functions unimplemented. You are required to implement the two sub-classes of `Encoder` and `Decoder`, i.e, the architecture and forward function of the encoder and decoder.


In [37]:
# Q1
class Autoencoder(nn.Module):

    def __init__(self,dim_latent_representation=2):

        super(Autoencoder,self).__init__()

        class Encoder(nn.Module):
            def __init__(self, output_size=2):
                super(Encoder, self).__init__()
                # needs your implementation
                self.fc1 = nn.Linear(784, output_size)
                
            def forward(self, x):
                # needs your implementation
                out = self.fc1(x)
                return out

        class Decoder(nn.Module):
            def __init__(self, input_size=2):
                super(Decoder, self).__init__()
                # needs your implementation
                self.fc3 = nn.Linear(input_size, 784)
                self.sig = nn.Sigmoid()

            def forward(self, z):
                # needs your implementation
                out = self.fc3(z)
                out = self.sig(out)
                return out

        self.encoder = Encoder(output_size=dim_latent_representation)
        self.decoder = Decoder(input_size=dim_latent_representation)

    # Implement this function for the DAE model
    # def add_noise(self, x, noise_type):
    #     if noise_type=='Gaussian':
    #         # return (x with Gaussian noise)
    #     elif noise_type=='Dropout':
    #         return (x with Dropout noise)

    # Implement this function for the VAE model
    # def reparameterise(self, mu, logvar):
    #     if self.training:
    #         # return reparametrized mu
    #     else:
    #         return mu

    def forward(self,x):
        # This function should be modified for the DAE and VAE
        # flatten the input
        shape = x.shape
        x = x.reshape([shape[0], -1])
        x = self.encoder(x)
        x = self.decoder(x)
        # reshape the output
        x = x.reshape(shape)
        # for the VAE forward function should also return mu and logvar
        return x

In [4]:
# Q2
class Autoencoder(nn.Module):

    def __init__(self,dim_latent_representation=2):

        super(Autoencoder,self).__init__()

        class Encoder(nn.Module):
            def __init__(self, output_size=2):
                super(Encoder, self).__init__()
                # needs your implementation
                self.fc1 = nn.Linear(784, 1024)
                self.relu1 = nn.ReLU()
                self.fc2 = nn.Linear(1024, output_size)

            def forward(self, x):
                # needs your implementation
                out = self.fc1(x)
                out = self.relu1(out)
                out = self.fc2(out)
                return out

        class Decoder(nn.Module):
            def __init__(self, input_size=2):
                super(Decoder, self).__init__()
                # needs your implementation
                self.fc3 = nn.Linear(input_size, 1024)
                self.relu3 = nn.ReLU()
                self.fc4 = nn.Linear(1024, 784)
                self.sig = nn.Sigmoid()

            def forward(self, z):
                # needs your implementation
                out = self.fc3(z)
                out = self.relu3(out)
                out = self.fc4(out)
                out = self.sig(out)
                return out

        self.encoder = Encoder(output_size=dim_latent_representation)
        self.decoder = Decoder(input_size=dim_latent_representation)

    # Implement this function for the DAE model
    # def add_noise(self, x, noise_type):
    #     if noise_type=='Gaussian':
    #         # return (x with Gaussian noise)
    #     elif noise_type=='Dropout':
    #         return (x with Dropout noise)

    # Implement this function for the VAE model
    # def reparameterise(self, mu, logvar):
    #     if self.training:
    #         # return reparametrized mu
    #     else:
    #         return mu

    def forward(self,x):
        # This function should be modified for the DAE and VAE
        shape = x.shape
        x = x.reshape([shape[0], -1])
        x = self.encoder(x)
        x = self.decoder(x)
        x = x.reshape(shape)
        # for the VAE forward function should also return mu and logvar
        return x

In [27]:
# Q3
class DAE(nn.Module):

    def __init__(self,dim_latent_representation=30):

        super(DAE,self).__init__()

        class Encoder(nn.Module):
            def __init__(self, output_size=2):
                super(Encoder, self).__init__()
                # needs your implementation
                self.fc1 = nn.Linear(784, output_size)

            def forward(self, x):
                # needs your implementation
                out = self.fc1(x)
                return out

        class Decoder(nn.Module):
            def __init__(self, input_size=2):
                super(Decoder, self).__init__()
                # needs your implementation
                self.fc3 = nn.Linear(input_size, 784)
                self.tan = nn.Tanh()

            def forward(self, z):
                # needs your implementation
                out = self.fc3(z)
                out = self.tan(out)
                return out

        self.encoder = Encoder(output_size=dim_latent_representation)
        self.decoder = Decoder(input_size=dim_latent_representation)

    # Implement this function for the DAE model
    def add_noise(self, x, noise_type):
        if self.training:
            if noise_type=='Gaussian':
                gaussian = torch.normal(1, 3, size = x.shape)
                gaussian.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
                # return (x with Gaussian noise)
                return x + gaussian
            elif noise_type=='Dropout':
                dropout = nn.Dropout(0.2)
                x = dropout(x)
                # return (x with Dropout noise)
                return x
        else:
            return x


    def forward(self,x):
        # This function should be modified for the DAE and VAE
        x = self.add_noise(x, 'Gaussian')

        shape = x.shape
        x = x.reshape([shape[0], -1])
        x = self.encoder(x)
        x = self.decoder(x)
        x = x.reshape(shape)
        # for the VAE forward function should also return mu and logvar
        return x

In [5]:
# Q4
class VAE(nn.Module):

    def __init__(self,dim_latent_representation=30):

        super(VAE,self).__init__()

        class Encoder(nn.Module):
            def __init__(self, output_size=30):
                super(Encoder, self).__init__()
                # needs your implementation
                self.fc1 = nn.Linear(784, 1024)
                self.relu1 = nn.ReLU()
                self.fc2 = nn.Linear(1024, output_size)
                # self.fc1 = nn.Linear(784, output_size)
                self.mu = nn.Linear(output_size, output_size)
                self.var = nn.Linear(output_size, output_size)

            def forward(self, x):
                # needs your implementation
                out = self.fc1(x)
                out = self.relu1(out)
                out = self.fc2(out)
                out_mu = self.mu(out)
                out_var = self.var(out)

                return out_mu, out_var

        class Decoder(nn.Module):
            def __init__(self, input_size=30):
                super(Decoder, self).__init__()
                # needs your implementation
                self.fc3 = nn.Linear(input_size, 1024)
                self.relu3 = nn.ReLU()
                self.fc4 = nn.Linear(1024, 784)
                # self.sig = nn.Sigmoid()
                self.tan = nn.Tanh()

            def forward(self, z):
                # needs your implementation
                out = self.fc3(z)
                out = self.relu3(out)
                out = self.fc4(out)
                # out = self.sig(out)
                out = self.tan(out)
                return out

        self.encoder = Encoder(output_size=dim_latent_representation)
        self.decoder = Decoder(input_size=dim_latent_representation)


    # Implement this function for the VAE model
    def reparameterise(self, mu, logvar):
        if self.training:
            # return reparametrized mu
            std = torch.exp(0.5 * logvar)
            eps = torch.empty_like(std).normal_()
            return mu + eps * std
        else:
            return mu

    def forward(self,x):
        # This function should be modified for the DAE and VAE
        shape = x.shape
        x = x.reshape([shape[0], -1])
        code_mu, code_logvar = self.encoder(x)
        code = self.reparameterise(code_mu, code_logvar)
        recon_x = self.decoder(code)
        recon_x = recon_x.reshape(shape)
        # for the VAE forward function should also return mu and logvar
        return recon_x, code_mu, code_logvar

## Training the Model

The training loop is provided by the `Autoencoder_Trainer` class from `autoencoder_starter.py`.

In [None]:
!pip install umap-learn  

In [None]:
from autoencoder_starter import Autoencoder_Trainer
from autoencoder_starter import Plot_Kernel
from VAE_starter import VAE_Trainer

LEARNING_RATE = 1e-3
EPOCH_NUMBER= 10 # the number of epochs and learning rate can be tuned.

# autoencoder = DAE(dim_latent_representation=30)
# autoencoder = VAE(dim_latent_representation=30)
autoencoder = Autoencoder(dim_latent_representation=2)

# trainer = VAE_Trainer(autoencoder_model=autoencoder,learning_rate=LEARNING_RATE,path_prefix=path_prefix)
trainer = Autoencoder_Trainer(autoencoder_model=autoencoder,learning_rate=LEARNING_RATE,path_prefix=path_prefix)

def Plot_Kernel(_model):
    '''
    the plot for visualizing the learned weights of the autoencoder's encoder .
    ----------
    _model: Autoencoder
    '''
    # needs your implementation  
    ae_kernel = _model.encoder.fc1.weight
    ae_kernel = ae_kernel.cpu().detach().numpy()
    # print(ae_kernel.shape)
    for i in range(ae_kernel.shape[0]):
        w = ae_kernel[i, :].reshape([28, 28])
        plt.imshow(w)
        plt.show()

try:
    for epoch in range(1, EPOCH_NUMBER + 1):
        trainer.train(epoch)
        trainer.validate(epoch)
    # Q3
    # Plot_Kernel(trainer.model)
except (KeyboardInterrupt, SystemExit):
        print("Manual Interruption")

If in the above cell, you find that the download of the dataset is unsuccessful because the server is down, you can also download the data from Canvas and put the data in the same directory of this notebook and `autoencoder_starter.py`. If you are running this notebook on Google Colab, you can create a directory called `CMPT_726Fall2021_A3` in your Google Drive and put the data directory you downloaded from Canvas inside. Then if you haven't mounted your Google Drive earlier using the block of code above, mount Google Drive by executing the following block of code.

In [39]:
from google.colab import drive
drive.mount('/content/gdrive')
path_prefix = "/content/gdrive/MyDrive/CMPT_726Fall2021_A3"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


## Visualizing 2D Bottleneck Feature Representations

We can visualize the 2D bottleneck representations of data points and their ground truth class labels with a scatter plot.

The `scatter_plot` function takes the following arguments:
* latent_presentations - (N, dimension_latent_representation) numpy array
* labels - (N, ) numpy array: the labels of the ground truth classes

In [None]:
# Q1 & Q2
with torch.no_grad():
    model = trainer.model
    model.eval()
    z=[];label=[]
    for x,y in trainer.val_loader:
        shape = x.shape
        x = x.reshape([shape[0], -1])
        z_ = model.encoder(x.to(trainer.device))
        z += z_.cpu().tolist()
        label += y.cpu().tolist()
    z = np.asarray(z)
    label = np.asarray(label)

from autoencoder_starter import scatter_plot
# from autoencoder_starter import scatter_plot
scatter_plot(latent_representations=z,labels=label)

In [None]:
# Q4
with torch.no_grad():
    model = trainer.model
    model.eval()
    z=[];label=[]
    for x,y in trainer.val_loader:
        shape = x.shape
        x = x.reshape([shape[0], -1])
        z_mu, z_var = model.encoder(x.to(trainer.device))
        z_ = model.reparameterise(z_mu, z_var)
        z += z_.cpu().tolist()
        label += y.cpu().tolist()
    z = np.asarray(z)
    label = np.asarray(label)

from VAE_starter import scatter_plot
scatter_plot(latent_representations=z,labels=label)

## Generating Images by Sampling Bottleneck Features

So here we can generate new images by sampling bottleneck features, and use the decoder to generate images. 

The code below provides an example of how to generate images by sampling bottleneck features. 

The `display_images_in_a_row` takes the following arguments:
* images: (N,28,28): N images of 28*28 as a numpy array
* file_path: file path name for where to store the figure
* display: display the image or not


In [None]:
# Q1
with torch.no_grad():
    samples = torch.randn(7, 2).to(trainer.device)
    samples = trainer.model.decoder(samples).cpu()

images = samples

from autoencoder_starter import display_images_in_a_row
display_images_in_a_row(images)

## Reconstructing Images

We can first retrieve the validation set and then pick 64 images (the first 64 images, though you can do it randomly).

We use the autoencoder to reconstruct the images and visualize them below.

In [None]:
images = trainer.get_val_set() # get the entire validation set
total_number = 64
images = images[:total_number]

from autoencoder_starter import display_images_in_a_row
print("Original images")
display_images_in_a_row(images.cpu())

In [None]:
with torch.no_grad():
    images = images.to(trainer.device)
    reconstructed = trainer.model(images).cpu()
print("Reconstructed images")
display_images_in_a_row(reconstructed)