# Autoencoders

A very interesting case emerges when we ask a neural network to reconstruct its input when we impose a bottleneck: a hidden layer with substantially fewer nodes than the input.  This forces the neural network to come up with a compressed representation of the data.  For example, we can imagine taking the MNIST dataset, and generating a neural network with one input layer, three hidden layers, and one output layer.  Then, we'll set the number of nodes in that middle hidden layer to two. This will force the model to come up with a distillation of the 784-dimensional dataset into a 2-D representation, and will also force the training of neural network weights to re-constitute that encoding back into the full 784D representation.  Note that we've seen this kind of thing before in the form of PCA.  However, that relied on a linear transformation, namely the orthogonal rotation of the coordinate system.  A neural network is much more powerful because it can perform non-linear transformations.  Let's see what this can do for us.

First we'll import Torch along with the MNIST dataset.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch

from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

X, y = fetch_openml('mnist_784', version=1, return_X_y=True, cache=True)
X/=255.
y = y.astype(int)
X,X_test,y,y_test = train_test_split(X,y,test_size=10000)

# Extract number of data points, and the height and width of the images for later reshaping
m = X.shape[0]
n = X.shape[1]

h = 28
w = 28

N = 10

X = torch.from_numpy(X)
X_test = torch.from_numpy(X_test)
y = torch.from_numpy(y)
y_test = torch.from_numpy(y_test)

X = X.to(torch.float32)
X_test = X_test.to(torch.float32)
y = y.to(torch.long)
y_test = y_test.to(torch.long)

device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu")

X = X.to(device)
X_test = X_test.to(device)
y = y.to(device)
y_test = y_test.to(device)

In [None]:
from torch.utils.data import TensorDataset

training_data = TensorDataset(X,y)
test_data = TensorDataset(X_test,y_test)

batch_size = 256
train_loader = torch.utils.data.DataLoader(dataset=training_data,
                                           batch_size=batch_size, 
                                           shuffle=True)

batch_size = 256
test_loader = torch.utils.data.DataLoader(dataset=test_data,
                                           batch_size=batch_size, 
                                           shuffle=False)

Next, we can specify some parameters of the network architecture.  For the so-called latent dimension (the bottleneck), we'll use only two nodes, meaning that any representation of a handwritten digit will be a vector in $\mathbb{R}^2$.  For the hidden layers between, we'll use 128 nodes.

$$\Sigma = \frac{1}{m} \sum_{i=1}^m (X - \bar{X})^T (X - \bar{X})$$

In [None]:
latent_dim = 64
intermediate_dim = 64

Next we'll specify our network architecture.  Because we'll want to re-use the weights of various layers in several models (namely functions from the input to the latent variables, from latent variables to outputs, and directly from input to output), I've structured this in such a way that we instantiate all the layers, then arrange them into the needed networks.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self,n,latent_dim,intermediate_dim):
        """
        This method is where you'll want to instantiate parameters.
        we do this by creating two linear transformation functions, l1 and l2, which 
        have encoded in it both the weight matrices W_1 and W_2, and the bias vectors
        """
        super(Encoder,self).__init__()
        self.l1 = nn.Linear(n,intermediate_dim) # Transform from input to hidden layer
        self.l2 = nn.Linear(intermediate_dim,latent_dim)
    
    def forward(self,x):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """
        a1 = self.l1(x)
        z1 = torch.relu(a1)   
        
        a2 = self.l2(z1)  
        return a2
    
class Decoder(nn.Module):
    def __init__(self,n,latent_dim,intermediate_dim):
        """
        This method is where you'll want to instantiate parameters.
        we do this by creating two linear transformation functions, l1 and l2, which 
        have encoded in it both the weight matrices W_1 and W_2, and the bias vectors
        """
        super(Decoder,self).__init__()
        self.l1 = nn.Linear(latent_dim,intermediate_dim) # Transform from input to hidden layer
        self.l2 = nn.Linear(intermediate_dim,n)
        self.drop = nn.Dropout(p=0.8)
    
    def forward(self,z):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """
        a1d = self.drop(z)
        a1 = self.l1(a1d)
        z1 = torch.relu(a1)   
        
        a2 = self.l2(z1)  
        return a2

