# K-nearest neigbours

##### Imports

In [1]:
import time
import numpy as np
import os
from scipy.io import wavfile

# sklearn
from sklearn import neighbors
from sklearn.metrics import confusion_matrix

# visualization
import matplotlib.pyplot as plt
import seaborn

# torch
import torch
from torchvision import datasets, transforms

# Dataset
from torch.utils.data import DataLoader, Dataset

#Sklearn
from sklearn.model_selection import train_test_split




##### AudioDataset class

In [2]:
sliced_dataset = "../data/short_audio_dataset"
sliced_dataset_lenght = 16050
original_dataset = "../data/audio_dataset"
original_dataset_lenght = 80249

class AudioDataset(Dataset):
    def __init__(self, drop_both=False):
        root_folder = original_dataset
        max_length = original_dataset_lenght
        self.class_map = {"both": 0, "esben" : 1, "peter": 2}
        self.data = []
        self.labels = []
        for subdir, dirs, files in os.walk(root_folder):
            for file_name in files:
                if "both" in subdir and drop_both:
                   continue
                file_path = os.path.join(subdir, file_name)
                _, wav = wavfile.read(file_path)
                if wav.shape[0] > max_length:
                    max_length = wav.shape[0]
                    print("Found wav with more length than specified max one, new max is:", wav.shape[0])
                wav = np.pad(wav, (0, max_length-wav.shape[0]))
                label = file_path.split('/')[3][2:]
                self.labels.append(label)
                self.data.append(wav)
        print("Max length of wav files:", max_length)
    

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        wav = self.data[idx]
        label = self.labels[idx]
        class_id = self.class_map[label]
        wav_tensor = torch.from_numpy(wav)
        class_id = torch.tensor([class_id])
        return wav_tensor, class_id


##### Initialize the Dataset

In [3]:
dataset = AudioDataset()
train_data, val_data, train_labels, val_labels = train_test_split(dataset.data, dataset.labels, test_size=0.2, random_state=42)

Max length of wav files: 80249


##### Define knn algorithm

In [13]:
def knn_param_search(train_data, train_labels, test_data, test_labels, 
                     metrics=('manhattan', 'euclidean', 'chebyshev'), 
                     ks=(1, 3, 5, 10, 25, 50, 100, 250), 
                     n_train=None, n_test=None, algorithm='brute'):
  """
  Takes a dataset and plots knn classification accuracy 
  for different hyper parameters.

  n_train and n_test allows to subsample the dataset for faster iteration
  """
  x_train = np.array(train_data)
  y_train = np.array(train_labels)
  x_test = np.array(test_data)
  y_test = np.array(test_labels)
  
  # subsample the dataset
  if n_train:
    x_train, y_train = x_train[:n_train], y_train[:n_train]
  if n_test:
    x_test, y_test = x_test[:n_test], y_test[:n_test]

  for metric in metrics:
    print(f'Metric: {metric}')
    for k in ks:
        print(f'\tk: {k:3d} Training', end='')
        classifier = neighbors.KNeighborsClassifier(k, algorithm=algorithm, metric=metric)
        classifier = classifier.fit(x_train, y_train)

        start = time.time()
        print(f'\r\tk: {k:3d} Testing', end='')
        labels = classifier.predict(x_test)
        duration = time.time() - start

        correct = labels == np.array(y_test)
        print(f'\r\tk: {k:3d} Accuracy: {correct.mean() * 100:.2f} %, Duration: {duration:.2f} s')

##### Call the KNN Algorithm 

In [15]:
knn_param_search(train_data, 
                 train_labels, 
                 val_data, 
                 val_labels,
                 ks=[3]
                )

Metric: manhattan
	k:   2 Accuracy: 40.00 %, Duration: 0.77 s
Metric: euclidean
	k:   2 Accuracy: 36.67 %, Duration: 0.10 s
Metric: chebyshev
	k:   2 Accuracy: 36.67 %, Duration: 0.62 s


#### Mlp


