In [None]:
import os
import numpy as np
import glob
import PIL.Image as Image
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from time import time

from torch.autograd import Variable
import torch.optim as optim
from IPython.display import clear_output

plt.rcParams['figure.figsize'] = [18, 6]

In [None]:
try:
  import data
  import dataset
  import utils
except:
  print("Remember to load the python files to colab") 

In [None]:
!cd ..
!mkdir data
!cd data
!wget https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
!tar -xvzf cifar-10-python.tar.gz
!rm cifar-10-python.tar.gz

In [None]:
!pip install -U albumentations

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

In [None]:
if torch.cuda.is_available():
    print("The code will run on GPU.")
else:
    print("The code will run on CPU. Go to Edit->Notebook Settings and choose GPU as the hardware accelerator")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
train_data, test_data = data.load_cifar_10_data("cifar-10-batches-py")

In [None]:
batch_size = 128
# transform = {
#         'train': transforms.Compose([
#             transforms.Resize((32, 32)),
#             transforms.ToTensor(),
#             transforms.Normalize((0.5, ), (0.5, ))
#         ]),
#         'test': transforms.Compose([
#             transforms.Resize((32, 32)),
#             transforms.ToTensor(),
#             transforms.Normalize((0.5, ), (0.5, ))
#         ])
#     }

transform = A.Compose([
    A.GaussNoise(var_limit=(30, 70), mean=0, p=1),
    # A.MultiplicativeNoise()
    # A.Blur(p=1)
    A.CoarseDropout(max_height=2, max_width=2, min_holes=2, max_holes=5)
])

preprocess_transform = A.Compose([
    # transforms.Resize((32, 32)),
    A.Normalize(mean = [0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ToTensorV2()
    
    ])



trainset = dataset.Cifar10AutoEncoderDataset(train_data, preprocess_transform, transform)
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=1)

testset = dataset.Cifar10AutoEncoderDataset(test_data, preprocess_transform, transform)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=True, num_workers=1)

In [None]:
def denormalize(img):
  image = np.swapaxes(np.swapaxes(img.numpy(), 0, 2), 0, 1)
  MEAN = 255 * np.array([0.5, 0.5, 0.5])
  STD = 255 * np.array([0.5, 0.5, 0.5])
  image = ((image * STD)+ MEAN).astype(int)
  return image

In [None]:
images, labels = next(iter(train_loader))
plt.figure()

for i in range(21):
    plt.subplot(5,7,i+1)
    plt.imshow(denormalize(images[i]))
    # plt.imshow(images[i].numpy())
    plt.axis('off')

plt.figure()

for i in range(21):
    plt.subplot(5,7,i+1)
    plt.imshow(denormalize(labels[i]))
    # plt.imshow(labels[i].numpy())
    plt.axis('off')

Old version, has a very big latent space - more like a U-net than autoencoder.
Here, the bottleneck encodes $4*4*128=2048$ features, which is close to $32*32*3=3072$ pixel values of hte input image

In [None]:
### Old version, has a very big latent space - more like a unet than autoencoder

class ConvAutoEncoder(nn.Module):
    def __init__(self):
        super(ConvAutoEncoder, self).__init__()
        
        # encoder (downsampling)
        self.enc_conv0 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool0 = nn.Conv2d(32, 32, 2, padding=0, stride=2)  # 32 -> 16
        self.enc_conv1 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool1 = nn.Conv2d(64, 64, 2, padding=0, stride=2)  # 16 -> 8
        self.enc_conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.Conv2d(128, 128,2, padding=0, stride=2)  # 8 -> 4

        # bottleneck
        self.bottleneck_conv = nn.Conv2d(128, 128, 3, padding=1)

        # decoder (upsampling)
        self.upsample0 = nn.ConvTranspose2d(128,128,2,stride=2)  # 4 -> 8
        self.dec_conv0 = nn.Conv2d(128, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64,64,2,stride=2)   # 8 -> 16
        self.dec_conv1 = nn.Conv2d(64, 32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32,32,2,stride=2)   # 16 -> 32
        self.dec_conv2 = nn.Conv2d(32, 32, 3, padding=1)
        self.dec_conv_fin = nn.Conv2d(32, 3, 1, padding=0)

    def forward(self, x): 
        # encoder
        e0 = F.leaky_relu(self.enc_conv0(x))
        e1 = F.leaky_relu(self.enc_conv1(self.pool0(e0)))
        e2 = F.leaky_relu(self.enc_conv2(self.pool1(e1)))

        # bottleneck
        b = F.leaky_relu(self.bottleneck_conv(self.pool2(e2)))
      
        # decoder
        d0 = F.leaky_relu(self.dec_conv0(self.upsample0(b)))
        d1 = F.leaky_relu(self.dec_conv1(self.upsample1(d0)))
        d2 = F.leaky_relu(self.dec_conv2(self.upsample2(d1)))
        out = F.tanh(self.dec_conv_fin(d2))
        return out

