# Метод ближайших соседей

knn.py - файл содержит реализацию класса knn 

Импорт библиотек:

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

from sklearn.neighbors import NearestNeighbors

from data_utils import load_and_preprocess_data
from evaluation import grid_search_knn, evaluate_knn
from knn import KNN, uniform_kernel, gaussian_kernel, triangular_kernel, polynomial_kernel
from plot_utils import (
    plot_accuracy,
    plot_class_distribution,
    plot_accuracy_over_parameters,
)
from lowess_custom import calculate_lowess_weights


Загрузка данных, подготовка и настройка
## Были выбраны
### Ядра: 
- Однородное ядро
- Гауссово ядро
- Полиномиальное
- Треугольное

### Метрики:
- Евклидово расстояние (`euclidean`)
- Манхэттенское расстояние (`manhattan`)
- Расстояние Чебышева (`chebyshev`)
- Косинусное расстояние (`cosine`)
- Расстояние Минковского (`minkowski` с параметром `p`)

In [None]:
def load_data(csv_path, target_column):
    return load_and_preprocess_data(csv_path, target_column)


def set_parameters():
    neighbor_range = range(1, 21, 2)
    radius_range = [0.5, 1.0, 1.5, 2.0]
    metrics = ['euclidean', 'manhattan', 'chebyshev', 'cosine', 'minkowski']
    kernels = {
        'uniform': uniform_kernel,
        'gaussian': gaussian_kernel,
        'triangular': triangular_kernel,
        'polynomial': lambda d: polynomial_kernel(d, a=2, b=1)
    }
    p_range = [2, 3, 4]
    return neighbor_range, radius_range, metrics, kernels, p_range

Поиск лучших параметров

In [None]:
def find_best_parameters(X_train, y_train, X_val, y_val, neighbor_range, radius_range, metrics, kernels, p_range):
    best_params, best_accuracy, results = grid_search_knn(
        X_train, y_train, X_val, y_val,
        neighbor_range, radius_range, metrics, kernels, p_range
    )
    return best_params, best_accuracy, results

Оценка на тестовом множестве

In [None]:
def evaluate_test_set(X_train, y_train, X_test, y_test, best_params):
    metric = best_params['metric']
    if 'minkowski' in metric:
        p = int(metric.split('_p')[-1])
        base_metric = 'minkowski'
    else:
        p = None
        base_metric = metric

    test_accuracy = evaluate_knn(
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        n_neighbors=best_params.get('n_neighbors'),
        radius=best_params.get('radius'),
        metric=base_metric,
        kernel=best_params['kernel'],
        p=p
    )
    return test_accuracy

main.py 
Выполняет загрузку данных, настройку параметров и подбор гиперпараметров для KNN. Проводит оценку на тестовой выборке и строит графики зависимости точности.

In [None]:
def main():
    csv_path = "allrecipes_binarized_every.csv"
    target_column = 'Category'

    X_train, X_val, X_test, y_train, y_val, y_test, label_encoder, scaler = load_data(csv_path, target_column)

    print(f"Размер тренировочной выборки: {X_train.shape}")
    print(f"Размер валидационной выборки: {X_val.shape}")
    print(f"Размер тестовой выборки: {X_test.shape}")

    neighbor_range, radius_range, metrics, kernels, p_range = set_parameters()

    best_params, best_accuracy, results = find_best_parameters(
        X_train, y_train, X_val, y_val,
        neighbor_range, radius_range, metrics, kernels, p_range
    )

    print("\nЛучшие параметры:")
    print(best_params)
    print(f"Лучшая точность на валидационной выборке: {best_accuracy:.4f}")

    test_accuracy = evaluate_test_set(X_train, y_train, X_test, y_test, best_params)
    print(f"Точность на тестовом множестве с лучшими гиперпараметрами: {test_accuracy:.4f}")

    plot_accuracy_results(X_train, y_train, X_test, y_test, best_params, results)
    plot_class_distribution_func(y_train)
    plot_accuracy_over_parameters_func(results)
    
    calculate_lowess_weights(X_train, y_train, X_test, y_test, best_params)

main()

knn.py - файл реализующий KNN и ядра

In [None]:
def uniform_kernel(distances):
    return np.ones_like(distances)

def gaussian_kernel(distances, bandwidth=1.0):
    return np.exp(-0.5 * (distances / bandwidth) ** 2)

