In [776]:
import numpy as np
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.neighbors import NearestNeighbors
EPS = 1e-5


def euclidean_distance(x, y, Neibors_norm=None):
    if Neibors_norm is None:
        return np.sqrt(- 2 * np.inner(x, y) +
                       np.sum(x ** 2, axis=1)[:, None] +
                       np.sum(y ** 2, axis=1))
    else:
        return np.sqrt(- 2 * np.inner(x, y) +
                       np.sum(x ** 2, axis=1)[:, None] +
                       Neibors_norm)


def cosine_distance(x, y, Neibors_norm=None):
    if Neibors_norm is None:
        return ((- np.inner(x, y) / np.linalg.norm(x, axis=1)[:, np.newaxis]) /
                np.linalg.norm(y, axis=1) + 1)
    else:
        return ((- np.inner(x, y) / np.linalg.norm(x, axis=1)[:, np.newaxis]) /
                Neibors_norm + 1)


def cos_vector(x, y):
    return 1 - np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))


class KNNClassifier:
    def __init__(
            self,
            k,
            strategy='brute',
            metric='euclidean',
            weights=False,
            test_block_size=20):
        self.k = k
        self.strategy = strategy
        self.metric = metric
        self.weights = weights
        self.test_block_size = test_block_size
        if strategy != 'my_own':
            if metric == 'euclidean':
                self.NearestNeighborsClass = NearestNeighbors(
                    k, algorithm=strategy, metric='euclidean')
            else:
                self.NearestNeighborsClass = NearestNeighbors(
                    k, algorithm=strategy, metric='cosine')

    def fit(self, X, y):
        if self.strategy == 'my_own':
            self.Neibors_coord = X
            self.Neibors_label = y
            if self.metric == 'euclidean':
                self.Neibors_norm = np.sum(X ** 2, axis=1)
            else:
                self.Neibors_norm = np.linalg.norm(X, axis=1)
        else:
            self.NearestNeighborsClass.fit(X, y)
            self.Neibors_label = y

    def find_kneighbors(self, X, return_distance):
        if self.strategy == 'my_own':
            if self.metric == 'euclidean':
                if return_distance is True:
                    rows = np.zeros((self.test_block_size, self.k), dtype=int)
                    rows[:, :] = \
                        np.array(
                            np.arange(self.test_block_size)
                        ).reshape(self.test_block_size, 1)
                    distance = np.zeros((X.shape[0], self.k))
                    positions = np.zeros((X.shape[0], self.k), dtype=int)
                    for i in range(0, X.shape[0], self.test_block_size):
                        Distance_matrix = euclidean_distance(
                            X[i:i + self.test_block_size],
                            self.Neibors_coord,
                            self.Neibors_norm)
                        positions[i:i + self.test_block_size] = \
                            np.argsort(Distance_matrix)[:, :self.k]
                        distance[i:i + self.test_block_size] = \
                            Distance_matrix[
                                rows[:Distance_matrix.shape[0]],
                                positions[i:i + self.test_block_size]
                                ]
                    return (distance, positions)
                else:
                    positions = np.zeros((X.shape[0], self.k), dtype=int)
                    for i in range(0, X.shape[0], self.test_block_size):
                        Distance_matrix = euclidean_distance(
                            X[i:i + self.test_block_size],
                            self.Neibors_coord,
                            self.Neibors_norm)
                        positions[i:i + self.test_block_size] = \
                            np.argsort(Distance_matrix)[:, :self.k]
                    return positions
            if self.metric == 'cosine':
                if return_distance is True:
                    rows = np.zeros((self.test_block_size, self.k), dtype=int)
                    rows[:, :] = \
                        np.array(
                            np.arange(self.test_block_size)
                        ).reshape(self.test_block_size, 1)
                    distance = np.zeros((X.shape[0], self.k))
                    positions = np.zeros((X.shape[0], self.k), dtype=int)
                    for i in range(0, X.shape[0], self.test_block_size):
                        Distance_matrix = cosine_distance(
                            X[i:i + self.test_block_size],
                            self.Neibors_coord)
                        positions[i:i + self.test_block_size] = \
                            np.argsort(Distance_matrix)[:, :self.k]
                        distance[i:i + self.test_block_size] = \
                            Distance_matrix[
                                rows[:Distance_matrix.shape[0]],
                                positions[i:i + self.test_block_size]
                                ]
                    return (distance, positions)
                else:
                    positions = np.zeros((X.shape[0], self.k), dtype=int)
                    for i in range(0, X.shape[0], self.test_block_size):
                        Distance_matrix = cosine_distance(
                            X[i:i + self.test_block_size],
                            self.Neibors_coord)
                        positions[i:i + self.test_block_size] = \
                            np.argsort(Distance_matrix)[:, :self.k]
                    return positions
        else:
            distance = np.zeros((X.shape[0], self.k))
            positions = np.zeros((X.shape[0], self.k), dtype=int)
            if return_distance is True:
                for i in range(0, X.shape[0], self.test_block_size):
                    distance[i:i + self.test_block_size], \
                        positions[i:i + self.test_block_size] = \
                        self.NearestNeighborsClass.kneighbors(
                        X[i:i + self.test_block_size])
                return (distance, positions)
            else:
                for i in range(0, X.shape[0], self.test_block_size):
                    positions[i:i + self.test_block_size] = \
                        self.NearestNeighborsClass.kneighbors(
                            X[i:i + self.test_block_size],
                            return_distance=False)
                return positions

    def predict(self, X):
        classes = np.max(self.Neibors_label) + 1
        if self.weights is False:
            nearest_neighbors = self.find_kneighbors(X, return_distance=False)
            nearest_neighbors = self.Neibors_label[nearest_neighbors]
            answers = np.zeros(X.shape[0], dtype=int)
            answers = np.argmax(np.sum(
                (nearest_neighbors[:, :, np.newaxis] ==
                    np.arange(classes)[
                        np.newaxis,
                        np.newaxis, :]).astype(int), axis=1), axis=1)
            return answers
        else:
            answers = np.zeros(X.shape[0], dtype=int)
            nearest_dist, nearest_neighbors = self.find_kneighbors(
                X,
                return_distance=True)
            nearest_neighbors = self.Neibors_label[nearest_neighbors]
            nearest_dist = np.vectorize(
                lambda x: 1. /
                (x + EPS))(nearest_dist)
            answers = np.argmax(
                np.sum(
                    (nearest_neighbors[:, :, np.newaxis] ==
                        np.arange(classes)[
                            np.newaxis,
                            np.newaxis,
                            :]).astype(int) *
                    nearest_dist[:, :, np.newaxis], axis=1),
                axis=1)
            return answers



