# Метод k-ближайших соседей

#### Настройка среды

- Создаем изолированное окружение: `python -m venv venv`
- Активируем: (unix) `. venv/bin/activate` или (win) `. venv/Scripts/activate`
- Устанавливаем зависимости: `pip install -r practicum_8/requirements.txt`

In [1]:
import random
import warnings
from typing import Callable, Union

import category_encoders as ce
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import yaml
from numpy.typing import NDArray
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler


TEST_SPLIT = 0.2
SEED = 42

random.seed(SEED)
np.random.seed(SEED)

## Реализуем простейшую версию kNN для задач классификации

#### Метрики

In [None]:
NpIntArrayT = NDArray[np.int_]
NpFloatArrayT = NDArray[np.float_]
NpArrayT = NDArray[Union[np.float_, np.int_]]
MetricT = Callable[[NpArrayT, NpArrayT, Union[NpArrayT, None]], NpFloatArrayT]

# vector (1, m)
# matrix (n, m)


def euclidean_distance(sample: NpArrayT, train_samples: NpArrayT) -> NpFloatArrayT:
    return np.sqrt(np.sum((sample - train_samples) ** 2, axis=1))


def cosine_distance(sample: NpArrayT, train_samples: NpArrayT) -> NpFloatArrayT:
    dot_prod = sample @ train_samples.T
    norm1 = np.sqrt(np.sum(sample**2))
    norm2 = np.sqrt(np.sum(train_samples**2, axis=1))
    return 1 - dot_prod / norm1 / norm2


def custom_metric(
    sample: NpArrayT, train_samples: NpArrayT, weights: Union[NpArrayT, None] = None
) -> NpFloatArrayT:
    raise NotImplementedError

In [None]:
sample = np.array([1, 0, -1])
train_samples = np.array([[-1, -1, 1], [1, 0, 0], [2, -2, 0]])

print(euclidean_distance(sample=sample, train_samples=train_samples), end="\n\n")

# Замечание: cosine distance = 1 - cosine similarity
print(
    "Наш cosine distance: ", cosine_distance(sample=sample, train_samples=train_samples)
)
print(
    "sklearn cosine distance: ",
    1 - cosine_similarity(sample.reshape(1, -1), train_samples),
)

#### Поиск соседей

Необходимо рассчитать расстояния от каждого тестового наблюдения до наблюдений в обучающей выборке, а затем просто отсортировать по возрастанию.

In [None]:
# data_train: (N, m)
# data_test: (n, m)


def get_neighbors_info(
    data_train: NpArrayT, data_test: NpArrayT, metric: MetricT
) -> tuple[NpIntArrayT, NpFloatArrayT]:
    distances = np.array([metric(test_sample, data_train) for test_sample in data_test])

    neighbors = np.zeros_like(distances, dtype=np.int_)
    for sample_idx, sample_dists in enumerate(distances):
        neighbors[sample_idx, :] = np.argsort(sample_dists)

    return neighbors, distances

In [None]:
get_neighbors_info(
    data_train=train_samples,
    data_test=np.array([sample, sample]),
    metric=euclidean_distance,
)

#### Обернем в fit-predict логику sklearn'а 

In [None]:
class KNNClassifier:
    def __init__(
        self,
        k: int = 5,
        metric: MetricT = euclidean_distance,
    ) -> None:
        self.k = k
        self.metric = metric

        self.data_train: Union[NpArrayT, None] = None
        self.y_train: Union[NpIntArrayT, None] = None

    def fit(self, X: pd.DataFrame, y: pd.DataFrame) -> None:
        self.data_train = X.values
        self.y_train = y.values.reshape(-1)

    def predict(self, X_test: pd.DataFrame) -> NpIntArrayT:
        assert self.data_train is not None
        neighbors, distances = get_neighbors_info(
            data_train=self.data_train, data_test=X_test.values, metric=self.metric
        )
        neighbors_k = neighbors[:, : self.k]
        distances_k = distances[:, : self.k]
        return self._predict(neighbors_k=neighbors_k, distances_k=distances_k)

    def _predict(
        self, neighbors_k: NpIntArrayT, distances_k: NpFloatArrayT
    ) -> NpIntArrayT:
        assert self.y_train is not None
        y_pred = [
            np.argmax(np.bincount(self.y_train[sample_neighbors.T]))
            for sample_neighbors in neighbors_k
        ]
        return np.array(y_pred, dtype=np.int_)

