# Supervised Learning Coursework 2 - Part I

## Import Libraries

In [None]:
# Import libraries
import numpy as np
from scipy.stats import mode 
import cupy as cp
from matplotlib import pyplot as plt
from itertools import combinations
import seaborn as sns
import pandas as pd
import time
import json
import os

## Create Results Folder

Run the below code to create a folder where the results will be saved.

In [None]:
# The folder name can be modified to whatever you wish
results_dir= "./slcw2_results/"

if not os.path.exists(results_dir):
    os.mkdir(results_dir)

## Create Helper Functions

These functions are essential for developing the models.

### Kernel Functions

In [None]:
# Polynomial Kernel
def polynomial_kernel_matrix(X1, X2, d=3):
    
    """ 
    Compute polynomial kernel matrix, given data matrices.
    
    Args
    ----
    X1 - training data matrix of shape (m, n) where m is the number of training instances and n is the number of features
    X2 - matrix of shape (l, n) where l=m if we are generating matrix for training. Otherwise, l is the number of new points we are predicting on.
    d - degree of polynomial kernel
    
    Returns
    -------
    kernel matrix of dimensions (m, l)
    """
    
    return (X1@X2.T)**d

# Gaussian Kernel
def gaussian_kernel_matrix(X1, X2, c=1):
    
    """ 
    Compute gaussian kernel matrix, given data matrices.
    
    Args
    ----
    X1 - training data matrix of shape (m, n) where m is the number of training instances and n is the number of features
    X2 - matrix of shape (l, n) where l=m if we are generating matrix for training. Otherwise, l is the number of new points we are predicting on.
    c - width of Gaussian kernel
    
    Returns
    -------
    kernel matrix of dimensions (m, l)
    """
    
    B = X1 @ X2.T
    norm_sq = np.diagonal(X1@X1.T).reshape(-1, 1) - 2*B + np.diagonal(X2@X2.T).reshape(1, -1)
    return np.exp(-1 * c * norm_sq)

### Data Processing Functions

In [None]:
def preprocess(dataset, flat=True):
    """ Preprocess dataset by separating labels and image pixels. If flat=True, then each datapoint is a flattened vector. """

    # Extract labels
    y = dataset[:, 0]

    # Extract pixel values
    x = dataset[:, 1:]

    if not flat:
        # Reshape into image dimensions
        x = x.reshape((x.shape[0], 16, 16))

    return x, y

def create_subsets(data, labels, classes):
    
    """ 
    Create subsets of the dataset which only contain examples and labels for selected class pairs.
    
    Args
    ----
    data - dataset of dimensions (m, n) where m is the number of instances and n is the number of features
    labels - true labels for data, must be of shape (m, )
    classes - the class pairs to select from the full data matrix to generate data subset. Should be a tuple/list.
    
    Returns
    -------
    subset_examples - set of data examples corresponding to the desired class pairs
    subset_labels - true labels for the subset_examples.
    """
    
    # Find indices of rows with the desired classes
    class_1 = np.where(labels == classes[0])
    class_2 = np.where(labels == classes[1])
    
    # select rows from entire dataset
    subset_examples = np.concatenate((data[class_1], data[class_2]))
    subset_labels = np.concatenate((labels[class_1], labels[class_2]))
    
    return subset_examples, subset_labels

def split_data(inputs, targets, test_proportion, shuffle=None):
    """
    Splits the data into training and test sets.

    Args
    ----
    inputs : NumPy array of input data. Should be of shape (# examples, # features)
    targets : NumPy array of target data. Should be of shape (# examples, 1)
    test_proportion : Value between 0 and 1 which specifies how much of the data to use for testing.
    shuffle : Optional. Set to True if you want the data shuffled and then split.
    seed : Optional. Set for reproducible results.

    Returns
    -------
    train_X : NumPy array of training examples. Should be of shape (# examples, # features)
    train_Y : NumPy array of training targets. Should be of shape (# examples, 1)
    test_X : NumPy array of testing examples. Should be of shape (# examples, # features)
    test_Y : NumPy array of testing targets. Should be of shape (# examples, 1)
    """
  
    #Stores the number of data points
    nData = inputs.shape[0]

    # Shuffle data
    if shuffle:
        #Generate a shuffled version of the array indices
        shuffled_indices = np.random.permutation(nData)
        #Shuffle the inputs as per in the array of shuffled indices
        shuffled_inputs = inputs[shuffled_indices, :]
        shuffled_targets = targets[shuffled_indices]
    else:
        #If shuffle is set to False then we just work with the data in its original order
        shuffled_indices = None
        shuffled_inputs = inputs
        shuffled_targets = targets

    # Calculate the split index based on the specified proportions
    split_index = int((1 - test_proportion) * nData)
    
    # Collect train and test indices
    train_idxs = shuffled_indices[:split_index]
    test_idxs = shuffled_indices[split_index:]
    cache = (train_idxs, test_idxs)

    # Select the examples up to the split index to be used as training set
    train_X = shuffled_inputs[:split_index]
    train_Y = shuffled_targets[:split_index]
    # Select the examples from the split index onwards to be used as the test set
    test_X = shuffled_inputs[split_index:]
    test_Y = shuffled_targets[split_index:]

    return train_X, train_Y, test_X, test_Y, cache

## Load full dataset

In [None]:
# Load the full dataset
full_dataset = np.genfromtxt("data/zipcombo.dat")

# Preprocess full dataset using helper function
x, y = preprocess(full_dataset)

# Inpsect the full dataset 
print("Number of records in full dataset = {}".format(full_dataset.shape[0]))
print("Labels for the full dataset = {}".format(np.unique(full_dataset[:, 0])))

In [None]:
# Visualize a few images
np.random.seed(1)
idxs = np.random.choice(len(x), size=7)
sample_imgs = x[idxs].reshape(7, 16, 16)
sample_lbls = y[idxs]