This version is actually more like an autoencoder, has a bottleneck that encodes $4*4*16 = 256$ features, which is a compression from $32*32*3 = 3072$ pixels of the input image (not a giant one, but okay given the variety of the images).

In [None]:
class ConvAutoEncoder2(nn.Module):
    def __init__(self):
        super(ConvAutoEncoder2, self).__init__()
        
        # encoder (downsampling)
        self.enc_conv0 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool0 = nn.Conv2d(32, 32, 2, padding=0, stride=2)  # 32 -> 16
        self.enc_conv1 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool1 = nn.Conv2d(64, 64, 2, padding=0, stride=2)  # 16 -> 8
        self.enc_conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.Conv2d(128, 128,2, padding=0, stride=2)  # 8 -> 4

        # bottleneck
        self.bottleneck_conv1 = nn.Conv2d(128, 16, 3, padding=1)
        self.bottleneck_conv2 = nn.Conv2d(16, 128, 3, padding=1)

        # decoder (upsampling)
        self.upsample0 = nn.ConvTranspose2d(128,128,2,stride=2)  # 4 -> 8
        self.dec_conv0 = nn.Conv2d(128, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64,64,2,stride=2)   # 8 -> 16
        self.dec_conv1 = nn.Conv2d(64, 32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32,32,2,stride=2)   # 16 -> 32
        self.dec_conv2 = nn.Conv2d(32, 32, 3, padding=1)
        self.dec_conv_fin = nn.Conv2d(32, 3, 1, padding=0)

    def forward(self, x): 
        # encoder
        e0 = F.leaky_relu(self.enc_conv0(x))
        e1 = F.leaky_relu(self.enc_conv1(self.pool0(e0)))
        e2 = F.leaky_relu(self.enc_conv2(self.pool1(e1)))

        # bottleneck
        b = F.leaky_relu(self.bottleneck_conv2(F.leaky_relu(self.bottleneck_conv1(self.pool2(e2)))))
       
        # decoder
        d0 = F.leaky_relu(self.dec_conv0(self.upsample0(b)))
        d1 = F.leaky_relu(self.dec_conv1(self.upsample1(d0)))
        d2 = F.leaky_relu(self.dec_conv2(self.upsample2(d1)))
        out = F.tanh(self.dec_conv_fin(d2))
        return out