#### Задание на дом

- Реализовать метрику, учитывающую значимость столбцов (опциональный параметр `weights` в матрике)
- Реализовать предсказание таргета с учетом расстояний до ближайших соседей (на манер `weights=distance` в KNeighborsClassifier)

In [3]:
import numpy as np
from collections import Counter

class SimpleKNN:
    def __init__(self, k=5, metric='euclidean', weights=None, use_distance_weights=False):
        self.k = k
        self.metric = metric
        self.weights = weights
        self.use_distance_weights = use_distance_weights
        
    def fit(self, X, y):
        self.X_train = np.array(X)
        self.y_train = np.array(y)
        
    def predict(self, X_test):
        predictions = []
        for x in X_test:
            if self.metric == 'euclidean':
                distances = self._euclidean_distance(x)
            else:
                distances = self._cosine_distance(x)
            
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train[nearest_indices]
            nearest_distances = distances[nearest_indices]

            if self.use_distance_weights:
                pred = self._weighted_vote(nearest_labels, nearest_distances)
            else:
                pred = self._majority_vote(nearest_labels)
            
            predictions.append(pred)
        
        return np.array(predictions)
    
    def _euclidean_distance(self, x):
        if self.weights is not None:
            return np.sqrt(np.sum(self.weights * (self.X_train - x)**2, axis=1))
        return np.sqrt(np.sum((self.X_train - x)**2, axis=1))
    
    def _cosine_distance(self, x):
        norm_x = np.sqrt(np.sum(x**2))
        norm_train = np.sqrt(np.sum(self.X_train**2, axis=1))
        dot = self.X_train @ x
        return 1 - dot / (norm_train * norm_x)
    
    def _majority_vote(self, labels):
        return Counter(labels).most_common(1)[0][0]
    
    def _weighted_vote(self, labels, distances):
        weights = 1 / (distances + 1e-10)  
        vote_counts = {}
        for label, weight in zip(labels, weights):
            vote_counts[label] = vote_counts.get(label, 0) + weight
        return max(vote_counts.items(), key=lambda x: x[1])[0]

if __name__ == "__main__":

    X_train = [[1, 2], [1, 4], [4, 1], [4, 3]]
    y_train = [0, 0, 1, 1]
    X_test = [[2, 3], [3, 1]]
    
    knn = SimpleKNN(k=2)
    knn.fit(X_train, y_train)
    print("Обычный KNN:", knn.predict(X_test))
    
    knn_weights = SimpleKNN(k=2, weights=[0.8, 0.2]) 
    knn_weights.fit(X_train, y_train)
    print("KNN с весами признаков:", knn_weights.predict(X_test))
    
    knn_dist = SimpleKNN(k=2, use_distance_weights=True)
    knn_dist.fit(X_train, y_train)
    print("KNN с весами расстояний:", knn_dist.predict(X_test))

Обычный KNN: [0 1]
KNN с весами признаков: [0 1]
KNN с весами расстояний: [0 1]


## Эксперименты

