In [13]:
!wget https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
!rm -r cifar-10-batches-py
!tar xvzf cifar-10-python.tar.gz
!rm cifar-10-python.tar.gz

--2019-10-10 18:57:33--  https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
Translacja www.cs.toronto.edu (www.cs.toronto.edu)... 128.100.3.30
Łączenie się z www.cs.toronto.edu (www.cs.toronto.edu)|128.100.3.30|:443... połączono.
Żądanie HTTP wysłano, oczekiwanie na odpowiedź... 200 OK
Długość: 170498071 (163M) [application/x-gzip]
Zapis do: `cifar-10-python.tar.gz'


2019-10-10 18:59:37 (1,32 MB/s) - zapisano `cifar-10-python.tar.gz' [170498071/170498071]

cifar-10-batches-py/
cifar-10-batches-py/data_batch_4
cifar-10-batches-py/readme.html
cifar-10-batches-py/test_batch
cifar-10-batches-py/data_batch_3
cifar-10-batches-py/batches.meta
cifar-10-batches-py/data_batch_2
cifar-10-batches-py/data_batch_5
cifar-10-batches-py/data_batch_1


In [1]:
import numpy as np
import pickle
from numpy.linalg import norm


train_data = np.empty((0,3072))
train_labels = np.empty((0))
CIFAR_TRAIN_DATA = []

for i in range(1, 2):
    with open('cifar-10-batches-py/data_batch_{}'.format(i), "rb") as f:
        data = pickle.load(f, encoding="bytes")
        train_data = np.append(train_data, data[b'data'], axis=0)
        train_labels = np.append(train_labels, data[b'labels'])
        
        for label, data in zip(train_labels, train_data):
            CIFAR_TRAIN_DATA.append((data, label))

test_data = np.empty((0,3072))
test_labels = np.empty((0))
CIFAR_TEST_DATA = []

with open("cifar-10-batches-py/test_batch", "rb") as f:
    data = pickle.load(f, encoding="bytes")
    test_data = np.append(test_data, data[b'data'], axis=0)
    test_labels = np.append(test_labels, data[b'labels'])
    
    for label, data in zip(test_labels, test_data):
        CIFAR_TEST_DATA.append((data, label))

In [2]:
def metric_l2(a, b):
    return np.linalg.norm(a-b)

def metric_l1(a, b):
    return np.linalg.norm(a-b, 1)

In [3]:
from heapq import nsmallest
from itertools import groupby, tee


def label(t):
    return t[1]

"""Project grouping to suitable form, allowing easy extraction of the label: 
    (size of group, -min distance, label)"""
def project_group(t):
    # t[1] contains iterator with (distance, group)
    min_it, len_it = tee(t[1], 2)
    # (count of elems in group, negated minimum distance, label)
    return (sum(1 for _ in len_it), -min(map(lambda v: v[0], min_it)), t[0])

"""Gets label for nearest neighbour in train_data"""
def nearest_neighbour_label(train_data, metric, target, k=1):
    first_k = nsmallest(k, map(lambda n: (metric(n[0], target), n[1]), train_data ))
    # then grouping by labels the result
    first_k = sorted(first_k, key=label)
    groups = groupby(first_k, key=label)
    return max(map(project_group, groups))[-1]

In [4]:
SKIP_STEP = 20


def cifar_test_metric(metric, train_data, test_data, k=1):
    correct = 0
    total = 0

    for test_data_obj in test_data:
        total += 1
        if total % SKIP_STEP != 0:
            continue
        correct += (nearest_neighbour_label(train_data, metric, test_data_obj[0], k=k) == test_data_obj[1])
            
    return correct / total * SKIP_STEP * 100

In [5]:
import random


def merge_chunks(chunks, i):
    result = np.array([], dtype=np.int32)
    
    for j in range(len(chunks)):
        if i == j:
            continue
        result = np.append(result, chunks[j])
        
    return result


def cross_validation(data, split=4, k=1):
    data = np.array(data)
    indices = list(range(len(data)))
    random.shuffle(indices)
    chunks = np.array_split(np.array(indices), split)
    chunks_len = len(chunks)
    results = {"l2": [], "l1": []}
    
    for i in range(chunks_len):
        test_data = data[list(chunks[i])]
        train_data = data[merge_chunks(chunks, i)]
        
        results["l2"].append(cifar_test_metric(metric_l2, train_data, test_data, k=k))
        results["l1"].append(cifar_test_metric(metric_l1, train_data, test_data, k=k))
        
    return results


In [6]:
CROSS_VALIDATE_K = [1, 3, 5, 7]
ACCURACIES = {"l1": {}, "l2": {}}


for k in CROSS_VALIDATE_K:
    result = cross_validation(CIFAR_TRAIN_DATA, k=k)
    print(f"Done cross-validation for {k} nearest neighbours")
    
    for m in ("l1", "l2"):
        ACCURACIES[m][k] = {"avg": np.average(result[m]), "res": result[m]}

Done cross-validation for 1 nearest neighbours
Done cross-validation for 3 nearest neighbours
Done cross-validation for 5 nearest neighbours
Done cross-validation for 7 nearest neighbours


In [7]:
BEST_K = {}

def print_results(results):
    results_for_k = []
    for k in results:
        avg = results[k]['avg']
        res = results[k]['res']
        results_for_k.append((avg, k))
        print(f"k = {k}, avg = {avg}\t: " + "\t".join(map(lambda f: str(round(f, 2)), res)))
    best_k = max(results_for_k)[1]
    print("")
    
    print(f"Best k: {best_k}")
    
    return best_k
    
        
for metric in ACCURACIES:
    print(f"Results for metric {metric}:\n")
    BEST_K[metric] = print_results(ACCURACIES[metric])
    print("\n")

Results for metric l1:

k = 1, avg = 28.400000000000002	: 29.6	24.0	32.0	28.0
k = 3, avg = 30.400000000000002	: 35.2	31.2	25.6	29.6
k = 5, avg = 34.800000000000004	: 40.0	36.0	32.8	30.4
k = 7, avg = 33.2	: 37.6	24.0	26.4	44.8

Best k: 5


Results for metric l2:

k = 1, avg = 30.2	: 33.6	24.0	33.6	29.6
k = 3, avg = 26.799999999999997	: 24.0	25.6	24.0	33.6
k = 5, avg = 31.599999999999998	: 33.6	31.2	30.4	31.2
k = 7, avg = 29.6	: 28.8	25.6	22.4	41.6

Best k: 5




In [8]:
print("\nL2: Selected k for k-NearestNeighbors =", BEST_K["l2"], " accuracy =", round(cifar_test_metric(metric_l2, CIFAR_TRAIN_DATA, CIFAR_TEST_DATA, k=BEST_K["l2"]), 3))


print("\nL1: Selected k for k-NearestNeighbors =", BEST_K["l1"], " accuracy =", round(cifar_test_metric(metric_l1, CIFAR_TRAIN_DATA, CIFAR_TEST_DATA, k=BEST_K["l1"]), 3))



L2: Selected k for k-NearestNeighbors = 5  accuracy = 29.6

L1: Selected k for k-NearestNeighbors = 5  accuracy = 32.2