plt.figure(figsize=(17, 6))
for i in range(len(sample_imgs)):
    plt.subplot(1, len(sample_imgs), i+1)
    plt.imshow(sample_imgs[i], plt.cm.gray)
    plt.title("Image Label = {}".format(sample_lbls[i]))

## Setup Multi-Class Kernel Perceptron Model

My kernel perceptron model is represented using the below python class. It can work with the Polynomial kernel or the Gaussian kernel and is designed for one-vs-all classification.

In [None]:
class Kernel_Perceptron():

    """ 
    
    Kernel Perceptron Class. Can be modified to train in one-vs-all (ova) mode or one-vs-one (ovo) mode.
    
    Args
    ----
    train_x : training data of shape (m, n) where m is the number of examples and n is number of features
    train_y : training labels of shape (m, )
    test_x : testing data of shape (l, n) where l is number of testing examples
    test_y : testing labels of shape (l, )
    
    Attributes
    ----------
    mode : can be ova or ovo
    kernel_func : kernel matrix function
    classes : list of unique classes in dataset
    k : number of unique classes
    m_train : number of training examples
    m_test : number of test examples
    train_K : kernel matrix or list of kernel matrices for training data, based on whether mode=ova or ovo
    test_K : kernelm matrix of list of kernel matrices for test data, based on whether mode=ova or ovo
    train_y : vector of training labels
    test_y : vector of testing labels
    alphas : matrix of shape (k, m_train) or list of vectors of shape (m_train, ) based on whether mode is ova or ovo
    classifier_K : list of kernel matrices for each subset of data for class pairs
    classifier_y : list of training labels for each subset of data for class pairs
    
    Methods
    -------
    fit - Fits model using fit_ova or fit_ovo method, based on the value of mode attribute
    predict - generates predictions using predict_ovo or predict_ova, based on the value of mode attribute
    evaluate - computes misclassification error
    evaluate_binary_classifier - required for training indivitual classifiers as part of fit_ovo method
    
    """

    def __init__(self, train_x, train_y, test_x, test_y, kernel_func, mode="ova"):
        
        # Training mode - can be one-vs-all (ova) or one-vs-one (ovo)
        self.mode = mode
        
        # Type of kernel function used to create kernel matrix
        self.kernel_func = kernel_func
        
        # Number of unique classes
        self.classes = np.unique(train_y)
        self.k = len(self.classes)
        
        # Dimensions of datasets
        self.m_train = len(train_y) 
        self.m_test = len(test_y)
        
        # Process training dataset differently based on the mode of training
        if mode == "ova":
            # create gram matrices
            self.train_K = kernel_func(train_x, train_x)
            self.test_K = kernel_func(train_x, test_x)
            self.train_y = train_y
            self.test_y = test_y
            self.alphas = np.zeros((self.k, self.m_train))
            
        elif mode == "ovo":
            
            # creates all combinations of classes which will be used to design indivitual binary classifiers
            self.classifiers = list(combinations(self.classes, 2))
            
            # initialize params
            self.alphas = [] # This will be a list of (m_subset, ) vectors where m_subset is the number of examples in each binary classifier's data subset
            self.classifier_K = [] # This will be a list of (m_subset, m_subset) matrices 
            self.classifier_y = [] # This will be a list of (m_subset, ) vectors
            self.train_K = [] # This will be a list of (m_subset, m_train) matrices
            self.train_y = train_y # This will be the original train labels
            self.test_K = [] # This will be a list of (m_subset, m_test) matrices
            self.test_y = test_y # This will be the original test labels
            
            for i, classifier in enumerate(self.classifiers):
                
                # Extract class values
                class_1 = classifier[0]
                class_2 = classifier[1]
                
                # Create subset
                examples_subset, labels_subset = create_subsets(train_x, train_y, classifier)
                
                # Create sub-kernels for each classifier
                classifier_kernel = cp.asnumpy(kernel_func(cp.asarray(examples_subset), cp.asarray(examples_subset)))
                
                # Create train kernels
                train_kernel = cp.asnumpy(kernel_func(cp.asarray(examples_subset), cp.asarray(train_x)))
                
                # Create test kernel
                test_kernel = cp.asnumpy(kernel_func(cp.asarray(examples_subset), cp.asarray(test_x)))
                
                # Save kernels
                self.classifier_K.append(classifier_kernel)
                self.train_K.append(train_kernel)
                self.test_K.append(test_kernel)
                
                # Convert labels to {+1, -1} encoding.
                # The first class is labelled as +1 while second is -1.
                # i.e for a binary classifier of "0 vs 1", 0 = +1 and 1 = -1
                labels_subset = np.where(labels_subset == class_1, 1, -1)
                self.classifier_y.append(labels_subset)
                
                # Initialize weights as 0s
                self.alphas.append(np.zeros(len(examples_subset)))

        else:
            raise Exception("mode has to be either \"ova\" for One-vs-All or \"ovo\" for One-vs-One ")
        
        
    def fit(self, epsilon=0.001, tolerance=5, max_epochs=20, verbose=True):
        
        """ Fit model based on the selected mode of training """
        
        if self.mode == "ova":
            self.fit_ova(epsilon, tolerance, max_epochs, verbose)
        elif self.mode == "ovo":
            self.fit_ovo(epsilon, tolerance, max_epochs, verbose)
        
        
    def fit_ova(self, epsilon=0.001, tolerance=5, max_epochs=30, verbose=True):

        """ Fit model on the training set using one-vs-all approach """
        
        # Stores the number of epochs over which the train error difference < epsilon
        eta = 0
        
        # Log start time
        s = time.time()

        # Iterate over epochs
        for epoch in range(max_epochs):
            
            if verbose:
                print("Epoch {}/{}".format(epoch+1, max_epochs), end=" : ")
            
            # Record the old training error before learning
            train_error_old = self.evaluate(self.train_K, self.train_y)

            # Process each example online
            for t in range(self.m_train):

                # Recieve t-th input
                input_t = self.train_K[:, t]

                # Predict step - generate predictions on all K classes simultaneously - will produce a Kx1 vector
                preds = self.alphas @ input_t
                assert preds.shape == (self.k, )
                
                # Recieve true label
                label_t = self.train_y[t]

                # Update step
                y = np.where(label_t == np.arange(0, self.k), 1, -1)
                idxs = np.where(y*preds <= 0)
                self.alphas[idxs, t] += y[idxs]

            # Compute train error and use it to monitor convergence
            train_error = self.evaluate(self.train_K, self.train_y)
            difference = train_error - train_error_old
            
            # Output logs
            if verbose:
                print("Training error = {:.5f}".format(train_error))
            
            #--------------------EARLY STOPPING CRITERIA---------------------#
            
            #If the difference in accuracy is lesser than epsilon and this occurs over lesser than 3 epochs, then add 1 to k
            if (np.abs(difference) <= epsilon and eta < tolerance):
                eta += 1

            #If the difference is lesser than epsilon but k=3, then we have converged and can end training.
            if (np.abs(difference) <= epsilon and eta == tolerance):
                if verbose:
                    print("Minimal change in training accuracy. Training completed early.")
                break

            #Suppose we saw a small difference over atleast 1 (but < 5) consecutive epochs but then suddenly a large jump in accuracy occurs, then we want to reset k=0 and restart the counting.
            #Note that if we saw a large difference and k = 0, that's just the same as going into the while loop and running another epoch again.
            if (np.abs(difference) > epsilon and eta >= 1):
                eta = 0
        
        # Log end time
        e = time.time()
                
        if verbose:
            print("TOTAL TIME TAKEN = {:.4f} seconds".format(e - s))
    
    def fit_ovo(self, epsilon=0.001, tolerance=5, max_epochs=30, verbose=True):
        
        """ Fit indivitual classifiers as part of the one-vs-one approach """
        
        # Log start time
        s = time.time()
        
        for i, classifier in enumerate(self.classifiers):
            
            # Stores the number of epochs over which the train error difference < epsilon
            eta = 0
            
            # Extract true class labels
            class_1 = classifier[0]
            class_2 = classifier[1]
            
            if verbose:
                print("\nTraining classifier {} : {} vs {}".format(i+1, class_1, class_2))

            # Iterate over epochs
            for epoch in range(max_epochs):

                if verbose:
                    print("Epoch {}/{}".format(epoch+1, max_epochs), end=" : ")

                # Record the old training error before learning
                train_error_old = self.evaluate_binary_classifier(self.alphas[i], self.classifier_K[i], self.classifier_y[i])

                # Process each example online
                for t in range(self.classifier_K[i].shape[0]):

                    # Recieve t-th input
                    input_t = self.classifier_K[i][:, t]

                    # Predict step 
                    preds = self.alphas[i] @ input_t

                    # Recieve true label
                    y = self.classifier_y[i][t]

                    # Update step
                    if y*preds <= 0:
                        self.alphas[i][t] += y

                # Compute train error and use it to monitor convergence
                train_error = self.evaluate_binary_classifier(self.alphas[i], self.classifier_K[i], self.classifier_y[i])
                difference = train_error - train_error_old

                # Output logs
                if verbose:
                    print("Training error = {:.5f}".format(train_error))

                #--------------------EARLY STOPPING CRITERIA---------------------#

                #If the difference in accuracy is lesser than epsilon and this occurs over lesser than 3 epochs, then add 1 to k
                if (np.abs(difference) <= epsilon and eta < tolerance):
                    eta += 1

                #If the difference is lesser than epsilon but k=3, then we have converged and can end training.
                if (np.abs(difference) <= epsilon and eta == tolerance):
                    if verbose:
                        print("Minimal change in training accuracy. Training completed early.")
                    break

                #Suppose we saw a small difference over atleast 1 (but < 5) consecutive epochs but then suddenly a large jump in accuracy occurs, then we want to reset k=0 and restart the counting.
                #Note that if we saw a large difference and k = 0, that's just the same as going into the while loop and running another epoch again.
                if (np.abs(difference) > epsilon and eta >= 1):
                    eta = 0
                
        # Log end time
        e = time.time()
            
        if verbose:
            print("TOTAL TIME TAKEN = {:.4f}s".format(e - s))
            
    def predict(self, K, labels):
        
        """ Generate predictions for new examples """
        
        if self.mode == "ova":
            preds = self.predict_ova(K, labels)
        elif self.mode == "ovo":
            preds = self.predict_ovo(K, labels)
            
        return preds

    def predict_ova(self, K, labels):

        """ Compute predictions using trained model - for one vs all strategy"""
        
        # Generate predictions
        preds = cp.asarray(self.alphas) @ cp.asarray(K)

        # Set index of most confident entry as our predicted class
        preds = np.argmax(cp.asnumpy(preds), 0)

        return preds
    
    def predict_ovo(self, K, labels):
        
        """ Compute predictions using trained model - for one vs one strategy """
        
        # Stores the classes predicted by the k(k-1)/2 indivitual binary classifiers in each row.
        # cols correspond to number of test examples
        class_matrix = np.zeros((len(self.classifiers), K[0].shape[1]))
        
        for i, classifier in enumerate(self.classifiers):
            
            # Extract class information
            class_1 = classifier[0]
            class_2 = classifier[1]
            
            # Generate predictions
            pred = cp.asarray(self.alphas[i]) @ cp.asarray(K[i])
            
            # Encode as +1 or -1
            pred = np.where(cp.asnumpy(pred) > 0, 1, -1)
            
            # Convert back to the original k class values
            pred = np.where(pred == 1, class_1, class_2)
            
            # Update class matrix
            class_matrix[i, :] = pred
            
        # Convert to a final predictions vector where each element is the class with maximum vote
        class_predictions = mode(class_matrix)[0].squeeze()
        
        return class_predictions
    
    def evaluate(self, K, labels):
        
        """ Evaluate the model by computing error on the new points """
        
        # Compute predictions
        preds = self.predict(K, labels)
        
        # Find mistakes
        mistakes = np.where(preds != labels, 1.0, 0.0)

        # Error is the mean of the mistakes
        error = np.mean(mistakes)
        
        return error
    
    def evaluate_binary_classifier(self, alphas, K, labels):
        
        """ Evaluates an indivitual binary classifier - required for monitoring indivitual classifiers in one vs one strategy """
    
        # Generate predictions
        preds = cp.asarray(alphas) @ cp.asarray(K)
        
        # Convert to {+1, -1} encoding
        preds = np.where(cp.asnumpy(preds) > 0, 1, -1)
        
        # Count mistakes
        mistakes = np.where(preds != labels, 1.0, 0.0)
        error = np.mean(mistakes)
        
        return error

