# From PCA to Autoencoders: Unsupervised representation learning

ISAE-Supaero filière SDD

Florent FOREST

forest@lipn.univ-paris13.fr

---

## Preparation

In [None]:
!git clone https://github.com/FlorentF9/Supaero-mlautoencoders.git

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
import numpy as np
import matplotlib.pyplot as plt
print('Torch version ', torch.__version__)

In [None]:
# check is CUDA is available
cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
device

In [None]:
batch_size = 128
train_loader = DataLoader(MNIST('../data', train=True, download=True, transform=transforms.ToTensor()), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(MNIST('../data', train=False, transform=transforms.ToTensor()), batch_size=batch_size, shuffle=True)

## Let's code 1

<div style="color: #191; font-size: 16px; background-color: #dfd; padding: 20px; border-radius: 15px">
<p><b>Exercise</b></p>
<p>Implement a multi-layer (MLP) autoencoder using PyTorch by completing the following code snippet.</p>
<ol>
    <li>Try changing the number of layers, units per layer, and activation function, and observe the impact on the loss function.</li>
    <li>The following code uses a <b>gaussian MLP</b>, i.e. <b>linear output activation</b> + <b>mean squared error (MSE) loss</b>. Could we also use a Bernoulli distribution, with a <b>sigmoid output activation</b> + <b>cross-entropy loss</b>?</li>
    <li>Visualize the latent space using t-SNE, and compare it with the original data.</li>
</ol>
    
<p>If you have time left, you can also implement a <b>convolutional autoencoder</b>. You might guess it, convolutional and pooling layers will replace the fully-connected layers in the encoder. And what about the decoder?</p>
</div>

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(784, 200),
            nn.ReLU(True),
            nn.Linear(200, 10)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(10, 200),
            nn.ReLU(True),
            nn.Linear(200, 784)
        )

    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        return self.decode(self.encode(x))

### Training and evaluation

In [None]:
model = Autoencoder().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

loss_function = nn.MSELoss(reduction='sum')
#loss_function = nn.BCELoss(reduction='sum')

In [None]:
# training
epochs = 30
train_losses = []
test_losses = []

for epoch in range(epochs):
    model.train()
    train_loss = 0
    for i, (X_batch, _) in enumerate(train_loader):
        X_batch = X_batch.to(device).view(-1, 784)
        # forward
        X_pred = model(X_batch)
        loss = loss_function(X_pred, X_batch)
        # backward
        optimizer.zero_grad()
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    train_losses.append(train_loss / len(train_loader.dataset))
            
    print('Epoch: {} - Train loss: {}'.format(epoch, train_losses[-1]))

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(train_losses, label='Train')
plt.legend();

In [None]:
# evaluation
model.eval()

test_loss = 0
with torch.no_grad():
    for i, (X_batch, _) in enumerate(test_loader):
        X_batch = X_batch.to(device).view(-1, 784)
        # forward
        X_pred = model(X_batch)
        test_loss += loss_function(X_pred, X_batch).item()
        
        if i == 0:
            n = min(X_batch.size(0), 8)
            comparison = torch.cat([X_batch.view(batch_size, 1, 28, 28)[:n], X_pred.view(batch_size, 1, 28, 28)[:n]])
            img = comparison.cpu().numpy()

print('Test loss: {}'.format(test_loss / len(test_loader.dataset)))

In [None]:
# visualize original VS reconstructed samples
n = img.shape[0] // 2
fig, ax = plt.subplots(2, n, figsize=(12, 3))
for i in range(n):
    ax[0][i].imshow(img[i, 0], cmap='gray')
    ax[1][i].imshow(img[n + i, 0], cmap='gray')
    ax[0][i].axis('off')
    ax[1][i].axis('off')
plt.subplots_adjust(hspace=0.0, wspace=0.0)

### Latent space visualization

In [None]:
from sklearn.manifold import TSNE

In [None]:
test_list = list(iter(test_loader))
X_test = np.vstack([x[0].view(-1, 784).numpy() for x in test_list])
y_test = np.vstack([x[1].view(-1, 1).numpy() for x in test_list])
del test_list
X_test.shape, y_test.shape

