# Lab 2, Nearest Neighbour (NN)
by Domrachev Ivan, B20-RO-01

In [78]:
import torch
import torch.nn as nn
import numpy as np
from matplotlib import pyplot as plt

import torch
import torchvision
import torchvision.transforms as transforms
from tqdm.notebook import tqdm 

from collections.abc import Iterable, Callable

## Part 1. Finishing lab task

Here, the task is to continue considering a toy example from the lab and test the second test data and consider another metric.

In [79]:
# Generating train data
train_data = np.array([
    np.eye(4) + np.eye(4)[::-1],
    np.ones((4,4))
])
train_data[1, 1:3, 1:3] = 0
print(train_data[0])
print(train_data[1])

train_labels = np.array([0, 1])

[[1. 0. 0. 1.]
 [0. 1. 1. 0.]
 [0. 1. 1. 0.]
 [1. 0. 0. 1.]]
[[1. 1. 1. 1.]
 [1. 0. 0. 1.]
 [1. 0. 0. 1.]
 [1. 1. 1. 1.]]


In [80]:
# Generating test data
test_data = np.copy(train_data)
test_data[0, ((1,2)), ((2,1)),] = 0
test_data[1, ((1, 3)), ((3, 2)),] = 0
test_data

array([[[1., 0., 0., 1.],
        [0., 1., 0., 0.],
        [0., 0., 1., 0.],
        [1., 0., 0., 1.]],

       [[1., 1., 1., 1.],
        [1., 0., 0., 0.],
        [1., 0., 0., 1.],
        [1., 1., 0., 1.]]])

In [81]:
def KNN(
        train_ds: Iterable[np.array, np.array], 
        test_data: np.array, 
        dist_func: Callable[[np.array, np.array], np.array],
        K: int = 1
) -> int:
    """Predicts class of test_data given train_data
    
        Keyword arguments:
        train_ds -- (train_data, train_labels), the actual dataset
        test_data -- the data to be tested
        dist_func -- function to measure the distance
        
        Returns:
        Class prediction, according to the train_labels"""
    train_data, train_labels = train_ds
    distance = dist_func(train_data, test_data)
    
    if K == 1:
        return train_labels[distance.argmin()]
    else:
        # Get K indices of K least elements
        closest_classes = train_labels[np.argpartition(-distance, K)[:K]]
        # Return the most frequent inxed. If there are several,
        # return the first one (a.k.a. the smallest)
        return train_labels[
            closest_classes[closest_classes.argmax()]
        ]

In [82]:
def abs_dist(train_data: np.array, test_data: np.array) -> np.array:
    """ Measures distance as absolute distance between entries 
        
        Keyword arguments:
        train_data -- the actual dataset
        test_data -- the data to be tested
            
        Returns:
        Absolute distance for each train_data entry"""
    return np.abs(
        train_data - test_data
    ).sum(
        axis = tuple(
            i for i in range(1, train_data.ndim)
        )
    )


def mult_dist(train_data: np.array, test_data: np.array) -> np.array:
    """ Measures distance as matrix multiplication 
        
        Keyword arguments:
        train_data -- the actual dataset
        test_data -- the data to be tested
            
        Returns:
        Multiplication distance for each train_data entry"""
    # Apparently, one single multiplication is longer to compute
    # than many small ones (this becomes important for CIFAR10)
    sum = 0
    for train_entry in train_data:
        sum += (train_entry.T @ test_data.T).sum()
    
    return sum

In [83]:
# Making predictions for the data
pred1 = [
    KNN(
        (train_data, train_labels), 
        test_data_i, 
        abs_dist
    ) for test_data_i in test_data
]
pred2 = [
    KNN(
        (train_data, train_labels), 
        test_data_i, 
        mult_dist
    ) for test_data_i in test_data
]

pred1_labeled = ["cross" if pred1_i else "circle" for pred1_i in pred1]
pred2_labeled = ["cross" if pred2_i else "circle" for pred2_i in pred2]

pred1_labeled, pred2_labeled

(['circle', 'cross'], ['circle', 'circle'])

Overall, one could say that absolute distance metric (`abs_dist`) works better for this task, because it shows better performance for the second test entry. It is correct for the first metric, but incorrect for the second

## Part 2. Testing 2 KNN implementation

### 2.1. Implementing 2-KNN manually

First, one need to load CIFAR10 dataset:

In [84]:
transform = transforms.Compose([
    transforms.ToTensor(),
    # This normalization values are precomputed for CIFAR-10 dataset,
    # refer to: https://github.com/kuangliu/pytorch-cifar/issues/19
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2471, 0.2435, 0.2616)),
])

# Train
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, 
                                          shuffle=True, num_workers=2)

# Test 
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset,
                                         shuffle=False, num_workers=2)

# List of classes
classes = trainset.classes

Files already downloaded and verified
Files already downloaded and verified


Let's convert the trainset and testset to the numpy arrays and test it's performance with our CNN implementation

In [85]:
train_data_np = trainset.data
train_labels_np = np.array(trainset.targets)
test_data_np = testset.data[:100]
test_labels_np = np.array(testset.targets)[:100]

In [86]:
# ~16 seconds
pred1 = []
test_loop_1 = tqdm(enumerate(test_data_np, 0), total=len(test_data_np), desc="Test absolute value")
for i, input in test_loop_1:
    pred1.append(
        KNN(
            (train_data_np, train_labels_np), 
            input, 
            abs_dist
        )
    )

Test absolute value:   0%|          | 0/100 [00:00<?, ?it/s]