## Perform a single run on the full-dataset (just to check if model running smoothly)

In [None]:
# Split data into train and test set
np.random.seed(3412)
train_x, train_y, test_x, test_y, _ = split_data(inputs=x, targets=y, test_proportion=0.2, shuffle=True)

In [None]:
# Setup perclocal variable 'time' referenced before assignmenteptron model
model = Kernel_Perceptron(train_x, train_y, test_x, test_y, gaussian_kernel_matrix, mode="ova")
model.fit(epsilon=0.001, tolerance=5, max_epochs=20)

## Creating function for performing multiple runs

Given the model runs pretty smoothly, we can now create the functionality to perform multiple runs.

In [None]:
def perform_multiple_runs(runs,
                          param_values,
                          kernel_func,
                          mode="ova",
                          save_results=False,
                          path_to_results="",
                          seed=None,
                          verbose=True):
    
    """
    Performs multiple runs of training with different parameter values.
    
    Args
    ----
    runs : number of runs to perform
    param_values : list of parameter values to train on
    kernel_func : kernel matrix computing function
    mode : set to "ova" or "ovo"
    save_results : Set true to save the results
    path_to_results : Saves results to provided filepath
    seed : to ensure reproducibility of results
    verbose : prints outputs while training
    
    Returns
    -------
    results_df : pandas dataframe of the final results from experiment.
    """

    # Save results here
    results = np.zeros((4, len(param_values)))

    # Set a random seed for reproducibility of results
    np.random.seed(seed)

    for i, p in enumerate(param_values):

        if verbose:
            print("\nparamter value = {}".format(p))
            
        # Create a version of the kernel function with hyperparameter d
        kernel = lambda X1, X2: kernel_func(X1, X2, p)

        # Save errors for all runs here
        train_errors = []
        test_errors = []

        for run in range(runs):
            
            if verbose:
                print(">> Run {}".format(run+1))

            # generate random train-test split
            train_x, train_y, test_x, test_y, _ = split_data(inputs=x, targets=y, test_proportion=0.20, shuffle=True)
            
            # Load a fresh model on every run
            model = Kernel_Perceptron(train_x, train_y, test_x, test_y, kernel, mode=mode)
            
            # Fit model on training data
            model.fit(epsilon=0.001, tolerance=5, max_epochs=20, verbose=False)

            # Evaluate train and test error for that d
            train_error = model.evaluate(model.train_K, model.train_y)
            test_error = model.evaluate(model.test_K, model.test_y)
            train_errors.append(np.mean(train_error))
            test_errors.append(np.mean(test_error))

        # Convert to numpy arrays
        train_errors = np.array(train_errors)
        test_errors = np.array(test_errors)

        # Save results
        results[0, i] = np.mean(train_errors)
        results[1, i] = np.std(train_errors)
        results[2, i] = np.mean(test_errors)
        results[3, i] = np.std(test_errors)

    # convert matrix to a pandas dataframe for easier visualization
    results_df = pd.DataFrame(results, columns=["parameter value = {}".format(p) for p in param_values], index=["Mean Train Error", "STD Train Error", "Mean Test Error", "STD Test Error"])

    if save_results:
        results_df.to_csv(results_dir+path_to_results, header=True, index=True)

    return results_df

