In [1]:
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
def load_data():
    from sklearn.datasets import load_iris
    return load_iris().data, load_iris().target

In [3]:
X, y = load_data()
print(X.shape, y.shape)

(150, 4) (150,)


In [4]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

## Реализовать будем ручной Kernel KNN с Гауссовским ядром

Формула класса нашего объекта определиться как:

### 1. Для общего случая
Пусть дана обучающая выборка $X = (x_i, y_i)^N_{i=1}$, где $x_i \in X \in R$, а $y_i \in Y = \{1, ..., c\}$. Пусть также задана некоторая симметричная по своим аргументам функция расстояния $\rho: X \times X \rightarrow [0, +\infty)$. Тогда нам необходимо среди ближайших k соседей (расстояние до которых меньше чем до других) выбрать наиболее встречающийся класс для нашего объекта $u$. Найденных $k$ соседей обозначим как $X_k(u) = x^{(1)}_u, ..., x^{(k)}_u$.

Тогда опишем задачу как:
$$ \forall x_{in} \in X_k(u); \space\space \forall x_{out} \in X \setminus X_k (u): \rho(u, x_{in}) \le \rho(u, x_{out})$$

Метку класса объекта $x_u^{(i)}$ обозначим как $y_u^{(i)}$. Тогда, будем присваивать нашему классифицируемому объекту метку класса, наиболее встречающегося между соседями:

$$a(u) = {\arg\max}_{y \in Y} \sum_i^k \mathbb{I}\left[ y_u^{(i)} = y\right]$$

### 2. Взвешенный KNN
Полученная функция не учитывает расстояния объектов от нашего класс. объекта $u$, т. е. все объекты, даже самые дальние соседи вносят равнозначный вклад в предсказываемый класс. Чтобы это изменить используют взвешанный KNN. Общая формула которого принимает вид:

$$a(u) = {\arg\max}_{y \in Y} \sum_i^k w_i\mathbb{I}\left[ y_u^{(i)} = y\right]$$

где $w_i$ - вес $i$ соседа. 

Способов выбора весов множество, но все они делятся на 2 группы:

1. Вес выбирается в зависимости от порядкогового номера в списке ближайших соседей отсортированном по увеличению функции расстояния от $u$
2. Вес - функция от расстояния

#### 2.1  
1. Линейный вес: $w_i = \frac{k+1-i}{k}$
2. Экспоненциальный вес: $w_i = q^i \space (0 < q < 1)$

#### 2.2 Ядерная функция.

Функция должна обладать следующими свойствами:

1. Положительна на области определения
2. Монотонно возрастающая

Общая формула ядерных функций:
$$K: \mathbb{R} \rightarrow \mathbb{R}$$
$$a(u)= \arg\max_{y_i \in \mathbb {Y}} K \left( \frac{\rho(u, x_u^{(i)})}{h}\right) \mathbb{I}\left[ y_u^{(i)} = y\right] $$
Существует множество функций но самые часто-используемые ядра:

1. Гауссовская ядерная функция:
$$K(x) = \frac{1}{\sqrt{2\pi}}\exp{\left(-2x^2\right)}$$
2. Прямоугольное ядро:
$$K(x) = \frac{1}{2} \mathbb{I} \left[ \left| x \right| \le 1 \right]$$
Реализуем модель KNN с гауссовским ядром при помощи отдельного класса

#### Пошаговая реализация

In [6]:
# (n, m) -> (n, 1, m)
# -
# (c, n)

# (n, 1, m)
# -
# (m, c) -> (1, c, m)

# (n, 1, m)
# -
# (1, c, m)

In [8]:
X_train[:, np.newaxis, :] - X_test

