# KNN классификация на датасете Forest Cover Type

In [4]:
import numpy as np
import pandas as pd

from pathlib import Path
import os

In [36]:
!pip -q install kagglehub

You should consider upgrading via the '/Users/rkoyunusov/jupiter_rnd/venv/bin/python3 -m pip install --upgrade pip' command.[0m


In [5]:
import kagglehub

  from .autonotebook import tqdm as notebook_tqdm


#### Датасет загружен из открытого репозитория Kaggle. Проверил структуру и корректность данных

In [6]:
path = kagglehub.dataset_download("uciml/forest-cover-type-dataset")
print("Path to dataset files:", path)

data_dir = Path(path)
list(data_dir.glob("**/*"))[:20]

Path to dataset files: /Users/rkoyunusov/.cache/kagglehub/datasets/uciml/forest-cover-type-dataset/versions/1


[PosixPath('/Users/rkoyunusov/.cache/kagglehub/datasets/uciml/forest-cover-type-dataset/versions/1/covtype.csv')]

In [7]:
csv_files = list(data_dir.glob("**/*.csv"))
df = pd.read_csv(csv_files[0])
df.shape, df.head()

((581012, 55),
    Elevation  Aspect  Slope  Horizontal_Distance_To_Hydrology  \
 0       2596      51      3                               258   
 1       2590      56      2                               212   
 2       2804     139      9                               268   
 3       2785     155     18                               242   
 4       2595      45      2                               153   
 
    Vertical_Distance_To_Hydrology  Horizontal_Distance_To_Roadways  \
 0                               0                              510   
 1                              -6                              390   
 2                              65                             3180   
 3                             118                             3090   
 4                              -1                              391   
 
    Hillshade_9am  Hillshade_Noon  Hillshade_3pm  \
 0            221             232            148   
 1            220             235            151   
 2 

### Первичный анализ структуры + выделена целевая переменная и набор признаков

In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 581012 entries, 0 to 581011
Data columns (total 55 columns):
 #   Column                              Non-Null Count   Dtype
---  ------                              --------------   -----
 0   Elevation                           581012 non-null  int64
 1   Aspect                              581012 non-null  int64
 2   Slope                               581012 non-null  int64
 3   Horizontal_Distance_To_Hydrology    581012 non-null  int64
 4   Vertical_Distance_To_Hydrology      581012 non-null  int64
 5   Horizontal_Distance_To_Roadways     581012 non-null  int64
 6   Hillshade_9am                       581012 non-null  int64
 7   Hillshade_Noon                      581012 non-null  int64
 8   Hillshade_3pm                       581012 non-null  int64
 9   Horizontal_Distance_To_Fire_Points  581012 non-null  int64
 10  Wilderness_Area1                    581012 non-null  int64
 11  Wilderness_Area2                    581012 non-null 

In [8]:
target_col = "Cover_Type"

X = df.drop(columns=[target_col])
y = df[target_col]

X.shape, y.shape, y.value_counts().sort_index()

((581012, 54),
 (581012,),
 Cover_Type
 1    211840
 2    283301
 3     35754
 4      2747
 5      9493
 6     17367
 7     20510
 Name: count, dtype: int64)

In [9]:
from sklearn.model_selection import train_test_split

### Данные разделены на обучающую и тестовую выборки с сохранением распределения классов

In [10]:
MAX_ROWS = 70_000
if len(df) > MAX_ROWS:
    df_sample = df.sample(n=MAX_ROWS, random_state=42)
    X = df_sample.drop(columns=[target_col])
    y = df_sample[target_col]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

X_train.shape, X_test.shape

((56000, 54), (14000, 54))

### Для алгоритма KNN используется масштабирование признаков, так как расстояние между объектами чувствительно к масштабу данных

In [14]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier

baseline_clf = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("knn", KNeighborsClassifier(n_neighbors=5, weights="uniform", metric="minkowski"))
])

baseline_clf

In [15]:
from sklearn.model_selection import GridSearchCV

param_grid = {
    "knn__n_neighbors": [3, 5, 7, 11, 15, 21, 31],
    "knn__weights": ["uniform", "distance"],
    "knn__metric": ["euclidean", "manhattan"]
}

grid = GridSearchCV(
    estimator=baseline_clf,
    param_grid=param_grid,
    scoring="f1_macro",
    cv=3,
    n_jobs=-1,
    verbose=1
)

grid.fit(X_train, y_train)

grid.best_params_, grid.best_score_