## Performing for 20 runs on dataset

In [None]:
# Range of d_values
d_values = np.arange(1, 8)

# Run for 20 runs
results = perform_multiple_runs(runs=20,
                                mode="ova",
                                param_values=d_values,
                                kernel_func=polynomial_kernel_matrix,
                                save_results=True,
                                path_to_results="q1_results.csv",
                                seed=10)

In [None]:
# Check results
results = pd.read_csv(results_dir+"q1_results.csv")
results

## Creating function for performing k-Fold Cross validation

In [None]:
def create_confusion_matrix(K, alphas, labels):
    
    """ 
    Creates confusion matrices for a given set of data 
    
    Args
    ----
    K : Kernel matrix between the training data and the test points. Should be of shape (m, l) where m is the number of training points and l is the number of test points
    alphas : dual solution vector for the model. Should be of shape (m, )
    labels : true labels of shape (l, )
    
    Returns
    -------
    matrix - confusion matrix of shape (number of classes, number of classes)
    """
    
    # Initialize CM
    matrix = np.zeros((10, 10))
    
    # Generate predictions
    preds = cp.asarray(alphas) @ cp.asarray(K)
    
    # Set index of most confident entry as our predicted class
    preds = np.argmax(cp.asnumpy(preds), 0)
    
    # Update cells in CM
    for i in range(K.shape[1]):
        if preds[i] != labels[i]:
            matrix[int(preds[i]), int(labels[i])] += 1

    return matrix