[This](https://towardsdatascience.com/aligning-hand-written-digits-with-convolutional-autoencoders-99128b83af8b) article suggests that it is better to use upsampling/maxpooling instead of deconvolutions and convolutions with stride. Additionally, it shows that it is best to use fully connected layers for the bottleneck. I will test that here.

**Results**

The upsamling/maxpooling results in much worse training. The fully connected bottleneck seems to have simillar results as the convolutional. In the end, I will test the skip connections on both versions.

In [None]:
class ConvAutoEncoder3(nn.Module):
    def __init__(self):
        super(ConvAutoEncoder3, self).__init__()
        
        # encoder (downsampling)
        self.enc_conv0 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool0 = nn.Conv2d(32, 32, 2, padding=0, stride=2)  # 32 -> 16
        self.enc_conv1 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool1 = nn.Conv2d(64, 64, 2, padding=0, stride=2)  # 16 -> 8
        self.enc_conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.Conv2d(128, 128,2, padding=0, stride=2)  # 8 -> 4

        # bottleneck
        self.flatten = torch.nn.Flatten() # 4*4*128 = 2048
        self.bottleneck1 = nn.Linear(2048, 256)
        self.bottleneck2 =  nn.Linear(256, 2048)
        # self.bottleneck_conv1 = nn.Conv2d(128, 16, 3, padding=1)
        # self.bottleneck_conv2 = nn.Conv2d(16, 128, 3, padding=1)

        # decoder (upsampling)
        self.upsample0 = nn.ConvTranspose2d(128,128,2,stride=2)  # 4 -> 8
        self.dec_conv0 = nn.Conv2d(128, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64,64,2,stride=2)   # 8 -> 16
        self.dec_conv1 = nn.Conv2d(64, 32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32,32,2,stride=2)   # 16 -> 32
        self.dec_conv2 = nn.Conv2d(32, 32, 3, padding=1)
        self.dec_conv_fin = nn.Conv2d(32, 3, 1, padding=0)

    def forward(self, x): 
        # encoder
        e0 = F.leaky_relu(self.enc_conv0(x))
        e1 = F.leaky_relu(self.enc_conv1(self.pool0(e0)))
        e2 = F.leaky_relu(self.enc_conv2(self.pool1(e1)))

        # bottleneck
        flat = self.flatten(self.pool2(e2))
        b = F.leaky_relu(self.bottleneck2(F.leaky_relu(self.bottleneck1(flat))))
        reshaped = torch.reshape(b, (-1,128,4,4))
        # b = F.leaky_relu(self.bottleneck_conv2(F.leaky_relu(self.bottleneck_conv1(self.pool2(e2)))))
       
        # decoder
        d0 = F.leaky_relu(self.dec_conv0(self.upsample0(reshaped)))
        d1 = F.leaky_relu(self.dec_conv1(self.upsample1(d0)))
        d2 = F.leaky_relu(self.dec_conv2(self.upsample2(d1)))
        out = F.tanh(self.dec_conv_fin(d2))
        return out

Autoencoder with skip connections

In [None]:
class ConvAutoEncoderSkip(nn.Module):
    def __init__(self):
        super(ConvAutoEncoderSkip, self).__init__()
        
        # encoder (downsampling)
        self.enc_conv0 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool0 = nn.Conv2d(32, 32, 2, padding=0, stride=2)  # 32 -> 16
        self.enc_conv1 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool1 = nn.Conv2d(64, 64, 2, padding=0, stride=2)  # 16 -> 8
        self.enc_conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.Conv2d(128, 128,2, padding=0, stride=2)  # 8 -> 4

        # bottleneck
        self.bottleneck_conv1 = nn.Conv2d(128, 16, 3, padding=1)
        self.bottleneck_conv2 = nn.Conv2d(16, 128, 3, padding=1)

        # decoder (upsampling)
        self.upsample0 = nn.ConvTranspose2d(128,128,2,stride=2)  # 4 -> 8
        self.dec_conv0 = nn.Conv2d(256, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64,64,2,stride=2)   # 8 -> 16
        self.dec_conv1 = nn.Conv2d(128, 32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32,32,2,stride=2)   # 16 -> 32
        self.dec_conv2 = nn.Conv2d(64, 32, 3, padding=1)
        self.dec_conv_fin = nn.Conv2d(32, 3, 1, padding=0)

    def forward(self, x): 
        # encoder
        e0 = F.leaky_relu(self.enc_conv0(x))
        e1 = F.leaky_relu(self.enc_conv1(self.pool0(e0)))
        e2 = F.leaky_relu(self.enc_conv2(self.pool1(e1)))

        # bottleneck
        b = F.leaky_relu(self.bottleneck_conv2(F.leaky_relu(self.bottleneck_conv1(self.pool2(e2)))))
       
        # decoder
        d0 = F.leaky_relu(self.dec_conv0(torch.cat([self.upsample0(b), e2], 1)))
        d1 = F.leaky_relu(self.dec_conv1(torch.cat([self.upsample1(d0),e1], 1)))
        d2 = F.leaky_relu(self.dec_conv2(torch.cat([self.upsample2(d1),e0], 1)))
        out = F.tanh(self.dec_conv_fin(d2))
        return out

Version with fully connected bottleneck

In [None]:
class ConvAutoEncoderSkip2(nn.Module):
    def __init__(self):
        super(ConvAutoEncoderSkip2, self).__init__()
        
        # encoder (downsampling)
        self.enc_conv0 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool0 = nn.Conv2d(32, 32, 2, padding=0, stride=2)  # 32 -> 16
        self.enc_conv1 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool1 = nn.Conv2d(64, 64, 2, padding=0, stride=2)  # 16 -> 8
        self.enc_conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool2 = nn.Conv2d(128, 128,2, padding=0, stride=2)  # 8 -> 4

        # bottleneck
        self.flatten = torch.nn.Flatten() # 4*4*128 = 2048
        self.bottleneck1 = nn.Linear(2048, 256)
        self.bottleneck2 =  nn.Linear(256, 2048)

        # decoder (upsampling)
        self.upsample0 = nn.ConvTranspose2d(128,128,2,stride=2)  # 4 -> 8
        self.dec_conv0 = nn.Conv2d(256, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64,64,2,stride=2)   # 8 -> 16
        self.dec_conv1 = nn.Conv2d(128, 32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32,32,2,stride=2)   # 16 -> 32
        self.dec_conv2 = nn.Conv2d(64, 32, 3, padding=1)
        self.dec_conv_fin = nn.Conv2d(32, 3, 1, padding=0)

    def forward(self, x): 
        # encoder
        e0 = F.leaky_relu(self.enc_conv0(x))
        e1 = F.leaky_relu(self.enc_conv1(self.pool0(e0)))
        e2 = F.leaky_relu(self.enc_conv2(self.pool1(e1)))

        # bottleneck
        flat = self.flatten(self.pool2(e2))
        b = F.leaky_relu(self.bottleneck2(F.leaky_relu(self.bottleneck1(flat))))
        reshaped = torch.reshape(b, (-1,128,4,4))
       
        # decoder
        d0 = F.leaky_relu(self.dec_conv0(torch.cat([self.upsample0(reshaped), e2], 1)))
        d1 = F.leaky_relu(self.dec_conv1(torch.cat([self.upsample1(d0), e1], 1)))
        d2 = F.leaky_relu(self.dec_conv2(torch.cat([self.upsample2(d1), e0], 1)))
        out = F.tanh(self.dec_conv_fin(d2))
        return out

In [None]:
def train(model, opt, epochs):

    def loss_fun(y_real, y_pred):
      # loss = nn.CrossEntropyLoss(weight=torch.tensor([1,5]).float().to(device))
      loss = torch.nn.MSELoss()
      # output = loss(y_pred, y_real.squeeze(1).long())
      output = loss(y_pred, y_real)
      return output

    X_val, Y_val = next(iter(test_loader))

    for epoch in range(epochs):
        tic = time()
        print('* Epoch %d/%d' % (epoch+1, epochs))

        avg_loss = 0
        model.train()  # train mode
        for minibatch_no, (X_batch, Y_batch) in enumerate(train_loader, 0):
            X_batch = X_batch.to(device)
            Y_batch = Y_batch.to(device)
            # set parameter gradients to zero
            opt.zero_grad()

            # forward
            Y_pred = model(X_batch)
            loss = loss_fun(Y_batch, Y_pred)  # forward-pass
            loss.backward()  # backward-pass
            opt.step()  # update weights

            # calculate metrics to show the user
            avg_loss += loss / len(X_batch)
        toc = time()
        print(' - loss: %f' % avg_loss)

        # show intermediate results
        model.eval()  # testing mode
        Y_hat = model(X_val.to(device)).detach().cpu()
        predicted = Y_hat
        clear_output(wait=True)
        k = 0
        for m in range(6):
          plt.subplot(3, 6, k+1)
          plt.imshow(denormalize(X_val[m]))
          # plt.imshow(X_val[m][0].numpy(), cmap='gray')
          plt.title('Input')
          plt.axis('off')

          plt.subplot(3, 6, k+7)
          plt.imshow(denormalize(predicted[m]))
          # plt.imshow(predicted[m], cmap='gray')
          plt.title('Output')
          plt.axis('off')

          plt.subplot(3, 6, k+13)
          plt.imshow(denormalize(Y_val[m]))
          # plt.imshow(Y_val[m][0], cmap='gray')
          plt.title('Target')
          plt.axis('off')
          
          k+=1
        plt.suptitle('%d / %d - loss: %f' % (epoch+1, epochs, avg_loss))
        plt.show()

In [None]:
model = ConvAutoEncoder().to(device)
train(model, optim.Adam(model.parameters()), 30)

In [None]:
model2 = ConvAutoEncoder2().to(device)
train(model2, optim.Adam(model2.parameters()), 30)

In [None]:
model3 = ConvAutoEncoder3().to(device)
train(model3, optim.Adam(model3.parameters()), 30)

In [None]:
# After removing fully connected layer
model3 = ConvAutoEncoder3().to(device)
train(model3, optim.Adam(model3.parameters()), 30)

In [None]:
# After removing alternative downsampling/upsampling
model3 = ConvAutoEncoder3().to(device)
train(model3, optim.Adam(model3.parameters()), 30)

In [None]:
modelSkip = ConvAutoEncoderSkip().to(device)
train(modelSkip, optim.Adam(modelSkip.parameters()), 30)

In [None]:
modelSkip2 = ConvAutoEncoderSkip2().to(device)
train(modelSkip, optim.Adam(modelSkip2.parameters()), 30)