In [None]:
from pathlib import Path
import shutil
from tqdm.notebook import tqdm

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, KFold
import matplotlib.pyplot as plt

from plots import heatmap, annotate_heatmap

%matplotlib inline

In [None]:
def import_data(fn):
    data = np.loadtxt(fn)
    inputs, labels = data[:, 1:], data[:, 0]
    return inputs, labels

X, Y = import_data('zipcombo.dat')
n_classes = 10

In [None]:
class BaseKernel:

    def __init__(self, X1, X2=None):
        self.base_kernel_matrix = self.base_kernel(X1, X2 if X2 is not None else X1)

    def base_kernel(self, a, b):
        raise NotImplementedError
    
    def indices(self, row_indices, col_indices):
        if row_indices is None:
            row_indices = np.arange(self.base_kernel_matrix.shape[0])
        if col_indices is None:
            col_indices = np.arange(self.base_kernel_matrix.shape[1])
        return row_indices, col_indices
    
    def kernel(self, hparam, row_indices=None, col_indices=None):
        raise NotImplementedError

In [None]:
class PolynomialKernel(BaseKernel):

    def __init__(self, X1, X2=None):
        super().__init__(X1, X2)

    def base_kernel(self, a, b):
        dot_products = np.dot(a, b.T)
        return dot_products
    
    def kernel(self, hparam, row_indices=None, col_indices=None):
        row_indices, col_indices = self.indices(row_indices, col_indices)
        kernel_matrix = np.power(self.base_kernel_matrix[np.ix_(row_indices, col_indices)], hparam)
        return kernel_matrix
    
full_kernel = PolynomialKernel(X)

In [None]:
def init_coefs(n_classes, train_size):
    coefs = np.zeros((n_classes, train_size))
    return coefs
    
def predict(coefs, kernel_values):
    predictions = coefs.dot(kernel_values)
    return predictions

def sign(x):
    return np.where(x <= 0., -1., 1.)

In [None]:
def train(train_kernel, train_y, coefs, n_epochs=1):
    
    mistakes = [0 for _ in range(n_epochs)]
    for epoch in range(n_epochs):
        for i, y in enumerate(train_y):
            labels = np.full(coefs.shape[0], -1.); labels[int(y)] = 1.
            predictions = predict(coefs, train_kernel[i])
            updates = np.where(labels*predictions <= 0., sign(predictions), 0.)
            coefs[:, i] -= updates
            if np.argmax(predictions) != y:
                mistakes[epoch] += 1
    return coefs, mistakes

def test(test_kernel, test_y, coefs, return_cm=False):

    mistakes = 0
    if return_cm:
        confusion_matrix = np.zeros((10, 10))
    for i, y in enumerate(test_y):
        prediction = np.argmax(predict(coefs, test_kernel[i]))
        if prediction != y:
            mistakes += 1
        if return_cm:
            confusion_matrix[int(y), prediction] += 1

    return (mistakes, confusion_matrix) if return_cm else mistakes

### Demo

In [None]:
def execute_run(train_x, train_y, test_x, test_y, d=3, n_classes=3, n_epochs=3):

    train_kernel = PolynomialKernel(train_x).kernel(d)
    test_kernel = PolynomialKernel(test_x, train_x).kernel(d)
    coefs = init_coefs(n_classes, train_x.shape[0])
    for i in range(1, n_epochs+1):
        coefs, train_mistakes = train(train_kernel, train_y, coefs, n_epochs=1)
        test_mistakes = test(test_kernel, test_y, coefs, return_cm=False)
        print(f'Epoch {i} - {train_mistakes[-1]} mistakes out of {train_x.shape[0]} items on training set, test error is {test_mistakes/test_x.shape[0]*100:.3f}%.')
    
    return coefs, train_mistakes[0], test_mistakes

In [None]:
train_x, train_y = import_data('dtrain123.dat')
test_x, test_y = import_data('dtest123.dat')

coefs, *_ = execute_run(train_x, train_y-1, test_x, test_y-1, d=3, n_classes=3, n_epochs=3)

### Question 1

