I created this KNN algorithm for my Machine Learning module coursework assignment at the University of London, MSc. Data Science programme.

Kursat Kutlu Aydemir, December, 2021, UoL.


### 1. Helper Functions

I implemented the helper code as a Python class named `KNNHelper` having the distance, point comparison and label decision functions as I consumed later.

In [1]:
import math
import numpy as np
from scipy.spatial import distance as d

class KNNHelper():
    """Distance functions
    
    Methods
    -------
    euclidean(point1, point2)
        Calculates euclidean distance between the given two data points.
    manhattan(point1, point2)
        Calculates manhattan distance between the given two data points.
    chebyshev(point1, point2)
        Calculates chebyshev distance between the given two data points.
    dist(point1, point2, metric='euclidean')
        Calculates the corresponding distance, using the given metric, between the given two data points.
    are_points_equal(point1, point2)
        Compares and returns if the given two points are equal or not.
    decide_label(_labels, element_list)
        Decides the final label by ranking the labels the given elements.

    References
    ----------
    For distance metrics I referred to the suitable metrics listed here:
    https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.DistanceMetric.html#sklearn.neighbors.DistanceMetric

    """

    def euclidean(self, point1, point2):
        """Calculates euclidean distance between the given two data points.

        Parameters
        ----------
        point1 : numpy.ndarray
        point2 : numpy.ndarray

        Returns
        -------
        self : float
            Square rooted euclidean distance
        """

        return math.sqrt(sum(((p1 - p2) ** 2) for p1, p2 in zip(point1, point2)))


    def manhattan(self, point1, point2):
        """
        Calculates manhattan distance between the given two data points.

        Parameters
        ----------
        point1 : numpy.ndarray
        point2 : numpy.ndarray

        Returns
        -------
        self : float
            Manhattan distance
        """

        return sum(abs(p1 - p2) for p1, p2 in zip(point1, point2))


    def chebyshev(self, point1, point2):
        """
        Calculates chebyshev distance between the given two data points.

        Parameters
        ----------
        point1 : numpy.ndarray
        point2 : numpy.ndarray

        Returns
        -------
        self : float
            Chebyshev distance
        """

        return np.max(np.abs(point1 - point2))


    def dist(self, point1, point2, metric='euclidean'):
        """
        Calculates the corresponding distance, using the given metric, between the given two data points.
          Available metrics:
          * euclidean
          * manhattan
          * chebyshev

        Parameters
        ----------
        point1 : numpy.ndarray
        point2 : numpy.ndarray

        Returns
        -------
        self : float
            Corresponding metric distance value
        """

        return {
            'euclidean': self.euclidean(point1, point2),
            'manhattan': self.manhattan(point1, point2),
            'chebyshev': self.chebyshev(point1, point2)
        }[metric]


    def are_points_equal(self, point1, point2):
        """Compares and returns if the given two points are equal or not.

        Parameters
        ----------
        point1 : numpy.ndarray
        point2 : numpy.ndarray

        Returns
        -------
        bool
        """

        for i in range(0, len(point1)):
            if point1[i] != point2[i]:
                return False
        return True


    def decide_label(self, _labels, element_list):
        """
        Decides the final label by ranking the labels of the given elements.

        Parameters
        ----------
        element_list : List of tuples
        These nodes supposed to be the nearest neighbors for given point\n
        `element_list` tuple elements must have two elements.
            * First element is an object which must have an attribute of `_label`
            * Second element is the distance value of float datatype.

        Returns
        -------
        object
            Decided label
        """

        max_label = {
          'label': None,
          'count': 0,
          'distance': 0
        }
        labels = []
        for label in _labels:
            cnt = sum(1 for n in element_list if n[0]._label == label)
            if cnt > 0:
                average_dist = sum(n[1] for n in element_list if n[0]._label == label) / cnt
                lbl = {
                    'label': label,
                    'count': cnt,
                    'distance': average_dist
                }
                labels.append(lbl)


            if cnt > max_label['count']:
                max_label['count'] = cnt
                max_label['label'] = label


        # This logic also shows how I handle the tie-break:

        # 1. list of elements having the max count
        # multiple elements with different labels can have the same max count
        max_nodes = [lblnode for lblnode in labels if lblnode['count'] == max_label['count']]

        # 2. get the class (label) having the minimum distance from the max_nodes list
        # this is our final decision
        min_distance_class = min(max_nodes, key=lambda x:x['distance'])

        return min_distance_class['label']


    def print_tabular(self, titles, matrix):
        """Prints a matrix in tabular format

        Parameters
        ----------
        titles : list of str
            A list of n titles, e.g. class names
        matrix : numpy.ndarray
            Matrix shape of n x n

        Returns
        -------
        None

        References:
        -----------
        Taken the code from: https://www.educba.com/python-print-table/
        """

        format_row = "{:>12}" * (len(titles) + 1)
        print(format_row.format("", *titles))
        for val, row in zip(titles, matrix):
            print(format_row.format(val, *row))


