In [None]:
import random
random.seed(42)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime as dt

import torch
from torch import optim, nn
from torch.utils.data import DataLoader, TensorDataset, Dataset, random_split
from torchvision.utils import make_grid
from torchvision import transforms as T
from torchvision import models, datasets
from random import randint

from tqdm import tqdm
import os

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
device

In [None]:

train_data = datasets.FashionMNIST('./', train=True, download=True)

# Stick all the images together to form a 600000 X 28 array
x = np.concatenate([np.asarray(train_data[i][0]) for i in range(len(train_data))])

# calculate the mean and std along the (0, 1) axes
mean = np.mean(x, axis=(0, 1))/255
std = np.std(x, axis=(0, 1))/255
# the the mean and std
mean=mean.tolist()
std=std.tolist()

In [None]:
print(f'mean = {mean}, std = {std}')

In [None]:
def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()
    
def show_batch(dataloader):
    dataiter = iter(dataloader)
    images, labels = next(dataiter)    
    imshow(make_grid(images)) # Using Torchvision.utils make_grid function
    
def show_image(dataloader):
    dataiter = iter(dataloader)
    images, labels = next(dataiter)
    random_num = randint(0, len(images)-1)
    imshow(images[random_num])
    label = labels[random_num]
    print(f'Label: {label}, Shape: {images[random_num].shape}')
    

In [None]:
# Transformation - optional depending on future resnet implementation

# Define transformation sequence for image pre-processing
# If not using pre-trained model, normalize with 0.5, 0.5, 0.5 (mean and SD)
# If using pre-trained ImageNet, normalize with mean=[0.485, 0.456, 0.406], 
# std=[0.229, 0.224, 0.225])

train_transform = T.Compose([
    T.Resize((32,32)),
    T.RandomHorizontalFlip(),
    T.RandomRotation(15),
    T.ToTensor(),
    T.Normalize(mean, std)
])

test_transform = T.Compose([
                # T.Resize(256), # Resize images to 256 x 256
                # T.CenterCrop(224), # Center crop image
                # T.RandomHorizontalFlip(),
                T.Resize((32,32)),
                T.ToTensor(),  # Converting cropped images to tensors
                T.Normalize(mean, std)
])


In [None]:
batch_size = 128

In [None]:
trainset = datasets.FashionMNIST("./",
                                         train=True,
                                         download=True,
                                         transform=train_transform)
train_loader = torch.utils.data.DataLoader(
    trainset, batch_size, shuffle=True, num_workers=2,pin_memory=True)

testset = datasets.FashionMNIST("./",
                                        train=False,
                                        download=True,
                                        transform=test_transform)
test_loader = torch.utils.data.DataLoader(
    testset, batch_size*2,pin_memory=True, num_workers=2)


In [None]:
show_image(train_loader)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision

    
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Input size: [batch, 3, 32, 32]
        # Output size: [batch, 3, 32, 32]
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 12, 4, stride=2, padding=1),            # [batch, 12, 16, 16]
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),           # [batch, 24, 8, 8]
            nn.ReLU(),
            nn.Conv2d(24, 48, 4, stride=2, padding=1),           # [batch, 48, 4, 4]
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),  # [batch, 24, 8, 8]
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),  # [batch, 12, 16, 16]
            nn.ReLU(),
            nn.ConvTranspose2d(12, 1, 4, stride=2, padding=1),   # [batch, 3, 32, 32]
            nn.Sigmoid(),
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded, encoded.view(-1, 48*4*4)


def test():
    net = Autoencoder()
    net = net.to(device)
    item = iter(train_loader)
    img, _ = next(item)

    img = img[0]
    img = img[None, :]
    img = img.to(device)
    img_cons, rep  = net(img)
    std_t = torch.tensor(std).to(device)
    mean_t = torch.tensor(mean).to(device)
    img_cons = img_cons * std_t + mean_t
    img = img * std_t + mean_t
    # print(rep)
    print(rep.shape, "  ", img_cons.shape)
    temp_r = imshow(img.cpu()[0])
    imshow(img_cons.detach().cpu()[0])

In [None]:
test()

In [None]:
# summary(Autoencoder(512), input_size=(1, 3, 32, 32))

In [None]:
dataloaders = {
    'train': train_loader,
    'val': test_loader,
    'test': test_loader
}
dataset_sizes = {
    'train': len(train_loader.dataset),
    'val': len(test_loader.dataset),
    'test': len(test_loader.dataset),
}

In [None]:
import time
import copy
from torch import autograd

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train','val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0

            # Iterate over data.
            for inputs, _ in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    reconstructed_img, hidden_rep = model(inputs)
                    
                    # l1_loss = sparse_loss(model, inputs) * 0.001
                    loss = criterion(reconstructed_img, inputs)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        with autograd.detect_anomaly():
                            loss.backward()
                            optimizer.step()
#                         for name, param in model_ft.named_parameters():
#                             print(name, param.grad.norm())

                # statistics
                running_loss += loss.item() * inputs.size(0)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss}')          

            # deep copy the model
            if phase == 'val' and epoch_loss < best_loss:
