In [17]:
import numpy as np
import math

In [19]:
def euclidean_distance(x, y):
    return np.sqrt(np.sum((x[:, np.newaxis] - y) ** 2, axis=2))

def cosine_distance(x, y):
    #print(x, y)
    norma = np.sqrt(np.sum(np.square(x)[:,np.newaxis], axis=1).reshape(-1, 1) * np.sum(np.square(y), axis=1)) 
    if len(np.where(norma == 0)[0]) == 0:
        return np.dot(x, np.transpose(y)) / norma
    return np.array([0])

In [147]:

def cosine_distance(x, y):
    return 1 - np.dot(x, y.T) / np.sqrt(np.sum(x ** 2, axis=1)[:, None]) / np.sqrt(np.sum(y ** 2, axis=1)[None, :])


In [422]:
def weight(array):
    eps = 0.00001
    return 1 / (array + eps)


class KNNClassifier:
    def __init__(self, k, strategy, metric, weights, test_block_size):
        self.k = k
        self.strategy = strategy
        self.metric = metric
        self.weights = weights
        self.test_block_size = test_block_size
        if strategy != 'my_own':
            if strategy != 'brute' and strategy != 'kd_tree' and strategy != 'ball_tree':
                raise KeyError
            # change parameters
            self.skln_knn = NearestNeighbors(
                n_neighbors=k, algorithm = strategy, metric = metric)
        
    def fit(self, x, y):
        if self.strategy != 'my_own':
            self.skln_knn.fit(x, y)
        else:
            self.train_set = x
        self.train_target = y.astype(int)
    

    def find_kneighbors(self, x, return_distance):
        if self.strategy == 'my_own':
            count_of_blocks = math.ceil(x.shape[0] / self.test_block_size)
            if self.metric == 'euclidean':
                ans_matrix = euclidean_distance(self.train_set, x)
            if self.metric == 'cosine':
                ans_matrix = cosine_distance(self.train_set, x)
            ans_matrix = ans_matrix.T
            neighbour = np.sort(ans_matrix, axis=1)[:, :self.k]
            index = np.argsort(ans_matrix, axis=1)[:,:self.k]
            if return_distance:
                return (neighbour, index)
            return index
        else:
            return self.skln_knn.kneighbors(x, n_neighbors=self.k, return_distance=return_distance)
        
    def predict_without_blocks(self, x):
        answer = np.zeros(x.shape[0]).astype(int)
        
        if self.weights:
            
            dist, ind = self.find_kneighbors(x, True)
            votes = weight(dist)
            
            for i in range(len(x)):
                ind_array = np.zeros(10)
                for numb in range(ind.shape[1]):
                    ind_array[self.train_target[ind[i, numb]]] += votes[i, numb]
                answer[i] = np.argmax(ind_array)
                
        else:
            
            ind = self.find_kneighbors(x, False)
            
            for i in range(len(x)):
                ind_array = np.zeros(10)
                for numb in range(ind.shape[1]):
                    ind_array[self.train_target[ind[i, numb]]] += 1
                answer[i] = np.argmax(ind_array)
                
        return answer
            
    def predict(self, x):
        answer = np.zeros(x.shape[0])
        blocks = math.ceil(x.shape[0] // self.test_block_size)
        if x.shape[0] <= self.test_block_size:
                blocked_data = x
        else:
            if x.shape[0] % self.test_block_size == 0:
                    blocked_data = np.split(x, blocks)
            else:
                blocks += 1
                print(blocks)
                array = np.split(x[:(blocks - 1) * self.test_block_size], blocks - 1)
                array.append(x[(blocks - 1) * self.test_block_size:])
                blocked_data = array
        blocked_data = np.array(blocked_data)
        print(blocked_data[2].shape)
        if self.weights:
            for j, block in enumerate(blocked_data):
                dist, ind = self.find_kneighbors(block, True)
                votes = weight(dist)
                for i, mark in enumerate(self.train_target[ind]): # в i-м блоке - i * self.test_block_size ... (i + 1) * -||-
                    ind_array = np.zeros(10)
                    for q in range(self.k):
                        ind_array[mark[q].astype(int)] += votes[i][q]
                    answer[i + j * self.test_block_size] = ind_array.argmax()                   
        
        else:
            for j, block in enumerate(blocked_data):
                ind = self.find_kneighbors(block, False)
                for i, mark in enumerate(self.train_target[ind]):
                    answer[i + j * self.test_block_size] = np.bincount(mark.astype(int)).argmax()
        return answer


In [423]:
from distances import euclidean_distance
from distances import cosine_distance


def weight_function(distance):
    epsilon = 0.00001
    return float(1 / (distance + epsilon))


def array_weight_function(distances):
    epsilon = 0.00001

    return 1 / (distances + epsilon)


class KNNC:

    def __init__(self,
                 k,
                 strategy,
                 metric,
                 weights,
                 test_block_size=0):

        #print(k, strategy, metric, weights, test_block_size)

        self.strategy = strategy


        # my implementation
        self.k = k
        self.weights = weights
        self.data = None
        self.target = None
        self.clusters = None
        self.clusters_amount = None
        self.test_block_size = test_block_size

        if metric == "euclidean":
            self.metric = euclidean_distance
        elif metric == "cosine":
            self.metric = cosine_distance
        else:
            raise TypeError

        if strategy != "my_own":
            if strategy != "kd_tree" and strategy != "ball_tree" and strategy != "brute":
                raise TypeError

            self.uses = NearestNeighbors(n_neighbors=k,
                                         metric=metric,
                                         algorithm=strategy)

    def fit(self, X, y):
        # my implementation
        if X.__len__() != y.__len__():
            raise TypeError

        self.data = X
        self.target = y
        self.clusters = np.sort(np.unique(y))
        self.clusters_amount = np.unique(y).__len__()
        if self.strategy != "my_own":
            self.uses.fit(X, y)

    def find_kneighbors(self, X, return_distance=True):
        if self.strategy == "my_own":

            ranges = self.metric(self.data, X).T
            nearest = np.argsort(ranges, axis=1)[:, :self.k]

            if return_distance:
                distances = np.empty((X.shape[0], self.k))

                for enum, item in enumerate(nearest):
                    distances[enum] = ranges[enum, nearest[enum]]

                return distances, nearest
            return nearest
        else:
            return self.uses.kneighbors(X, n_neighbors=self.k, return_distance=return_distance)

    def predict(self, X):

        if X.shape[1] != self.data.shape[1]:
            raise TypeError

        test_target = np.empty(X.__len__()).astype(int)

        if self.weights:

            distances, nearest = self.find_kneighbors(X, True)

            for enum in range(X.__len__()):
                cluster_nb = np.zeros(self.clusters_amount)
                for numb, item in enumerate(nearest[enum]):
                    cluster_nb[np.where(self.clusters ==
                                        self.target[item])[0]] += weight_function(distances[enum, numb])

                test_target[enum] = self.clusters[np.argmax(cluster_nb)]
        else:
            nearest = self.find_kneighbors(X, False)

            for enum in range(X.__len__()):
                cluster_nb = np.zeros(self.clusters_amount).astype(int)
                for numb, item in enumerate(nearest[enum]):
                    cluster_nb[np.where(self.clusters ==
                                        self.target[item])[0]] += 1

                test_target[enum] = self.clusters[np.argmax(cluster_nb)]

        return test_target




In [424]:
from sklearn.datasets import fetch_mldata
mnist = fetch_mldata('MNIST original')
data = mnist.data / 255.0
target = mnist.target.astype(int)



In [425]:
train_data = data[:100]
train_target = target[:100]
test_data = data[100:110, :]
test_target = target[100:110]

In [449]:
knn = KNNClassifier(1, "my_own", "euclidean", False, 2)

In [450]:
knn.fit(train_data, train_target)

In [451]:
dist_first, index_first = knn.find_kneighbors(test_data, return_distance=True)

In [452]:
knn_my = KNNC(1, "my_own", "euclidean", False, 4)

In [453]:
knn_my.fit(train_data, train_target)

In [454]:
dist_second, index_second = knn_my.find_kneighbors(test_data, return_distance=True)

In [455]:
assert np.allclose(dist_first, dist_second)

In [456]:
assert np.allclose(index_first, index_second)

In [457]:
res = knn.predict_without_blocks(test_data)
res_my = knn_my.predict(test_data)

In [458]:
res

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [459]:
res_my

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])