In [1]:
import numpy as np

In [2]:
def euclidean_distance(x, y):
    u = np.diagonal((x @ x.T)).reshape(x.shape[0], 1)   # квадраты векторов из матрицы x
    v = np.diagonal(y @ y.T).reshape(1, y.shape[0])     # квадраты векторов из матрицы y
    u = u @ np.ones(y.shape[0]).reshape(1, y.shape[0])  # приводим к нужной размерности, так как на выходе должна быть матрица попарных расстояний 
    v = np.ones(x.shape[0]).reshape(x.shape[0], 1) @ v  # приводим к нужной размерности, так как на выходе должна быть матрица попарных расстояний 
    return np.sqrt(u + v - 2*(x @ y.T))                 # вычитаем попарные скалярные произведения векторов из x и векторов из y 

In [181]:
x = np.array([[1,0,1],[0,0,1]])

In [182]:
y = np.array([[1,1,1], [1,2,3], [4,0,1]])

In [183]:
euclidean_distance(np.array([[1,0,1],[0,0,1]]), np.array([[1,1,1], [1,2,3], [4,0,1]]))

array([[1.        , 2.82842712, 3.        ],
       [1.41421356, 3.        , 4.        ]])

In [3]:
def cosine_distance(x, y): # 1- cos(x,y); cos(x,y) = (x,y)/|x||y|
    u = np.sqrt(np.diagonal((x @ x.T)).reshape(x.shape[0], 1))  # длины векторов из матрицы x
    v = np.sqrt(np.diagonal(y @ y.T).reshape(1, y.shape[0]))    # длины векторов из матрицы y
    u = u @ np.ones(y.shape[0]).reshape(1, y.shape[0])  # приводим к нужной размерности, так как на выходе должна быть матрица попарных расстояний 
    v = np.ones(x.shape[0]).reshape(x.shape[0], 1) @ v  # приводим к нужной размерности, так как на выходе должна быть матрица попарных расстояний 
    return 1 - x @ y.T / u /v                           # делим скалярные произведения векторов их на длины

In [207]:
x = np.array([[1,0,1],[0,0,1]])

In [208]:
y = np.array([[1,1,1], [1,2,3], [4,0,1]])

In [209]:
cosine_distance(np.array([[1,0,1],[0,0,1]]), np.array([[1,1,1], [1,2,3], [4,0,1]]))

array([[0.18350342, 0.24407105, 0.14250707],
       [0.42264973, 0.19821627, 0.75746437]])

In [4]:
class NearestNeighborsFinder:
    def __init__(self, n_neighbors, metric="euclidean"):
        self.n_neighbors = n_neighbors

        if metric == "euclidean":
            self._metric_func = euclidean_distance
        elif metric == "cosine":
            self._metric_func = cosine_distance
        else:
            raise ValueError("Metric is not supported", metric)
        self.metric = metric

    def fit(self, X, y=None):
        self._X = X
        return self

    def kneighbors(self, X, return_distance=False):  # X - объекты тестовой выборки
        # Сначала считаем расстояния от тестовой выборки до обучающей
        Dist = self._metric_func(X, self._X)
        # обрабатываем случай, когда число соседей = n_neighbors, сортируем и выводим всё
        if self.n_neighbors == Dist.shape[1]: 
            indices = Dist.argsort(axis=1)
            distances = np.take_along_axis(Dist, indices, axis=1)
            if return_distance:
                return distances, indices
            else:
                return indices
        else:
            # находим индексы n_neighbors лучших в каждой строке, то есть ближайших соседей 
            indices = np.argpartition(Dist, self.n_neighbors, axis=1)[:, :self.n_neighbors] 
            ranks_top = np.take_along_axis(Dist, indices, axis=1)  # выбираем этих лучших
            # np.argpartition возвращает неотсортриованный список лучших, поэтому сортируем его
            indices_top = ranks_top.argsort(axis=1)                                   
            indices = np.take_along_axis(indices, indices_top, axis=1)   # получаем индексы отсортрованных ближайших соседей
            distances = np.take_along_axis(Dist, indices, axis=1)  # получаем расстояния до ближайших соседей
            if return_distance:
                return distances, indices
            else:
                return indices