In [None]:
encoder = Encoder(n,latent_dim,intermediate_dim)
encoder.to(device)

decoder = Decoder(n,latent_dim,intermediate_dim)
decoder.to(device)

for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
    layer.weight.data[:] = 0

criterion = torch.nn.MSELoss()

optimizer = torch.optim.Adam([e for e in encoder.parameters()]+[p for p in decoder.parameters()],lr=1e-3)

gamma = 1.0e-2
epochs = 50
# Loop over the data
for epoch in range(epochs):
    encoder.train()
    decoder.train()
    # Loop over each subset of data
    l = 0
    n_batches = 0
    for d,_ in train_loader:

        # Zero out the optimizer's gradient buffer
        optimizer.zero_grad()
        
        # Make a prediction based on the model
        latent = encoder(d)
        reconstruction = decoder(latent)
        
        # Compute the loss
        loss = criterion(reconstruction,d)
        for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
            loss += gamma*torch.mean(layer.weight**2)
        
        # Use backpropagation to compute the derivative of the loss with respect to the parameters
        loss.backward()
        
        # Use the derivative information to update the parameters
        optimizer.step()
        
        l += loss.item()
        n_batches += 1
    print(l/n_batches)

This takes a minute to train, but once its done, we can look at the filters that we produce, just like we did with PCA (and with classifying MLPs)

In [None]:
# Encoder weights (Good features to extract)
fig,axs = plt.subplots(nrows=1,ncols=5,figsize=(10,10))
for i,ax in enumerate(axs):    
    i = np.random.randint(intermediate_dim)
    ax.imshow([e for e in encoder.parameters()][0][i,:].detach().cpu().numpy().reshape((28,28)))#,vmin=-0.3,vmax=0.4)

plt.show()

We can also look at how effective our autoencoder is: how well do we map back to the inputs.

In [None]:
reconstruction = decoder(encoder(X_test))
index = 1
fig,axs = plt.subplots(ncols=2)
#index = np.random.randint(len(reconstruction))
axs[0].imshow(reconstruction[index].reshape(28,28).detach().cpu().numpy(),vmin=0,vmax=1)
axs[1].imshow(X_test[index].reshape(28,28).cpu().numpy(),vmin=0,vmax=1)

It does just OK.  But we are trying to distill all the variability in the MNIST dataset down into two numbers.  Let's look at those two numbers:

In [None]:
z_intermediate = encoder(X_test).detach().cpu().numpy()
plt.scatter(z_intermediate[:,0],z_intermediate[:,1],c=y_test.detach().cpu().numpy())
plt.colorbar()
plt.xlabel('z_0')
plt.ylabel('z_1')
fig = plt.gcf()
fig.set_size_inches(12,12)
plt.show()