def polynomial_kernel(distances, a=2, b=1):
    return np.power((1 - np.power(np.abs(distances), a)), b)

def triangular_kernel(distances):
    return np.maximum(1 - distances, 0)

KERNELS = {
    'uniform': uniform_kernel,
    'gaussian': gaussian_kernel,
    'triangular': triangular_kernel,
    'polynomial': polynomial_kernel
}

Класс knn и реализация его методов

In [None]:
class KNN:
    def __init__(self, n_neighbors=5, radius=None, metric='euclidean', kernel='uniform', weights=None, p=2):
        self.n_neighbors = n_neighbors
        self.radius = radius
        self.metric = metric
        self.kernel = KERNELS.get(kernel, uniform_kernel)
        self.weights = weights if weights is not None else np.ones(1)
        self.p = p
        if radius is None:
            if self.metric == 'minkowski':
                self.model = NearestNeighbors(n_neighbors=n_neighbors, metric=self.metric, p=self.p)
            else:
                self.model = NearestNeighbors(n_neighbors=n_neighbors, metric=self.metric)
        else:
            if self.metric == 'minkowski':
                self.model = NearestNeighbors(radius=radius, metric=self.metric, p=self.p)
            else:
                self.model = NearestNeighbors(radius=radius, metric=self.metric)

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
        self.model.fit(X)

    def predict(self, X):
        predictions = []
        if self.radius is None:
            distances, indices = self.model.kneighbors(X)
            for i, neighbors in enumerate(indices):
                neighbor_labels = self.y_train[neighbors]
                neighbor_distances = distances[i]
                weights = self.kernel(neighbor_distances)
                if self.weights.size > 1:
                    weights *= self.weights[neighbors]
                weighted_votes = Counter()
                for label, weight in zip(neighbor_labels, weights):
                    weighted_votes[label] += weight
                most_common = weighted_votes.most_common(1)[0][0]
                predictions.append(most_common)
        else:
            neighbors = self.model.radius_neighbors(X, return_distance=True)
            for i in range(len(X)):
                neighbor_indices = neighbors[1][i]
                neighbor_distances = neighbors[0][i]
                if len(neighbor_indices) == 0:
                    distances, indices = self.model.kneighbors(X[i].reshape(1, -1))
                    neighbor_indices = indices[0]
                    neighbor_distances = distances[0]
                neighbor_labels = self.y_train[neighbor_indices]
                weights = self.kernel(neighbor_distances)
                if self.weights.size > 1:
                    weights *= self.weights[neighbor_indices]
                weighted_votes = Counter()
                for label, weight in zip(neighbor_labels, weights):
                    weighted_votes[label] += weight
                most_common = weighted_votes.most_common(1)[0][0]
                predictions.append(most_common)
        return np.array(predictions)

## Подбор лучших гиперпараметров

In [None]:
def evaluate_knn(X_train, y_train, X_test, y_test, n_neighbors=5, radius=None, metric='euclidean',
                kernel='uniform', weights=None, p=2):
    knn = KNN(n_neighbors=n_neighbors, radius=radius, metric=metric, kernel=kernel, weights=weights, p=p)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    accuracy = np.mean(y_pred == y_test)
    return accuracy