In [None]:
# take a smaller sample for running t-SNE
N = 1000
sample_idx = np.random.permutation(np.arange(X_test.shape[0]))[:N]
X_sample = X_test[sample_idx]
y_sample = y_test[sample_idx].squeeze(1)

In [None]:
tsne = TSNE(n_components=2)
# apply t-SNE on raw samples
raw_tsne = tsne.fit_transform(X_sample)

In [None]:
with torch.no_grad():
    Z_sample = model.encode(torch.Tensor(X_sample).to(device)).cpu().numpy()

In [None]:
# apply t-SNE on encoded samples
ae_tsne = tsne.fit_transform(Z_sample)

In [None]:
_, ax = plt.subplots(1, 2, figsize=(20,10))
for label in range(10):
    ax[0].scatter(raw_tsne[y_sample == label, 0], raw_tsne[y_sample == label, 1], label=label, cmap='Accent')
    ax[1].scatter(ae_tsne[y_sample == label, 0], ae_tsne[y_sample == label, 1], label=label, cmap='Accent')
ax[0].legend()
ax[0].set_title('t-SNE visualization of raw MNIST')
ax[1].legend()
ax[1].set_title('t-SNE visualization of MNIST in AE latent space');

### Impact on $k$-means clustering

In [None]:
from sklearn.utils.linear_assignment_ import linear_assignment

def _contingency_matrix(y_true, y_pred):
    w = np.zeros((y_true.max() + 1, y_pred.max() + 1), dtype=np.int64)
    for c, k in zip(y_true, y_pred):
        w[c, k] += 1  # w[c, k] = number of c-labeled samples in cluster k
    return w

def clustering_accuracy(y_true, y_pred):
    """Unsupervised clustering accuracy.

    Can only be used if the number of target classes in y_true is equal to the number of clusters in y_pred.

    Parameters
    ----------
    y_true : array, shape = [n]
        true labels.
    y_pred : array, shape = [n]
        predicted cluster ids.

    Returns
    -------
    accuracy : float in [0,1] (higher is better)
    """
    y_true = y_true.astype(np.int64)
    y_pred = y_pred.astype(np.int64)
    w = _contingency_matrix(y_true, y_pred).T
    ind = linear_assignment(w.max() - w)
    return np.sum([w[i, j] for i, j in ind]) / y_true.size

In [None]:
from sklearn.cluster import KMeans
y_pred = KMeans(n_clusters=10, n_jobs=-1).fit_predict(X_test)

with torch.no_grad():
    Z_test = model.encode(torch.Tensor(X_test).to(device)).cpu().numpy()
y_pred_ae = KMeans(n_clusters=10, n_jobs=-1).fit_predict(Z_test)

In [None]:
clustering_accuracy(y_test, y_pred)

In [None]:
clustering_accuracy(y_test, y_pred_ae)

---

## Let's code 2

<div style="color: #191; font-size: 16px; background-color: #dfd; padding: 20px; border-radius: 15px">
<p><b>Exercise</b></p>
<p>Implement a variational autoencoder (VAE) using PyTorch by completing the following code snippet.</p>
<ol>
    <li>Implement the reparameterization.</li>
    <li>Implement the VAE loss function. As in the standard AE, the reconstruction error can be either a <b>mean squared error (MSE)</b> (gaussian decoder) or a <b>binary cross-entropy loss</b> (Bernoulli decoder)</li>
    <li>Visualize the latent space using t-SNE, and compare it with the original data.</li>
</ol>
    
<p>If you have time left, you can also implement a <b>convolutional VAE</b>
</div>

In [None]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(784, 400),
            nn.ReLU(True)
        )
        self.mu = nn.Linear(400, 20)
        self.logvar = nn.Linear(400, 20)
        
        self.decoder = nn.Sequential(
            nn.Linear(20, 400),
            nn.ReLU(True),
            nn.Linear(400, 784)
        )

    def encode(self, x):
        h = self.encoder(x)
        return self.mu(h), self.logvar(h)

    def decode(self, z):
        return self.decoder(z)

    def reparameterize(self, mu, logvar):
        # TODO
        raise NotImplementedError

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

