In [1]:
iris_source = '../data/iris.data'

In [2]:
mnist_source = '../data/mnist_test.csv'

In [3]:
import pandas as pd
df = pd.read_csv(iris_source, 
                  header=None)

In [4]:
from sklearn.model_selection import train_test_split
import numpy as np

In [5]:
df_without_labels = df.drop(columns=4)

In [6]:
X_train, X_test, y_train, y_test = train_test_split(df_without_labels, 
                                                    df.iloc[:,4].tolist(), test_size=0.2)

In [7]:
X_test, X_train = X_test.to_numpy(), X_train.to_numpy()

In [8]:
y_train, y_test = np.array(y_train), np.array(y_test)

In [9]:
(X_train.shape, X_test.shape)

((120, 4), (30, 4))

In [10]:
# 4: KNN implementation
import numpy as np
from sklearn import preprocessing
from collections import Counter


class KNearestNeighbors:
    def __init__(self, k, distance_metric="euclidean", weights="uniform"):
        self.k = k
        self.distance_metric = distance_metric
        self.weights = weights
        
    def fit(self, X, y):
        """
        Store the 'prior knowledge' of you model that will be used
        to predict new labels.
        :param X : input data points, ndarray, shape = (R,C).
        :param y : input labels, ndarray, shape = (R,).
        """
        self.X_train = X
        
        le = preprocessing.LabelEncoder()
        le.fit(y)
        self.y_train = le.transform(y)

        
    def predict(self, X):
        """Run the KNN classification on X.
        :param X: input data points, ndarray, shape = (N,C).
        :return: labels : ndarray, shape = (N,).
        """
        dist_matrix = compute_distance(self, X)
        knn = dist_matrix.argsort(axis=0)[:self.k, :].T
        if self.weights=='uniform':
            y_pred = np.array([ majority_voting(self.y_train[knn][i]) for i in range(len(self.y_train[knn])) ])


        elif self.weights=='distance':
            w = 1 / (np.dist_matrix[knn]
            #labels = sorted(set(self.y_train))
            #lab_counters = [0] * len(labels)
            #for lab, w in votes:
            #    lab_counters[lab] += w
            y_pred = np.array([ majority_voting(self.y_train[knn][i]*w[:,i]) for i in range(len(slef.y_train[knn])) ])
                
        return y_pred
    
        
    def compute_distance(self, p, q):
        if self.distance_metric=='euclidean':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_diff = X_train_reshaped - X_test
            # shape after diff = (120, 30, 4), 
            #remember we did reshaping and normalization.
            # 4 are the features for iris.data 
            dist_matrix = np.sqrt(np.sum(X_diff**2,axis=2))
            # shape after this op. = (120, 30), axis=2 are the features
        
        if self.distance_metric=='cosine':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_train_norm = ((self.X_train**2).sum(axis=1)**.5).reshape(-1,1)
            X_test_norm = ((X_test**2).sum(axis=1)**.5)
            dot_prods = self.X_train_reshaped @ X_test.T
            dist_matrix = 1 - np.abs(dot_prods / X_train_norm.reshape(-1,1) 
                                     / X_test_norm)
                              
        if self.distance_metric=='manhattan':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_diff = self.X_train_reshaped - X_test
            dist_matrix = np.abs(X_diff).sum(axis=2)

        return dist_matrix
    
    
    def compute_accuracy(self, y_pred, y_true):
        return (y_true==y_pred).sum() / len(y_true)
    
    def majority_voting(votes):
        count = Counter(votes)
        return count.most_common(1)[0][0] 
        # most_common(n) returns a list with the n most recurring votes (n=1 -> top vote)

SyntaxError: invalid syntax (<ipython-input-10-c966e7a1fdbf>, line 44)

In [None]:
model = KNearestNeighbors(k=8, distance_metric='euclidean')

In [None]:
model.fit(X_train, y_train)

In [None]:
y_pred = model.predict(X_test)

In [None]:
dist_m[knn].shape

In [None]:
np.take_along_axis(dist_m, knn.T, 0).shape

In [None]:
print(model.compute_accuracy(y_pred=y_pred, y_test=y_test))

In [None]:
# 8 :  New Version: The intuition behind weighted kNN, 
# is to give more weight to the points which are nearby and 
#less weight to the points which are farther away.

import numpy as np
from sklearn import preprocessing
from collections import Counter


class KNearestNeighbors:
    def __init__(self, k, distance_metric="euclidean", weights="uniform"):
        self.k = k
        self.distance_metric = distance_metric
        self.weights = weights
        
    def fit(self, X, y):
        """
        Store the 'prior knowledge' of you model that will be used
        to predict new labels.
        :param X : input data points, ndarray, shape = (R,C).
        :param y : input labels, ndarray, shape = (R,).
        """
        self.X_train = X
        
        le = preprocessing.LabelEncoder()
        le.fit(y)
        self.y_train = le.transform(y)

        
    def predict(self, X):
        """Run the KNN classification on X.
        :param X: input data points, ndarray, shape = (N,C).
        :return: labels : ndarray, shape = (N,).
        """
        knn = dist_matrix.argsort(axis=0)[:self.k, :].T
        if self.weights=='uniform':
            y_pred = np.array([ majority_voting(self.y_train[knn][i]) for i in range(len(self.y_train[knn])) ])


        elif self.weights=='distance':
            votes = [(self.y_train[d[0]], 1/d[1]) for d in dist_list]
            labels = sorted(set(self.y_train))
            lab_counters = [0] * len(labels)
            for lab, w in votes:
                lab_counters[lab] += w
            lab_counters = np.array(lab_counters)

        argmax = np.argmax(lab_counters) # If deuce, we take the first
        y_pred.append(argmax)
                
        return y_pred
    
        
    def compute_distance(self, p, q):
        if self.distance_metric=='euclidean':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_diff = X_train_reshaped - X_test
            # shape after diff = (120, 30, 4), 
            #remember we did reshaping and normalization.
            # 4 are the features for iris.data 
            dist_matrix = np.sqrt(np.sum(X_diff**2,axis=2))
            # shape after this op. = (120, 30), axis=2 are the features
        
        if self.distance_metric=='cosine':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_train_norm = ((self.X_train**2).sum(axis=1)**.5).reshape(-1,1)
            X_test_norm = ((X_test**2).sum(axis=1)**.5)
            dot_prods = self.X_train_reshaped @ X_test.T
            dist_matrix = 1 - np.abs(dot_prods / X_train_norm.reshape(-1,1) 
                                     / X_test_norm)
                              
        if self.distance_metric=='manhattan':
            X_train_reshaped = np.expand_dims(self.X_train, 1)
            X_diff = self.X_train_reshaped - X_test
            dist_matrix = np.abs(X_diff).sum(axis=2)
        return dist_matrix
    
    
    def compute_accuracy(self, y_pred, y_true):
        return (y_true==y_pred).sum() / len(y_true)
    
    def _majority_voting(votes):
        count = Counter(votes)
        return count.most_common(1)[0][0] 
        # most_common(n) returns a list with the n most recurring votes (n=1 -> top vote)


In [None]:
model_updated = KNearestNeighbors(k=8, distance_metric='euclidean', weights='distance')

In [None]:
model_updated.fit(X_train, y_train)

In [None]:
y_pred = model_updated.predict(X_test)

In [None]:
le = preprocessing.LabelEncoder()
le.fit(y_test)
y_test = le.transform(y_test)

print(model.compute_accuracy(y_pred=y_pred, y_test=y_test))

In [None]:
# 9: KNN on MNIST
import pandas as pd
mnist_df = pd.read_csv(mnist_source, 
                 header=None)

In [None]:
# Loading our dataset:
labels = mnist_df.iloc[:,0]
lab_set = set(labels.tolist())
counters = [0] * len(lab_set)
dataset = []
while any(c < 100 for c in counters):
    for i, row in mnist_df.iterrows():
        row = row.tolist()
        if counters[row[0]] < 100:
            dataset.append(row)
            counters[row[0]] += 1

In [None]:
our_mnist = pd.DataFrame(dataset)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(our_mnist.iloc[:, 1:], 
                                                    our_mnist.iloc[:,0], test_size=0.2)

In [None]:
X_test, X_train = X_test.to_numpy(), X_train.to_numpy()

In [None]:
y_train, y_test = np.array(y_train), np.array(y_test)

In [None]:
X_train.shape

In [None]:
mnist_KNN = KNearestNeighbors(k=5, distance_metric='euclidean')

In [None]:
mnist_KNN.fit(X=X_train, y=y_train)

In [None]:
y_pred = mnist_KNN.predict(X_test)

In [None]:
mnist_KNN.compute_accuracy(y_pred=y_pred, y_test=y_test.tolist())