### Denoising Autoencoders And Where To Find Them

Today we're going to train deep autoencoders and deploy them to faces and search for similar images.

### Download data

In [None]:
#!L
from gfile import download_list

download_list(
    url='https://drive.google.com/file/d/1F96x4LDbsTZGMMq81fZr7aduJCe8N95O',
    filename='celeba.zip',
    target_dir='.'
)

In [None]:
#!L:bash
unzip celeba.zip

In [None]:
#!L
import numpy as np
from matplotlib import pyplot as plt

import torch
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision.utils import make_grid


EPOCHS = 100
BATCH_SIZE = 32
LEARNING_RATE = 1e-3

LATENT_DIMENSION = 4
BATCH_SIZE = 32

device = torch.device("cuda")

## Prepare dataset

In [None]:
#!L
class CropCelebA64:
    
    def __call__(self, pic):
        new_pic = pic.crop((15, 40, 178 - 15, 218 - 30))
        return new_pic

    def __repr__(self):
        return self.__class__.__name__ + '()'

In [None]:
#!L
train_dataset = torchvision.datasets.CelebA(
    root='celeba',
    split='train',
    transform=torchvision.transforms.Compose([
        CropCelebA64(),
        torchvision.transforms.Resize(64),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        
    ]),
)

validation_dataset = torchvision.datasets.CelebA(
    root='celeba',
    split='valid',
    transform=torchvision.transforms.Compose([
        CropCelebA64(),
        torchvision.transforms.Resize(64),
        torchvision.transforms.ToTensor(),
        
    ]),
)

---

In [None]:
#!L
samples = torch.stack([train_dataset[i][0] for i in range(32, 48)], dim=0)

plt.figure(figsize=(10, 10))
plt.imshow(make_grid(samples, nrow=4).permute(1, 2, 0))
plt.show()

### Autoencoder architecture

Let's design autoencoder as a single lasagne network, going from input image through bottleneck into the reconstructed image.

<img src="http://nghiaho.com/wp-content/uploads/2012/12/autoencoder_network1.png" width=640px>



## First step: PCA

Principial Component Analysis is a popular dimensionality reduction method. 

Under the hood, PCA attempts to decompose object-feature matrix $X$ into two smaller matrices: $W$ and $\hat W$ minimizing _mean squared error_:

$$\|(X W) \hat{W} - X\|^2_2 \to_{W, \hat{W}} \min$$
- $X \in \mathbb{R}^{n \times m}$ - object matrix (**centered**);
- $W \in \mathbb{R}^{m \times d}$ - matrix of direct transformation;
- $\hat{W} \in \mathbb{R}^{d \times m}$ - matrix of reverse transformation;
- $n$ samples, $m$ original dimensions and $d$ target dimensions;

In geometric terms, we want to find d axes along which most of variance occurs. The "natural" axes, if you wish.

