# Convolutional Autoencoder with K-Means

In [1]:
import torch
import numpy as np
from torchvision import datasets
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F


# to import CIFAR-10 as torch tensor
transform = transforms.ToTensor()

# load the training and test datasets
train_data = datasets.CIFAR10(root='data', train=True,
                                   download=True, transform=transform)
test_data = datasets.CIFAR10(root='data', train=False,
                                  download=True, transform=transform)

Files already downloaded and verified
Files already downloaded and verified


## training and testing dataloaders

In [2]:
num_workers = 0
# how many samples per batch to load
batch_size = 50
lr = 1e-3
# how many epochs for training
num_epochs = 40

# prepare data loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, num_workers=num_workers)

## autoencoder neural network design
* input layer: 1 channel, 28 pixels wide, 28 pixels long
* conv 1 layer: 16 channel, 10 pixels wide, 10 pixels long
* pool 1 layer: 16 channel, 5 pixels wide, 5 pixels long
* conv 2 layer: 32 channel, 3 pixels wide, 3 pixels long
* pool 2 layer: 32 channel, 2 pixels wide, 2 pixels long

* latent layer: 128 input, 10 output
* upscale layer: 10 input, 128 output

* deconv 3 layer: 16 channel, 5 pixels wide, 5 pixels long
* deconv 2 layer: 8 channel, 15 pixels wide, 15 pixels long
* deconv 1 layer: 1 channel, 28 pixels wide, 28 pixels long

In [3]:
class cae(nn.Module):
    def __init__(self):
        super(cae, self).__init__()
        # convolutional encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),            # [batch, 12, 16, 16]
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),           # [batch, 24, 8, 8]
            nn.ReLU(),
            nn.Conv2d(24, 48, 4, stride=2, padding=1),           # [batch, 48, 4, 4]
            nn.ReLU(),
            nn.Conv2d(48, 96, 4, stride=2, padding=1),           # [batch, 96, 2, 2]
            nn.ReLU(),
        )
        #autoencoder bottle neck
        #self.latent = nn.Sequential(
        #    nn.Linear(48*4*4, 200),
        #    nn.ReLU(True),
        #)
        #self.upscale = nn.Sequential(
        #    nn.Linear(200, 48*4*4),
        #    nn.ReLU(True),
        #)
        # convolutional decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(96, 48, 4, stride=2, padding=1),  # [batch, 48, 4, 4]
            nn.ReLU(),
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),  # [batch, 24, 8, 8]
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),  # [batch, 12, 16, 16]
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1),   # [batch, 3, 32, 32]
            nn.Sigmoid(),
        )
        
    # forward learning path
    def forward(self, x):
        bottle_neck = self.encoder(x)
        #bottle_neck = self.latent(x.view(-1, 48*4*4))
        
        #x = self.upscale(bottle_neck)
        x = self.decoder(bottle_neck)
        return x, bottle_neck.view(-1, 96*2*2)

## CAE model instantiation

In [4]:
# build a CAE model
model = cae()
# set loss function
criterion = nn.MSELoss()
# choose an optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=lr,
                             weight_decay=1e-7)
print(model)

cae(
  (encoder): Sequential(
    (0): Conv2d(3, 12, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(12, 24, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(24, 48, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): Conv2d(48, 96, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(96, 48, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(48, 24, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(24, 12, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): ConvTranspose2d(12, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): Sigmoid()
  )
)


## training

In [None]:
# Training the model
for epoch in range(num_epochs):
    for data in train_loader:
        img, _ = data
        # forward path
        output, _ = model(img)
        loss = criterion(output, img)
        # back propagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # log
    print('epoch [{}/{}], loss:{:.4f}'
          .format(epoch+1, num_epochs, loss.item()))

epoch [1/40], loss:0.0160
epoch [2/40], loss:0.0107
epoch [3/40], loss:0.0094
epoch [4/40], loss:0.0084
epoch [5/40], loss:0.0077
epoch [6/40], loss:0.0072
epoch [7/40], loss:0.0068
epoch [8/40], loss:0.0062
epoch [9/40], loss:0.0059
epoch [10/40], loss:0.0054
epoch [11/40], loss:0.0052
epoch [12/40], loss:0.0049
epoch [13/40], loss:0.0050
epoch [14/40], loss:0.0047
epoch [15/40], loss:0.0045
epoch [16/40], loss:0.0042
epoch [17/40], loss:0.0041
epoch [18/40], loss:0.0039
epoch [19/40], loss:0.0039
epoch [20/40], loss:0.0038
epoch [21/40], loss:0.0036
epoch [22/40], loss:0.0036
epoch [23/40], loss:0.0034
epoch [24/40], loss:0.0034
epoch [25/40], loss:0.0033
epoch [26/40], loss:0.0032
epoch [27/40], loss:0.0033
epoch [28/40], loss:0.0035
epoch [29/40], loss:0.0030
epoch [30/40], loss:0.0030
epoch [31/40], loss:0.0030


## visualizing reconstructed results
### NOTE TO SELF: to be automated through DV

In [None]:
import matplotlib.pyplot as plt

dataiter = iter(test_loader)
images, _ = dataiter.next()

# get sample outputs
output, _ = model(images)
# prep images for display
images = images.numpy()# output is resized into a batch of iages
output = output.view(batch_size, 3, 32, 32)
# use detach when it's an output that requires_grad
output = output.detach().numpy()

# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(25,4))

# input images on top row, reconstructions on bottom
for images, row in zip([images, output], axes):
    for img, ax in zip(images, row):
        ax.imshow(img.transpose((1, 2, 0)))
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
plt.show()

## reconstruct the entire dataset
### NOTE TO SELF: eval to be used to stop regularization

In [None]:
# run the dataset through the trained model
for image_index, data in enumerate(train_loader):
    images, itr_labels = data
    # forward pass: compute embedded outputs by passing inputs to the model
    decoded, encoded = model(images)
    if not image_index:
        #First group encoded in new array
        embedded = encoded.detach().numpy()
        #first group decoded
        disembedded = decoded.detach().numpy()
        #labels
        labels = itr_labels.detach().numpy()
        continue
    #stacking the remaining data
    embedded = np.vstack((embedded, encoded.detach().numpy()))
    disembedded = np.vstack((disembedded, decoded.detach().numpy()))
    labels = np.hstack((labels, itr_labels.detach().numpy()))
    print(embedded.shape)
    print(disembedded.shape)

## clustering model instantiation and training

In [None]:
from sklearn.cluster import KMeans

# create a k-means model to cluster the embedded features
clustering_model = KMeans(n_clusters=10, tol = 1e-4, max_iter = 400).fit(embedded)

## visualizing results and performance evaluation

In [None]:
import import_ipynb
import DataVisuals as dv


view_results = dv.DataVisuals(disembedded.reshape(-1, 3, 32, 32), labels, clustering_model.labels_)

In [None]:
embedded.shape

In [None]:
view_results.cm()

In [None]:
view_results.scat(embedded)

In [None]:
metric = dv.Metrics(labels, clustering_model.labels_)
nmi = metric.nmi()
ari = metric.ari()
acc = metric.acc()
print('NMI = {:.4f} \nARI = {:.4f} \nACC = {:.4f}'.format(nmi, ari, acc))

## storing a copy of learned weights in hard drive

In [None]:
labels.shape