In [None]:
def grid_search_knn(X_train, y_train, X_val, y_val, neighbor_range, radius_range, metrics, kernels, p_range):
    best_accuracy = 0
    best_params = {}
    results = []
    print("Точность для разных комбинаций параметров:")

    for n_neighbors in neighbor_range:
        for metric in metrics:
            if metric == 'minkowski':
                for p in p_range:
                    for kernel_name in kernels.keys():
                        accuracy = evaluate_knn(
                            X_train, y_train, X_val, y_val,
                            n_neighbors=n_neighbors, radius=None,
                            metric=metric, kernel=kernel_name, p=p
                        )
                        print(
                            f"[kNN] Метрика: {metric} (p={p}), Ядро: {kernel_name}, n_neighbors: {n_neighbors}, Точность: {accuracy:.4f}")
                        results.append({
                            'type': 'n_neighbors',
                            'n_neighbors': n_neighbors,
                            'metric': f"{metric}_p{p}",
                            'kernel': kernel_name,
                            'accuracy': accuracy
                        })
                        if accuracy > best_accuracy:
                            best_accuracy = accuracy
                            best_params = {
                                'type': 'n_neighbors',
                                'n_neighbors': n_neighbors,
                                'metric': f"{metric}_p{p}",
                                'kernel': kernel_name
                            }
            else:
                for kernel_name in kernels.keys():
                    accuracy = evaluate_knn(
                        X_train, y_train, X_val, y_val,
                        n_neighbors=n_neighbors, radius=None,
                        metric=metric, kernel=kernel_name
                    )
                    print(
                        f"[kNN] Метрика: {metric}, Ядро: {kernel_name}, n_neighbors: {n_neighbors}, Точность: {accuracy:.4f}")
                    results.append({
                        'type': 'n_neighbors',
                        'n_neighbors': n_neighbors,
                        'metric': metric,
                        'kernel': kernel_name,
                        'accuracy': accuracy
                    })
                    if accuracy > best_accuracy:
                        best_accuracy = accuracy
                        best_params = {
                            'type': 'n_neighbors',
                            'n_neighbors': n_neighbors,
                            'metric': metric,
                            'kernel': kernel_name
                        }

    for radius in radius_range:
        for metric in metrics:
            if metric == 'minkowski':
                for p in p_range:
                    for kernel_name in kernels.keys():
                        accuracy = evaluate_knn(
                            X_train, y_train, X_val, y_val,
                            n_neighbors=None, radius=radius,
                            metric=metric, kernel=kernel_name, p=p
                        )
                        print(
                            f"[RadiusNN] Метрика: {metric} (p={p}), Ядро: {kernel_name}, radius: {radius}, Точность: {accuracy:.4f}")
                        results.append({
                            'type': 'radius',
                            'radius': radius,
                            'metric': f"{metric}_p{p}",
                            'kernel': kernel_name,
                            'accuracy': accuracy
                        })
                        if accuracy > best_accuracy:
                            best_accuracy = accuracy
                            best_params = {
                                'type': 'radius',
                                'radius': radius,
                                'metric': f"{metric}_p{p}",
                                'kernel': kernel_name
                            }
            else:
                for kernel_name in kernels.keys():
                    accuracy = evaluate_knn(
                        X_train, y_train, X_val, y_val,
                        n_neighbors=None, radius=radius,
                        metric=metric, kernel=kernel_name
                    )
                    print(
                        f"[RadiusNN] Метрика: {metric}, Ядро: {kernel_name}, radius: {radius}, Точность: {accuracy:.4f}")
                    results.append({
                        'type': 'radius',
                        'radius': radius,
                        'metric': metric,
                        'kernel': kernel_name,
                        'accuracy': accuracy
                    })
                    if accuracy > best_accuracy:
                        best_accuracy = accuracy
                        best_params = {
                            'type': 'radius',
                            'radius': radius,
                            'metric': metric,
                            'kernel': kernel_name
                        }
    return best_params, best_accuracy, results

Реализация метода LOWESS с итеративным пере-взвешиванием и регуляризацией. Метод выполняет локальную регрессию для сглаживания данных и выявления аномалий.

In [None]:
def lowess(X, y, f=0.25, iterations=3, lambda_reg=1e-5):
    n = len(X)
    y_pred = np.zeros(n)
    r = int(np.ceil(f * n))
    delta = np.ones(n)
    X_augmented = np.hstack((np.ones((n, 1)), X))
    y = y.flatten()

    for iteration in range(iterations):
        for i in range(n):
            distances = np.linalg.norm(X - X[i], axis=1)
            idx = np.argsort(distances)[:r]
            x_neighbors = X_augmented[idx]
            y_neighbors = y[idx]
        
            max_distance = distances[idx].max()
            if max_distance > 0:
                normalized_distances = distances[idx] / max_distance
            else:
                normalized_distances = np.zeros_like(distances[idx])
            weights = triangular_kernel(normalized_distances)
            weights *= delta[idx]
            W = np.diag(weights)
            XT_W_X = x_neighbors.T @ W @ x_neighbors
            XT_W_X += lambda_reg * np.eye(X_augmented.shape[1])
            XT_W_y = x_neighbors.T @ W @ y_neighbors
            try:
                beta = np.linalg.pinv(XT_W_X) @ XT_W_y
            except np.linalg.LinAlgError:
                beta = np.zeros(X_augmented.shape[1])
            y_pred[i] = X_augmented[i] @ beta
        residuals = y - y_pred
        s = np.median(np.abs(residuals))
        if s == 0:
            s = 1
        delta = residuals / (6.0 * s)
        delta = np.clip(delta, -1, 1)
        delta = (1 - delta ** 2) ** 2
    return y_pred