In [None]:
def question_1(n_runs, hparams, n_epochs):

    error_rates = {'train': np.zeros((n_runs, len(hparams))), 'test': np.zeros((n_runs, len(hparams)))}

    for i in tqdm(range(len(hparams))):

        hparam = hparams[i]
        full_kernel_d = full_kernel.kernel(hparam)

        for run in range(n_runs):

            train_indices, test_indices = train_test_split(np.arange(X.shape[0]), train_size=0.8, shuffle=True)
            train_kernel, test_kernel = full_kernel_d[np.ix_(train_indices, train_indices)], full_kernel_d[np.ix_(test_indices, train_indices)]
            train_y, test_y = Y[train_indices], Y[test_indices]

            coefs = init_coefs(n_classes=10, train_size=train_indices.size)
            coefs, train_mistakes = train(train_kernel, train_y, coefs, n_epochs=n_epochs)
            test_mistakes = test(test_kernel, test_y, coefs, return_cm=False)
            
            error_rates['train'][run, i] = train_mistakes[-1]/train_y.size
            error_rates['test'][run, i] = test_mistakes/test_y.size

    error_rates['train'] = [f'{100*m:.3f} ± {100*s:.3f}' for m, s in zip(np.mean(error_rates['train'], axis=0), np.std(error_rates['train'], axis=0))]
    error_rates['test']  = [f'{100*m:.3f} ± {100*s:.3f}' for m, s in zip(np.mean(error_rates['test'], axis=0),  np.std(error_rates['test'], axis=0))]

    return error_rates

In [None]:
n_runs, n_epochs = 20, 3
ds = range(1, 8)

error_rates = question_1(n_runs=n_runs, hparams=ds, n_epochs=n_epochs)
df = pd.DataFrame(data={'Train error (%)': error_rates['train'], 'Test error (%)': error_rates['test']}, index=ds)

df

### Questions 2 and 3

In [None]:
def question_2(n_runs, n_splits, hparams, n_epochs, return_cm=True):

    kfold = KFold(n_splits=n_splits, shuffle=True)

    confusion_matrix = [None for _ in range(n_runs)]
    results = {'hparam_star': np.zeros((n_runs,)), 'test_error': np.zeros((n_runs,))}

    for run in tqdm(range(n_runs)):
        
        train_indices, test_indices = train_test_split(np.arange(X.shape[0]), train_size=0.8, shuffle=True)
        
        val_errors = np.zeros((len(hparams), kfold.get_n_splits()))
        for i, hparam in enumerate(hparams):
            full_kernel_d = full_kernel.kernel(hparam)
            for fold, (train_fold, val_fold) in enumerate(kfold.split(train_indices)):
                train_kernel, val_kernel = full_kernel_d[np.ix_(train_fold, train_fold)], full_kernel_d[np.ix_(val_fold, train_fold)]
                train_y, val_y = Y[train_fold], Y[val_fold]
                coefs = init_coefs(n_classes, train_fold.size)
                coefs, _ = train(train_kernel, train_y, coefs, n_epochs=n_epochs)
                val_mistakes = test(val_kernel, val_y, coefs, return_cm=False)
                val_errors[i, fold] = val_mistakes/val_y.size
        val_errors = val_errors.mean(axis=1)
        hparam_star = hparams[np.argmin(val_errors)]; results['hparam_star'][run] = hparam_star

        full_kernel_d = full_kernel.kernel(hparam_star)
        train_kernel, test_kernel = full_kernel_d[np.ix_(train_indices, train_indices)], full_kernel_d[np.ix_(test_indices, train_indices)]
        train_y, test_y = Y[train_indices], Y[test_indices]
        coefs = init_coefs(n_classes=n_classes, train_size=train_indices.size)
        coefs, _ = train(train_kernel, train_y, coefs, n_epochs=n_epochs)
        test_mistakes, confusion_matrix[run] = test(test_kernel, test_y, coefs, return_cm=return_cm)
        results['test_error'][run] = test_mistakes/test_y.size

    return (results, np.stack(confusion_matrix)) if return_cm else results

In [None]:
n_splits = 5
results, confusion_matrix = question_2(n_runs=n_runs, n_splits=n_splits, hparams=ds, n_epochs=n_epochs, return_cm=True)

In [None]:
print(f"d* = {np.mean(results['hparam_star']):.3f} ± {np.std(results['hparam_star']):.3f}")
print(f"Test error (%) = {100*np.mean(results['test_error']):.3f} ± {100*np.std(results['test_error']):.3f}")

