<a href="https://colab.research.google.com/github/adasegroup/ML2022_seminars/blob/master/seminar14/seminar_dim_reduction_autoencoding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Seminar: Dimensionality reduction. Autoencoding.

__Autoencoders__ or how it was on the lectures - replicational neural networks - are unsupervised artificial neural network that learns how to efficiently compress and encode data then learns how to reconstruct the data back from the reduced encoded representation to a representation that is as close to the original input as possible.

It is good for:
* redusing noise in the data
* building stable networks for data invariants
* data compression

The encoder brings the data from a high dimensional input to a __Latent space__ or *bottleneck* layer, where the number of neurons is the smallest. Then, the decoder takes this encoded input and converts it back to the original input shape — in our case an image. The latent space is the space in which the data lies in the bottleneck layer.

<img src="https://www.compthree.com/images/blog/ae/ae.png" alt="Drawing" style="width: 700px;" />

In [1]:
# imports
from IPython.display import Image
import matplotlib.pyplot as plt


from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.utils.data as torch_data

In [None]:
import numpy as np
from sklearn.datasets import fetch_olivetti_faces
from sklearn.model_selection import train_test_split


data = fetch_olivetti_faces(shuffle=True, random_state= 42 ).data
target = fetch_olivetti_faces(shuffle=True, random_state= 42).target

X_train, X_test, y_train, y_test = train_test_split(data, target, stratify=target, test_size=0.33, random_state=42)


#data
plt.figure(figsize=(12, 5))
for i in range(40):
    plt.subplot(4, 10, i + 1)
    plt.imshow(data[i].reshape((64, 64)), cmap='gray')
    plt.axis('off')

#### 1. Write `torch` compatible dataset and dataloader

In [3]:
class FacesData(torch_data.Dataset):
    def __init__(self, X, y):
        super(FacesData, self).__init__()
        self.X = torch.from_numpy(X)
        self.y = y.astype(int)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx].unsqueeze(0), self.y[idx]

In [None]:
train_dset = FacesData(X_train, y_train)
test_dset = FacesData(X_test, y_test)

tsr_im, cls = train_dset[5]
plt.imshow(tsr_im.view(64, 64))

#### 2. Write Autoencoder model as object of  `torch.nn.Module` class

It takes as input encoder and decoder (it will be small neural networks).

