The steps for the algorithm are

1. Calculate the distance between the new instance from the known data.
2. Count the classes of the K Nearest Neighbors.
3. Classify the instance based on the majority of classes obtained in the previous step.

In [19]:

import numpy as np
from sklearn.datasets import make_classification

In [20]:
def euclidian_distance(a, b):
    return np.sqrt(np.sum((a-b)**2, axis=1))

In [21]:
def kneighbors(X_test, return_distance=False):
       
        n_neighbors = 5
        dist = []
        neigh_ind = []
        
        point_dist = [euclidian_distance(x_test, X_train) for x_test in X_test]

        for row in point_dist:
            enum_neigh = enumerate(row)
            sorted_neigh = sorted(enum_neigh, key=lambda x: x[1])[:n_neighbors]
    
            ind_list = [tup[0] for tup in sorted_neigh]
            dist_list = [tup[1] for tup in sorted_neigh]
    
            dist.append(dist_list)
            neigh_ind.append(ind_list)
        
        if return_distance:
            return np.array(dist), np.array(neigh_ind)
        
        return np.array(neigh_ind)

In [22]:
def predict(X_test, weights='uniform'):
        
        class_num = 3
        
        if weights=='uniform':
            neighbors = kneighbors(X_test)
            y_pred = np.array([np.argmax(np.bincount(y_train[neighbor])) for neighbor in neighbors])
        
            return y_pred 
    
        if weights=='distance':
        
            dist, neigh_ind = kneighbors(X_test, return_distance=True)
        
            inv_dist = 1/dist
            
            mean_inv_dist = inv_dist / np.sum(inv_dist, axis=1)[:, np.newaxis]
            
            proba = []
            
            for i, row in enumerate(mean_inv_dist):
                
                row_pred = self.y_train[neigh_ind[i]]
                
                for k in range(class_num):
                    indices = np.where(row_pred==k)
                    prob_ind = np.sum(row[indices])
                    proba.append(np.array(prob_ind))
        
            predict_proba = np.array(proba).reshape(X_test.shape[0], class_num)
            
            y_pred = np.array([np.argmax(item) for item in predict_proba])
            
            return y_pred

In [28]:
def score(X_test, y_test):
    y_pred = predict(X_test)
        
    return float(sum(y_pred == y_test))/ float(len(y_test))

In [23]:
from sklearn.datasets import make_classification
X, y = make_classification(n_samples = 1000, n_features=2, n_redundant=0, n_informative=2,
                             n_clusters_per_class=1, n_classes=3, random_state=21)

mu = np.mean(X, 0)
sigma = np.std(X, 0)

X = (X - mu ) / sigma

In [24]:
data = np.hstack((X, y[:, np.newaxis]))
        
np.random.shuffle(data)

split_rate = 0.7

train, test = np.split(data, [int(split_rate*(data.shape[0]))])

X_train = train[:,:-1]
y_train = train[:, -1]

X_test = test[:,:-1]
y_test = test[:, -1]

y_train = y_train.astype(int)
y_test = y_test.astype(int)

In [25]:
kneighbors(X_test)

array([[280, 234, 249, 192, 260],
       [171, 518,  60,  68, 107],
       [179, 239, 329, 190, 496],
       ...,
       [171,  60, 518,  68,  67],
       [316, 427, 514, 230, 651],
       [ 57,  90, 363, 501,  49]])

In [26]:
predict(X_test)