Функция для применения LOWESS к тренировочным данным, вычисления весов, оценки модели до и после взвешивания, а также построения графиков распределения весов.

In [None]:
def calculate_lowess_weights(X_train, y_train, X_test, y_test, best_params):
    try:
        y_train_pred = lowess(X_train, y_train, iterations=3)
        weights = np.abs(y_train - y_train_pred)
        weights = weights / np.max(weights)

        if 'minkowski' in best_params['metric']:
            p = int(best_params['metric'].split('_p')[-1])
            initial_accuracy = evaluate_knn(
                X_train=X_train,
                y_train=y_train,
                X_test=X_test,
                y_test=y_test,
                n_neighbors=best_params.get('n_neighbors'),
                radius=best_params.get('radius'),
                metric='minkowski',
                kernel=best_params['kernel'],
                p=p
            )
        else:
            initial_accuracy = evaluate_knn(
                X_train=X_train,
                y_train=y_train,
                X_test=X_test,
                y_test=y_test,
                n_neighbors=best_params.get('n_neighbors'),
                radius=best_params.get('radius'),
                metric=best_params['metric'],
                kernel=best_params['kernel']
            )
        print(f"Точность до взвешивания: {initial_accuracy:.4f}")

        if 'minkowski' in best_params['metric']:
            p = int(best_params['metric'].split('_p')[-1])
            weighted_accuracy = evaluate_knn(
                X_train=X_train,
                y_train=y_train,
                X_test=X_test,
                y_test=y_test,
                n_neighbors=best_params.get('n_neighbors'),
                radius=best_params.get('radius'),
                metric='minkowski',
                kernel=best_params['kernel'],
                weights=weights,
                p=p
            )
        else:
            weighted_accuracy = evaluate_knn(
                X_train=X_train,
                y_train=y_train,
                X_test=X_test,
                y_test=y_test,
                n_neighbors=best_params.get('n_neighbors'),
                radius=best_params.get('radius'),
                metric=best_params['metric'],
                kernel=best_params['kernel'],
                weights=weights
            )
        print(f"Точность после взвешивания: {weighted_accuracy:.4f}")

    except Exception as e:
        print(f"Ошибка в LOWESS: {e}")

## Код для построения графиков, улушаюших понимание

Строит график точности на тренировочных и тестовых данных в зависимости от заданного параметра (например, числа соседей).

In [1]:
def plot_accuracy(train_accuracies, test_accuracies, parameter, param_values, title, xlabel, save_path=None):
    plt.figure(figsize=(10, 6))
    plt.plot(param_values, train_accuracies, marker='o', label='Точность на тренировочном множестве')
    plt.plot(param_values, test_accuracies, marker='s', label='Точность на тестовом множестве')
    plt.xlabel(xlabel)
    plt.ylabel('Точность')
    plt.title(title)
    plt.legend()
    plt.grid(True)
    if save_path:
        plt.savefig(save_path)
        plt.close()
    else:
        plt.show()

 Визуализирует распределение классов в выборке, что помогает выявить дисбаланс данных.

In [2]:
def plot_class_distribution(y, title='Распределение классов', save_path=None):
    unique, counts = np.unique(y, return_counts=True)
    data = pd.DataFrame({'Class': unique, 'Count': counts})
    plt.figure(figsize=(8, 6))
    sns.barplot(x='Class', y='Count', data=data, hue='Class', palette='viridis', dodge=False)
    plt.xlabel('Класс')
    plt.ylabel('Количество объектов')
    plt.title(title)
    plt.xticks(unique)
    if save_path:
        plt.savefig(save_path)
        plt.close()
    else:
        plt.show()

Создаёт график точности для разных комбинаций параметров (например, числа соседей и метрики) и ядер, чтобы найти оптимальные гиперпараметры.

