# Simple KNN

#### python by package


In [1]:
from sklearn import datasets
import pandas as pd
import numpy as np

iris = datasets.load_iris()
df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
df['target'] = iris.target
df.head()
X = df.drop('target', axis=1)
y = df.target

In [2]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12345)
print(X_train.head(2))
print(y_train.head(2))

    sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)
19                5.1               3.8                1.5               0.3
48                5.3               3.7                1.5               0.2
19    0
48    0
Name: target, dtype: int64


In [3]:
from sklearn.neighbors import KNeighborsRegressor,KNeighborsClassifier
from sklearn.metrics import mean_squared_error
from math import sqrt
knn_c = KNeighborsClassifier(n_neighbors=3)
knn_c.fit(X_train, y_train)
train_preds = knn_c.predict(X_train)
mse = mean_squared_error(y_train, train_preds)
rmse = sqrt(mse)
rmse

0.2041241452319315

In [4]:
knn_c

KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',
                     metric_params=None, n_jobs=None, n_neighbors=3, p=2,
                     weights='uniform')

In [5]:
knn_r = KNeighborsRegressor(n_neighbors=3)
knn_r.fit(X_train, y_train)
train_preds2 = knn_r.predict(X_train)
mse2 = mean_squared_error(y_train, train_preds2)
rmse2 = sqrt(mse2)
rmse2


0.13608276348795434

In [6]:
knn_r

KNeighborsRegressor(algorithm='auto', leaf_size=30, metric='minkowski',
                    metric_params=None, n_jobs=None, n_neighbors=3, p=2,
                    weights='uniform')

In [13]:
# find the best parameter
from sklearn.model_selection import GridSearchCV
parameters = {"n_neighbors": range(1, 50)}
gridsearch = GridSearchCV(KNeighborsClassifier(), parameters)
gridsearch.fit(X_train, y_train)

print("gridsearch.best_params_",gridsearch.best_params_)

test_preds_grid = gridsearch.predict(X_test)
test_mse = mean_squared_error(y_test, test_preds_grid)
test_rmse = sqrt(test_mse)
print("KNeighborsClassifier:",rmse)
print("GridSearchCV with best k:        ",test_rmse)


gridsearch.best_params_ {'n_neighbors': 5}
KNeighborsClassifier: 0.2041241452319315
GridSearchCV with best k:         0.18257418583505536


In [16]:
# Weighted Average of Neighbors Based on Distance
parameters = {
 "n_neighbors": range(1, 50),
 "weights": ["uniform", "distance"]}
gridsearch = GridSearchCV(KNeighborsRegressor(), parameters)
gridsearch.fit(X_train, y_train)

print(gridsearch.best_params_)
test_preds_grid = gridsearch.predict(X_test)
test_mse = mean_squared_error(y_test, test_preds_grid)
test_rmse2 = sqrt(test_mse)
test_rmse2
print("KNeighborsClassifier:                ",rmse)
print("GridSearchCV with best               ",test_rmse)
print("GridSearchCV with best k and weights:",test_rmse2)

{'n_neighbors': 6, 'weights': 'distance'}
KNeighborsClassifier:                 0.2041241452319315
GridSearchCV with best                0.18257418583505536
GridSearchCV with best k and weights: 0.16546358196238142


In [None]:
best_k = gridsearch.best_params_["n_neighbors"]
best_weights = gridsearch.best_params_["weights"]

#### from scratch


