# Import

In [1]:
import numpy as np
import pandas as pd
import heapq
from sklearn import datasets
from sklearn.model_selection import train_test_split

# Implement

In [2]:
def minkowskiDistance(x, y, p):
    '''
    Minkowski Distance between x, y
    --------
    Return
    d: float
    '''
    d = 0
    for i in range(len(x)):
        d = d + (x[i] - y[i]) ** p
    d = d ** (1/p) 
    return d

In [3]:
class KNN_Classification:
    def __init__(self, n_neighbors=5, p=2):
        ''' 
        Class constructor
        
        Parameters
        ----------
        n_neighbors: int
            Number of neighbors to use.
        
        p: int 
            type of norm
        '''
        self.n_neighbors = n_neighbors
        self.p = p
        
    def fit(self, X, y):
        '''
        Get dataset (X, y).
        
        Parameters
        ----------
        X : numpy array, shape (m, n)
            The matrix of inputs
        y : numpy array, shape (m, 1) 
            The vector of outputs.
        '''
        self.X = X;
        self.y = y;
        
    def predict(self, X):
        '''
        Predict using the ridge model.
        
        Parameters
        ----------
        X : numpy array, shape (m, n)
        The matrix of inputs
        
        Return
        ----------
        Returns predicted values.
        '''
        y_pred = np.array([])
        for i in range(len(X)):
            listDistance = {}
            for j in range(len(self.X)):
                listDistance[j] = minkowskiDistance(X[i], self.X[j], self.p)
            # Get n_neighbors row have smallest distance
            topSmallest = heapq.nsmallest(self.n_neighbors, listDistance, key=listDistance.get)
            # Predict class for this record
            classes = {}
            for j in topSmallest:
                if self.y.item(j) not in classes:
                    classes[self.y.item(j)] = 1
                else:
                    classes[self.y.item(j)] += 1
            max_key = max(classes, key=classes.get)
            y_pred = np.append(y_pred, max_key)
        return y_pred.reshape(-1,1)

In [4]:
def standardScaler(X):
    return (X - np.mean(X)) / np.std(X)

# Test

In [5]:
iris = datasets.load_iris()

X = iris.data
y = iris.target.reshape(-1,1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [6]:
model = KNN_Classification(n_neighbors=3)

X_train = standardScaler(X_train)
model.fit(X_train, y_train)

X_test = standardScaler(X_test)
y_pred = model.predict(X_test)

In [7]:
print('Score:', np.mean(y_pred == y_test))

Score: 1.0