### 2. KDNode

In [3]:
class KDNode():
    """KD-Tree Node

    Attributes
    ----------
    _knnHelper : `KNNHelper`
    _level : int
        Depth of the node in the kd-tree
    _left_child : `KDNode`
    _right_child : `KDNode`
    _label : int
        Class (label)
    _distance : str
        Distance metric

    
    Methods
    -------
    __init__(location, label, level, distance)
        `KDNode` Constructor
    print_node()
        Prints the node itself and its left and right child nodes.

    preorder_traversal(node, test_point)
        Traverses the binary tree in preorder direction.
    
    search_closest(k, test_point, depth, knearestl)
        Searches the nearest point in the `KDTree` for the given `test_point` by iterating trough the depth into the tree.

    """
    
    def __init__(self, location, label, level, distance):
        """`KDNode` Constructor

        Parameters
        ----------
        location : numpy.ndarray
            Actual data point of the node
        label : int
            Class (target) of the node
        level : int
            Depth of the node, starting level is 0
        distance : str
            Distance metric name. Available metrics are:
            * euclidean
            * manhattan
            * chebyshev

        Returns
        -------
        self : KDTree
        """

        self._knnHelper = KNNHelper()
        self._level = level            # depth of the node (starting from 0 for root node)
        self._left_child = None
        self._right_child = None
        self._location = location      # actual data point from the dataset
        self._label = label            # target class value of the data point
        self._distance = distance      # distance metric name


    def print_node(self):
        """Prints the node itself and its left and right child nodes.

        Returns
        -------
        None
        """

        if self._left_child:
            self._left_child.print_node()
        print("-" * self._level, self._level, self._location, ", label: ", self._label),
        if self._right_child:
            self._right_child.print_node()


    def preorder_traversal(self, node):
        """Traverses the binary tree in preorder direction starting from the given node.

        Parameters
        ----------
        node : KDNode
            Starting node of the traversal

        Returns
        -------
        list of numpy.ndarray
            List of locations of the traversed nodes
        """

        node_data = []

        if node:
            node_data.append(node._location)
            if node._left_child:
                node_data.extend(self.preorder_traversal(node._left_child))
            if node._right_child:
                node_data.extend(self.preorder_traversal(node._right_child))
        return node_data


    def search_closest(self, k, test_point, depth, knearestl):
        """Searches the nearest points in the `KDTree` for the given `test_point` by iterating trough the depth into the tree.

        Parameters
        ----------
        node : int
            Nearest neighbor count to search for
        test_point : numpy.ndarray
        depth : int
        knearestl : dict of tuples
            Dict holding the (KDNode, distance to test_point) tuple list per test_point.

        Returns
        -------
        [numpy.ndarray, KDNode]
            Found current best distance and the node.
        """

        # d is depth % (feature count) which gives us the feature index to compare
        # current node value with the point we are searching for to decide to continue with
        # left or right child
        d = depth % len(test_point)

        # this is a unique data point key to hold the found nearest neighbors for each datapoint
        test_point_key = tuple(test_point)

        current_distance = self._knnHelper.dist(self._location, test_point, self._distance)
        current_best = self
        child_distance = None
        child = None

        if test_point_key not in knearestl:
            knearestl[test_point_key] = []

        matching_pair = [item for item in knearestl[test_point_key] if self._knnHelper.are_points_equal(item[0]._location, self._location)]
        if len(matching_pair) == 0:
            knearestl[test_point_key].append(tuple([current_best, current_distance]))

        if test_point[d] < self._location[d]:
            # continue with left branch
            if self._left_child:
                child_distance, child = self._left_child.search_closest(k, test_point, depth + 1, knearestl)
            elif len(knearestl[test_point_key]) < k:
                if self._right_child:
                    child_distance, child = self._right_child.search_closest(k, test_point, depth + 1, knearestl)
        else:
            # continue with right branch
            if self._right_child:
                child_distance, child = self._right_child.search_closest(k, test_point, depth + 1, knearestl)
            elif len(knearestl[test_point_key]) < k:
                if self._left_child:
                    child_distance, child = self._left_child.search_closest(k, test_point, depth + 1, knearestl)

        # if a child is picked from one of the branches
        # pick the current best as child if child distance is smaller
        if child:
            if child_distance < current_distance:
                current_distance = child_distance
                current_best = child

        return [current_distance, current_best]


