In [23]:
import numpy as np
import distances
import sklearn.neighbors
import random

class KNNClassifier:
    
    def __init__(self, k, strategy, metric, weights=False, test_block_size=100):        
        
        if metric in ['euclidean', 'cosine']:
            self.metric = metric
        else:
            raise TypeError
        
        self.k = k
        self.weights = weights
        self.test_block_size = test_block_size
        
        if strategy in ['my_own', 'brute', 'kd_tree', 'ball_tree']:
            self.strategy = strategy
            if strategy != 'my_own':
                self.other = sklearn.neighbors.NearestNeighbors(algorithm=self.strategy, metric=self.metric)
        else:
            raise TypeError
            
        
###################### обучение ######################

    def fit(self, X, y): # X_train, y_train
        
        self.X_train = X
        self.y_train = y
            
        if self.strategy != 'my_own':
            self.other.fit(X, y)
    

###################### поиск k соседей ######################

    def find_kneighbors(self, X, return_distance):

        if self.metric == 'euclidean':
            dist_func = distances.euclidean_distance
        elif self.metric == 'cosine':
            dist_func = distances.cosine_distance

        max_size_of_X = 100

        if X.shape[0] > max_size_of_X:

            ret_dist, ret_ind = [], []

            for X_for_predict in np.array_split(X, X.shape[0] //
                                                (max_size_of_X - 1) + 1):
                dist = []
                if return_distance:
                    dist, ind = self.find_kneighbors(
                            X_for_predict, return_distance=return_distance)
                else:
                    ind = self.find_kneighbors(
                            X_for_predict, return_distance=return_distance)

                ret_dist += list(dist)
                ret_ind += list(ind)

            if return_distance:
                return (np.array(ret_dist), np.array(ret_ind))
            else:
                return (np.array(ret_ind))

        else:

            if self.strategy == 'my_own':
                ret_dist = np.zeros((X.shape[0], self.k))
                ret_ind = np.zeros((X.shape[0], self.k))

                dist = dist_func(X, self.X_train)

                for i in range(X.shape[0]):
                    one_dist = dist[i]
                    ind = list(range(self.X_train.shape[0]))
                    res = list(zip(ind, one_dist))
                    res = sorted(res, key=lambda d: d[1])
                    res = res[:self.k]
                    ind, one_dist = zip(*res)

                    ind = np.array(ind)

                    one_dist = np.array(one_dist)

                    ret_dist[i] = np.array(one_dist)
                    ret_ind[i] = np.array(ind)

                if return_distance:
                    return (np.array(ret_dist), np.array(ret_ind).astype('int'))
                else:
                    return (np.array(ret_ind).astype('int'))

            else:
                return self.other.kneighbors(
                        X, n_neighbors=self.k, return_distance=return_distance)

###################### прогноз ######################

    def predict(self, X): # X_test

        if self.weights:
            dist, ind = self.find_kneighbors(X, return_distance=True)
            
            el_weights = 1 / (dist + 10 ** -5)

            res = []
            for i in range(ind.shape[0]):
                indeces = ind[i]
                w = el_weights[i]
                ind_list = []
                for index in indeces:
                    index = int(index)
                    ind_list.append(self.y_train[index])
                counts = np.bincount(ind_list, weights=w)
                y = np.argmax(counts)
                res.append(y)
            
        else:
            ind = self.find_kneighbors(X, return_distance=False)

            res = []
            for indeces in ind:
                ind_list = []
                for index in indeces:
                    index = int(index)
                    ind_list.append(self.y_train[index])
                counts = np.bincount(ind_list)
                y = np.argmax(counts)
                res.append(y)
                
        return np.array(res)
        

In [24]:
# X_tr = np.array([[0, 0, 0, 1], [5, 5, 3, 5], [5, 5, 5, 5], [0, 0, 0, 2]])
# y_tr = np.array([0, 5, 5, 0])
# X_tst = np.array([[0, 0, 0, 3], [5, 5, 2, 5], [0, 3, 0, 0]])
# y_tst = np.array([0, 5, 0])


cls = KNNClassifier(k=3, strategy='my_own', metric='euclidean', weights=False)
cls.fit(X_tr, y_tr)
cls.predict(X_tst) #== y_test

array([1, 2, 3], dtype=int64)

In [27]:
cls.find_kneighbors(X_tst, return_distance=False)

array([[ 0,  5,  9],
       [ 2,  7, 11],
       [ 1,  6, 10]], dtype=int64)

In [26]:
cls = KNNClassifier(k=3, strategy='brute', metric='euclidean', weights=False)
cls.fit(X_tr, y_tr)
cls.predict(X_tst)

array([1, 2, 3], dtype=int64)

In [78]:
cls = KNNClassifier(k=3, strategy='kd_tree', metric='euclidean')
cls.fit(X_tr, y_tr)
cls.predict(X_tst)

4 [0, 1, 3]


array([4, 4, 4], dtype=int64)

In [100]:
# cls.fit(X_tr, y_train)

In [6]:
# %%time
X_tr = np.array([[1, 7, 3, 8], [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9], [2, 1, 1, 2], [1, 7, 3, 8],
                 [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9], [1, 7, 3, 8], [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9],
                 [1, 7, 3, 8], [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9], [2, 1, 1, 2], [1, 7, 3, 8],
                 [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9], [1, 7, 3, 8], [9, 5, 0, 1], [2, 5, 1, 0], [3, 5, 2, 9]])
# X_tr = np.array([[1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [0, 0, 0, 1]])
y_tr = np.array([1, 4, 4, 2, 3, 3, 5, 2, 4, 1, 3, 2, 4,
                    1, 4, 4, 2, 3, 3, 5, 2, 4, 1, 3, 2, 4])
X_tst = np.array([[1, 4, 3, 5], [2, 5, 2, 2], [6, 3, 4, 1]])
# y_tr = np.array([1, 4, 4, 2])
# cls.predict(X_tr)


In [4]:
test_block_size = 3
test_block = random.sample(range(X_tr.shape[1]), test_block_size)
#     print(list(range(X_tr[1] - 1)), test_block)