def knn_cross_val_score(X, y, k_list, score, cv=None, **kwargs):
    if cv is None:
        cv = []
        kfold = StratifiedKFold()
        for i in kfold.split(X, y):
            cv.append(i)
    classes = np.max(y) + 1
    dict_of_answers = {}
    for k in k_list:
        dict_of_answers[k] = []
    knn = KNNClassifier(k_list[-1], **kwargs)
    for ar in cv:
        if score == 'accuracy':
            if kwargs['weights'] is True:
                knn.fit(X[ar[0]], y[ar[0]])
                nearest_dist, nearest_neighbors = knn.find_kneighbors(
                    X[ar[1]],
                    return_distance=True)
                nearest_dist = np.vectorize(
                    lambda x: 1. /
                    (x + EPS))(nearest_dist)
                nearest_neighbors = \
                    y[ar[0]][nearest_neighbors]
                for k in k_list:
                    answers = np.zeros(X[ar[1]].shape[0], dtype=int)
                    answers = np.argmax(
                        np.sum(
                            (nearest_neighbors[:,:k][:,:,np.newaxis] ==
                             np.arange(classes)[
                                 np.newaxis,
                                 np.newaxis,
                                 :]).astype(int) *
                            nearest_dist[:,:k][:,:,np.newaxis], axis=1),
                        axis=1)
                    scor = np.sum(answers == y[ar[1]]) / len(answers)
                    dict_of_answers[k].append(scor)
            else:
                knn.fit(X[ar[0]], y[ar[0]])
                nearest_neighbors = knn.find_kneighbors(
                    X[ar[1]],
                    return_distance=False)
                nearest_neighbors = \
                    y[ar[0]][nearest_neighbors]
                for k in k_list:
                    answers = np.zeros(X[ar[1]].shape[0], dtype=int)
                    answers = np.argmax(
                        np.sum(
                            (nearest_neighbors[:,:k][:,:,np.newaxis] ==
                             np.arange(classes)[
                                 np.newaxis,
                                 np.newaxis,
                                 :]).astype(int),
                            axis=1),
                        axis=1)
                    scor = np.sum(answers == y[ar[1]]) / len(answers)
                    dict_of_answers[k].append(scor)
        else:
            return None
    return dict_of_answers