### 3. KDTree

In [None]:
class KDTree():
    """KD-Tree implementation in using binary-search tree algorithm
    
    Attributes
    ----------
    _root : KDNode
    _X : numpy.ndarray
    _y : numpy.ndarray
    _k : int
    _labels : set
    _distance : str
    _knearest : dict
    _knnHelper : KNNHelper


    Methods
    -------
    print_node()
        Fits and builds the KDTree. This can also be called a fit function of the KD-Tree algorithm.
    preorder_traversal(node, test_point)
        Creates a `KDNode` object.
    search_closest(k, test_point, depth, knearestl)
    
    """

    def __init__(self, X, y, k=1, distance='euclidean'):
        """`KDTree` constructor. Creates a K-Dimensional tree

        Parameters
        ----------
        X : numpy.ndarray
            n samples, K features
        y : numpy.ndarray
            n outputs
        k : int
            Nearest neighbor count to find
        distance : str
            Distance metric

        Returns
        -------
        self : KDTree
            KDTree estimator
        """

        self._root = None
        self._X = X
        self._y = y
        self._k = k
        self._labels = set(y)
        self._distance = distance
        self._knearest = {}
        self._knnHelper = KNNHelper()

        if (self._k > round(len(self._X) / 2)):
            self._k = round(len(self._X) / 2)


    def _build_tree(self):
        """Fits and builds the KDTree. This can also be called a fit function of the KD-Tree algorithm.

        Returns
        -------
        None
        """

        for i in range(0, len(self._X)):
            self._root = self._create_node(self._root, self._X[i], self._y[i], 0)


    def _create_node(self, tree_node, point, label, depth):
        """Creates a `KDNode` object.

        Parameters
        ----------
        tree_node : KDNode
            If tree_node is None creates and returns a new KDNode, else creates a left or right node and attaches to tree_node
        point : numpy.ndarray
            The output labels.
        label : int
            The output label of the node being created.
        depth : int
            Level of the node in the tree. Root node is at the level 0

        Returns
        -------
        KDNode
        """

        if tree_node == None:
            return KDNode(point, label, depth, self._distance)

        # feature count
        K = len(point)

        # current feature to compare for creating a child node to the right or left
        d = depth % K
        if point[d] < tree_node._location[d]:
            tree_node._left_child = self._create_node(tree_node._left_child, point, label, depth + 1)
            tree_node._level = depth + 1
        else:
            tree_node._right_child = self._create_node(tree_node._right_child, point, label, depth + 1)
            tree_node._level = depth + 1

        return tree_node


    def predict(self, X_test):
        """Predicts the classes for the given test input

        Parameters
        ----------
        X_test : numpy.ndarray
            Test data points of shape n rows with K features

        Returns
        -------
        list of int
            List of n classes
        """

        y_pred = []
        for point in X_test:
            self._knearest = {}
            # search closest elements for each test point and fill knearest lists the tree object
            self._root.search_closest(self._k, point, 0, self._knearest)

            # sort the k-nearest neighbors for each test point
            for nnkey in self._knearest.keys():
                self._knearest[nnkey].sort(key=lambda s: s[1])

            # append predicted class list after a final decision
            # over the nearest k points
            y_pred.append(self._knnHelper.decide_label(self._labels, self._knearest[tuple(point)][:self._k]))

        return y_pred


    def _print_tree(self):
        """Prints the KD-Tree

        Returns
        -------
        None
        """

        self._root.print_node()