In [None]:
def show_nearest_neighbours(train_data, train_labels, test_data, test_labels,
                            n_examples=20, n_neighbours=10, seed=None, 
                            scale=1., labelnames=tuple(range(10))):
  x_train = np.array(train_data)
  y_train = np.array(train_labels)
  x_test = np.array(test_data)
  y_test = np.array(test_labels)

  rand = np.random.RandomState(seed=seed)
  idx = rand.randint(0, len(x_test), n_examples)
  x_test, y_test = x_test[idx], y_test[idx].reshape(-1)
  y_train = y_train.reshape(-1)

  d_data = np.prod(x_test.shape[1:])
  classifier = neighbors.KNeighborsClassifier(algorithm='brute', metric='euclidean')
  classifier = classifier.fit(x_train.reshape((-1, d_data)), y_train)
  dist, idx = classifier.kneighbors(x_test[:n_examples].reshape(-1, d_data), 
                                    n_neighbors=n_neighbours)

  fig, axs = plt.subplots(n_neighbours + 1, n_examples, 
                          figsize=(n_examples * 0.5 * scale, 
                                   n_neighbours * 0.75 * scale))
  cmap = 'gray' if len(x_test.shape) == 3 else None
  for i in range(n_examples):
    ax = axs[0, i]
    ax.imshow(x_test[i], cmap=cmap)
    ax.set_title(labelnames[y_test[i]])
    if i == 0:
      ax.set_ylabel('inp')
    for j in range(n_neighbours):
      ax = axs[j + 1, i]
      n_idx = idx[i, j]
      y = y_train[n_idx]
      ax.imshow(x_train[n_idx], cmap=cmap)
      ax.set_title(labelnames[y] if y != y_test[i] else '')
      if i == 0:
        ax.set_ylabel(f'n{j+1}')
  for ax in axs.reshape(-1):
    ax.set_xticks([])
    ax.set_yticks([])
  plt.tight_layout()
  
  
  

In [None]:
show_nearest_neighbours(mnist_train_dataset.data, mnist_train_dataset.targets, 
                        mnist_test_dataset.data, mnist_test_dataset.targets, 
                        seed=0)

Looking at the nearest neighbours, it becomes apparent that for many of the test images, an almost identical image is in the training set.

To us, the nearest neighbour 'mistakes' (n1-n10 with another marked number) may seem odd, but remember that the distance is meassured in pixel space.

Let's have a look at what kind of mistakes the model makes.

In [None]:
def plot_confusion_matrix(gt, pred, only_show_mistakes=False, 
                          labelnames='auto'):
  gt = np.array(gt)
  pred = np.array(pred)
  if only_show_mistakes:
    mask = gt != pred
    gt, pred = gt[mask], pred[mask]

  conf_matrix = confusion_matrix(gt, pred)
  plt.figure(figsize=(10, 8))
  ax = seaborn.heatmap(conf_matrix / conf_matrix.sum(), annot=True, fmt='.2%', 
                       cmap='Blues', cbar=False, yticklabels=labelnames)
  ax.set_ylabel('ground truth')
  ax.set_xlabel('predicted')

In [None]:
plot_confusion_matrix(y_test, labels)

In [None]:
plot_confusion_matrix(y_test, labels, only_show_mistakes=True)

In [None]:
def show_mistake_examples(x_test, y_test, labels, labelnames=tuple(range(10)), 
                          rows=4, cols=7, seed=None):
  x_test = np.array(x_test)
  y_test = np.array(y_test)
  labels = np.array(labels)
  mask = labels != y_test
  x_test, y_test, labels = x_test[mask], y_test[mask], labels[mask]
  idx = np.random.RandomState(seed=seed).randint(0, len(x_test), rows * cols)
  x_test, y_test, labels = x_test[idx], y_test[idx], labels[idx]
  cmap = 'gray' if x_test.ndim == 3 else None
  fig, axs = plt.subplots(rows, cols, figsize=(cols, rows * 1.0))
  for i, ax in enumerate(axs.reshape(-1)):
    ax.imshow(x_test[i], cmap=cmap)
    gt, pred = labelnames[y_test[i]], labelnames[labels[i]]
    ax.set_title(f'{gt} ({pred})')
    ax.axis('off')
  plt.tight_layout()
  print('ground truth (predicted)')
  plt.show()