In [None]:
def perform_kfoldCV(k, x, y, mode, hparam, kernel_func, shuffle=True, verbose=True):
    
    """ 
    Performs k-fold cross validation for a given parameter
    
    Args
    ----
    k - number of folds of CV to perform
    x - training data
    y - training labels
    mode - ova or ovo
    hparam - parameter to use for kernel
    kernel_func - kernel matrix computing function
    shuffle - shuffles data before performing k-fold CV
    verbose - prints output logs while running
    
    Returns
    -------
    cv_error_over_folds - the cross valiation error for each fold of cross validation.
    """

    # Extract dimensions
    m, n = x.shape
    
    # Configure kernel function to have a specific d value  
    kernel = lambda X1, X2: kernel_func(X1, X2, hparam)

    # Shuffle dataset randomly for splitting into groups
    if shuffle:
        perm = np.random.permutation(m)
        x_shuffled = x[perm, :]
        y_shuffled = y[perm]
    else:
        x_shuffled = x
        y_shuffled = y

    # Split data into k-groups
    x_groups = np.array_split(x_shuffled, k)
    y_groups = np.array_split(y_shuffled, k)

    # Stores the mean CV error over all folds of CV
    cv_error_over_folds = 0

    for i in range(len(x_groups)):
        
        if verbose:
            print(">>>> Cross-validation Fold {}".format(i+1), end="...")

        # Use the selected group as "validation" set
        val_inputs, val_labels = x_groups[i], y_groups[i]

        # Use rest of groups as training set
        train_inputs = np.vstack([x_groups[j] for j in range(len(x_groups)) if j != i])
        train_labels = np.concatenate([y_groups[j] for j in range(len(x_groups)) if j != i])

        #-------------------------TRAIN MODEL--------------------------#

        # Load the model
        model = Kernel_Perceptron(train_inputs, train_labels, val_inputs, val_labels, kernel, mode=mode)

        # Fit model to our training set
        model.fit(epsilon=0.001, tolerance=5, max_epochs=20, verbose=False)

        # Evaluate on validation data - the "test" K and y refer to the val set
        cv_error_over_folds += np.mean(model.evaluate(model.test_K, model.test_y))

        if verbose:
            print("Done!")

    #Average the errors
    cv_error_over_folds /= k

    return cv_error_over_folds

In [None]:
def perform_multiple_runs_with_kFold(param_values,
                                     kernel_func,
                                     mode="ova",
                                     k=5,
                                     runs=20,
                                     seed=None,
                                     verbose=True,
                                     save_results=True,
                                     path_to_results="",
                                     create_cm=True,
                                     path_to_cm="",
                                     save_mistake_counts=False,
                                     path_to_mistake_counts=""):
    
    """
    Performs multiple runs of training with different parameter values.
    
    Args
    ----
    param_values : list of parameter values to train on
    kernel_func : kernel matrix computing function
    mode : set to "ova" or "ovo"
    k : number of folds of CV
    runs : number of runs to perform
    seed : to ensure reproducibility of results
    verbose : prints outputs while training
    save_results : Set true to save the results
    path_to_results : Saves results to provided filepath
    create_cm : Creates confusion matrices for each run of cross validation experiment
    path_to_cm : saves confusion matrices as numpy arrays to the given path
    save_mistake_counts : counts the number of mistakes for a specific example on each round of CV
    path_to_mistake_counts : saves mistake counts to provided filepath
    
    Returns
    -------
    results_df : pandas dataframe of the final results from experiment.
    confusion_matrices : numpy arrays for the confusion matrices
    mistake_counts : vector containing counts of mistakes for each record in the dataset.
    """

    # Set a random seed for reproducibility of results
    np.random.seed(seed)

    # Results will be stored here
    results = np.zeros((runs, 2))

    # Confusion matrices will be created
    if create_cm:
        confusion_matrices = np.zeros((runs, 10, 10))
    else:
        confusion_matrices = None
    
    # mistake counts for each image - required to identify which images are hardest to classify
    if save_mistake_counts:
        mistake_counts = np.zeros(x.shape[0])
    else:
        mistake_counts = None

    for run in range(runs):

        if verbose:
            print("\nRun {}".format(run+1))

        # I will record the errors for a single run in this vector
        mean_cv_errors = np.zeros(len(param_values))

        # generate random train-test split
        train_x, train_y, test_x, test_y, cache = split_data(inputs=x, targets=y, test_proportion=0.20, shuffle=True)

        # extract the indices for the examples in the original dataset that are being trained/tested on
        train_idxs, test_idxs = cache

        #----------------------------PERFORM k-FOLD CV over parameters-------------------------------#

        for i, p in enumerate(param_values):

            if verbose:
                print("\n>> parameter = {}".format(p))

            # Perform k-fold CV on the training set
            mean_cv_errors[i] = perform_kfoldCV(k, train_x, train_y, mode, p, kernel_func, verbose=verbose)

        #-----FIND BEST HYPERPARAM VALUE AND TRAIN THE WHOLE DATASET WITH THAT-----#

        # The d that has lowest mean cv error is the optimal d
        param_star = param_values[np.argmin(mean_cv_errors)]
    
        print("\nbest parameter value = {}".format(param_star))

        # Set the kernel to have this d*
        kernel = lambda X1, X2: kernel_func(X1, X2, param_star)

        # Fit model on training set
        model = Kernel_Perceptron(train_x, train_y, test_x, test_y, kernel, mode=mode)
        model.fit(epsilon=0.001, tolerance=5, max_epochs=20, verbose=True)

        # Evaluate on train and test set
        train_error = model.evaluate(model.train_K, model.train_y)
        test_error = model.evaluate(model.test_K, model.test_y)

        if save_mistake_counts:
            # Select the train and test indices that got misclassified
            train_misclassifieds = train_idxs[np.where(train_error == 1.0)]
            test_misclassifieds = test_idxs[np.where(test_error == 1.0)]

            # Update the counts in the mistake vector for these indices
            mistake_counts[train_misclassifieds] += 1
            mistake_counts[test_misclassifieds] += 1

        # Record results
        results[run, 0] = param_star
        results[run, 1] = np.mean(test_error)
    
        if create_cm:
            # Record confusion matrices on test set
            confusion_matrices[run] = create_confusion_matrix(model.test_K, model.alphas, model.test_y)

        # Convert results matrix to pandas dataframe
        results_df = pd.DataFrame(results, columns=["best parameter", "Test Error"], index=["Run {}".format(run+1) for run in range(runs)])

        if save_results:
            # Save errors
            results_df.to_csv(results_dir+path_to_results, header=True, index=True)
        if create_cm:
            # Save confusion matrices
            np.save(results_dir+path_to_cm, confusion_matrices)
        if save_mistake_counts:
            # Save vector of the mistake counts
            np.save(results_dir+path_to_mistake_counts, mistake_counts)

    return results_df, confusion_matrices, mistake_counts