### 4. BruteForce

In [4]:
class LabeledPoint():
    """Represents a data point enriched with a class attribute.

    Attributes
    ----------
    _p = KDNode
    _labels = set of int
        Available classes

    Methods
    -------
    __init__()
        LabeledPoint constructor.
    """

    def __init__(self, p, label):
        """Instantiates a LabeledPoint object

        Parameters
        ----------
        p : numpy.ndarray
            Data point of K features
        label : int
            Class of the given point p

        Returns
        -------
        self : object
        """

        self._p = p
        self._label = label

class BruteForce():
    """BruteForce algorithm for my KNN classifier.

    Attributes
    ----------
    _X : numpy.ndarray
        Training data points of shape n rows with K features
    _y : numpy.ndarray
        Training data classes
    _k : int
        Nearest neighbor count to find
    _labels : set of int
        Available unique classes
    _distance : str
        Distance metric parameter
    _knearest : dict
        Dict holding the (KDNode, distance to test_point) tuple list per test_point.
    _knnHelper : KNNHelper

    Methods
    -------
    __init__()
        BruteForce constructor.
    search_closest(test_point, knearestl)
        Searches k nearest neighbors
    predict(X_test)
        Predicts output classes for given test data
    """

    def __init__(self, X, y, k=1, distance='euclidean'):
        """BruteForce constructor.

        Parameters
        ----------
        X : numpy.ndarray
            Training data points of shape n rows with K features
        y : int
            Training data classes
        k : int
            Nearest neighbor count to find
        distance : str
            Distance metric parameter

        Returns
        -------
        self : object
        """

        self._X = X
        self._y = y
        self._k = k
        self._labels = set(y)
        self._distance = distance
        self._knearest = {}
        self._knnHelper = KNNHelper()

    def search_closest(self, test_point, knearestl):
        """For given test (prediction) point searches through all the training data and calculates the distances for finding k nearest neighbors.

        Parameters
        ----------
        test_point : numpy.ndarray
            Data point to predict its label
        knearestl : dict
            Dict holding the (KDNode, distance to test_point) tuple list per test_point.

        Returns
        -------
        None
        """

        # dict key for filling the nearest-neighbor list for each test point
        test_point_key = tuple(test_point)

        for i in range(len(self._X)):
            p_distance = self._knnHelper.dist(self._X[i], test_point, self._distance)

            if test_point_key not in knearestl:
                knearestl[test_point_key] = []

            matching_pair = [item for item in knearestl[test_point_key] if self._knnHelper.are_points_equal(item[0]._p, self._X[i])]
            if len(matching_pair) == 0:
                lPoint = LabeledPoint(self._X[i], self._y[i])
                knearestl[test_point_key].append(tuple([lPoint, p_distance]))

        # sort the neighbor list by their distances
        knearestl[test_point_key].sort(key=lambda s: s[1])


    def predict(self, X_test):
        """Predicts the classes for the given test input

        Parameters
        ----------
        X_test : numpy.ndarray
            Test data points to predict their classes.

        Returns
        -------
        list of int
            Predicted classes.
        """

        y_pred = []
        for point in X_test:
            self._knearest = {}
            self.search_closest(point, self._knearest)

            # pick the k nearest neighbors and make a final decision on label
            y_pred.append(self._knnHelper.decide_label(self._labels, self._knearest[tuple(point)][:self._k]))

        return y_pred