In [None]:
# ~7.5 minutes
pred2 = []
test_loop = tqdm(enumerate(test_data_np, 0), total=len(test_data_np), desc="Test multiplication")
for i, input in test_loop:
    pred2.append(
        KNN(
            (train_data_np, train_labels_np), 
            input, 
            mult_dist
        )
    )

Test:   0%|          | 0/100 [00:00<?, ?it/s]

In [None]:
pred1 = np.array(pred1)
pred2 = np.array(pred2)

In [None]:
acc1 = (pred1 == test_labels_np).sum() / len(pred1)
acc2 = (pred2 == test_labels_np).sum() / len(pred2)

print(f"Accuracy of absolute distance: {acc1}")
print(f"Accuracy of product distance: {acc2}")

Accuracy of absolute distance: 0.07
Accuracy of product distance: 0.07


As one could see, the accuracy is quite low without encoding

### 2.2. Testing encoded data 

For a prepared encoder, I've decided to borrow a RESNET model, trained on CIFAR-10 from [here](https://github.com/huyvnphan/PyTorch_CIFAR10/tree/master).

In [None]:
# !git submodule add https://github.com/huyvnphan/PyTorch_CIFAR10.git

In [69]:
from PyTorch_CIFAR10.cifar10_models.resnet import resnet50

model = resnet50()
ckpt = torch.load("data/resnet50.pt")
model.load_state_dict(ckpt)
encoding_model = torch.nn.Sequential(*(list(model.children())[:-1]));

In [77]:
trainset_encoded = []
with torch.no_grad():
    encoding_model.eval()  # evaluation mode
    test_loop = tqdm(enumerate(trainloader, 0), total=len(trainloader), desc="Train encoding")
    for i, inputs in test_loop:
        trainset_encoded.append(encoding_model(inputs[0]))
trainset_encoded[0]

Train encoding:   0%|          | 0/50000 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [75]:
testset_encoded = []
with torch.no_grad():
    encoding_model.eval()  # evaluation mode
    test_loop = tqdm(enumerate(testloader, 0), total=len(testloader), desc="Test encoding")
    for i, inputs in test_loop:
        testset_encoded.append(encoding_model(inputs[0]))
testset_encoded[0]

Test encoding:   0%|          | 0/10000 [00:00<?, ?it/s]

[tensor([[[[ 0.5189,  0.5347,  0.6299,  ...,  0.1856,  0.0110, -0.1477],
          [ 0.4236,  0.4078,  0.5347,  ...,  0.1697, -0.0049, -0.1001],
          [ 0.4078,  0.4078,  0.5189,  ...,  0.2173,  0.0745, -0.0842],
          ...,
          [-0.9095, -1.3221, -1.4967,  ..., -1.3856, -1.7824, -1.3539],
          [-1.0206, -1.2110, -1.4332,  ..., -1.5760, -1.5284, -1.6713],
          [-1.1317, -1.0999, -1.2745,  ..., -1.6078, -1.4491, -1.6554]],

         [[-0.1765, -0.1926, -0.1121,  ..., -0.4503, -0.5147, -0.6114],
          [-0.1765, -0.2087, -0.1443,  ..., -0.4503, -0.5147, -0.5630],
          [-0.2087, -0.2248, -0.1926,  ..., -0.4020, -0.4503, -0.5469],
          ...,
          [ 0.0167, -0.3698, -0.5630,  ..., -0.4181, -0.9496, -0.6114],
          [-0.1121, -0.3376, -0.6114,  ..., -0.6597, -0.6597, -0.9496],
          [-0.2571, -0.2893, -0.5469,  ..., -0.7402, -0.6275, -0.9013]],

         [[-0.9723, -1.0022, -0.9423,  ..., -1.1671, -1.1671, -1.2121],
          [-0.9423, -1.1072, 

TypeError: conv2d() received an invalid combination of arguments - got (list, Parameter, NoneType, tuple, tuple, tuple, int), but expected one of:
 * (Tensor input, Tensor weight, Tensor bias, tuple of ints stride, tuple of ints padding, tuple of ints dilation, int groups)
      didn't match because some of the arguments have invalid types: (!list of [Tensor, Tensor]!, !Parameter!, !NoneType!, !tuple of (int, int)!, !tuple of (int, int)!, !tuple of (int, int)!, int)
 * (Tensor input, Tensor weight, Tensor bias, tuple of ints stride, str padding, tuple of ints dilation, int groups)
      didn't match because some of the arguments have invalid types: (!list of [Tensor, Tensor]!, !Parameter!, !NoneType!, !tuple of (int, int)!, !tuple of (int, int)!, !tuple of (int, int)!, int)


In [None]:
trainset_encoded = [entry.numpy() for entry in trainset_encoded]
testset_encoded = [entry.numpy() for entry in testset_encoded]

In [None]:
def cosine_similarity(train_data: np.array, test_data: np.array) -> np.array:
    """ Measures cosine similarity between inputs
        
        Keyword arguments:
        train_data -- the actual dataset
        test_data -- the data to be tested
            
        Returns:
        Cosine similarity for each train_data entry"""
       
    return train_data.dot(test_data) / (train_data.norm()*test_data.norm())

In [None]:
pred_encoded = []
test_loop = tqdm(enumerate(test_data_np, 0), total=len(test_data_np), desc="Test")
for i, input in test_loop:
    pred2.append(
        KNN(
            (trainset_encoded, testset_encoded), 
            input, 
            cosine_similarity
        )
    )