## Performing 5-fold Cross validation for 20 runs

In [None]:
# Range of d values
d_values = np.arange(1, 8)

# Run model for multiple runs with cross validation
results_df, confusion_matrices, mistake_counts = perform_multiple_runs_with_kFold(param_values=d_values,
                                                                                  kernel_func=polynomial_kernel_matrix,
                                                                                  mode="ova",
                                                                                  k=5,
                                                                                  runs=20,
                                                                                  seed=231,
                                                                                  path_to_results="q2_crossval_v2.csv",
                                                                                  create_cm=True,
                                                                                  path_to_cm="q3_confusions_v2",
                                                                                  save_mistake_counts=True,
                                                                                  path_to_mistake_counts="mistakes")

In [None]:
# Results
results_df = pd.read_csv(results_dir+"q2_crossval.csv")
results_df

In [None]:
print("Mean d* \u00B1 STD = {} \u00B1 {}".format(results_df["best parameter"].mean(), results_df["best parameter"].std()))
print("Mean Test Error \u00B1 STD = {} \u00B1 {}".format(results_df["Test Error"].mean(), results_df["Test Error"].std()))

## Confusion Matrices

In [None]:
# Confusion matrices
confusion_matrices = np.load(results_dir+"q3_confusions.npy")
means = np.mean(confusion_matrices, axis=0)
stds = np.std(confusion_matrices, axis=0)

annotations = np.empty(means.shape, dtype=object)

for i in range(annotations.shape[0]):
    for j in range(annotations.shape[1]):
        if i != j:
            annotations[i, j] = "{:.4f} \u00B1 {:.2f}".format(means[i, j], stds[i, j])
        else:
            annotations[i, j] = "0"

plt.figure(figsize=(20, 8))
sns.heatmap(means, annot=annotations, fmt="", annot_kws={"fontweight":"bold"}, linewidths=1)
plt.ylabel("Predicted Class")
plt.xlabel("True Class")
plt.savefig(results_dir+"q3_confusion_matrix.jpg")
plt.show()

# plt.figure(figsize=(12, 52))
# for i in range(len(confusion_matrices)):
#     plt.subplot(10, 2, i+1)
#     ax = sns.heatmap(confusion_matrices[i], annot=True)
#     plt.ylabel("Predicted Class")
#     plt.xlabel("True Class")
#     plt.title("Run {}, d* = {}".format(i+1, results_df["best parameter"][i]))

## Find Hardest to Predict images

In [None]:
# load mistake counts file
mistakes = np.load(results_dir+"q4_mistakes.npy")

# store indices of top 5 hardest to predict images
idxs = np.argsort(mistakes)[-5:]

# Obtain the examples and their corresponding labels
five_hardest_images = x[idxs].reshape(5, 16, 16)
labels = y[idxs]

# Plot results
plt.figure(figsize=(18, 6))
for i in range(5):
    plt.subplot(1, 5, i+1)
    plt.imshow(five_hardest_images[i], plt.cm.gray)
    plt.title("Image Label = {}".format(labels[i]))
# plt.savefig(results_dir+"hard2predict.png")

## Repeat Experiments with Gaussian Kernel

### Perform 20 runs with Gaussian Kernel

In [None]:
# Range of c_values
c_values = [2**x for x in range(-10, -3)]

# Run for 20 runs
results = perform_multiple_runs(runs=20,
                                mode="ova",
                                param_values=c_values,
                                kernel_func=gaussian_kernel_matrix,
                                save_results=True,
                                path_to_results="q5_results.csv",
                                seed=1796)

In [None]:
# Check results
results = pd.read_csv(results_dir+"q5_results.csv")
results

### Perform 20 runs with 5-fold cross-validation

In [None]:
# Range of c_values
c_values = [2**x for x in range(-10, -3)]

# Run model for multiple runs with cross validation
results_df, _, _ = perform_multiple_runs_with_kFold(param_values=c_values,
                                                    kernel_func=gaussian_kernel_matrix,
                                                    mode="ova",
                                                    k=5,
                                                    runs=20,
                                                    seed=231,
                                                    path_to_results="q5_crossval.csv",
                                                    create_cm=False)

In [None]:
# Results
results_df = pd.read_csv(results_dir+"q5_crossval.csv")
results_df

In [None]:
print("\nMean d* \u00B1 STD = {} \u00B1 {}".format(results_df["best parameter"].mean(), results_df["best parameter"].std()))
print("Mean Test Error \u00B1 STD = {} \u00B1 {}".format(results_df["Test Error"].mean(), results_df["Test Error"].std()))

## Alternative Method for Multi-class Classification: One vs One

### Perform 20 runs

In [None]:
# Range of c_values
d_values = np.arange(1, 8)

# Run for 20 runs
results = perform_multiple_runs(runs=20,
                                mode="ovo",
                                param_values=d_values,
                                kernel_func=polynomial_kernel_matrix,
                                save_results=True,
                                path_to_results="q6_results.csv",
                                seed=130399)