The dots are colored by digit.  There are clear distinctions between groups, but still quite a bit of overlap.  Nonetheless, we could make a pretty good classifier out of this (in fact this is what we're doing with an MLP).  

We can also pick values of our latent variables, and generate a new digit using it, which is pretty neat.

In [None]:
z = torch.from_numpy(np.array([0.,-4])).to(torch.float32).to(device)
x_pred = decoder(z).reshape((28,28)).detach().cpu().numpy()
plt.imshow(x_pred)
plt.show()


Unfortunately, it's easy to get off the map.  If we wanted to randomly generate handwritten digits by sampling $z$, we'd have a hard time, because the distribution of z values doesn't follow anything specific.  This problem becomes especially acute in high latent-dimension (if the plot were 16D); there would be a very large separation between each of the classes, thus although sevens and ones should be "close" to each other, in high latent space, they tend not to be.

In [None]:
latent_dim = 128
intermediate_dim = 256

encoder = Encoder(n,latent_dim,intermediate_dim)
encoder.to(device)

decoder = Decoder(n,latent_dim,intermediate_dim)
decoder.to(device)

#for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
#    layer.weight.data[:] = 0

criterion = torch.nn.MSELoss()

optimizer = torch.optim.Adam([e for e in encoder.parameters()]+[p for p in decoder.parameters()],lr=1e-3)

gamma = 1.0e-4
epochs = 50
# Loop over the data
for epoch in range(epochs):
    #model.train()
    # Loop over each subset of data
    l = 0
    n_batches = 0
    for d,_ in train_loader:

        # Zero out the optimizer's gradient buffer
        optimizer.zero_grad()
        
        # Make a prediction based on the model
        latent = encoder(d)
        reconstruction = decoder(latent)
        
        # Compute the loss
        loss = criterion(reconstruction,d)
        for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
            loss += gamma*torch.mean(layer.weight**2)
        
        # Use backpropagation to compute the derivative of the loss with respect to the parameters
        loss.backward()
        
        # Use the derivative information to update the parameters
        optimizer.step()
        
        l += loss.item()
        n_batches += 1
    print(l/n_batches)

In [None]:
from mpl_toolkits.mplot3d import Axes3D

z_intermediate = encoder(X_test).detach().cpu().numpy()
fig = plt.figure()
fig.set_size_inches(20,20)
ax = fig.add_subplot(111, projection='3d')
p = ax.scatter(z_intermediate[:,0],z_intermediate[:,1],z_intermediate[:,2],c=y_test.detach().cpu().numpy(),alpha=0.8)
fig.colorbar(p)
plt.show()

In [None]:
fig,axs = plt.subplots(ncols=2)
index = np.random.randint(len(reconstruction))
axs[0].imshow(reconstruction[index].reshape(28,28).detach().cpu().numpy(),vmin=0,vmax=1)
axs[1].imshow(d[index].reshape(28,28).cpu().numpy(),vmin=0,vmax=1)

# Denoising autoencoder

The utility of this may not yet seem clear: we've developed a more expressive version of PCA.  However we can do some things that are more creative in this framework.  For example: 

In [None]:
mask = torch.cuda.FloatTensor(10, 10).uniform_() > 0.8
print(mask)

In [None]:
latent_dim = 32
intermediate_dim = 256

encoder = Encoder(n,latent_dim,intermediate_dim)
encoder.to(device)

decoder = Decoder(n,latent_dim,intermediate_dim)
decoder.to(device)

#for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
#    layer.weight.data[:] = 0

criterion = torch.nn.MSELoss()

optimizer = torch.optim.Adam([e for e in encoder.parameters()]+[p for p in decoder.parameters()],lr=1e-3)

gamma = 1.0e-4
epochs = 50
# Loop over the data
for epoch in range(epochs):
    #model.train()
    # Loop over each subset of data
    l = 0
    n_batches = 0
    for d,_ in train_loader:

        # Zero out the optimizer's gradient buffer
        optimizer.zero_grad()
        
        std = 0.2
        mask = torch.cuda.FloatTensor(*d.size()).uniform_() > rate
        d_noisy = d*mask
        
        # Make a prediction based on the model
        latent = encoder(d_noisy)
        reconstruction = decoder(latent)
        
        # Compute the loss
        loss = criterion(reconstruction,d)
        for layer in [encoder.l1,encoder.l2,decoder.l1,decoder.l2]:
            loss += gamma*torch.mean(layer.weight**2)
        
        # Use backpropagation to compute the derivative of the loss with respect to the parameters
        loss.backward()
        
        # Use the derivative information to update the parameters
        optimizer.step()
        
        l += loss.item()
        n_batches += 1
    print(l/n_batches)

In [None]:
mask = torch.cuda.FloatTensor(*X_test.size()).uniform_() > rate
X_test_noisy = X_test*mask
reconstruction = decoder(encoder(X_test_noisy))

fig,axs = plt.subplots(ncols=3,figsize=(10,10))
index = np.random.randint(len(reconstruction))
axs[0].imshow(reconstruction[index].reshape(28,28).detach().cpu().numpy(),vmin=0,vmax=1)
axs[1].imshow(X_test_noisy[index].reshape(28,28).cpu().numpy(),vmin=0,vmax=1)
axs[2].imshow(X_test[index].reshape(28,28).cpu().numpy(),vmin=0,vmax=1)