In [None]:
class simpleKNNClassifier(ClassifierAlgorithm):
    """
    This is a subclass for ClassifierAlgorithm for implementing the  simpleKNN algorithrm
    """

    def __init__(self, X=None, y=None, split=0.9, k=3):
        """
        The constructor for simpleKNNClassifier subclass.

        Parameters:
        X: The features of dataset in dataframe format.
        y: The label of dataset in dataframe format.
        split: The split proportion of the train/test dataset.
        k: The number of nearest neighbours (Default = 3)
        """
        self.split = split
        if (X is not None) and (y is not None):
            self.indices = np.random.permutation(X.shape[0])
            self.i = int(X.shape[0] * self.split)
        self.k = k

    def knn_train(self, X_train=None, y_train=None, X_test=None, y_test=None):
        """
        knn_train:
        This method splits the data to train/test datasets random selection.
        And build attributes:
            X_train, X_test: training/test data of features
            y_train, y_test: training/test data of labels
        This method also allow the users to input additional splited features/labels.

        Parameters:
        X_train, X_test, y_train, y_test: (default = None )
        """
        list_input = [X_train, y_train, X_test, y_test]

        if all(elem is None for elem in list_input) == True:
            # get attributes by random selection.
          self.X_train = self.X.iloc[self.indices[:self.i]].reset_index(drop=True)
          self.y_train = self.y.iloc[self.indices[:self.i]].rename("actual").reset_index(drop=True)
          self.X_test = self.X.iloc[self.indices[self.i:]].reset_index(drop=True)
          self.y_test = self.y.iloc[self.indices[self.i:]].rename("actual").reset_index(drop=True)
        elif all(elem is not None for elem in list_input[0:3]) == True:
            if len(X_train) == len(y_train) and X_train.shape[1] == X_test.shape[1]:
                # get attributes by inputs
                self.X_train = X_train
                self.X_test = X_test
                self.y_train = y_train
                if y_test is None:
                    self.y_test = pd.DataFrame("unknown", index=X_test.index, columns=['actual'])['actual']
                elif (len(y_test) == len(X_test)):
                    self.y_test = y_test
                else:
                    raise TypeError(
                        "Invalid input, please input correct dataframes: {X_train, X_test, y_train, y_test}")
            else:
                raise TypeError("Invalid input, please input correct dataframes: {X_train, X_test, y_train, y_test}")
        else:
            raise TypeError("Invalid input, please input correct dataframes: {X_train, X_test, y_train, y_test}")

    def Manhattan_distance(self, a, b):
        """
        Manhattan_distance:
        This method calculates the Manhattan distance between two points

        Parameters:
        a, b: two points

        Returns:
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = 0
        # Calculate Manhattan distance
        for i in range(len(a)):
            distance += abs(a[i] - b[i])
        return distance

    def Euclidean_distance(self, a, b):
        """
        Euclidean_distance:
        This method calculates the Euclidean distance between two points

        Parameters:
        a, b: two points

        Returns:
        distances: The distance between two points
        """

        # Set initial distance to 0
        distance = 0
        # Calculate Manhattan distance
        for i in range(len(a)):
            distance += (a[i] - b[i]) ** 2
        return np.sqrt(distance)

    def Chebyshev_distance(self, a, b):
        """
        Chebyshev_distance:
        This method calculates the Chebyshev distance between two points

        Parameters:
        a, b: two points

        Returns:
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = []
        for i in range(len(a)):
            distance.append(abs(a[i] - b[i]))
        return max(distance)

    def Hamming_distance(self, a, b):
        """
        Hamming_distance:
        This method calculates the Hamming distance between two categorical points

        Parameters:
        a, b: two points

        Returns:
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = 0
        # Calculate hamming distance using parameter p
        for i in range(len(a)):
            if a[i] != b[i]:
                distance += 1
        return distance

    def knn_test(self, method="m"):
        """
        knn_test:
        This method predict the labels for x_test dataaset by selected distance calculation method

        Parameters:
        method :
            * "Manhattan" or "m": compute distance for continuous data by Manhattan_distance method
            * "Euclidean" or "e": compute distance for continuous data  by  Euclidean_distance method
            * "Chebyshev" or "c": compute distance for continuous data by  Chebyshev_distance method
            * "Hamming"   or "h": compute distance for categorical data by  Hamming_distance method

        Returns:
        distances: The distance between two points
        """

        # define the distance method
        method = method.lower()
        if method == "manhattan" or method == "m":
            dist_function = self.Manhattan_distance
        elif method == "euclidean" or method == "e":
            dist_function = self.Euclidean_distance
        elif method == "chebyshev" or method == "c":
            dist_function = self.Chebyshev_distance
        elif method == "Hamming" or method == "h":
            dist_function = self.Hamming_distance
        else:
            raise TypeError("""Invalid method. The available method contains:
                           \n   * "Manhattan" or "m": compute distance for continuous  data  by Manhattan_distance method
                           \n   * "Euclidean" or "e": compute distance for continuous  data  by  Euclidean_distance method
                           \n   * "Chebyshev" or "c": compute distance for continuous  data  by  Chebyshev_distance method
                           \n   * "Hamming"   or "h": compute distance for categorical data  by  Hamming_distance method""")

        # compute distance
        x_predict = []
        for i in self.X_test.index:
            distances = []
            for j in self.X_train.index:
                try:
                    distances.append(dist_function(self.X_test.iloc[i], self.X_train.iloc[j, :]))
                except:
                    raise TypeError("""Invalid method. The available method contains:
                           \n   * "Manhattan" or "m": compute distance for continuous data by Manhattan_distance method
                           \n   * "Euclidean" or "e": compute distance for continuous data  by  Euclidean_distance method
                           \n   * "Chebyshev" or "c": compute distance for continuous data by  Chebyshev_distance method
                           \n   * "Hamming"   or "h": compute distance for categorical data by  Hamming_distance method""")

            # get the top k :short distance
            df_dists = pd.DataFrame(data={"dist": distances, "label": self.y_train},
                                    index=self.X_train.index).sort_values(by=['dist'], axis=0)[:self.k]
            # get the votes for labels
            predict_label = df_dists.label.value_counts()[:1].index.tolist()[0]
            # find the final label
            x_predict.append(predict_label)

        # build the result table
        x_test_predict = pd.DataFrame(data={"predicted": x_predict}, index=self.X_test.index)
        final = self.X_test.merge(self.y_test, left_index=True, right_index=True)
        final = final.merge(x_test_predict, left_index=True, right_index=True)
        self.knn_result = final[["actual", "predicted"]]
        return final


In [None]:
class KDNode:
    '''
    vaule: [X,y]
    '''
    def __init__(self, value=None, parent=None, left=None, right=None, index=None):
        self.value = value
        self.parent = parent
        self.left = left
        self.right = right 

    @property
    def Nodes(self):
        """ 
        Nodes:
        This method updates the nodes

        Returns: 
        value of nodes
        """      
        if not self.parent:
            nodes = None
        else:
            if self.parent.left is self:
                nodes = self.parent.right
            else:
                nodes = self.parent.left
        return nodes
class kdTreeKNNclassifier(ClassifierAlgorithm):
    def __init__(self, X=None,y=None, split=0.9, k=3):
        """ 
        The constructor for simpleKNNClassifier subclass. 
  
        Parameters: 
        X: The features of dataset in dataframe format. 
        y: The label of dataset in dataframe format. 
        split: The split proportion of the train/test dataset.
        k: The number of nearest neighbours (Default = 3)
        """
        super().__init__(X,y, split)
        self.root = KDNode()
        self.K = k

        
    def kd_train(self,X_train=None, y_train=None, X_test=None, y_test=None):
        """ 
        kd_train:
        This method splits the data to train/test datasets random selection.
        And build attributes:
            X_train, X_test: training/test data of features
            y_train, y_test: training/test data of labels
        This method also allow the users to input additional splited features/labels. 

        Parameters: 
        X_train, X_test, y_train, y_test: (default = None )
        """
        list_input = [X_train,y_train,X_test,y_test]

        if all(elem is None for elem in list_input) == True:
            ClassifierAlgorithm.test(self)
            ClassifierAlgorithm.train(self)
            self.X_train = self.X_train.to_numpy()
            self.X_test = self.X_test.to_numpy()
            self.y_train = self.y_train.to_numpy()
            self.y_test = self.y_test.to_numpy()
        elif all(elem is not None for elem in list_input[0:3]) == True:
            if len(X_train) == len(y_train) and X_train.shape[1]==X_test.shape[1]:
                self.X_train = X_train.to_numpy()
                self.X_test = X_test.to_numpy()
                self.y_train = y_train.to_numpy()
                if y_test is  None:
                    self.y_test = pd.DataFrame("unknown", index=X_test.index, columns=['actual'])['actual']
                elif (len(y_test) == len(X_test)):
                    self.y_test = y_test.to_numpy()
                else:
                    raise TypeError("Invild input")
            else:
                raise TypeError("Invild input")
        else:
          raise TypeError("Invild input")

    def Euclidean_distance(self,a, b):
        """ 
        Euclidean_distance:
        This method calculates the Euclidean distance between two points

        Parameters: 
        a, b: two points
        
        Returns: 
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = 0
        # Calculate minkowski distance using parameter p = 2
        for i in range(len(a)):
            distance += (a[i] - b[i])**2
        return np.sqrt(distance)

    def Chebyshev_distance(self,a, b):
        """ 
        Chebyshev_distance:
        This method calculates the Chebyshev distance between two points

        Parameters: 
        a, b: two points
        
        Returns: 
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = []
        for i in range(len(a)):
            distance.append( abs(a[i] - b[i]))
        return max(distance)
    
    def Hamming_distance(self,a, b):
        """ 
        Hamming_distance:
        This method calculates the Hamming distance between two points

        Parameters: 
        a, b: two points
        
        Returns: 
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = 0
        # Calculate hamming distance using parameter p
        for i in range(len(a)):
          if a[i] != b[i]:
            distance += 1
        return distance

    def Manhattan_distance(self,a, b): 
        """ 
        Manhattan_distance:
        This method calculates the Manhattan distance between two points

        Parameters: 
        a, b: two points
        
        Returns: 
        distances: The distance between two points
        """
        # Set initial distance to 0
        distance = 0
        # Calculate minkowski distance using parameter p =1
        for i in range(len(a)):
            distance += abs(a[i] - b[i])
        return distance

    def build_tree(self, data, axis=0,parent=None):
        """ 
        build_tree:
        This method build kd tree for trainng dataset

        Parameters: 
        data: training dataset
        axis: axis
        parent: parent node
        
        Returns: 
        root: The nodes of tree
        """

        # choose median point 
        if len(data) == 0:
            root = KDNode()
            return root
        data = np.array(sorted(data, key=lambda x:x[axis]))
        #get median
        median = int(len(data)/2)

        root = KDNode(data[median],parent=parent)
        #depth+1
        new_axis = (axis+1)%(len(data[0])-1)
        # spliting into partitions
        left = data[:median,:]
        right = data[median+1:,:]

        #While (P != null) 
        if len(left) != 0:
          root.left = self.build_tree(left,axis=new_axis,parent=root)
        else:
          root.left = None
        if len(right) != 0:
          root.right = self.build_tree(right,axis=new_axis,parent=root)
        else:
          root.right = None
          
        self.root = root 

        return root
    
    def kd_tree(self):
        """ 
        kd_tree:
        This method implement the method build_tree
        """
        X = self.X_train
        y = self.y_train
        data = np.concatenate([X, y.reshape(-1,1)],axis=1)
        root = self.build_tree(data)

    def search_tree(self,point,dist_function ):
        """ 
        search_tree:
        This method  traverse through the tree and find K nearest points

        Parameters: 
        point: the point that need to be classfied
        dist_function : the function that compute the distance

        Returns: 
        dist_table: The dataframe of K nearest points
        """
        self.kd_tree()
        loc = self.root
        # init the leaf node
        axis = 0
        while loc:
            if point[axis] < loc.value[axis]:
                temp = loc
                loc = loc.left
            else:
                temp = loc
                loc = loc.right
            axis = (axis+1)%len(point)
        current = temp
     
        class_y = []
        dist = []
        #  traverse through the tree 
        while current:
            node_dist = dist_function(current.value[:-1],point)
            dist.append(node_dist)
            class_y.append(current.value[-1])
            if current.Nodes:
                node_dist = dist_function(current.Nodes.value[:-1],point)
                dist.append(node_dist)
                class_y.append(current.Nodes.value[-1])
            current = current.parent
        
        dist_table = pd.DataFrame(data={"dist":dist,"label":class_y}).sort_values(by=['dist'], axis=0)
        dist_table = dist_table[:self.K]
        return dist_table
        
    def kd_test(self,method = "e"):
        """ 
        kd_test:
        This method predict the labels for x_test dataaset by selected distance calculation method

        Parameters: 
        method :
            * "Manhattan" or "m": compute distance for continuous data by Manhattan_distance method
            * "Euclidean" or "e": compute distance for continuous data  by  Euclidean_distance method
            * "Chebyshev" or "c": compute distance for continuous data by  Chebyshev_distance method
            * "Hamming"   or "h": compute distance for categorical data by  Hamming_distance method

        Returns: 
        kd_result: the dataframe of predicted result and the actual result
        """
        predict = [0 for i in range(len(self.X_test))]

        method = method.lower()
        if method == "manhattan" or method == "m" :
          dist_function = self.Manhattan_distance
        elif method == "euclidean"or method == "e"  : 
          dist_function = self.Euclidean_distance
        elif method == "chebyshev"or method == "c"  :
          dist_function = self.Chebyshev_distance
        elif method == "Hamming"or method == "h"  :
          dist_function = self.Hamming_distance
        else:
          raise TypeError("invalid method")
        

        for i in range(len(self.X_test)):
            dist_table = self.search_tree(self.X_test[i],dist_function = dist_function)
            dist_table["label"] = dist_table["label"].astype("category")
            predict_label = dist_table.label.value_counts()[:1].index.tolist()[0]
            predict[i] = predict_label
        self.kd_result = pd.DataFrame(data={"predicted":predict,"actual":self.y_test})
        return self.kd_result