In [None]:
# Check results
results = pd.read_csv(results_dir+"q6_results.csv")
results

### Peform 20 runs of 5-fold cross-validation

In [None]:
# Range of d values
d_values = np.arange(1, 8)

# Run model for multiple runs with cross validation
results_df, confusion_matrices = perform_multiple_runs_with_kFold(param_values=d_values,
                                                                  kernel_func=polynomial_kernel_matrix,
                                                                  mode="ovo",
                                                                  k=5,
                                                                  runs=20,
                                                                  seed=231,
                                                                  path_to_results="q6_crossval.csv",
                                                                  create_cm=False)

In [None]:
# Results
results_df = pd.read_csv(results_dir+"q6_crossval.csv")
results_df

In [None]:
print("\nMean d* \u00B1 STD = {} \u00B1 {}".format(results_df["best parameter"].mean(), results_df["best parameter"].std()))
print("Mean Test Error \u00B1 STD = {} \u00B1 {}".format(results_df["Test Error"].mean(), results_df["Test Error"].std()))

## Implementing 2 other Classification Algorithms

### K-Nearest Neighbors

In [None]:
class KNN():
    
    """ 
    Manual Implementation of KNN algorithm 
    
    Args
    ----
    data - training data of shape (m, n) where m is the number of training examples and n is number of features
    labels - vector of training labels of shape (m, )
    k - number of neighbors to predict with.
    
    Methods
    -------
    calc_distance - computes euclidean distance between two points 
    get_nearest_neighbors - finds the indices in data of the k nearest neighbors
    predict - predicts the class of a test point based on k nearest neighbors
    
    """
    
    def __init__(self, data, labels, k):
        self.k = k
        self.data = data
        self.labels = labels
        
    def calc_distance(self, point1, point2):
        """ Calculates the euclidean distance between two points """
        distance = cp.linalg.norm(cp.asarray(point1) - cp.asarray(point2), axis=1)
        return cp.sqrt(distance).get()
    
    def get_nearest_neighbors(self, test_point):
        """ Returns the k nearest neighbors indices """
        distances = self.calc_distance(self.data, test_point)
        nearest_neighbors = np.argsort(distances)[:self.k]
        return nearest_neighbors
        
    def predict(self, test_point):
        """ Predicts the class of a test point based on k nearest neighbors  """
        nearest_neighbors = self.get_nearest_neighbors(test_point)
        nearest_labels = self.labels[nearest_neighbors]
        pred = mode(nearest_labels)[0]
        return pred

### Basic Results

In [None]:
def perform_multiple_runs(runs,
                          param_values,
                          save_results=False,
                          path_to_results="",
                          seed=None,
                          verbose=True):
    
    """
    Performs multiple runs of training with different parameter values.
    
    Args
    ----
    runs : number of runs to perform
    param_values : list of parameter values to train on
    save_results : Set true to save the results
    path_to_results : Saves results to provided filepath
    seed : to ensure reproducibility of results
    verbose : prints outputs while training
    
    Returns
    -------
    results_df : pandas dataframe of the final results from experiment.
    """

    # Save results here
    results = np.zeros((4, len(param_values)))

    # Set a random seed for reproducibility of results
    np.random.seed(seed)

    for i, p in enumerate(param_values):

        if verbose:
            print("paramter value = {}".format(p))

        # Save errors for all runs here
        train_errors = []
        test_errors = []

        for run in range(runs):
            
            if verbose:
                print(">> Run {}".format(run+1), end="...")

            # generate random train-test split
            train_x, train_y, test_x, test_y, _ = split_data(inputs=x, targets=y, test_proportion=0.20, shuffle=True)
            
            # log start time
            s = time.time()
            # Load a fresh model on every run
            model = KNN(train_x, train_y, p)
            
            # Generate predictions for training and testing set
            train_preds = np.zeros(train_x.shape[0])
            for k in range(len(train_x)):
                train_preds[k] = model.predict(train_x[k])
            
            test_preds = np.zeros(test_x.shape[0])
            for l in range(len(test_x)):
                test_preds[l] = model.predict(test_x[l])
                
            # log end time
            e = time.time()
            print("time taken: {:.5f} seconds".format(e-s))
            
            # Evaluate train and test error for that d
            train_mistakes = np.where(train_preds != train_y, 1.0, 0.0)
            test_mistakes = np.where(test_preds != test_y, 1.0, 0.0)
            train_errors.append(np.mean(train_mistakes))
            test_errors.append(np.mean(test_mistakes))

        # Convert to numpy arrays
        train_errors = np.array(train_errors)
        test_errors = np.array(test_errors)

        # Save results
        results[0, i] = np.mean(train_errors)
        results[1, i] = np.std(train_errors)
        results[2, i] = np.mean(test_errors)
        results[3, i] = np.std(test_errors)

    # convert matrix to a pandas dataframe for easier visualization
    results_df = pd.DataFrame(results, columns=["parameter value = {}".format(p) for p in param_values], index=["Mean Train Error", "STD Train Error", "Mean Test Error", "STD Test Error"])

    if save_results:
        results_df.to_csv(results_dir+path_to_results, header=True, index=True)

    return results_df

In [None]:
k_values = 3**np.arange(5)

results = perform_multiple_runs(runs=20,
                                param_values=k_values,
                                save_results=True,
                                path_to_results="q7_knn_basic_results.csv",
                                seed=130399,
                                verbose=True)

In [None]:
#Check results
results = pd.read_csv(results_dir+"q7_knn_basic_results.csv")
results