Fitting 3 folds for each of 28 candidates, totalling 84 fits


({'knn__metric': 'manhattan',
  'knn__n_neighbors': 3,
  'knn__weights': 'distance'},
 np.float64(0.7529641274204201))

### Выполнен подбор гиперпараметров алгоритма KNN с использованием кросс-валидации. Оптимизация проводилась по метрике macro F1-score. 
Оценено качество улучшенной модели на тестовой выборке. Результаты сравниваются с бейзлайном для анализа влияния подбора гиперпараметров.

In [16]:
from sklearn.metrics import accuracy_score, f1_score, classification_report
best_clf = grid.best_estimator_
y_pred_best = best_clf.predict(X_test)

acc_best = accuracy_score(y_test, y_pred_best)
f1m_best = f1_score(y_test, y_pred_best, average="macro")

print(f"Improved Accuracy: {acc_best:.4f}")
print(f"Improved Macro-F1: {f1m_best:.4f}")
print("\nClassification:\n", classification_report(y_test, y_pred_best))

Improved Accuracy: 0.8623
Improved Macro-F1: 0.7620

Classification:
               precision    recall  f1-score   support

           1       0.87      0.86      0.86      5140
           2       0.88      0.89      0.88      6812
           3       0.84      0.83      0.84       858
           4       0.57      0.58      0.58        62
           5       0.63      0.59      0.61       227
           6       0.68      0.68      0.68       415
           7       0.90      0.87      0.89       486

    accuracy                           0.86     14000
   macro avg       0.77      0.76      0.76     14000
weighted avg       0.86      0.86      0.86     14000



## Реализация KNN

In [11]:
import numpy as np
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled  = scaler.transform(X_test)

y_train_np = np.asarray(y_train)
y_test_np  = np.asarray(y_test)

MAX_TRAIN_FOR_MYKNN = 40_000
if X_train_scaled.shape[0] > MAX_TRAIN_FOR_MYKNN:
    rng = np.random.RandomState(42)
    idx = rng.choice(X_train_scaled.shape[0], MAX_TRAIN_FOR_MYKNN, replace=False)
    X_train_scaled_small = X_train_scaled[idx]
    y_train_np_small = y_train_np[idx]
else:
    X_train_scaled_small = X_train_scaled
    y_train_np_small = y_train_np

X_train_scaled_small.shape, X_test_scaled.shape

((40000, 54), (14000, 54))

### Создаем класс knn с реализацией алгоритма поиска
KNN-классификатор на NumPy с выбором метрики расстояния и схемой взвешивания соседей

In [22]:
from collections import Counter

class MyKNNClassifier:
    def __init__(self, n_neighbors=5, weights="uniform", metric="euclidean", eps=1e-9):
        self.n_neighbors = int(n_neighbors)
        self.weights = weights
        self.metric = metric
        self.eps = eps
        self.X_train_ = None
        self.y_train_ = None

    def fit(self, X, y):
        self.X_train_ = np.asarray(X, dtype=float)
        self.y_train_ = np.asarray(y)
        return self

    def _distances(self, X):
        X = np.asarray(X, dtype=float)
        if self.metric == "euclidean":
            X2 = np.sum(X**2, axis=1, keepdims=True)
            T2 = np.sum(self.X_train_**2, axis=1, keepdims=True).T
            d2 = X2 + T2 - 2.0 * (X @ self.X_train_.T)
            d2 = np.maximum(d2, 0.0)
            return np.sqrt(d2)
        elif self.metric == "manhattan":
            return np.sum(np.abs(X[:, None, :] - self.X_train_[None, :, :]), axis=2)
        else:
            raise ValueError("metric must be 'euclidean' or 'manhattan'")

    def predict(self, X):
        if self.X_train_ is None:
            raise RuntimeError("Call fit() before predict().")

        X = np.asarray(X, dtype=float)
        dist = self._distances(X)
        k = self.n_neighbors

        nn_idx = np.argpartition(dist, kth=k-1, axis=1)[:, :k]
        nn_dist = np.take_along_axis(dist, nn_idx, axis=1)
        nn_labels = self.y_train_[nn_idx]

        if self.weights == "uniform":
            preds = []
            for row in nn_labels:
                preds.append(Counter(row).most_common(1)[0][0])
            return np.asarray(preds)

        if self.weights == "distance":
            w = 1.0 / (nn_dist + self.eps)
            preds = []
            for labels_row, w_row in zip(nn_labels, w):
                score = {}
                for lab, ww in zip(labels_row, w_row):
                    score[lab] = score.get(lab, 0.0) + float(ww)
                preds.append(max(score.items(), key=lambda x: x[1])[0])
            return np.asarray(preds)

        raise ValueError("weights must be 'uniform' or 'distance'")
        
    def predict_batch(self, X, batch_size=512):
        """
        Предикт батчами по X, чтобы не хранить всю матрицу расстояний в памяти.
        """
        if self.X_train_ is None:
            raise RuntimeError("Call fit() before predict().")
    
        X = np.asarray(X, dtype=float)
        preds = []
        n = X.shape[0]
    
        for start in range(0, n, batch_size):
            end = min(start + batch_size, n)
            Xb = X[start:end]
    
            dist = self._distances(Xb)  # (batch, n_train)
            k = self.n_neighbors
    
            nn_idx = np.argpartition(dist, kth=k-1, axis=1)[:, :k]
            nn_dist = np.take_along_axis(dist, nn_idx, axis=1)
            nn_labels = self.y_train_[nn_idx]
    
            if self.weights == "uniform":
                for row in nn_labels:
                    preds.append(Counter(row).most_common(1)[0][0])
            elif self.weights == "distance":
                w = 1.0 / (nn_dist + self.eps)
                for labels_row, w_row in zip(nn_labels, w):
                    score = {}
                    for lab, ww in zip(labels_row, w_row):
                        score[lab] = score.get(lab, 0.0) + float(ww)
                    preds.append(max(score.items(), key=lambda x: x[1])[0])
            else:
                raise ValueError("weights must be 'uniform' or 'distance'")
    
        return np.asarray(preds)

