<a href="https://colab.research.google.com/github/colinZejda/Summer2023_UCI_ML_Research/blob/main/4_Tutorial_convolutional_autoencoder_on_MNIST_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Autoencoder
*   An unsupervised deep learning algorithm that learns encoded representations of the input data, then reconstructs the same input as output
*   It consists of 2 networks: an encoder and a decoder
*   **Encoder**: compresses a high-dimensional input into a low-dimensional latent code (aka encoded space) to extract the most relevant information from it
*   **Decoder**: decompresses the encoded data, recreates the original input


Goal of autoencoder architectures:
*   to maximize the information when
 encoding, and minimizing the reconstruction error


Reconstruction error:
*   aka reconstruction loss
*   it is the mean-squared error (MSE) between the reconstructed input and the original input when the input is real-valued (continuous). Otherwise, for categorical data we use the cross-entropy (CE) loss function instead.








Implementation in PyTorch
1. import libraries and MNIST dataset
2. define convolutional autoencoder
3. initialize loss function and optimizer
4. train model and evaluate model
5. generate new samples from the latent code
6.   visualize the latent space with t-SNE





In [None]:
# IMPORTS
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torch.optim as optim

import torchvision
from torchvision import transforms

In [None]:
# SET UP MNIST DATA
    # train data = training + validation (80%)
    # test data = testing only, after training is done (20%)

data_dir = 'dataset'
train_dataset = torchvision.datasets.MNIST(data_dir, train=True, download=True)   # download datasets
test_dataset = torchvision.datasets.MNIST(data_dir, train=False, download=True)

train_transform = transforms.Compose([transforms.ToTensor()])    # define transforms
test_transform = transforms.Compose([transforms.ToTensor])       # we don't normalize bc these are colored images

train_dataset.transform = train_transform                        # apply transforms to datasets
test_dataset.transform = test_transform

m = len(train_dataset)
train_data, val_data = random_split(train_dataset, [int(m - m*0.2), int(m*0.2)])   # create 80-20 split of training data into: training + validation data
batch_size = 256

train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val_data, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)


In [None]:
# DEFINE CONVOLUATIONAL AUTOENCODER
    # composed of 2 classes: one for encoder, another for decoder
    # encoder has 3 conv layers and 2 fc layers, with batch norm layers added as regularizers
    # decoder has same architecture, but in inverse order

class Encoder(nn.Module):

  def __init__(self, encoded_space_dim, fc2_input_dim):
    super().__init__()

    # convolutional section
    self.encoder_cnn = nn.Sequential(
        nn.Conv2d(1, 8, 3, stride=2, padding=1),
        nn.ReLU(True),
        nn.Conv2d(8, 16, 3, stride=2, padding=1),
        nn.BatchNorm2d(16),
        nn.ReLU(True),
        nn.Conv2d(16, 32, 3, stride=2, padding=0),
        nn.ReLU(True)
    )

    # flatten layer
    self.flatten = nn.Flatten(start_dim=1)

    # linear section
    self.encoder_lin = nn.Sequential(
        nn.Linear(3 * 3 * 32, 128),
        nn.ReLU(True),
        nn.Linear(128, encoded_space_dim)
    )

  def forward(self, x):
    x = self.encoder_cnn(x)
    x = self.flatten(x)
    x = self.encoder_lin(x)
    return x


class Decoder(nn.Module):       # inverse architecture of Encoder

  def __init__(self, encoded_space_dim, fc2_input_dim):
    super().__init__()

    self.decoder_lin = nn.Sequential(
        nn.Linear(encoded_space_dim, 128),
        nn.ReLU(True),
        nn.Linear(128, 3 * 3  * 32),
        nn.ReLU(True)
    )

    self.unflatten = nn.Unflatten(dim=1, unflattened_size=(32, 3, 3))

    self.decoder_conv = nn.Sequential(
        nn.ConvTranspose2d(32, 16, 3, stride=2, output_padding=0),
        nn.BatchNorm2d(16),
        nn.ReLU(True),
        nn.ConvTranspose2d(16, 8, 3, stride=2, padding=1, output_padding=1),
        nn.BatchNorm2d(8),
        nn.ReLU(True),
        nn.ConvTranspose2d(8, 1, 3, stride=2, padding=1, output_padding=1)
    )

  def forward(self, x):
    x = self.decoder_lin(x)
    x = self.unflatten(x)
    x = self.decoder_conv(x)
    x = torch.sigmoid(x)           # squishes values to between (0, 1), sigmoid(x) = 1 / (1 + e^-x), used for binary classification
    return x