### Training and evaluation

In [None]:
model = VAE().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

def loss_function(decoded_x, x, mu, logvar):
    
    raise NotImplementedError
    
    # reconstruction error (MSE or BCE)
    RE = 0.0 # TODO
    
    # Kullback-Leibler divergence error
    # see VAE paper: Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # https://arxiv.org/abs/1312.6114
    KLD = 0.0 # TODO

    return RE + KLD

In [None]:
# training
epochs = 30
train_losses = []
test_losses = []

for epoch in range(epochs):
    model.train()
    train_loss = 0
    for i, (X_batch, _) in enumerate(train_loader):
        X_batch = X_batch.to(device).view(-1, 784)
        # forward
        X_pred, mu, logvar = model(X_batch)
        loss = loss_function(X_pred, X_batch, mu, logvar)
        # backward
        optimizer.zero_grad()
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    train_losses.append(train_loss / len(train_loader.dataset))
            
    print('Epoch: {} - Train loss: {}'.format(epoch, train_losses[-1]))

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(train_losses, label='Train')
plt.legend();

In [None]:
# evaluation
model.eval()

test_loss = 0
with torch.no_grad():
    for i, (X_batch, _) in enumerate(test_loader):
        X_batch = X_batch.to(device).view(-1, 784)
        # forward
        X_pred, mu, logvar = model(X_batch)
        test_loss += loss_function(X_pred, X_batch, mu, logvar).item()
        
        if i == 0:
            n = min(X_batch.size(0), 8)
            comparison = torch.cat([X_batch.view(batch_size, 1, 28, 28)[:n], X_pred.view(batch_size, 1, 28, 28)[:n]])
            img = comparison.cpu().numpy()

print('Test loss: {}'.format(test_loss / len(test_loader.dataset)))

In [None]:
# visualize original VS reconstructed samples
n = img.shape[0] // 2
fig, ax = plt.subplots(2, n, figsize=(12, 3))
for i in range(n):
    ax[0][i].imshow(img[i, 0], cmap='gray')
    ax[1][i].imshow(img[n + i, 0], cmap='gray')
    ax[0][i].axis('off')
    ax[1][i].axis('off')
plt.subplots_adjust(hspace=0.0, wspace=0.0)

In [None]:
# generate some samples
n_samples = 8
with torch.no_grad():
    z = torch.randn(n_samples, 20).to(device)
    sample = model.decode(z).view(-1, 28, 28).cpu().numpy()

_, ax = plt.subplots(1, n_samples, figsize=(12, 4))
for i in range(n_samples):
    ax[i].imshow(sample[i], cmap='gray')
    ax[i].axis('off')

### Latent space visualization

In [None]:
with torch.no_grad():
    Z_sample = model.encode(torch.Tensor(X_sample).to(device))[0].cpu().numpy()

In [None]:
# apply t-SNE on encoded samples
vae_tsne = tsne.fit_transform(Z_sample)

In [None]:
_, ax = plt.subplots(1, 2, figsize=(20,10))
for label in range(10):
    ax[0].scatter(raw_tsne[y_sample == label, 0], raw_tsne[y_sample == label, 1], label=label, cmap='Accent')
    ax[1].scatter(vae_tsne[y_sample == label, 0], vae_tsne[y_sample == label, 1], label=label, cmap='Accent')
ax[0].legend()
ax[0].set_title('t-SNE visualization of raw MNIST')
ax[1].legend()
ax[1].set_title('t-SNE visualization of MNIST in VAE latent space');

### Impact on $k$-means clustering

In [None]:
y_pred = KMeans(n_clusters=10, n_jobs=-1).fit_predict(X_test)

with torch.no_grad():
    Z_test = model.encode(torch.Tensor(X_test).to(device))[0].cpu().numpy()  
y_pred_vae = KMeans(n_clusters=10, n_jobs=-1).fit_predict(Z_test)

In [None]:
clustering_accuracy(y_test, y_pred)

In [None]:
clustering_accuracy(y_test, y_pred_vae)