### Создаем baseline и собственные метрики

In [23]:
my_knn = MyKNNClassifier(n_neighbors=5, weights="uniform", metric="euclidean")
my_knn.fit(X_train_scaled_small, y_train_np_small)

y_pred_my = my_knn.predict(X_test_scaled)

acc_my = accuracy_score(y_test_np, y_pred_my)
f1_my  = f1_score(y_test_np, y_pred_my, average="macro")

print(f"MyKNN (baseline) Accuracy: {acc_my:.4f}")
print(f"MyKNN (baseline) Macro-F1: {f1_my:.4f}")

MyKNN (baseline) Accuracy: 0.8294
MyKNN (baseline) Macro-F1: 0.7082


### Сравниваем с реализацией sklearn
Сравниваем собственнуб реализацию и `sklearn` при одинаковых данных и параметрах

In [24]:
from sklearn.neighbors import KNeighborsClassifier

sk_knn = KNeighborsClassifier(n_neighbors=5, weights="uniform", metric="euclidean")
sk_knn.fit(X_train_scaled_small, y_train_np_small)
y_pred_sk = sk_knn.predict(X_test_scaled)

acc_sk = accuracy_score(y_test_np, y_pred_sk)
f1_sk  = f1_score(y_test_np, y_pred_sk, average="macro")

print(f"sklearn KNN Accuracy: {acc_sk:.4f}")
print(f"sklearn KNN Macro-F1: {f1_sk:.4f}")
print("delta Macro-F1 (my - sklearn):", f1_my - f1_sk)

sklearn KNN Accuracy: 0.8287
sklearn KNN Macro-F1: 0.7076
delta Macro-F1 (my - sklearn): 0.0006148707389077579


### Улучшаем бейзлайн
В реализацию перенес лучшие гиперпараметры, найденные на кросс-валидации

In [25]:
best_params = grid.best_params_
best_params

{'knn__metric': 'manhattan', 'knn__n_neighbors': 3, 'knn__weights': 'distance'}

In [26]:
best_params = grid.best_params_

my_knn_best = MyKNNClassifier(
    n_neighbors=best_params["knn__n_neighbors"],
    weights=best_params["knn__weights"],
    metric=best_params["knn__metric"],
)

my_knn_best.fit(X_train_scaled_small, y_train_np_small)

y_pred_my_best = my_knn_best.predict_batch(X_test_scaled, batch_size=512)

acc_my_best = accuracy_score(y_test_np, y_pred_my_best)
f1_my_best  = f1_score(y_test_np, y_pred_my_best, average="macro")

print(f"MyKNN (improved) Accuracy: {acc_my_best:.4f}")
print(f"MyKNN (improved) Macro-F1: {f1_my_best:.4f}")

MyKNN (improved) Accuracy: 0.8480
MyKNN (improved) Macro-F1: 0.7479