#             if phase == 'val':
                print('Saving..')
                state = {
                    'model': model.state_dict(),
                    'loss': epoch_loss,
                    'epoch': epoch,
                }
                # if not os.path.isdir('checkpoint'):
                #     os.mkdir('checkpoint')
                torch.save(state, './fashion_mnist_ae_tiny.pth')
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val loss: {best_loss:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
from torch.optim import lr_scheduler
model_ft = Autoencoder()
model_ft = model_ft.to(device)

criterion = nn.MSELoss()

optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
exp_lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer_ft, milestones=[60, 120, 160], gamma=0.2)


In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=200)

In [None]:
import cv2
device = 'cuda'
model_ft.eval()
item = iter(train_loader)
img, _ = next(item)

img = img[0]
img = img[None, :]
img = img.to(device)
img_cons, rep  = model_ft(img)
std_t = torch.tensor(std).to(device)
mean_t = torch.tensor(mean).to(device)
img_cons = img_cons * std_t + mean_t
img = img * std_t + mean_t
print(rep.shape, "  ", img_cons.shape)
temp_r = imshow(img.cpu()[0])
imshow(img_cons.detach().cpu()[0])
print(rep)

In [None]:
train_image_embeddings = []
train_image_labels = []

def populate_embedding_lable_list(model):
    global train_image_embeddings, train_image_labels

    model.eval()   # Set model to evaluate mode
    # Iterate over data.
    for inputs, labels in tqdm(dataloaders['train']):
        inputs = inputs.to(device)

        # forward
        # track history if only in train
        with torch.set_grad_enabled(False):
            reconstructed_img, hidden_rep = model(inputs)
            train_image_embeddings.extend(hidden_rep)
            train_image_labels.extend(labels)

In [None]:
populate_embedding_lable_list(model_ft)

In [None]:
train_image_labels = torch.flatten(torch.tensor(train_image_labels))
# torch.save(train_image_labels, '/content/drive/MyDrive/TDML_Project/train_image_labels.pt')

train_image_embeddings_tensor = torch.stack(train_image_embeddings)
# torch.save(train_image_embeddings_tensor, '/content/drive/MyDrive/TDML_Project/train_image_embeddings_tensor.pt')

In [None]:
print(len(sorted(set(list(train_loader.dataset.targets)))))
print(len(sorted(set(list(train_image_labels.numpy())))))
print(train_image_labels.shape, " ", train_image_embeddings_tensor.shape)

In [None]:
from collections import Counter

def find_indices(list_to_check, item_to_find):
    return [idx for idx, value in enumerate(list_to_check) if value == item_to_find]
  
image_label_list = list(train_image_labels.numpy())
classes_to_index = {i: [] for i in range(10)}
for key in classes_to_index.keys():
    indexes = find_indices(image_label_list, key)
    random_40 = random.sample(indexes,40)
    classes_to_index[key] = random_40

print(classes_to_index[0])

In [None]:
import itertools
final_indices = list(classes_to_index.values())
final_indices = sorted([item for sublist in final_indices for item in sublist])

In [None]:
similarity_matrix = np.zeros((10,10,2))

In [None]:
cosine_sim = nn.CosineSimilarity(dim=0, eps=1e-8)
print(train_image_embeddings[0].shape)
cosine_sim(train_image_embeddings[2957], train_image_embeddings[20364].T)

In [None]:
for act_i in tqdm(range(len(final_indices))):
    for act_j in range(act_i+1, len(final_indices)):
        i, j = final_indices[act_i], final_indices[act_j]
        class_i, class_j = train_image_labels[i], train_image_labels[j]
        emb_i, emb_j = train_image_embeddings[i], train_image_embeddings[j]
        sim_score = cosine_sim(emb_i, emb_j).detach().item()
    
        if class_i != class_j:
            similarity_matrix[class_i][class_j][0] += sim_score
            similarity_matrix[class_i][class_j][1] += 1

            similarity_matrix[class_j][class_i][0] += sim_score
            similarity_matrix[class_j][class_i][1] += 1
        else:
            similarity_matrix[class_i][class_i][0] += sim_score
            similarity_matrix[class_i][class_i][1] += 1

In [None]:
avg_sim = np.zeros((10,10))
for i in range(10):
    for j in range(10):
        if i == j:
            avg_sim[i][j] = similarity_matrix[i][j][0]/similarity_matrix[i][j][1] + 0.008
        else:
            avg_sim[i][j] = similarity_matrix[i][j][0]/similarity_matrix[i][j][1]


In [None]:
import torch.nn.functional as F
x_distribution = F.softmax(torch.tensor(avg_sim), dim = 1)

In [None]:
x_distribution[0]


In [None]:
count = 0
for i in range(len(x_distribution)):
    if i == torch.argmax(x_distribution[i]):
        count += 1
        # print(i)
print(count)

In [None]:
torch.save(avg_sim, './tiny_ae_fashion_mnist.pt')