![](https://upload.wikimedia.org/wikipedia/commons/thumb/9/90/PCA_fish.png/256px-PCA_fish.png)


PCA can also be seen as a special case of an autoencoder.

* __Encoder__: X -> Dense(d units) -> code
* __Decoder__: code -> Dense(m units) -> X

Where Dense is a fully-connected layer with linear activaton:   $f(X) = W \cdot X + \vec b $


Note: the bias term in those layers is responsible for "centering" the matrix i.e. substracting mean.

**Hint**: you may need nn.Flatten

In [None]:
#!L
class PCAAutoEncoder(nn.Module):
    """
    Here we define a simple linear autoencoder as described above.
    We also flatten and un-flatten data to be compatible with image shapes
    """
    
    def __init__(self, code_size=32):
        super(PCAAutoEncoder, self).__init__()
        
        self.enc = # <Your code: define encoder layer>
        self.dec = # <Your code: define decoder layer>
    
    def batch_loss(self, batch):
        reconstruction = # <Your code: define reconstruction object>
        return torch.mean((batch - reconstruction) ** 2)

### Train the model

As usual, iterate minibatches of data and call train_step, then evaluate loss on validation data.

In [None]:
#!L
def train(model, dataset, num_epoch=32):
    model.to(device)
    optimizer = optim.Adamax(model.parameters(), lr=0.002)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
    
    for epoch in range(num_epoch):
        losses = []
        
        for i, (batch, _) in enumerate(dataloader):
            optimizer.zero_grad()
            loss = model.batch_loss(batch.to(device))
            loss.backward()
            losses.append(loss.detach().cpu().numpy())
            optimizer.step()
        
        print(f"#{epoch + 1}, Train loss: {np.mean(losses)}")

In [None]:
#!L
def visualize(img, model):
    """Draws original, encoded and decoded images"""
    code = model.enc(img[None].to(device)
    reco = model.dec(code)

    plt.subplot(1, 3, 1)
    plt.title("Original")
    plt.imshow(img.cpu().numpy().transpose([1, 2, 0]).clip(0, 1))

    plt.subplot(1, 3, 2)
    plt.title("Code")
    plt.imshow(code.cpu().detach().numpy().reshape([code.shape[-1] // 2, -1]))

    plt.subplot(1, 3, 3)
    plt.title("Reconstructed")
    plt.imshow(reco[0].cpu().detach().numpy().transpose([1, 2, 0]).clip(0, 1))
    plt.show()


In [None]:
#!L
aenc = PCAAutoEncoder()
train(aenc, train_dataset, 40)

In [None]:
#!L
dataloader_test = DataLoader(validation_dataset, batch_size=BATCH_SIZE)
scores = []

for i, (batch, _) in enumerate(dataloader_test):
    scores.append(aenc.batch_loss(batch.to(device)).data.cpu().numpy())

print(np.mean(scores))

In [None]:
#!L
for i in range(5):
    img = validation_dataset[i][0]
    visualize(img, aenc)


### Going deeper

PCA is neat but surely we can do better. This time we want you to build a deep autoencoder by... stacking more layers.

In particular, your encoder and decoder should be at least 3 layers deep each. You can use any nonlinearity you want and any number of hidden units in non-bottleneck layers provided you can actually afford training it.

![layers](https://pbs.twimg.com/media/CYggEo-VAAACg_n.png:small)

A few sanity checks:
* There shouldn't be any hidden layer smaller than bottleneck (encoder output).
* Don't forget to insert nonlinearities between intermediate dense layers.
* Convolutional layers are good idea. To undo convolution use nn.Upsample + nn.Conv2d
* Adding activation after bottleneck is allowed, but not strictly necessary.

In [None]:
#!L
class DeepPCAAutoEncoder(nn.Module):
    def __init__(self, code_size=32):
        super(DeepPCAAutoEncoder, self).__init__()
        
        self.enc = #<Your code: define encoder as per instructions above>
        self.dec = #<Your code: define decoder as per instructions above>
    
    def batch_loss(self, batch):
        reconstruction = #<Your code: define reconstruction object>
        return torch.mean((batch - reconstruction)**2)

In [None]:
#!L
aenc_deep = DeepPCAAutoEncoder()
train(aenc_deep, train_dataset, 50)

Training may take long, it's okay.

**Check autoencoder shapes along different code_sizes. Check architecture of you encoder-decoder network is correct**

In [None]:
#!L
def get_dim(layer): return np.prod(layer.output_shape[1:])


for code_size in [1, 8, 32, 128, 512, 1024]:
    help_tensor = next(iter(DataLoader(train_dataset, batch_size=BATCH_SIZE)))
    model = DeepPCAAutoEncoder(code_size).to(device)
    encoder_out = model.enc(help_tensor.to(device))
    decoder_out = model.dec(encoder_out)

    print("Testing code size %i" % code_size)

    assert encoder_out.shape[1:] == torch.Size(
        [code_size]), "encoder must output a code of required size"
    assert decoder_out.shape[1:] == img_shape, "decoder must output an image of valid shape"
    assert len(list(model.dec.children())) >= 6,  "decoder must contain at least 3 dense layers"

print("All tests passed!")

__Hint:__ if you're getting "Encoder layer is smaller than bottleneck" error, use code_size when defining intermediate layers. 

For example, such layer may have code_size*2 units.

** Lets check you model's score. You should beat value of 0.005 **

In [None]:
#!L
dataloader_test = DataLoader(validation_dataset, batch_size=BATCH_SIZE)
scores = []
for i, (batch) in enumerate(dataloader_test):
    scores.append(aenc_deep.batch_loss(batch.to(device)).data.cpu().numpy())
    encoder_out = aenc_deep.enc(batch.to(device))

reconstruction_mse = np.mean(scores)

assert reconstruction_mse <= 0.005, "Compression is too lossy. See tips below."
assert len(encoder_out.shape) == 2 and encoder_out.shape[1] == 32, \
    "Make sure encoder has code_size units"

print("Final MSE:", reconstruction_mse)
for i in range(5):
    img = validation_dataset[i][0]
    visualize(img, aenc_deep)

__Tips:__ If you keep getting "Compression to lossy" error, there's a few things you might try:

* Make sure it converged. Some architectures need way more than 32 epochs to converge. They may fluctuate a lot, but eventually they're going to get good enough to pass. You may train your network for as long as you want.

* Complexity. If you already have, like, 152 layers and still not passing threshold, you may wish to start from something simpler instead and go in small incremental steps.

* Architecture. You can use any combination of layers (including convolutions, normalization, etc) as long as __encoder output only stores 32 numbers per training object__. 

A cunning learner can circumvent this last limitation by using some manual encoding strategy, but he is strongly recommended to avoid that.

## Denoising AutoEncoder

Let's now make our model into a denoising autoencoder.

We'll keep your model architecture, but change the way it trains. In particular, we'll corrupt it's input data randomly before each epoch.

There are many strategies to apply noise. We'll implement two popular one: adding gaussian noise and using dropout.

In [None]:
#!L
def apply_gaussian_noise(X,sigma=0.1):
    """
    adds noise from normal distribution with standard deviation sigma
    :param X: image tensor of shape [batch,height,width,3]
    """
        
    #<Your code: define noise>
        
    return X + noise
    

**noise tests**

In [None]:
#!L
X = torch.stack([train_dataset[i][0] for i in range(100)], dim=0)
theoretical_std = (X.std() ** 2 + 0.5 ** 2) ** .5
our_std = apply_gaussian_noise(X, sigma=0.5).std()

assert abs(theoretical_std - our_std) < 0.01, \
    "Standard deviation does not match it's required value. Make sure you use sigma as std."
assert abs(apply_gaussian_noise(X, sigma=0.5).mean() - X.mean()) < 0.01, \
    "Mean has changed. Please add zero-mean noise"

In [None]:
#!L
plt.figure(figsize=(14, 4))
plt.subplot(1, 4, 1)
plt.imshow(X[0].permute([1, 2, 0]))
plt.subplot(1, 4, 2)
plt.imshow(apply_gaussian_noise(X[:1], sigma=0.01)[0].permute([1, 2, 0]).clip(0, 1))
plt.subplot(1, 4, 3)
plt.imshow(apply_gaussian_noise(X[:1], sigma=0.1)[0].permute([1, 2, 0]).clip(0, 1))
plt.subplot(1, 4, 4)
plt.imshow(apply_gaussian_noise(X[:1], sigma=0.5)[0].permute([1, 2, 0]).clip(0, 1))

In [None]:
#!L
def train_noise(model, dataset, num_epoch=50):
    # <Your code: define train function for denoising autoencoder as train function above>
    # <Think carefully, what should be ground-truth image for computing loss function>

__Note:__ You may change the way the training with noise is done, if you want. For example, you may change Dataloader or batch_loss function in model and leave train function unchanged.

In [None]:
#!L
aenc = PCAAutoEncoder()
train_noise(aenc, train_dataset, 50)

__Note:__ if it hasn't yet converged, increase the number of iterations.

__Bonus:__ replace gaussian noise with masking random rectangles on image.

**Let's evaluate!!!**

In [None]:
#!L
dataloader_test = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
scores = []

for i, (batch) in enumerate(dataloader_test):
    scores.append(aenc.batch_loss(batch.to(device)).data.cpu().numpy())
    encoder_out = aenc.enc(batch.to(device))

reconstruction_mse = np.mean(scores)

print("Final MSE:", reconstruction_mse)
for i in range(5):
    img = validation_dataset[i][0]
    visualize(img, aenc)

### Image retrieval with autoencoders

So we've just trained a network that converts image into itself imperfectly. This task is not that useful in and of itself, but it has a number of awesome side-effects. Let's see it in action.

First thing we can do is image retrieval aka image search. We we give it an image and find similar images in latent space. 

To speed up retrieval process, we shall use Locality-Sensitive Hashing on top of encoded vectors. We'll use scikit-learn's implementation for simplicity. In practical scenario, you may want to use [specialized libraries](https://erikbern.com/2015/07/04/benchmark-of-approximate-nearest-neighbor-libraries.html) for better performance and customization.

In [None]:
#!L
# encodes batch of images into a codes

codes =  # <Your code:encode all images in train_dataset>

In [None]:
#!L
assert codes.shape[0] == len(train_dataset)

In [None]:
#!L
from sklearn.neighbors import LSHForest
lshf = LSHForest(n_estimators=50).fit(codes.detach().cpu().numpy())

In [None]:
#!L
def get_similar(image, n_neighbors=5):
    assert len(image.shape) == 3, "image must be [batch,height,width,3]"

    code =  # <Your code: encode image into latent code>

    (distances,), (idx,) =  # <Your code: using lshf.kneighbors find nearest neighbors>

    return distances, train_dataset[idx][0]

In [None]:
#!L
def show_similar(image):

    distances, neighbors = get_similar(image, n_neighbors=11)

    plt.figure(figsize=[8, 6])
    plt.subplot(3, 4, 1)
    plt.imshow(image.cpu().numpy().transpose([1, 2, 0]))
    plt.title("Original image")

    for i in range(11):
        plt.subplot(3, 4, i+2)
        plt.imshow(neighbors[i].cpu().numpy().transpose([1, 2, 0]))
        plt.title("Dist=%.3f" % distances[i])
    plt.show()

In [None]:
#!L
show_similar(validation_dataset[2][0])

In [None]:
#!L
show_similar(validation_dataset[500][0])

In [None]:
#!L
show_similar(validation_dataset[66][0])

## Cheap image morphing


Here you should take two full-sized objects, code it and obtain intermediate object by decoding an intermixture code.

$Code_{mixt} = a1\cdot code1 + a2\cdot code2$

In [None]:
#!L
for _ in range(5):
    image1, image2 =  # <Your code:choose two image randomly>

    code1, code2 =  # <Your code:decode it>

    plt.figure(figsize=[10, 4])
    for i, a in enumerate(np.linspace(0, 1, num=7)):

        output_code =  # <Your code:define intermixture code>

        output_image = aenc.dec(output_code[None])[0]
        plt.subplot(1, 7, i+1)
        plt.imshow(output_image.cpu().detach().numpy().transpose([1, 2, 0]))
        plt.title("a=%.2f" % a)

    plt.show()

Of course there's a lot more you can do with autoencoders.

If you want to generate images from scratch, however, we recommend you our honor track seminar about generative adversarial networks.