In [5]:
seed = np.random.RandomState(9872)
X = seed.permutation(25).reshape(5, -1)

In [6]:
nn = NearestNeighborsFinder(n_neighbors=1, metric='euclidean')
nn.fit(X)

<__main__.NearestNeighborsFinder at 0x19d2755e8b0>

In [7]:
distances, indices = nn.kneighbors(X, return_distance=True)  

In [5]:
class KNNClassifier:
    EPS = 1e-5

    def __init__(self, n_neighbors, algorithm='my_own', metric='euclidean', weights='uniform'):
        if algorithm == 'my_own':
            finder = NearestNeighborsFinder(n_neighbors=n_neighbors, metric=metric)
        elif algorithm in ('brute', 'ball_tree', 'kd_tree',):
            finder = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm, metric=metric)
        else:
            raise ValueError("Algorithm is not supported", metric)

        if weights not in ('uniform', 'distance'):
            raise ValueError("Weighted algorithm is not supported", weights)

        self._finder = finder
        self._weights = weights

    def fit(self, X, y=None):
        self._finder.fit(X)
        self._labels = np.asarray(y)
        return self

    def _predict_precomputed(self, indices, distances):  
        # найдём метки соседей
        kneighbors_labels = np.take_along_axis(self._labels.reshape(-1, 1), indices.T, axis=0)
        if self._weights == 'distance':
            weights = 1 / (distances + self.EPS)  # веса обратно пропорциальны расстояниям до соседей
            # сделаем из меток сосдей 3d-array, взял пример преобразования со stackoverflow
            kneighbors_labels_3D = kneighbors_labels.T[:,:, np.newaxis]
            weighrs_3d = weights[:, :, np.newaxis]  # аналогично поступим с весами
            # размножим метки соседей по матрицам: по строкам соседи, по столбцам возможные метки,
            # а третья размерность - элементы тестовой выборки
            # сделали это, чтобы можно было по отдельности посчитать взвешенную сумму для каждого возможного лейбла 
            mask = kneighbors_labels_3D == np.arange(self._labels.max() + 1)
            # заполним эту маску весами, для этого умножим её на веса
            weighted_mask = weighrs_3d * mask
            # посчитаем требуемую сумму по столбцам(3d-array, поэтому тут axis=1, то есть вдоль второй оси)
            res = np.sum(weighted_mask, axis=1)
            return np.argmax(res, axis=1) # выберем индексы меток с максимальной суммой по строчкам
        else:
            def find_label(a):
                return np.argmax(np.bincount(a))
            return np.apply_along_axis(find_label, 0, kneighbors_labels)  # выберем самые часто встречаемые метки
            
    def kneighbors(self, X, return_distance=False):
        return self._finder.kneighbors(X, return_distance=return_distance)

    def predict(self, X):
        distances, indices = self.kneighbors(X, return_distance=True)
        return self._predict_precomputed(indices, distances)

In [9]:
seed = np.random.RandomState(9872)
X_train = seed.permutation(25).reshape(5, -1)

In [10]:
nn = KNNClassifier(n_neighbors=3)
nn.fit(X_train, y = np.array([1,2,2,1,3]))

<__main__.KNNClassifier at 0x19d27573190>

In [10]:
seed = np.random.RandomState(123)
X_test = seed.permutation(20).reshape(-1, 5)

In [12]:
distances, indices = nn.kneighbors(X_test, return_distance=True)

In [13]:
nn._predict_precomputed(indices, distances)

array([1, 1, 1, 1], dtype=int64)