Протестируем реализованный нами `KNNClassifier` и его стандартную реализацию из scikit-learn `sklearn.neighbors.KNeighborsClassifier` на датасетах [WDBC](https://www.kaggle.com/datasets/mohaiminul101/wisconsin-diagnostic-breast-cancer-wdbc) и [Mushrooms](https://www.kaggle.com/datasets/uciml/mushroom-classification).

In [None]:
with open("../config.yaml", "r") as f:
    cfg = yaml.safe_load(f)

#### Подготовка WDBC

In [None]:
df_wdbc = pd.read_csv(cfg["classification"]["wdbc"])
target_col_wdbc = "diagnosis"

df_wdbc = df_wdbc.drop(["id", "Unnamed: 32"], axis=1)
df_wdbc[target_col_wdbc] = df_wdbc[target_col_wdbc].replace({"B": 0, "M": 1})
feature_cols_wdbc = list(df_wdbc.columns)
feature_cols_wdbc.remove(target_col_wdbc)

y_wdbc = df_wdbc[[target_col_wdbc]]
X_wdbc = df_wdbc[feature_cols_wdbc]

X_wdbc_train, X_wdbc_test, y_wdbc_train, y_wdbc_test = train_test_split(
    X_wdbc, y_wdbc, test_size=TEST_SPLIT, random_state=SEED
)

# Нормализация!
standard_scaler = StandardScaler()
X_wdbc_train[feature_cols_wdbc] = standard_scaler.fit_transform(X_wdbc_train)
X_wdbc_test[feature_cols_wdbc] = standard_scaler.transform(X_wdbc_test)

#### Подготовка Mushrooms

In [None]:
df_mushrooms = pd.read_csv(cfg["classification"]["mushrooms"])
target_col_mushrooms = "class"

feature_cols_mushrooms = list(df_mushrooms.columns)
feature_cols_mushrooms.remove(target_col_mushrooms)

y_mushrooms = df_mushrooms[[target_col_mushrooms]]
X_mushrooms = df_mushrooms[feature_cols_mushrooms]

(
    X_mushrooms_train,
    X_mushrooms_test,
    y_mushrooms_train,
    y_mushrooms_test,
) = train_test_split(X_mushrooms, y_mushrooms, test_size=TEST_SPLIT, random_state=SEED)

count_encoder = ce.CountEncoder()
X_mushrooms_train = count_encoder.fit_transform(X_mushrooms_train)
X_mushrooms_test = count_encoder.transform(X_mushrooms_test)

# Нормализация!
standard_scaler = StandardScaler()
X_mushrooms_train[feature_cols_mushrooms] = standard_scaler.fit_transform(
    X_mushrooms_train
)
X_mushrooms_test[feature_cols_mushrooms] = standard_scaler.transform(X_mushrooms_test)

label_encoder = LabelEncoder()
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    y_mushrooms_train[target_col_mushrooms] = label_encoder.fit_transform(
        y_mushrooms_train.values
    )
    y_mushrooms_test[target_col_mushrooms] = label_encoder.fit_transform(
        y_mushrooms_test.values
    )

#### Сравним нашу реализацию с реализацией sklearn

In [None]:
K_NEIGHBORS = 5

knn_wdbc_ours = KNNClassifier(k=K_NEIGHBORS, metric=euclidean_distance)
knn_wdbc_sklearn = KNeighborsClassifier(
    n_neighbors=K_NEIGHBORS, metric="euclidean", algorithm="brute", weights="uniform"
)

knn_wdbc_ours.fit(X=X_wdbc_train, y=y_wdbc_train)
knn_wdbc_sklearn.fit(X=X_wdbc_train, y=y_wdbc_train.values.reshape(-1))

y_wdbc_pred_ours = knn_wdbc_ours.predict(X_test=X_wdbc_test)
y_wdbc_pred_sklearn = knn_wdbc_sklearn.predict(X=X_wdbc_test)

print("Наш kNN: ", accuracy_score(y_true=y_wdbc_test, y_pred=y_wdbc_pred_ours))
print("kNN sklearn: ", accuracy_score(y_true=y_wdbc_test, y_pred=y_wdbc_pred_sklearn))

In [None]:
K_NEIGHBORS = 5

knn_mushrooms_ours = KNNClassifier(k=K_NEIGHBORS, metric=cosine_distance)
knn_mushrooms_sklearn = KNeighborsClassifier(
    n_neighbors=K_NEIGHBORS, metric="cosine", algorithm="brute", weights="uniform"
)

knn_mushrooms_ours.fit(X=X_mushrooms_train, y=y_mushrooms_train)
knn_mushrooms_sklearn.fit(X=X_mushrooms_train, y=y_mushrooms_train.values.reshape(-1))

y_mushrooms_pred_ours = knn_mushrooms_ours.predict(X_test=X_mushrooms_test)
y_mushrooms_pred_sklearn = knn_mushrooms_sklearn.predict(X=X_mushrooms_test)

print(
    "Наш kNN: ", accuracy_score(y_true=y_mushrooms_test, y_pred=y_mushrooms_pred_ours)
)
print(
    "kNN sklearn: ",
    accuracy_score(y_true=y_mushrooms_test, y_pred=y_mushrooms_pred_sklearn),
)