In [None]:
# LOSS FUNCTION AND OPTIMIZER
loss_fn = torch.nn.MSELoss()     # loss function
lr = 1e-3                        # learning rate

torch.manual_seed(0)             # set seed for reproducible randomness

d = 4
encoder = Encoder(encoded_space_dim=d, fc2_input_dim=128)   # initialize the 2 networks
decoder = Decoder(encoded_space_dim=d, fc2_input_dim=128)

params_to_optimize = [
    {'params' : encoder.parameters()},
    {'params' : decoder.parameters()}
]

optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-5)  # adam optimizer

# check if GPU is available
device = torch.device("cuda") if torch.cuda.is_available else torch.device("cpu")
print(f'Selected device: {device}')

# move both encoder and decoder to selected device
encoder.to(device)
decoder.to(device)

In [None]:
# FUNCTION TO TRAIN THE MODEL
def train_epoch(encoder, decoder, dvice, dataloader, loss_fn, optimizer):

  # set rain mode fro both the encoder and decoder
  encoder.train()
  decoder.train()
  train_loss = list()

  # iterate the dataloader
  # we don't need labels here, this is unsupervised learning
  # dataloader gives us tuple of (data, labels), we ignore labels with _
  for image_batch, _ in dataloader:
    image_batch = image_batch.to(device)         # move tensor to proper device

    encoded_data = encoder(image_batch)          # encode data
    decoded_data = decoder(encoded_data)         # decode data

    loss = loss_fn(decoded_data, image_batch)    # this is really cool-- we compute loss by checking difference between decoded data and original data (to check how good the compression + decompression was)

    optimizer.zero_grad()                        # backprop
    loss.backward()
    optimizer.step()

    print('\t partial train loss (single batch): %f' %(loss.data))
    train_loss.append(loss.detach.cpu().numpy())

  return np.mean(train_loss)

In [None]:
# FUNCTION TO EVALUATE THE MODEL
def test_epoch(encoder, decoder, device, dataloader, loss_fn):

  encoder.eval()                  # set evaluation mode
  decoder.eval()

  with torch.no_grad():           # don't change weights, aka don't track gradients

    conc_out = list()             # lists to store outputs for each batch
    conc_label = list()

    for image_batch, _ in dataloader:

      image_batch = image_batch.to(device)       # move tensor to proper device

      encoded_data = encoder(image_batch)        # encode + decode data
      decoded_data = decoder(encoded_data)

      conc_out.append(decoded_data.cpu())        # append network output and original image to lists
      conc_label.append(image_batch.cpu())

    # create single tensor with all values in the lists
    conc_out = torch.cat(conc_out)
    conc_label = torch.cat(conc_label)

    # evaluate global loss
    val_loss = loss_fn(conc_out, conc_label)

  return val_loss.data


In [None]:
# FUNCTION TO PLOT IMAGES
# to see if the autoencoder is learning from the input images
def plot_ae_outputs(encoder, decoder, n=10):

  plt.figure(figsize=(16, 4.5))
  targets = test_dataset.targets.numpy()
  t_idx = {i : np.where(targets == i)[0][0] for i in range(n)}

  for i in range(n):

    ax = plt.subplot(2, n, i+1)
    img = test_dataset[t_idx[i]][0].unsqueeze(0).to(device)        # extract ith image from test datset, then increase by 1 dimension on 0th axis, necessary to pass img to autoencoder
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
      rec_img = decoder(encoder(img))                              # obtain reconstructed image

    plt.imshow(img.cpu().squeeze().numpy(), cmap='gist_gray')      # plots the original image
        # squeeze() to remove the dimension added before
        # numpy() to transform the tensor into ndarray, so we can plt.imshow() it
        # cpu() returns a copy of the tensor object into CPU memory
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    if i == n//2:
      ax.set_title('Original images')

    ax = plt.subplot(2, n, i+1+n)
    plt.imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray')  # plots the reconstructed image
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    if i == n//2:
      ax.set_title('Reconstructed images')

  plt.show()




