## Aproximate Nearest Neighbor from Scratch

In [None]:
Create Next : 
2. Add priority queue for predict function
3. based on max distance from every node

In [2]:
import numpy as np
import matplotlib.pyplot as plt

In [64]:
class Tree:
    def __init__(self,
                 children_left = None,
                 children_right = None,
                 size : int = None,
                 is_leaf : bool = None,
                 hyperplane : np.array = None,
                 data_registered : np.array = None,
                 label_registered : np.array = None):
        """Class for store Node

        Args:
            children_left (Tree, optional): 
                Tree for data left. Defaults to None.
            children_right (_type_, optional): 
                Tree for data right. Defaults to None.
            size (int, optional): 
                Count of data point in a tree. Defaults to None.
        """
        self.children_left = children_left
        self.children_right = children_right
        self.size = size
        self.is_leaf = is_leaf
        self.hyperplane = hyperplane
        self.data_registered = data_registered
        self.label_registered = label_registered

In [55]:
class ApproximateNearestNeighbor:
    def __init__(self,
                 min_sample_per_branch : int = 10,
                 distance_type : str = "eucledian"):
        self.min_sample_per_branch = min_sample_per_branch
        self.distance_type = distance_type

    def create_index(self,
                     X : np.array,
                     y : np.array) -> None :
        # Pass the data by value
        X = np.array(X).copy()
        y = np.array(y).copy()

        # Grow indexing Tree
        self.forest = self.create_forest(X, y, 10)

    def create_forest(self,
                    X : np.array,
                    y : np.array,
                    n_tree : int = 10) -> np.array :
        # prepare the container object
        trees = []

        for i in range(n_tree):
            tree = self.grow_index(X,y)
            trees.append(tree)
        
        return trees

    def grow_index(self,
                   X : np.array,
                   y : np.array,
                   depth : int = 0):
        # calculate node size
        node_size = len(X)

        # if the node size larger than minimum data
        if node_size > self.min_sample_per_branch:

            # create Tree object without
            # registering data
            node = Tree(size = node_size, is_leaf = False)

            # split the node
            # and calculate hyperplane parameter
            X_left, X_right, y_left, y_right, hyperplane = self.hyperplane_split(X, y)
            node.hyperplane = hyperplane

            # grow recursively the branch
            node.children_left = self.grow_index(X_left, y_left, depth+1)
            node.children_right = self.grow_index(X_right, y_right, depth+1)
        
        else :
            # create Tree object with
            # registered data
            node = Tree(size=node_size,
                        is_leaf = True,
                        data_registered = X,
                        label_registered = y,
                        hyperplane=None)
            
        return node
    
    def hyperplane_split(self,
                         X : np.array,
                         y :np.array):

        # pick 2 random data point from data using
        random_index = np.random.randint(low=0, high=len(X)-1, size=2)
        random_point = X[random_index]

        # calculate the middle point
        random_middle = np.sum(random_point, axis=0)/2

        # vector random point
        vector = random_point[0] - random_point[1]

        # Create the hyperplane equation
        c = -np.dot(vector,random_middle)
        hyperplane = np.append(vector, c)

        # calculate the sign for every data 
        result_sign = np.sign(np.dot(X, hyperplane[:-1].T) + hyperplane[-1])

        # separate the data into left and right data
        X_left = X[np.where(result_sign==-1)]
        X_right = X[np.where(result_sign==1)]
        y_left = y[np.where(result_sign==-1)]
        y_right = y[np.where(result_sign==1)]

        return X_left, X_right, y_left, y_right, hyperplane
    
    def find_items_forest(self,
                          X_in : np.array,
                          n_items : int = 10):

        # crate empty container for store 
        # item from each tree with feature 
        # of X plus 1 for y
        item_forest = np.empty((0,X_in.shape[1]+1), dtype=float)

        # populate data from leaf
        for tree in self.forest:
            item_tree = self.find_item_leaf(X_in, tree=tree)
            item_forest = np.append(item_forest, item_tree, axis=0)

        # eliminate duplicate data
        item_forest = np.unique(item_forest, axis=0)

        # separate the item feature and item_label
        X_forest = item_forest[:,:X_in.shape[1]]
        y_forest = item_forest[:,-1]

        # rank the X_env and retrieve the y
        ranked_index = self.rank_neighbors(X_in,X_forest)
        
        if len(ranked_index) < n_items:
            return y_forest[ranked_index]
        else:
            return y_forest[ranked_index[:n_items]]
        
    def find_item_leaf(self,
                  X_in : np.array,
                  tree):
        
        if tree.is_leaf:
            return np.column_stack((tree.data_registered, tree.label_registered))
        
        else:
            sign_check = np.sign(np.dot(X_in, tree.hyperplane[:-1]) + tree.hyperplane[-1])
            if sign_check == 1:
                branch = tree.children_right
            else:
                branch = tree.children_left
            return self.find_item_leaf(X_in, branch)
        
    
    def rank_neighbors(self,
                         X_in : np.array,
                         X_env : np.array) -> np.array :
        """Method to compute distance between data input
           and registered data in a leaf, then rank the 
           environment

        Args:
            X_in (np.array) (k,):
                Data input into the environment with 1 
                data point and k number of feature
            X_env (np.array) (n,k):
                Data registered in the environment with
                n data point registered in the environment
                and k number of feature

        Returns:
            np.array (n,): 
                array of index of nearest item calculated 
                between the input and environment data 
                with size n data index
        """
        
        if self.distance_type == "eucledian":
            distance = np.linalg.norm(X_env - X_in, axis=1)
            rank_index = np.argsort(distance)

        elif self.distance_type == "manhattan":
            distance = np.sum(np.abs(X_env - X_in), axis=1)
            rank_index = np.argsort(distance)

        elif self.distance_type == "cosine-similarity":
            distance = np.squeeze(X_env @ X_in.T)/(np.linalg.norm(X_env, axis=1)*np.linalg.norm(X_in))
            rank_index = np.argsort(-distance)

        else:
            distance = np.squeeze(X_env @ X_in.T)
            rank_index = np.argsort(-distance)

        return rank_index

In [61]:
X = np.random.rand(100,5)
y = np.random.randint(low=100, high=1000, size=100)

print(X[0], y[0])

[0.19798236 0.35213506 0.34625828 0.71753865 0.53268257] 971


In [62]:
model = ApproximateNearestNeighbor(min_sample_per_branch=10)
model.create_index(X, y)

In [63]:
model.find_items_forest(np.array([[0.19798236,0.35213506,0.34625828,0.71753865,0.53268257]]))

array([971., 809., 915., 935., 974., 225., 566., 965., 477., 618.])

In [27]:
model.tree.children_left.children_left.children_left.children_right.hyperplane

None