array([[[ 1.4, -0.6,  2.9,  1.2],
        [ 1.8,  0. ,  2.9,  1.3],
        [ 1.6, -0.2,  2.7,  0.9],
        ...,
        [-0.1,  0.1, -1.1, -0.7],
        [ 0.6,  0.5, -1.2,  0. ],
        [ 0.3,  0.4, -0.9, -0.5]],

       [[-0.5, -0.6,  0.1,  0. ],
        [-0.1,  0. ,  0.1,  0.1],
        [-0.3, -0.2, -0.1, -0.3],
        ...,
        [-2. ,  0.1, -3.9, -1.9],
        [-1.3,  0.5, -4. , -1.2],
        [-1.6,  0.4, -3.7, -1.7]],

       [[ 0.4,  0.1,  0.2,  0.1],
        [ 0.8,  0.7,  0.2,  0.2],
        [ 0.6,  0.5,  0. , -0.2],
        ...,
        [-1.1,  0.8, -3.8, -1.8],
        [-0.4,  1.2, -3.9, -1.1],
        [-0.7,  1.1, -3.6, -1.6]],

       ...,

       [[-0.2, -0.2, -0.1,  0.1],
        [ 0.2,  0.4, -0.1,  0.2],
        [ 0. ,  0.2, -0.3, -0.2],
        ...,
        [-1.7,  0.5, -4.1, -1.8],
        [-1. ,  0.9, -4.2, -1.1],
        [-1.3,  0.8, -3.9, -1.6]],

       [[-0.7, -0.1, -0.5,  0. ],
        [-0.3,  0.5, -0.5,  0.1],
        [-0.5,  0.3, -0.7, -0.3],
        .

In [9]:
distances = np.sqrt(((X_train[:, np.newaxis, :] - X_test)**2).sum(axis=2)).T
distances.shape

(30, 120)

In [10]:
#(30, 120)
#(120)

In [11]:
sorted_idx = np.argsort(distances, axis=1)
sorted_idx

array([[ 58,   9,   5, ...,  79,   4,  71],
       [ 41,   1,  95, ...,  79,   4,  71],
       [ 72, 100,  17, ...,  79,   4,  71],
       ...,
       [ 37, 110,  67, ..., 119, 109, 118],
       [ 98,  62,  87, ...,  63, 109, 118],
       [107,  44,  87, ..., 119, 109, 118]], dtype=int64)

In [12]:
knn = sorted_idx[:, :5]
knn

array([[ 58,   9,   5,  48,  64],
       [ 41,   1,  95,  55,  49],
       [ 72, 100,  17, 114,  56],
       [ 11,  92,  80,  94,  14],
       [ 69,  93, 113,  21, 102],
       [109,  83,  24,   3,  20],
       [112, 105,  46, 116,  73],
       [ 12, 106,  72,   1, 114],
       [102,  69,  21,  93, 113],
       [ 78,  36,  11,  77,  34],
       [102,  38,  67, 107, 115],
       [ 81,  18,  62,  77,  52],
       [ 73, 105, 112,  26,   7],
       [ 74,   0,  61, 104,  99],
       [ 19,  45,  65, 116, 112],
       [ 97,  46, 105,  80,  73],
       [ 58,  86,   5,  16,  64],
       [ 62,  84,  81,  77,  18],
       [109,  24,  20,   3,  83],
       [ 96,  29,  84,  77,  14],
       [ 41,   1,  55,  88,  95],
       [ 69, 110, 113,  93,  21],
       [ 58,  56,  68,   9,  25],
       [ 24,   3,  12, 106,  20],
       [ 81,  77,  14,  62,  18],
       [ 79,  71,  10,  23,  47],
       [ 96,  29,  84,  77,  14],
       [ 37, 110,  67,  21,  87],
       [ 98,  62,  87,  40,  52],
       [107,  

In [13]:
knn_targets = y_train[knn]
knn_targets

array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [1, 1, 1, 1, 2],
       [2, 2, 2, 2, 2],
       [0, 0, 0, 0, 0],
       [1, 1, 1, 1, 1],
       [0, 0, 0, 0, 0],
       [2, 2, 2, 2, 2],
       [1, 1, 1, 2, 1],
       [2, 2, 2, 2, 2],
       [2, 1, 2, 2, 2],
       [1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1],
       [0, 0, 0, 0, 0],
       [2, 2, 2, 2, 1],
       [0, 0, 0, 0, 0],
       [2, 2, 2, 2, 2],
       [0, 0, 0, 0, 0],
       [2, 2, 2, 2, 2],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [2, 2, 2, 2, 1],
       [2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2],
       [2, 2, 2, 2, 2]])

In [15]:
gaussian_distances = np.exp(-(distances[:, :5]/2)**2/2)/np.sqrt(2 * np.pi)
gaussian_distances

array([[0.08714442, 0.36919197, 0.38812084, 0.36781009, 0.00400509],
       [0.0752877 , 0.39744905, 0.34294386, 0.39398654, 0.002763  ],
       [0.10472279, 0.387636  , 0.36781009, 0.37853811, 0.00508511],
       [0.37524035, 0.08074667, 0.09862421, 0.0607986 , 0.14657932],
       [0.24287982, 0.01375382, 0.02001169, 0.00944105, 0.28148151],
       [0.04024826, 0.37383583, 0.27418875, 0.38666812, 0.00099259],
       [0.33405871, 0.13329527, 0.13838875, 0.10498492, 0.08176234],
       [0.09606901, 0.39006631, 0.35118626, 0.37901158, 0.00465325],
       [0.28502211, 0.02200604, 0.03217909, 0.01560462, 0.26179631],
       [0.3828207 , 0.07054965, 0.09715588, 0.05338697, 0.17548731],
       [0.32176348, 0.03011631, 0.04305881, 0.02167841, 0.24136655],
       [0.3463905 , 0.04729085, 0.05747308, 0.0342973 , 0.17417608],
       [0.33826065, 0.14475849, 0.15799553, 0.11573658, 0.08237786],
       [0.39398654, 0.06187194, 0.08979836, 0.04612323, 0.1889187 ],
       [0.31186386, 0.15010225, 0.

Дальше нужно свести матрицу дистанций к матрице размера (с, z). z - кол-во классов в общучающем датасете. Где каждой ячейкой строки будет являться значение равное сумме весов умноженной на соответсиве классов (Формула взвешанного KNN)

In [None]:
# np.unique to knn_targets broadcasting
# (c, k)
# (z) -> (z, 1, 1)
# (z, c, k)

In [None]:
# (z, c, k)
# (c, k) -> (1, c, k)
# (z, c, k)

In [75]:
test = np.unique(y_train)[:, np.newaxis, np.newaxis] == knn_targets

In [113]:
w_matrix = (test * gaussian_distances).sum(axis=2)
w_matrix.T

array([[1.21627242, 0.        , 0.        ],
       [1.21243014, 0.        , 0.        ],
       [1.24379209, 0.        , 0.        ],
       [0.        , 0.61540983, 0.14657932],
       [0.        , 0.        , 0.56756788],
       [1.07593355, 0.        , 0.        ],
       [0.        , 0.79248999, 0.        ],
       [1.22098641, 0.        , 0.        ],
       [0.        , 0.        , 0.61660815],
       [0.        , 0.72601355, 0.05338697],
       [0.        , 0.        , 0.65798355],
       [0.        , 0.04729085, 0.61233697],
       [0.        , 0.83912911, 0.        ],
       [0.        , 0.78069877, 0.        ],
       [0.        , 0.79214748, 0.        ],
       [0.        , 0.75883144, 0.        ],
       [1.16601061, 0.        , 0.        ],
       [0.        , 0.16588864, 0.50453928],
       [1.13616195, 0.        , 0.        ],
       [0.        , 0.        , 0.62771323],
       [1.21724369, 0.        , 0.        ],
       [0.        , 0.        , 0.63767939],
       [1.

In [115]:
c_matrix = np.argmax(w_matrix, axis=0)
c_matrix

array([0, 0, 0, 1, 2, 0, 1, 0, 2, 1, 2, 2, 1, 1, 1, 1, 0, 2, 0, 2, 0, 2,
       0, 0, 2, 2, 2, 2, 2, 2], dtype=int64)

In [118]:
proba_matrix = (w_matrix / w_matrix.sum(axis=0))
proba_matrix.T

array([[1.        , 0.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.80763594, 0.19236406],
       [0.        , 0.        , 1.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [0.        , 0.93150252, 0.06849748],
       [0.        , 0.        , 1.        ],
       [0.        , 0.07169323, 0.92830677],
       [0.        , 1.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.24743695, 0.75256305],
       [1.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [1.

#### Готовый класс

In [18]:
class KernelKnnClassifier():
    def __init__(self, n_neibghours: int = 5, *, h: float, metric: str = 'euclidean', kernel: str = 'gaussian'):
        self.n_neibghours = n_neibghours
        self.metric = metric
        self.kernel = kernel
        self.h = h
        self.X = None
        self.y = None
        is_fitted = False
        
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        self.X = X
        self.n = X.shape[0]
        self.m = X.shape[1]
        self.y = y
        self.c_val = np.unique(y)
        self.is_fitted = True
        
    def predict(self, X: np.ndarray, *, return_proba: bool = False) -> np.ndarray:
        assert self.is_fitted == True, 'You should fit the model before'
        assert X.ndim == 2, 'Input matrix should be 2 dimensional'
        assert X.shape[1] == self.X.shape[1], 'Number of features must be equal with training dataset'
        
        #Find nearest neibghours
        if self.metric == 'euclidean':
            distances = self.__calc_eucledean(self.X, X) ## (c, n)
        else:
            raise ValueError("Wrong metric argument")

        sorted_idx = np.argsort(distances, axis=1)
        knn_target = self.y[sorted_idx[:, :self.n_neibghours]]

        #Apply weights to distances
        if self.kernel == 'rectangular':
            weights = np.full_like(distances[:, :self.n_neibghours], 0.5)
        elif self.kernel == 'gaussian':
            weights = self.__calc_gaussian(distances[:, :self.n_neibghours]) #c, k
        else:
            raise ValueError("Wrong kernel argument")

        weight_matrix = ((self.c_val[:, np.newaxis, np.newaxis] == knn_target) * weights).sum(axis=2) #z, c
        proba_matrix = weight_matrix / weight_matrix.sum(axis=0) #z, c

        #Return predict
        if return_proba == False:
            return np.argmax(weight_matrix, axis=0)
        else:
            return proba_matrix.T
               
    def __calc_eucledean(self, X: np.ndarray, u: np.ndarray) -> np.ndarray:
        return np.sqrt(((X[:, np.newaxis, :] - u)**2).sum(axis=2)).T

    def __calc_gaussian(self, X: np.ndarray) -> np.ndarray:
        return np.exp(-2*(X/self.h)**2)/np.sqrt(2 * np.pi)
        

In [19]:
classifier = KernelKnnClassifier(5, h=2, kernel='gaussian')
classifier.fit(X_train, y_train)
classifier.predict(X_test, return_proba=True)

array([[1.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [1.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [1.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 2.26138861e-01, 7.73861139e-01],
       [0.00000000e+00, 0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.84591734e-04, 9.99815408e-01],
       [0.00000000e+00, 0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 1.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 1.00000000e+00, 0.00000000e+00],
       [1.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 9.99326589e-01, 6.73410608e-04],
       [1.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 0.00000

In [20]:
from sklearn.neighbors import KNeighborsClassifier

sk_classifier = KNeighborsClassifier(5, weights='distance', algorithm='brute')
sk_classifier.fit(X_train, y_train)
sk_classifier.predict_proba(X_test)

array([[1.        , 0.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.29791809, 0.70208191],
       [0.        , 0.        , 1.        ],
       [0.        , 0.12858452, 0.87141548],
       [0.        , 0.        , 1.        ],
       [0.        , 1.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [0.        , 1.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [0.        , 0.87128259, 0.12871741],
       [1.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        ],
       [0.        , 0.        , 1.        ],
       [1.        , 0.        , 0.        ],
       [0.        , 1.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [1.        , 0.        , 0.        ],
       [1.

In [21]:
self_predict.shape

(1,)

In [23]:
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score

self_f1 = []
sk_f1 = []
k_number = 10

kf = KFold(X.shape[0], shuffle=True)
for i, (train_index, test_index) in enumerate(kf.split(X)):
    self_classifier = KernelKnnClassifier(k_number, h=1, kernel='rectangular')
    self_classifier.fit(X[train_index], y[train_index])
    self_predict = self_classifier.predict(X[test_index], return_proba=False)
    self_f1.append(f1_score(self_predict, y[test_index], average='weighted'))

    sk_classifier = KNeighborsClassifier(k_number, weights='distance', algorithm='brute')
    sk_classifier.fit(X_train, y_train)
    sk_predict = sk_classifier.predict(X[test_index])
    sk_f1.append(f1_score(sk_predict, y[test_index], average='weighted'))

print('Self weighted KNN predictor f1 score', np.array(self_f1).mean())
print('Sklearn weighted KNN predictor f1 score', np.array(sk_f1).mean())

Self weighted KNN predictor f1 score 0.9666666666666667
Sklearn weighted KNN predictor f1 score 0.9866666666666667