In [None]:
show_mistake_examples(x_test, y_test, np.array(labels))

# CIFAR 10

In [None]:
cifar10_train_dataset = datasets.CIFAR10('../', train=True, download=True)
cifar10_test_dataset = datasets.CIFAR10('../', train=False, download=True)

In [None]:
cifar10_classes = cifar10_train_dataset.classes
for i in range(9):
  plt.subplot(3,3,i+1)
  plt.tight_layout()
  plt.imshow(cifar10_test_dataset.data[i], cmap='gray', interpolation='none')
  plt.title("Ground Truth: {}".format(cifar10_classes[cifar10_test_dataset.targets[i]]))
  plt.xticks([])
  plt.yticks([])

In [None]:
knn_param_search(cifar10_train_dataset.data, cifar10_train_dataset.targets, 
                 cifar10_test_dataset.data, cifar10_test_dataset.targets,
                 ['euclidean'], n_test=500)

In [None]:
(x_train, y_train) = (cifar10_train_dataset.data, cifar10_train_dataset.targets)
(x_test, y_test) = (cifar10_test_dataset.data, cifar10_test_dataset.targets)

# Use all data to evaluate performance of the most promising hyperparameters
classifier = neighbors.KNeighborsClassifier(10, algorithm='brute', metric='euclidean')
classifier = classifier.fit(x_train.reshape((-1, 32 * 32 * 3)), y_train)
labels = classifier.predict(x_test.reshape((-1, 32 * 32 * 3)))
correct = labels == np.array(y_test)
print(f'Accuracy: {correct.mean() * 100} %')

In [None]:
show_mistake_examples(x_test, y_test, labels, labelnames=cifar10_classes)

This doesn't work nearly as well as our MNIST classifier. Some of the predictions seem ridiculous. Let's look at the nearest neighbours.

In [None]:
show_nearest_neighbours(cifar10_train_dataset.data, cifar10_train_dataset.targets,
                        cifar10_test_dataset.data, cifar10_test_dataset.targets,
                        n_examples=10, n_neighbours=5, scale=2, labelnames=cifar10_classes)

We could improve accuracy by gathering more data, but that's expensive.
Also note that inference time grows with the amount of training data.

# Principal Component Analysis (PCA)

Instead of measuring the distance directly in pixel-space,
we can attempt to extract features using a classical decomposition.

In [None]:
from sklearn.decomposition import PCA

x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)

d_pca = 20  # hyper parameter
pca = PCA(n_components=d_pca).fit(x_train.reshape((-1, 32 * 32 * 3)))
x_train_pca = pca.transform(x_train.reshape((-1, 32 * 32 * 3)))
x_test_pca = pca.transform(x_test.reshape((-1, 32 * 32 * 3)))
print(f'\tCIFAR-10 compressed train x/y shape: {x_train_pca.shape} / {y_train.shape}')
print(f'\tCIFAR-10 compressed test x/y shape: {x_test_pca.shape} / {y_test.shape}')

In [None]:
# we can visualize the pca components
fig, axs = plt.subplots(4, 5, figsize=(5, 5))
for i, ax in enumerate(axs.reshape(-1)):
  pci = pca.components_[i].reshape(32, 32, 3)
  ax.imshow(0.5 + pci * 0.5 / np.abs(pci).max())
  ax.set_title(f'PC{i}')
  ax.axis('off')
plt.tight_layout()

In [None]:
knn_param_search(x_train_pca, y_train, x_test_pca, y_test, n_test=500)

In [None]:
classifier = neighbors.KNeighborsClassifier(25, algorithm='brute', metric='manhattan')
classifier = classifier.fit(x_train_pca, y_train)
labels = classifier.predict(x_test_pca)
correct = labels == y_test
print(f'Accuracy: {correct.mean() * 100} %')