In [None]:
def plot_accuracy_over_parameters(results, parameter_type, save_path=None):
    df_results = pd.DataFrame(results)
    if parameter_type == 'n_neighbors':
        x_col = 'n_neighbors'
        xlabel = 'Число соседей k'
    else:
        x_col = 'radius'
        xlabel = 'Радиус окна'

    plt.figure(figsize=(12, 8))
    sns.lineplot(data=df_results, x=x_col, y='accuracy', hue='metric', style='kernel', markers=True, dashes=False)
    plt.xlabel(xlabel)
    plt.ylabel('Точность')
    plt.title(f'Точность на разных метриках и ядрах в зависимости от {xlabel.lower()}')
    plt.legend(title='Метрика и Ядро', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path)
        plt.close()
    else:
        plt.show()

Строит график точности на тренировочной и тестовой выборках для разных значений числа соседей или радиуса, используя лучшие гиперпараметры.

In [None]:
def plot_accuracy_results(X_train, y_train, X_test, y_test, best_params, results):
    if best_params['type'] == 'n_neighbors':
        neighbor_range_plot = range(1, 31, 2)
        train_accuracies, test_accuracies = [], []
        metric = best_params['metric']
        if 'minkowski' in metric:
            p = int(metric.split('_p')[-1])
            base_metric = 'minkowski'
        else:
            p = None
            base_metric = metric
        for k in neighbor_range_plot:
            acc_train = evaluate_knn(
                X_train=X_train, y_train=y_train, X_test=X_train, y_test=y_train,
                n_neighbors=k, radius=None, metric=base_metric, kernel=best_params['kernel'], p=p
            )
            acc_test = evaluate_knn(
                X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test,
                n_neighbors=k, radius=None, metric=base_metric, kernel=best_params['kernel'], p=p
            )
            train_accuracies.append(acc_train)
            test_accuracies.append(acc_test)
        plot_accuracy(train_accuracies, test_accuracies, parameter='k', param_values=neighbor_range_plot,
                      title='Зависимость точности от числа соседей', xlabel='Число соседей k')
    else:
        radius_range_plot = np.linspace(0.1, 2.0, 20)
        train_accuracies, test_accuracies = [], []
        metric = best_params['metric']
        if 'minkowski' in metric:
            p = int(metric.split('_p')[-1])
            base_metric = 'minkowski'
        else:
            p = None
            base_metric = metric
        for r in radius_range_plot:
            acc_train = evaluate_knn(
                X_train=X_train, y_train=y_train, X_test=X_train, y_test=y_train,
                n_neighbors=None, radius=r, metric=base_metric, kernel=best_params['kernel'], p=p
            )
            acc_test = evaluate_knn(
                X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test,
                n_neighbors=None, radius=r, metric=base_metric, kernel=best_params['kernel'], p=p
            )
            train_accuracies.append(acc_train)
            test_accuracies.append(acc_test)
        plot_accuracy(train_accuracies, test_accuracies, parameter='r', param_values=radius_range_plot,
                      title='Зависимость точности от радиуса окна', xlabel='Радиус окна')


Визуализирует распределение весов объектов, рассчитанных через LOWESS, что позволяет оценить вес каждого объекта в модели.

In [None]:
def plot_weights_distribution(weights, title='Распределение весов после LOWESS', save_path=None):
    plt.figure(figsize=(8, 6))
    sns.histplot(weights, bins=30, kde=True, color='green')
    plt.xlabel('Вес')
    plt.ylabel('Количество объектов')
    plt.title(title)
    if save_path:
        plt.savefig(save_path)
        plt.close()
    else:
        plt.show()

Построение графика распределения классов

In [None]:
def plot_class_distribution_func(y_train):
    plot_class_distribution(
        y=y_train,
        title='Распределение классов в тренировочной выборке'
    )

Построение графиков зависимости точности от метрик и ядер

In [None]:
def plot_accuracy_over_parameters_func(results):
    df_results_neighbors = [res for res in results if res['type'] == 'n_neighbors']
    plot_accuracy_over_parameters(results=df_results_neighbors, parameter_type='n_neighbors')

    df_results_radius = [res for res in results if res['type'] == 'radius']
    plot_accuracy_over_parameters(results=df_results_radius, parameter_type='radius')


## data_utils.py

In [None]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split


def load_and_preprocess_data(csv_path, target_column, test_size=0.2, val_size=0.25, random_state=42):
    df = pd.read_csv(csv_path)

    X = df.drop(columns=[target_column]).values
    y = df[target_column].values

    le = LabelEncoder()
    y = le.fit_transform(y)
    y = y.astype(float)

    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, test_size=val_size, random_state=random_state)

    return X_train, X_val, X_test, y_train, y_val, y_test, le, scaler