array([0, 1, 2, 1, 0, 1, 2, 1, 0, 1, 1, 0, 2, 0, 2, 1, 1, 1, 2, 2, 0, 1,
       1, 1, 1, 2, 2, 2, 1, 2, 0, 1, 1, 0, 0, 1, 0, 0, 2, 1, 2, 0, 1, 2,
       2, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 2, 0, 2, 1, 1,
       0, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 2, 0, 1, 2, 2, 1, 0, 1, 1, 1, 1,
       0, 2, 0, 2, 2, 1, 0, 0, 0, 2, 0, 1, 2, 2, 0, 1, 2, 2, 1, 0, 0, 2,
       2, 0, 2, 0, 1, 2, 0, 2, 2, 1, 0, 2, 2, 0, 0, 1, 2, 1, 1, 2, 1, 0,
       0, 0, 1, 0, 1, 2, 0, 1, 0, 2, 2, 0, 1, 2, 0, 0, 1, 1, 1, 2, 0, 0,
       0, 0, 0, 1, 2, 2, 0, 0, 1, 1, 1, 1, 2, 1, 1, 2, 0, 0, 0, 2, 0, 2,
       1, 0, 1, 0, 0, 2, 2, 1, 2, 0, 1, 1, 1, 2, 0, 2, 1, 2, 1, 0, 2, 1,
       2, 1, 0, 2, 0, 2, 2, 2, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 2, 0,
       2, 2, 1, 1, 2, 1, 2, 0, 0, 0, 0, 0, 0, 0, 2, 2, 0, 2, 1, 2, 2, 0,
       2, 0, 1, 1, 1, 0, 0, 1, 2, 1, 1, 2, 1, 0, 0, 1, 2, 0, 1, 1, 2, 1,
       1, 1, 0, 2, 1, 1, 2, 1, 2, 2, 0, 2, 2, 2, 0, 1, 0, 0, 2, 2, 2, 0,
       0, 2, 1, 2, 1, 2, 0, 0, 0, 0, 0, 1, 0, 0])

In [29]:
score(X_test, y_test)

0.9833333333333333

In [44]:
import numpy as np


class KNearestNeighbors():
    def __init__(self, X_train, y_train, n_neighbors=5, weights='uniform'):

        self.X_train = X_train
        self.y_train = y_train

        self.n_neighbors = n_neighbors
        self.weights = weights

        self.n_classes = 3

    def euclidian_distance(self, a, b):
        return np.sqrt(np.sum((a - b)**2, axis=1))

    def kneighbors(self, X_test, return_distance=False):

        dist = []
        neigh_ind = []

        point_dist = [self.euclidian_distance(x_test, self.X_train) for x_test in X_test]

        for row in point_dist:
            enum_neigh = enumerate(row)
            sorted_neigh = sorted(enum_neigh,
                                  key=lambda x: x[1])[:self.n_neighbors]

            ind_list = [tup[0] for tup in sorted_neigh]
            dist_list = [tup[1] for tup in sorted_neigh]

            dist.append(dist_list)
            neigh_ind.append(ind_list)

        if return_distance:
            return np.array(dist), np.array(neigh_ind)

        return np.array(neigh_ind)

    def predict(self, X_test):

        if self.weights == 'uniform':
            neighbors = self.kneighbors(X_test)
            y_pred = np.array([
                np.argmax(np.bincount(self.y_train[neighbor]))
                for neighbor in neighbors
            ])

            return y_pred

        if self.weights == 'distance':

            dist, neigh_ind = self.kneighbors(X_test, return_distance=True)

            inv_dist = 1 / dist

            mean_inv_dist = inv_dist / np.sum(inv_dist, axis=1)[:, np.newaxis]

            proba = []

            for i, row in enumerate(mean_inv_dist):

                row_pred = self.y_train[neigh_ind[i]]

                for k in range(self.n_classes):
                    indices = np.where(row_pred == k)
                    prob_ind = np.sum(row[indices])
                    proba.append(np.array(prob_ind))

            predict_proba = np.array(proba).reshape(X_test.shape[0],
                                                    self.n_classes)

            y_pred = np.array([np.argmax(item) for item in predict_proba])

            return y_pred

    def score(self, X_test, y_test):
        y_pred = self.predict(X_test)
        
        return float(sum(y_pred == y_test)) / float(len(y_test))


from sklearn.datasets import load_iris

from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
import pandas as pd


dataset = load_iris()

X = dataset.data
y = dataset.target

mu = np.mean(X, 0)
sigma = np.std(X, 0)
X = (X - mu ) / sigma

X_train, X_test, y_train, y_test = train_test_split(\
                X, y, test_size=0.3, random_state=45)

our_classifier = KNearestNeighbors(X_train, y_train, n_neighbors=3)
sklearn_classifier = KNeighborsClassifier(n_neighbors=3).fit(X_train, y_train)

our_accuracy = our_classifier.score(X_test, y_test)
sklearn_accuracy = sklearn_classifier.score(X_test, y_test)

pd.DataFrame([[our_accuracy, sklearn_accuracy]],
             ['Accuracy'],    
             ['Our Implementation', 'Sklearn\'s Implementation'])
       
  

Unnamed: 0,Our Implementation,Sklearn's Implementation
Accuracy,0.955556,0.955556