confusion_matrix = confusion_matrix / confusion_matrix.sum(axis=2, keepdims=True)
cm_mean = np.mean(confusion_matrix, axis=0)
cm_std = np.std(confusion_matrix, axis=0)
labels = np.array([f'{100*mean:.1f} ±\n{100*std:.1f}' for mean, std in zip(cm_mean.flatten(), cm_std.flatten())]).reshape(cm_mean.shape)

cm_mean = np.fill_diagonal(cm_mean, 0.)
labels = np.fill_diagonal(labels, '')

# plot the log of the values?
fig, axs = plt.subplots(figsize=(8, 8))
im, cbar = heatmap(100*cm_mean, np.arange(10), np.arange(10), ax=axs, cmap='Blues')
texts = annotate_heatmap(im, labels=labels)
fig.tight_layout()
plt.show()

### Question 4

In [None]:
def hardest_samples(kernel, ys, coefs, n=5):

    errors = np.zeros_like(ys)
    for i, y in enumerate(ys):
        predictions = predict(coefs, kernel[i])
        errors[i] = predictions[int(y)]
    hardest_samples = np.argpartition(errors, n)[:n]
    
    return hardest_samples

In [None]:
def plot(img, label, fn=None, show=False):
    
    fig, axs = plt.subplots(figsize=(4, 4))
    img = img.reshape(16, 16)
    axs.imshow(img, cmap='viridis', interpolation='none')
    axs.set_title(f'Label = {label}', fontdict={'font': 'serif', 'size': 16})
    axs.get_xaxis().set_visible(False); axs.get_yaxis().set_visible(False)
    fig.tight_layout()
    if fn is not None:
        fig.savefig(fn)
    if show:
        plt.show()
    else:
        plt.close(fig)

In [None]:
d = np.round(np.mean(results['hparam_star']))
full_kernel_d = full_kernel.kernel(d)

train_indices, test_indices = train_test_split(np.arange(X.shape[0]), train_size=0.8, shuffle=True)
train_kernel, test_kernel = full_kernel_d[np.ix_(train_indices, train_indices)], full_kernel_d[np.ix_(test_indices, train_indices)]
train_y, test_y = Y[train_indices], Y[test_indices]

coefs = init_coefs(n_classes=10, train_size=train_indices.size)
coefs, train_mistakes = train(train_kernel, train_y, coefs, n_epochs=3)
indices = hardest_samples(test_kernel, test_y, coefs, n=5)
hardest_test_samples = test_indices[indices]

In [None]:
display = 1

dirpath = Path('assets')
if dirpath.exists() and dirpath.is_dir():
    shutil.rmtree(dirpath)
dirpath.mkdir(parents=True, exist_ok=True)

for index in hardest_test_samples:
    img, label = X[index], int(Y[index])
    plot(img, label, fn=f'{dirpath}/sample-{index}_label-{label}.png', show=False)

index = hardest_test_samples[display-1]
img, label = X[index], int(Y[index])
plot(img, label, fn=None, show=True)

### Question 5

In [None]:
class GaussianKernel(BaseKernel):

    def __init__(self, X1, X2=None):
        super().__init__(X1, X2)

    def base_kernel(self, a, b):
        squared_distances = np.square(np.hstack([
            np.linalg.norm(np.expand_dims(a, 1) - np.expand_dims(b[[i], :], 0), axis=2)
            for i in tqdm(range(b.shape[0]))
        ]))
        return squared_distances
    
    def kernel(self, hparam, row_indices=None, col_indices=None):
        row_indices, col_indices = self.matrix_indices(row_indices, col_indices)
        kernel_matrix = np.exp(-hparam * self.base_kernel_matrix[np.ix_(row_indices, col_indices)])
        return kernel_matrix
    
full_kernel = GaussianKernel(X)

In [None]:
cs = np.linspace(0.01, 0.5, 7, endpoint=True)
error_rates = question_1(n_runs=n_runs, hparams=ds, n_epochs=n_epochs)

In [None]:
df = pd.DataFrame(data=error_rates, index=ds)
df

In [None]:
results = question_2(n_runs=n_runs, n_splits=n_splits, hparams=cs, n_epochs=n_epochs, cm=False)

In [None]:
print(f"c* = {np.mean(results['hparam_star']):.3f} ± {np.std(results['hparam_star']):.3f}")
print(f"Test error (%) = {np.mean(results['test_error']):.3f} ± {np.std(results['test_error']):.3f}")