In [6]:
class MyFirstAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(MyFirstAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        """
        Take a mini-batch as an input, encode it to the latent space and decode back to the original space
        x_out = decoder(encoder(x))
        :param x: torch.tensor, (MB, x_dim)
        :return: torch.tensor, (MB, x_dim)
        """
        z =  ### YOUR CODE HERE ###
        x_out = ### YOUR CODE HERE ###
        return x_out

#### 3. Define Encoder and Decoder, whey will be symmetrical

You should define variable for bottelneck layer - `hid` and for number of neurons in the whole network  - `ss`.

In [7]:
ss = 128

samples, sample_size = data.shape

encoder = lambda hid: nn.Sequential(
                        nn.Linear(sample_size, ss*4),
                        nn.LeakyReLU(inplace=True),
                        nn.Dropout(0.2),
                        nn.Linear(ss*4, ss*2),
                        nn.LeakyReLU(inplace=True),
                        nn.Dropout(0.2),
                        nn.Linear(ss*2, hid)
                        )

decoder =  lambda hid: nn.Sequential(
                        nn.Linear(hid, ss*2),
                        nn.LeakyReLU(inplace=True),
                        nn.Dropout(0.2),
                        nn.Linear(ss*2, ss*4),
                        nn.LeakyReLU(inplace=True),
                        nn.Dropout(0.2),
                        nn.Linear(ss*4, sample_size),
                        nn.Sigmoid(),
                        )

#### 4. Defining criterion, optimizer, scheduler and data loaders

Choose criterion, it will be `nn.MSEloss` for now, optimizer and scheduler for optimizer

In [8]:
device = 'cuda:0'
#device = 'cpu'

net = MyFirstAE(encoder(50), decoder(50))
criterion = nn.MSELoss()
optimizer =  ### YOUR CODE HERE ###  # Adam with lr = 1e-3
scheduler =  ### YOUR CODE HERE ###    # StepLR with step_size=50, gamma=0.7


train_loader = torch_data.DataLoader(train_dset, batch_size=100, shuffle=True)
val_loader = torch_data.DataLoader(test_dset, batch_size=100, shuffle=False)

#### 5. The main part - write `train` for the network

Train will take the batch, send to the devise, encode and decode it and calculate the loss.

In [9]:
def train(epochs, net, criterion, optimizer, train_loader, val_loader,scheduler=None, verbose=True, save_dir=None):
    net.to(device)
    for epoch in range(1, epochs+1):
        net.train()
        for X, _ in train_loader:

            X = X.to(device)

            out = net(X)
            loss = criterion(out, X)

            optimizer.zero_grad()

            loss.backward()
            optimizer.step()

        net.eval()
        for X, _ in val_loader:
            X = X.to(device)
            out = net(X)
            val_loss = criterion(out, X)

        if scheduler is not None:
            scheduler.step()
        freq = max(epochs//20,1)
        if verbose and epoch%freq==0:
            print('Epoch {}/{} || Loss:  Train {:.4f} | Validation {:.4f}'.format(epoch, epochs, loss.item(), val_loss.item()))

    if save_dir is not None:
        torch.save(model.state_dict(), os.path.join(save_dir, 'model.pth'))

#### 5. Enjoy the training

In [None]:
# for `MSE` loss lets get < 0.011 on validation, with AE "bottleneck" = 50

train(300, net, criterion, optimizer, train_loader, val_loader, scheduler)

In [None]:

fig, ax = plt.subplots(ncols=10, nrows=2, figsize=(20, 5))

n_pics = 64

for i in range(10):
    im = train_dset[i][0]
    rec = net(im.reshape(1,n_pics**2).to(device))[0]
    ax[0, i].imshow(im[0].reshape(n_pics,n_pics), cmap = "gray");
    ax[1, i].imshow(rec.detach().cpu().reshape(n_pics,n_pics), cmap = "gray");
    ax[0, i].axis('off')
    ax[1, i].axis('off')

In [12]:
from matplotlib import offsetbox
from matplotlib.offsetbox import AnnotationBbox, OffsetImage

def plot_embedding(X, y, images_small=None, title=None):
    """
    Nice plot on first two components of embedding with Offsets.

    """
    # take only first two columns
    X = X[:, :2]
    # scaling
    x_min, x_max = np.min(X, 0), np.max(X, 0)
    X = (X - x_min) / (x_max - x_min)
    plt.figure(figsize=(13,8))
    ax = plt.subplot(111)

    for i in range(X.shape[0] - 1):
        plt.text(X[i, 0], X[i, 1], str(y[i]),
                 color=plt.cm.RdGy(y[i]),
                 fontdict={'weight': 'bold', 'size': 12})
        if images_small is not None:
            imagebox = OffsetImage(images_small[i], zoom=.4, cmap = 'gray')
            ab = AnnotationBbox(imagebox, (X[i, 0], X[i, 1]),
                xycoords='data')
            ax.add_artist(ab)

    if hasattr(offsetbox, 'AnnotationBbox'):
        # only print thumbnails with matplotlib > 1.0
        shown_images = np.array([[1., 1.]])
        for i in range(X.shape[0]):
            dist = np.sum((X[i] - shown_images) ** 2, 1)
            if np.min(dist) < 4e-1:
                # don't show points that are too close
                continue
    if title is not None:
        plt.title(title)

In [None]:
from sklearn.decomposition import PCA
X_projected = PCA(50).fit_transform(data)
data_pic = data.reshape((-1, 64, 64))
plot_embedding(X_projected, target, data_pic, "PCA. Projection on two components out of 50 first ")

In [None]:
X_projected = net.encoder(torch.Tensor(data).to(device)).cpu().detach().numpy()
data_pic = data.reshape((-1, 64, 64))
plot_embedding(X_projected, target, data_pic, "Projection AE with 50 latent features, MSE = 0.0087")