In [None]:
# PERFORM TRAINING + EVALUATION
num_epochs = 30
diz_loss = {'train_loss':[], 'val_loss':[]}

for epoch in range(num_epochs):

  train_loss = train_epoch(encoder, decoder, device, train_loader, loss_fn, optim)
  val_loss = test_epoch(encoder, decoder, device, train_loader, loss_fn, optim)

  print('\n Epoch {}/{} \t train loss {} \t val loss {}'.format(epoch+1, num_epochs, train_loss, val_loss))

  diz_loss['train_loss'].append(train_loss)
  diz_loss['val_loss'].append(val_loss)

  plot_ae_outputs(encoder, decoder, n=10)

In [None]:
# FINAL EVAL-- USING TEST SET
test_epoch(encoder, decoder, device, test_loader, loss_fn).item()

In [None]:
# RECONSTRUCTION LOSSES DECREASE OVER EPOCHS (VISUALIZATION)
plt.figure(figsize=(10, 8))
plt.semilogy(diz_loss['train_loss'], label='Train')
plt.xlabel('Epoch')
plt.ylabel('Average Loss')
plt.legend()
plt.show()

In [None]:
# GENERATE NEW SAMPLES FROM RANDOM LATENT CODE
    # we will sample randomly from a normal distribution with the mean and std of the encoded data
    # then we'll pass the samples to the decoder to create reconstructed images

def show_image(img):
  npimg = img.numpy()
  plt.imshow(np.transpose(npimg, (1, 2, 0)))

encoder.eval()
decoder.eval()

with torch.no_grad():

  # calc mean and std of latent (encoded/compressed) code, generated taking in test images as inputs
  images, labels = iter(test_loader).next()
  images = images.to(device)
  latent = encoder(images)
  latent = latent.cpu()

  mean = latent.mean(dim=0)
  print(mean)
  std = (latent - mean).pow(2).mean(dim=0).sqrt()
  print(std)

  # sample latent vectors from the normal distribution
  latent = torch.randn(128, d) * std + mean

  # reconstruct images from the random latent vectors
  latent = latent.to(device)
  img_recon = decoder(latent)
  img_recon = img_recon.cpu()

  fig, ax = plt.subplots(figsize=(20, 8.5))
  show_image(torchvision.utils.make_grid(img_recon[:100], 10, 5))
  plt.show()

# some of the digits make no sense
# this is bc the autoencoder latent space is extremely irregular, which is why close points in the latent space can produce different and meaningless patterns
# this is why the autoencoder isn't great for generative purposes


In [None]:
# VISUALIZE LATENT SPACE WITH t-SNE

# first, create encoded samples using the test set
from tqdm import tqdm
encoded_samples = []
for sample in tqdm(test_dataset):
  img = sample[0].unsqueeze(0).to(device)
  label = sample[1]

  # encode image
  encoder.eval()
  with torch.no_grad():
    encoded_img = encoder(img)

  # append to list
  encoded_img = encoded_img.flatten().cpu().numpy()
  encoded_sample = {f"Enc. Variable {i}":enc for i, enc in enumerate(encoded_img)}
  encoded_sample['label'] = label
  encoded_samples.append(encoded_sample)
encoded_samples = pd.DataFrame(encoded_samples)
encoded_samples


# then, plot the latent space representation using plotly epxress library
    # this is really really cool
    # we can see a scatterplot of where the dots representing each digit lie, and their overlap
import plotly.express as px
px.scatter(encoded_samples, x='Enc. Variable 0', y='Enc. Variable 1', color=encoded_samples.label.astype(str), opacity=0.7)


# then, use TSNE,
# TSNE is used for dimensionality reduction in order to visualize the latent code in a 2d space
    # this is why n_components = 2, for 2D visualization
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2)
tsne_results = tsne.fit_transform(encoded_samples.drop(['label'], axis=1))
fig = px.scatter(tsne_results, x=0, y=1,
                 color=encoded_samples.label.astype(str),
                 labels={'0': 'tsne-2d-one', '1': 'tsne-2d-two'})
fig.show()
# we can very clearly see that each digit distinguishes itself from another (with some exceptions)

  0%|          | 0/10000 [00:00<?, ?it/s]


TypeError: ignored