In [6]:
class BatchedKNNClassifier(KNNClassifier):
    '''
    Нам нужен этот класс, потому что мы хотим поддержку обработки батчами
    в том числе для классов поиска соседей из sklearn
    '''

    def __init__(self, n_neighbors, algorithm='my_own', metric='euclidean', weights='uniform', batch_size=None):
        KNNClassifier.__init__(
            self,
            n_neighbors=n_neighbors,
            algorithm=algorithm,
            weights=weights,
            metric=metric,
        )
        self._batch_size = batch_size

    def kneighbors(self, X, return_distance=False):
        if self._batch_size is None or self._batch_size >= X.shape[0]:
            return super().kneighbors(X, return_distance=return_distance)
            # super() - доступ к классу-наследнику, который не разбивает на батчи
        else:
            import math as m
            number_of_steps = m.ceil(X.shape[0] / self._batch_size)
            if return_distance:
                # создаём два списка, в которые будем складывать индексы и расстояния для каждого куска
                batched_ind = list(np.zeros(number_of_steps, int))
                batched_dist = list(np.zeros(number_of_steps))
                for i in range(number_of_steps):  # выполняем поиск ближающих соседей для каждого батча
                    batched_dist[i], batched_ind[i] = super().kneighbors(X[self._batch_size*i:self._batch_size*(i+1), :],
                                                                     return_distance=return_distance)
                batched_ind = np.vstack(batched_ind)  # собираем результаты вместе
                batched_dist = np.vstack(batched_dist)
                return batched_dist, batched_ind
            else:
                return np.vstack([super().kneighbors(X[self._batch_size*i:self._batch_size*(i+1), :],
                                                                     return_distance=return_distance) for i in range(number_of_steps)])

In [246]:
from sklearn.model_selection import KFold, BaseCrossValidator
from sklearn.metrics import accuracy_score
def knn_cross_val_score(X, y, k_list, scoring, metric, weights, cv=None, **kwargs):
    y = np.asarray(y)

    if scoring == "accuracy":
        scorer = accuracy_score
    else:
        raise ValueError("Unknown scoring metric", scoring)

    if cv is None:
        cv = KFold(n_splits=5)
    elif not isinstance(cv, BaseCrossValidator):
        raise TypeError("cv should be BaseCrossValidator instance", type(cv))

    i = 0
    scores = {}
    k_max = np.max(k_list)
    models_KFold = list(np.zeros(cv.get_n_splits()))
    distances_KFold = list(np.zeros(cv.get_n_splits()))
    indices_KFold = list(np.zeros(cv.get_n_splits()))
    score_KFold = list(np.zeros(cv.get_n_splits()))

    for train_index, test_index in cv.split(X):  # разбиваем обучающую выборку на фолды
        for k in np.sort(k_list)[::-1]:
            if k == k_max:
                x_train, x_test = X[train_index], X[test_index]
                y_train, y_test = y[train_index], y[test_index]
                models_KFold[i] = BatchedKNNClassifier(n_neighbors=k_max, metric=metric, weights=weights)
                models_KFold[i].fit(x_train, y_train)
                # считаем расстояния и индексы соседей только для большего k
                distances_KFold[i], indices_KFold[i] = models_KFold[i].kneighbors(x_test, return_distance=True)
                y_pred = models_KFold[i]._predict_precomputed(indices_KFold[i], distances_KFold[i])
                scores[k] = accuracy_score(y_test, y_pred)  # считаем скоры для всех k
            else:
                # выбираем нужное количество сеседей
                y_pred = models_KFold[i]._predict_precomputed(indices_KFold[i][:, :k], distances_KFold[i][:, :k])
                scores[k] = accuracy_score(y_test, y_pred)
        score_KFold[i] = list(scores.values())
        i += 1
    score_KFold = np.asarray(score_KFold)
    scores = {}
    for k in range(len(k_list)):  # преобразуем вывод к нужному виду
        scores[str(np.sort(k_list)[::-1][k])] = score_KFold[:, k]
    return scores  # возвращаем значения accuracy для каждого k