In [707]:
X = np.random.rand(110,2)

In [708]:
y = np.random.randint(0, 2, 110)

In [709]:
def gen_folds(X, y, n):
    list_of_folds = []
    kfold = KFold(n)
    for i in kfold.split(X,y):
        list_of_folds.append(i)
    return list_of_folds

In [710]:
A = gen_folds(X, y, 3)
k_list = [1, 2, 3, 4]

In [747]:
dict_1 = knn_cross_val_score(X, y, k_list, 'accuracy', A, strategy='my_own', metric='euclidean', weights=False, test_block_size=10)

In [748]:
print(dict_1)

{1: [0.43243243243243246, 0.5405405405405406, 0.5277777777777778], 2: [0.5405405405405406, 0.5135135135135135, 0.6111111111111112], 3: [0.6216216216216216, 0.4864864864864865, 0.5833333333333334], 4: [0.5945945945945946, 0.4864864864864865, 0.6111111111111112]}


In [749]:
from sklearn.model_selection import cross_val_score
from sklearn.neighbors import KNeighborsClassifier

In [750]:
def dist_aplly(a):
    return np.apply_along_axis(lambda x: 1. / (x + EPS),axis=0,arr=a)

In [751]:
for i in k_list:
    print(cross_val_score(KNeighborsClassifier(n_neighbors=i, algorithm='brute', metric='euclidean'), X, y,cv=A))

[0.43243243 0.54054054 0.52777778]
[0.54054054 0.51351351 0.61111111]
[0.62162162 0.48648649 0.58333333]
[0.59459459 0.48648649 0.61111111]


In [789]:
cl = KNeighborsClassifier(n_neighbors=2, algorithm='brute', metric='euclidean',weights=dist_aplly)

In [790]:
cl.fit(X[A[0][0]], y[A[0][0]])

KNeighborsClassifier(algorithm='brute', leaf_size=30, metric='euclidean',
                     metric_params=None, n_jobs=None, n_neighbors=2, p=2,
                     weights=<function dist_aplly at 0x7fdec77a4620>)

In [791]:
cl.predict(X[A[0][1]])

array([1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1,
       1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1])

In [792]:
A1 = cl.kneighbors(X[A[0][1]])[0]

In [793]:
knn = KNNClassifier(2, 'my_own', 'euclidean', weights=True)

In [794]:
knn.fit(X[A[0][0]], y[A[0][0]])

In [795]:
A2 = knn.find_kneighbors(X[A[0][1]], True)[0]

In [796]:
knn.predict(X[A[0][1]])

array([1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1,
       1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1])

In [199]:
a = np.array([1, 2, 3, 4])

In [200]:
np.apply_along_axis(lambda x: 1. / (x + EPS), axis=0, arr=a)

array([0.99999   , 0.4999975 , 0.33333222, 0.24999938])

In [491]:
rows = np.zeros((10, 10), dtype=int)

In [492]:
rows[:, :] = np.array(np.arange(10)).reshape(10, 1)

In [493]:
rows

array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [3, 3, 3, 3, 3, 3, 3, 3, 3, 3],
       [4, 4, 4, 4, 4, 4, 4, 4, 4, 4],
       [5, 5, 5, 5, 5, 5, 5, 5, 5, 5],
       [6, 6, 6, 6, 6, 6, 6, 6, 6, 6],
       [7, 7, 7, 7, 7, 7, 7, 7, 7, 7],
       [8, 8, 8, 8, 8, 8, 8, 8, 8, 8],
       [9, 9, 9, 9, 9, 9, 9, 9, 9, 9]])

In [623]:
C = np.array([
    [0, 2, 1],
    [1, 3, 1],
    [0, 0, 0],
    [2, 2, 1]])

In [624]:
D = np.array([
    [0.2, 0.3, 0.4],
    [0.1, 0.35, 0.18],
    [0.5, 0.6, 0.9],
    [2.1, 3.2, 3.1]
])

In [625]:
np.argmax(np.sum((C[:,:,np.newaxis] == np.arange(4)[np.newaxis, np.newaxis, :]).astype(int) * D[:,:,np.newaxis], axis=1), axis=1)

array([1, 3, 0, 2])

In [572]:
y = np.array([1, 0, 1, 1, 0, 2, 2, 1])

In [573]:
neibors = np.array([[3, 2],
                    [4, 6],
                    [7, 0],
                    [1, 5]])

In [574]:
y[neibors]

array([[1, 1],
       [0, 2],
       [1, 1],
       [0, 2]])