### Шумоподавляющие автоэнкодеры и что с ними делать

В этом примере мы будем обучать глубокие автоэнкодеры и применять их к изображениям лиц, в том числе для поиска похожих.

Изображения лиц представлены датасетом [lfw](http://vis-www.cs.umass.edu/lfw/).

In [None]:
import numpy as np
from lfw_dataset import fetch_lfw_dataset
from sklearn.model_selection import train_test_split
X, attr = fetch_lfw_dataset(use_raw=True,dimx=38,dimy=38)
X = X.astype('float32') / 256.0

img_shape = X.shape[1:]

X_train, X_test = train_test_split(X, test_size=0.1,random_state=42)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.title('sample image')
for i in range(6):
    plt.subplot(2,3,i+1)
    plt.imshow(X[i])

print("X shape:",X.shape)
print("attr shape:",attr.shape)

### Архитектура автоэнкодера

Представим автоэнкодер как две последовательных сети: энкодер и декодер.

<img src="http://nghiaho.com/wp-content/uploads/2012/12/autoencoder_network1.png" width=640px>

In [None]:
import torch
from torch import nn
from torch.autograd import Variable
from tqdm import tqdm
import sys

use_cuda = torch.cuda.is_available()

## Первый шаг: PCA

Метод главных компонент (Principal component analysis, или PCA) - популярный метод снижения размерности. 

Основой метода является такое разложение матрицы "объект-признак" $X$ на две матрицы меньшего размера: $W$ и $\hat W$, которое минимизирует _среднеквадратичную ошибку_:

$$\|(X W) \hat{W} - X\|^2_2 \to_{W, \hat{W}} \min$$
- $X \in \mathbb{R}^{n \times m}$ - Матрица объектов (**центрированная**);
- $W \in \mathbb{R}^{m \times d}$ - Матрица прямого преобразования;
- $\hat{W} \in \mathbb{R}^{d \times m}$ - Матрица обратного преобразования;
- $n$ - количество примеров, $m$ - оригинальная размерность и $d$ - целевая размерность;

В геометрическом смысле, мы хотим выделить $d$ осей, вдоль которых находится большая часть дисперсии. Эти оси также иногда называются "естественными".

![](https://upload.wikimedia.org/wikipedia/commons/thumb/9/90/PCA_fish.png/256px-PCA_fish.png)


PCA также может быть рассмотрен как особый случай автоэнкодера.

* __Энкодер__: X -> Linear(d нейронов) -> код
* __Декодер__: код -> Linear(m нейронов) -> X

Где Linear обозначает полносвязный слой с линейной активационной функцией:   $f(X) = W \cdot X + \vec b $

Замечание: в данных слоях смещение предназначено для вычета средного, или "центрирования" матрицы.

In [None]:
class Encoder(nn.Module):
    def __init__(self, img_shape, code_size=32):
        super(Encoder, self).__init__()
        self.enc = <your code here>
    
    def forward(self, input):
        input = input.view(input.size(0), -1)
        input = self.enc(input)
        return input
    
    
class Decoder(nn.Module):
    def __init__(self, img_shape, code_size=32):
        super(Decoder, self).__init__()
        self.dec = <your code here>
        
    def forward(self, input):
        input = self.dec(input)
        input = input.view((input.size(0),) + img_shape)
        return input

Объединяем их в одну модель

In [None]:
class Autoencoder(nn.Module):
    """
    Here we define a simple linear autoencoder as described above.
    We also flatten and un-flatten data to be compatible with image shapes
    """
    def __init__(self, img_shape, code_size=32):
        super(Autoencoder, self).__init__()
        self.encoder = <your code here>
        self.decoder = <your code here>
        
    def forward(self, input):
        input = self.encoder(input)
        input = self.decoder(input)
        return input

In [None]:
batch_size = 50
code_size = 32

autoencoder = Autoencoder(img_shape, code_size)

inp_image = Variable(torch.FloatTensor((batch_size,) + img_shape))
inp_code = Variable(torch.FloatTensor(batch_size, code_size))

if use_cuda:
    autoencoder.cuda()
    inp_image.cuda()
    inp_code.cuda()
    
autoenc_opt = torch.optim.Adamax(autoencoder.parameters())
autoenc_loss = <your code here> # mean square error

In [None]:
def iterate_minibatches(data, batch_size = 32, verbose = True):
    indices = np.random.permutation(np.arange(len(data)))
    batches = range(0, len(data), batch_size)
    if verbose: 
        batches = tqdm(batches)
    return (torch.from_numpy(data[indices[start_idx:start_idx + batch_size]]) for start_idx in batches)

def compute_loss(x_batch):
    autoencoder.eval()
    inp_image.data.resize_(x_batch.size()).copy_(x_batch)
    rec = autoencoder(inp_image)
    err = autoenc_loss(rec, inp_image)
    autoencoder.train()
    return err.data[0]

def evaluate(x):
    val_losses = list(map(compute_loss, iterate_minibatches(x,verbose=False)))
    return np.mean(val_losses)

In [None]:
for epoch in range(32):
    losses = []
    for x_batch in iterate_minibatches(X_train,batch_size=batch_size):
        autoencoder.zero_grad()
        inp_image.data.resize_(x_batch.size()).copy_(x_batch)
        rec = autoencoder(inp_image)
        err = autoenc_loss(rec, inp_image)
        err.backward()
        autoenc_opt.step()
        losses.append(err.data[0])
    print("#%i, Train loss: %.7f"%(epoch+1,np.mean(losses)),flush=True)

    print("#%i, Test loss: %.7f"%(epoch+1,evaluate(X_test)),flush=True)

In [None]:
def visualize(img,encoder,decoder):
    """Draws original, encoded and decoded images"""
    inp_image.data.resize_(img[None].shape).copy_(torch.from_numpy(img[None]))
    code = encoder(inp_image)
    reco = decoder(code).data.numpy()[0]
    code = code.data.numpy()[0]

    plt.subplot(1,3,1)
    plt.title("Original")
    plt.imshow(img)

    plt.subplot(1,3,2)
    plt.title("Code")
    plt.imshow(code.reshape([code.shape[-1]//2,-1]))

    plt.subplot(1,3,3)
    plt.title("Reconstructed")
    plt.imshow(reco.clip(0,1))
    plt.show()


In [None]:
score = evaluate(X_test)
print("Final MSE:",score)

encoder = list(autoencoder.children())[0]
decoder = list(autoencoder.children())[1]

for i in range(5):
    img = X_test[i]
    visualize(img,encoder,decoder)

### Отправляемся глубже

PCA уже даёт неплохие результаты, но мы в состоянии добиться лучшего. В этот раз мы построим глубокий автоэнкодер... добавив больше слоёв.

В частности, энкодер и декодер оба должны иметь как минимум 3 слоя. Допускается использование любой активационной функции и любого количества нейронов в слоях (кроме слоя с кодом). Сеть не должна быть слишком большой, чтобы её тренировка занимала не так много времени.

![layers](https://pbs.twimg.com/media/CYggEo-VAAACg_n.png:small)

Проверьте:
* Не должно быть скрытых слоёв размером меньше, чем слой с кодом (выходной слой энкодера).
* Не забывайте добавлять нелинейность между полносвязными слоями.
* Использование свёрточных сетей допускается, но не является необходимым. Чтобы отменить операцию свёртки используйте nn.ConvTranspose2d, операцию подвыборки (pooling) - nn.UpsamplingBilinear2d.
* Добавление активационной функции после слоя с кодом допускается, но не является строго необходимым.

In [None]:
class DeepEncoder(nn.Module):
    def __init__(self, img_shape, code_size=32):
        super(DeepEncoder, self).__init__()
        self.enc = nn.Sequential(
            < Your code here: define encoder architecture as per 
            instructions above>
        )
    
    def forward(self, input):
        input = input.view(input.size(0), -1)
        input = self.enc(input)
        return input
    
    
class DeepDecoder(nn.Module):
    def __init__(self, img_shape, code_size=32):
        super(DeepDecoder, self).__init__()
        self.dec = nn.Sequential(
            < Your code here: define decoder architecture as per 
            instructions above>
        )
        
    def forward(self, input):
        input = self.dec(input)
        input = input.view((input.size(0),) + img_shape)
        return input
    
    
class DeepAutoencoder(nn.Module):
    def __init__(self, img_shape, code_size=32):
        super(DeepAutoencoder, self).__init__()
        self.encoder = DeepEncoder(img_shape, code_size)
        self.decoder = Decoder(img_shape, code_size)
        
    def forward(self, input):
        input = self.encoder(input)
        input = self.decoder(input)
        return input

In [None]:
#Check autoencoder shapes along different code_sizes
for code_size in [1,8,32,128,512,1024]:
    encoder = DeepEncoder(img_shape, code_size)
    decoder = DeepDecoder(img_shape, code_size)
    encoder_layers = list(encoder.children())[0]
    decoder_layers = list(decoder.children())[0]
    print("Testing code size %i" % code_size)
    assert list(encoder_layers.children())[-1].out_features==code_size, "encoder must output a code of required size"
    assert list(decoder_layers.children())[-1].out_features==np.prod(img_shape), "decoder must output an image of valid shape"
    assert len(list(encoder.parameters()))>=6,     "encoder must contain at least 3 dense layers"
    assert len(list(decoder.parameters()))>=6,     "decoder must contain at least 3 dense layers"
    
    for layer in encoder_layers:
        if type(layer) == nn.Linear:
            assert layer.out_features >= code_size, "Encoder layer %s is smaller than bottleneck"%(layer)
    
    for layer in decoder_layers:
        if type(layer) == nn.Linear:
            assert layer.out_features >= code_size, "Decoder layer %s is smaller than bottleneck"%(layer)

print("All tests passed!")

__Подсказка:__ Если вы получаете ошибку "Encoder layer is smaller than bottleneck", используйте переменную code_size при объявлении промежуточных слоёв. 

Например, такой слой может содержать code_size*2 нейронов.

In [None]:
batch_size = 50
code_size = 32

autoencoder = DeepAutoencoder(img_shape, code_size)

inp_image = Variable(torch.FloatTensor((batch_size,) + img_shape))
inp_code = Variable(torch.FloatTensor(batch_size, code_size))

if use_cuda:
    autoencoder.cuda()
    inp_image.cuda()
    inp_code.cuda()
    
autoenc_opt = torch.optim.Adamax(autoencoder.parameters())
autoenc_loss = nn.MSELoss()

Обучение может потребовать около 20 минут.

In [None]:
for epoch in range(50):
    losses = []
    for x_batch in iterate_minibatches(X_train,batch_size=batch_size):
        autoencoder.zero_grad()
        inp_image.data.resize_(x_batch.size()).copy_(x_batch)
        rec = autoencoder(inp_image)
        err = autoenc_loss(rec, inp_image)
        err.backward()
        autoenc_opt.step()
        losses.append(err.data[0])
    print("#%i, Train loss: %.7f"%(epoch+1,np.mean(losses)),flush=True)

    print("#%i, Test loss: %.7f"%(epoch+1,evaluate(X_test)),flush=True)

In [None]:
reconstruction_mse = evaluate(X_test)
assert reconstruction_mse <= 0.01, "Compression is too lossy. See tips below."
print("Final MSE:", reconstruction_mse)

encoder = list(autoencoder.children())[0]
decoder = list(autoencoder.children())[1]

for i in range(5):
    img = X_test[i]
    visualize(img,encoder,decoder)

__Подсказка:__ Если вы получаете ошибку "Compression to lossy", вот несколько вещей, которые можно попробовать:

* Убедитесь, что процесс обучения сошёлся. Некоторые архитектуры могут потребовать для этого гораздо больше, чем 32 эпохи. Процесс обучения и метрика могут колебаться, но рано или поздно её значания станут достаточно хорошими для прохождения проверки. Можете тренировать сеть столько, сколько для этого потребуется.

* Сложность архитектуры. Если у вас уже, например, 152 слоя, и вы всё ещё не проходите проверку, начните с чего-то более простого и постепенно усложняйте архитекруру.

* Архитектура. Вы можете использовать любую комбинацию слоёв (включая свёрточные, нормализационные и так далее) до тех пор, пока __выходной слой энкодера содержит только 32 числа на обучающий пример__.

## Арифметика скрытого представления

In [None]:
for _ in range(5):
    image1,image2 = X_test[np.random.randint(0,len(X_test),size=2)]
    code1, code2 = encoder(Variable(torch.from_numpy(np.stack([image1,image2])))).data.numpy()

    plt.figure(figsize=[10,4])
    for i,a in enumerate(np.linspace(0,1,num=7)):

        output_code = code1*(1-a) + code2*(a)
        output_code = Variable(torch.from_numpy(output_code[None]))
        output_image = decoder(output_code).data.numpy()[0]

        plt.subplot(1,7,i+1)
        plt.imshow(output_image)
        plt.title("a=%.2f"%a)
        
    plt.show()

## Шумоподавляющие автоэнкодера

Превратим нашу модель в шумоподавляющий автоэнкодер.

Мы сохраним архитектуру модели неизменной, но внесём изменения в процесс её работы. В частности, мы внесём случайную ошибку в её входные данные перед началом каждой эпохи.

Есть много методов зашумления. Мы имплементируем два популярных: добавление гауссового шума и дропаута.

In [None]:
def apply_gaussian_noise(X,sigma=0.1):
    """
    adds noise from normal distribution with standard deviation sigma
    :param X: image tensor of shape [batch,height,width,3]
    """
    <Your code here>
    return X + noise
    

In [None]:
#noise tests
theoretical_std = (X[:100].std()**2 + 0.5**2)**.5
our_std = apply_gaussian_noise(X[:100],sigma=0.5).std()
assert abs(theoretical_std - our_std) < 0.01, "Standard deviation does not match it's required value. Make sure you use sigma as std."
assert abs(apply_gaussian_noise(X[:100],sigma=0.5).mean() - X[:100].mean()) < 0.01, "Mean has changed. Please add zero-mean noise"

In [None]:
plt.subplot(1,4,1)
plt.imshow(X[0])
plt.subplot(1,4,2)
plt.imshow(apply_gaussian_noise(X[:1],sigma=0.01)[0])
plt.subplot(1,4,3)
plt.imshow(apply_gaussian_noise(X[:1],sigma=0.1)[0])
plt.subplot(1,4,4)
plt.imshow(apply_gaussian_noise(X[:1],sigma=0.5)[0])

In [None]:
batch_size=50
code_size=512

autoencoder = DeepAutoencoder(img_shape, code_size=code_size)

inp_image = Variable(torch.FloatTensor((batch_size,) + img_shape))
inp_code = Variable(torch.FloatTensor(batch_size, code_size))

if use_cuda:
    autoencoder.cuda()
    inp_image.cuda()
    inp_code.cuda()
    
autoenc_opt = torch.optim.Adamax(autoencoder.parameters())
autoenc_loss = nn.MSELoss()

In [None]:
for epoch in range(50):
    print("Epoch %i/50, Generating corrupted samples..."%epoch)
    X_train_noise = apply_gaussian_noise(X_train)
    X_test_noise = apply_gaussian_noise(X_test)
    losses = []
    
    for x_batch in iterate_minibatches(X_train_noise,batch_size=batch_size):
        autoencoder.zero_grad()
        inp_image.data.resize_(x_batch.size()).copy_(x_batch)
        rec = autoencoder(inp_image)
        err = autoenc_loss(rec, inp_image)
        err.backward()
        autoenc_opt.step()
        losses.append(err.data[0])
    print("#%i, Train loss: %.7f"%(epoch+1,np.mean(losses)),flush=True)

    print("#%i, Test loss: %.7f"%(epoch+1,evaluate(X_test_noise)),flush=True)

__Замечание:__ Если обучение не сошлось, увеличьте количество итераций.

__Дополнительное задание:__ Замените гауссов шум на перекрытие случайных прямогугольников на изображении.

In [None]:
denoising_mse = evaluate(X_test)
print("Final MSE:", denoising_mse)

encoder = list(autoencoder.children())[0]
decoder = list(autoencoder.children())[1]

for i in range(5):
    img = X_test[i]
    visualize(img,encoder,decoder)

### Поиск изображений при помощи автоэнкодеров

Мы только что обучили сеть, которая преобразует изображение само в себя с ошибкой. Эта задача не столько полезна сама по себе, но она обладает рядом полезных побочных эффектов. Изучим их в действии.

В первую очередь мы можем осуществлять поиск изображений. Имея изображения, мы хотим найти похожие используя код в латентном пространстве.

Чтобы ускорить процесс поиска, мы будем использовать Locality-Sensitive Hashing с закодированными векторами. Мы будем использовать имплементацию из scikit-learn для простоты. В реальных условиях, вам скорее всего потребуется использовать [специализированные библиотеки](https://erikbern.com/2015/07/04/benchmark-of-approximate-nearest-neighbor-libraries.html) для большей производительности и поддержки дополнительных настроек.

In [None]:
def compute_codes(images):
    codes = np.zeros([images.shape[0], code_size])
    for i, image in enumerate(images):
        image = Variable(torch.from_numpy(image[None]))
        codes[i] = encoder(image).data.numpy()[0]
    return codes

images = X_train
codes = <encode all images>
assert len(codes) == len(images)

In [None]:
from sklearn.neighbors import LSHForest
lshf = LSHForest(n_estimators=50).fit(codes)

In [None]:
def get_similar(image, n_neighbors=5):
    assert image.ndim==3,"image must be [batch,height,width,3]"
    
    image = Variable(torch.from_numpy(image[None]))
    code = encoder(image).data.numpy()[0]
    
    (distances,),(idx,) = lshf.kneighbors(code,n_neighbors=n_neighbors)
    
    return distances,images[idx]

In [None]:
def show_similar(image):
    
    distances,neighbors = get_similar(image,n_neighbors=11)
    
    plt.figure(figsize=[8,6])
    plt.subplot(3,4,1)
    plt.imshow(image)
    plt.title("Original image")
    
    for i in range(11):
        plt.subplot(3,4,i+2)
        plt.imshow(neighbors[i])
        plt.title("Dist=%.3f"%distances[i])
    plt.show()

In [None]:
#smiles
show_similar(X_test[2])

In [None]:
#ethnicity
show_similar(X_test[500])

In [None]:
#glasses
show_similar(X_test[66])

Есть и много других применений автоэнкодеров.

Тем не менее, они не совсем подходят для генерации изображений. Для этого лучше использовать генеративно-состязательные сети.