In [None]:
table = results.values[:, 1:]
train_err = ["{:.6f} \u00B1 {:.6f}".format(i, j) for i, j in zip(table[0], table[1])]
test_err = ["{:.6f} \u00B1 {:.6f}".format(i, j) for i, j in zip(table[2], table[3])]
table = {"K":3**np.arange(5), "Mean Train Error":train_err, "Mean Test Error":test_err}
df = pd.DataFrame.from_dict(table)
latex = df.to_latex(index=False)
print(latex)

### KNN: Cross validation

In [None]:
def perform_kfoldCV(k, x, y, hparam, shuffle=True, verbose=True):
    
    """ 
    Performs k-fold cross validation for a given parameter
    
    Args
    ----
    k - number of folds of CV to perform
    x - training data
    y - training labels
    hparam - parameter to use for kernel
    shuffle - shuffles data before performing k-fold CV
    verbose - prints output logs while running
    
    Returns
    -------
    cv_error_over_folds - the cross valiation error for each fold of cross validation.
    """

    # Extract dimensions
    m, n = x.shape

    # Shuffle dataset randomly for splitting into groups
    if shuffle:
        perm = np.random.permutation(m)
        x_shuffled = x[perm, :]
        y_shuffled = y[perm]
    else:
        x_shuffled = x
        y_shuffled = y

    # Split data into k-groups
    x_groups = np.array_split(x_shuffled, k)
    y_groups = np.array_split(y_shuffled, k)

    # Stores the mean CV error over all folds of CV
    cv_error_over_folds = 0

    for i in range(len(x_groups)):
        
        if verbose:
            print(">>>> Cross-validation Fold {}".format(i+1), end="...")

        # Use the selected group as "validation" set
        val_inputs, val_labels = x_groups[i], y_groups[i]

        # Use rest of groups as training set
        train_inputs = np.vstack([x_groups[j] for j in range(len(x_groups)) if j != i])
        train_labels = np.concatenate([y_groups[j] for j in range(len(x_groups)) if j != i])

        #-------------------------TRAIN MODEL--------------------------#

        # Load the model
        model = KNN(train_inputs, train_labels, hparam)
        
        # Generate predictions
        val_preds = np.zeros(val_inputs.shape[0])
        for j in range(len(val_inputs)):
            val_preds[j] = model.predict(val_inputs[j])

        # Compute mistakes on val set
        val_mistakes = np.where(val_preds != val_labels, 1.0, 0.0)
        
        # Evaluate on validation data
        cv_error_over_folds += np.mean(val_mistakes)

        if verbose:
            print("Done!")

    #Average the errors
    cv_error_over_folds /= k

    return cv_error_over_folds

In [None]:
def perform_multiple_runs_with_kFold(runs,
                                     param_values,
                                     k=5,
                                     seed=None,
                                     verbose=True,
                                     save_results=True,
                                     path_to_results=""):
    
    """
    Performs multiple runs of training with different parameter values.
    
    Args
    ----
    param_values : list of parameter values to train on
    k : number of folds of CV
    runs : number of runs to perform
    seed : to ensure reproducibility of results
    verbose : prints outputs while training
    save_results : Set true to save the results
    path_to_results : Saves results to provided filepath
    
    Returns
    -------
    results_df : pandas dataframe of the final results from experiment.
    """

    # Set a random seed for reproducibility of results
    np.random.seed(seed)

    # Results will be stored here
    results = np.zeros((runs, 2))

    for run in range(runs):

        if verbose:
            print("Run {}".format(run+1))

        # I will record the errors for a single run in this vector
        mean_cv_errors = np.zeros(len(param_values))

        # generate random train-test split
        train_x, train_y, test_x, test_y, _ = split_data(inputs=x, targets=y, test_proportion=0.20, shuffle=True)

        #----------------------------PERFORM k-FOLD CV-------------------------------#

        for i, p in enumerate(param_values):

            if verbose:
                print(">> parameter = {}".format(p))

            # Perform k-fold CV on the training set
            mean_cv_errors[i] = perform_kfoldCV(k, train_x, train_y, p, verbose=verbose)

        #-----FIND BEST HYPERPARAM VALUE AND TRAIN THE WHOLE DATASET WITH THAT-----#

        # The d that has lowest mean cv error is the optimal d
        param_star = param_values[np.argmin(mean_cv_errors)]

        print("\nbest parameter value = {}".format(param_star), end="...")

        if verbose:
            print("Evaluating", end="...")

        # Fit model on training set
        model = KNN(train_x, train_y, param_star)
        
        # Generate predictions
        test_preds = np.zeros(test_x.shape[0])
        for j in range(len(test_x)):
            test_preds[j] = model.predict(test_x[j])

        # Compute mistakes on val set
        test_mistakes = np.where(test_preds != test_y, 1.0, 0.0)
        
        # Record results
        results[run, 0] = param_star
        results[run, 1] = np.mean(test_mistakes)
        
        if verbose:
            print("Done!")

        # Convert results matrix to pandas dataframe
        results_df = pd.DataFrame(results, columns=["best parameter", "Test Error"], index=["Run {}".format(run+1) for run in range(runs)])

        if save_results:
            # Save errors
            results_df.to_csv(results_dir+path_to_results, header=True, index=True)

    return results_df

In [None]:
# Range of d values
k_values = 3**np.arange(5)

# Run model for multiple runs with cross validation
results_df= perform_multiple_runs_with_kFold(runs=20,
                                             param_values=k_values,
                                             k=5,
                                             seed=231,
                                             path_to_results="q7_knn_crossval.csv")

In [None]:
# Results
results_df = pd.read_csv(results_dir+"q7_knn_crossval.csv")
results_df

In [None]:
print("\nMean d* \u00B1 STD = {} \u00B1 {}".format(results_df["best parameter"].mean(), results_df["best parameter"].std()))
print("Mean Test Error \u00B1 STD = {} \u00B1 {}".format(results_df["Test Error"].mean(), results_df["Test Error"].std()))

## SVM: Refer to `svm_manual_implementation.ipynb`