## **Lab 2: KNN**

In [129]:
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from typing import Optional, Literal, Tuple


In [130]:
torch.manual_seed(42)
device_default = torch.device("cuda:0" if torch.cuda.is_available()
                              else ("mps" if torch.backends.mps.is_available() else "cpu")
                              )

### 1. Dataset


In [131]:
insurance_df = pd.read_csv("datasets/insurance.csv").dropna()
insurance_df.head()


Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


In [132]:
insurance_df.describe()

Unnamed: 0,age,bmi,children,charges
count,1338.0,1338.0,1338.0,1338.0
mean,39.207025,30.663397,1.094918,13270.422265
std,14.04996,6.098187,1.205493,12110.011237
min,18.0,15.96,0.0,1121.8739
25%,27.0,26.29625,0.0,4740.28715
50%,39.0,30.4,1.0,9382.033
75%,51.0,34.69375,2.0,16639.912515
max,64.0,53.13,5.0,63770.42801


In [133]:
fashion_df = pd.read_csv("datasets/fashion-mnist_test.csv").dropna()
fashion_df.head()

Unnamed: 0,label,pixel1,pixel2,pixel3,pixel4,pixel5,pixel6,pixel7,pixel8,pixel9,...,pixel775,pixel776,pixel777,pixel778,pixel779,pixel780,pixel781,pixel782,pixel783,pixel784
0,0,0,0,0,0,0,0,0,9,8,...,103,87,56,0,0,0,0,0,0,0
1,1,0,0,0,0,0,0,0,0,0,...,34,0,0,0,0,0,0,0,0,0
2,2,0,0,0,0,0,0,14,53,99,...,0,0,0,0,63,53,31,0,0,0
3,2,0,0,0,0,0,0,0,0,0,...,137,126,140,0,133,224,222,56,0,0
4,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [134]:
fashion_df.describe()

Unnamed: 0,label,pixel1,pixel2,pixel3,pixel4,pixel5,pixel6,pixel7,pixel8,pixel9,...,pixel775,pixel776,pixel777,pixel778,pixel779,pixel780,pixel781,pixel782,pixel783,pixel784
count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,...,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0
mean,4.5,0.0004,0.0103,0.0521,0.077,0.2086,0.3492,0.8267,2.3212,5.4578,...,34.3208,23.0719,16.432,17.8706,22.86,17.7902,8.3535,2.5416,0.6295,0.0656
std,2.872425,0.024493,0.525187,2.494315,2.208882,4.669183,5.657849,8.591731,15.031508,23.359019,...,57.888679,49.049749,42.159665,44.140552,51.706601,45.128107,28.765769,16.417363,7.462533,1.93403
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,4.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,7.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,55.0,6.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
max,9.0,2.0,45.0,218.0,185.0,227.0,223.0,247.0,218.0,244.0,...,254.0,252.0,255.0,255.0,255.0,255.0,240.0,225.0,205.0,107.0


In [135]:
def regression_metrics(y_true: torch.Tensor, y_pred: torch.Tensor) -> Tuple[float, float, float]:
    y_true = y_true.to(y_pred.device)
    mse = torch.mean((y_true - y_pred) ** 2).item()
    mae = torch.mean(torch.abs(y_true - y_pred)).item()
    rmse = mse ** 0.5
    return mse, mae, rmse


### 2. Data Processing

In [136]:
insurance_df["sex"] = insurance_df["sex"].str.lower().map({"male": 0, "female": 1})
insurance_df["smoker"] = insurance_df["smoker"].str.lower().map({"no": 0, "yes": 1})
insurance_df["region"] = insurance_df["region"].str.lower().map({"southeast": 0, "southwest": 1,  "northwest": 2, "northeast": 3})

insurance_df.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,1,27.9,0,1,1,16884.924
1,18,0,33.77,1,0,0,1725.5523
2,28,0,33.0,3,0,0,4449.462
3,33,0,22.705,0,0,2,21984.47061
4,32,0,28.88,0,0,2,3866.8552


### 3. Model

In [137]:
class KNN:
    def __init__(
        self,
        k: int = 5,
        task: Literal["classification", "regression"] = "classification",
        weights: Literal["uniform", "distance"] = "uniform",
        standardize: bool = True,
        num_classes: Optional[int] = None,
        device: Optional[torch.device] = None,
        metric: Literal["euclidean", "manhattan", "cosine"] = "euclidean",
        eps: float = 1e-12,
    ) -> None:
        # Validate parameters
        assert k >= 1, "k must be >= 1"
        assert task in ("classification", "regression")
        assert weights in ("uniform", "distance")
        assert metric in ("euclidean", "manhattan", "cosine")

        self.k = int(k)
        self.task = task
        self.weights = weights
        self.standardize = standardize
        self.num_classes = num_classes
        self.device = device if device is not None else device_default
        self.metric = metric
        self.eps = float(eps)

        # Internal attributes
        self.X_train: Optional[torch.Tensor] = None
        self.y_train: Optional[torch.Tensor] = None
        self.mean: Optional[torch.Tensor] = None
        self.std: Optional[torch.Tensor] = None
        self.classes_: Optional[torch.Tensor] = None

    def _standardize_fit(self, X: torch.Tensor) -> torch.Tensor:
        if not self.standardize:
            return X
        self.mean = X.mean(dim=0, keepdim=True)
        self.std = X.std(dim=0, unbiased=False, keepdim=True).clamp_min(1e-12)
        return (X - self.mean) / self.std

    def _standardize_transform(self, X: torch.Tensor) -> torch.Tensor:
        """Standardize new data using training mean/std."""
        if not self.standardize or self.mean is None or self.std is None:
            return X
        return (X - self.mean) / self.std

    def _compute_distances(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Compute pairwise distances between two sets of vectors."""
        if self.metric == "euclidean":
            return torch.cdist(A, B, p=2)
        elif self.metric == "manhattan":
            return torch.cdist(A, B, p=1)
        else:  # cosine distance
            A_norm = A / (A.norm(dim=1, keepdim=True).clamp_min(self.eps))
            B_norm = B / (B.norm(dim=1, keepdim=True).clamp_min(self.eps))
            similarity = A_norm @ B_norm.T
            distance = 1.0 - similarity
            return distance

    def fit(self, X: torch.Tensor, y: torch.Tensor) -> "KNN":
        """Fit model with training data."""
        X = X.to(self.device, dtype=torch.float32)

        if self.task == "classification":
            y = y.to(self.device)
            classes, y_encoded = torch.unique(y, sorted=True, return_inverse=True)
            self.classes_ = classes.to(self.device)
            self.y_train = y_encoded.to(self.device, dtype=torch.long).contiguous()
            if self.num_classes is None:
                self.num_classes = int(self.classes_.numel())
        else:  # regression
            self.y_train = y.to(self.device, dtype=torch.float32).contiguous()

        X_standardized = self._standardize_fit(X)
        self.X_train = X_standardized.contiguous()
        return self

    @torch.no_grad()
    def predict(self, X: torch.Tensor) -> torch.Tensor:
        """Predict labels or values for new samples."""
        assert self.X_train is not None and self.y_train is not None, "Call fit first"
        X = X.to(self.device, dtype=torch.float32)
        X_standardized = self._standardize_transform(X)

        # Compute distances and select k nearest neighbors
        distances = self._compute_distances(X_standardized, self.X_train)
        neighbor_distances, neighbor_indices = torch.topk(
            distances, k=self.k, dim=1, largest=False
        )

        if self.task == "classification":
            neighbor_labels = self.y_train[neighbor_indices].long()

            if self.weights == "uniform":
                weights = torch.ones_like(neighbor_distances, dtype=torch.float32)
            else:  # distance-based weights
                weights = 1.0 / (neighbor_distances + self.eps)

            # Handle exact matches (distance = 0)
            zero_mask = (neighbor_distances <= self.eps)
            rows_with_zero = zero_mask.any(dim=1)
            if rows_with_zero.any():
                weights[rows_with_zero] = torch.where(
                    zero_mask[rows_with_zero],
                    torch.ones_like(weights[rows_with_zero]),
                    torch.zeros_like(weights[rows_with_zero])
                )

            # Aggregate scores for each class
            scores = torch.zeros(
                (X_standardized.size(0), self.num_classes),
                device=self.device,
                dtype=torch.float32,
            )
            scores.scatter_add_(1, neighbor_labels, weights)

            pred_encoded = scores.argmax(dim=1).to(torch.long)
            return self.classes_[pred_encoded] if self.classes_ is not None else pred_encoded

        else:  # regression
            neighbor_values = self.y_train[neighbor_indices]
            if self.weights == "uniform":
                weights = torch.ones_like(neighbor_distances, dtype=torch.float32)
            else:
                weights = 1.0 / (neighbor_distances + self.eps)
            predictions = (weights * neighbor_values).sum(dim=1) / weights.sum(dim=1)
            return predictions

### 4. Model Training

In [138]:
features = insurance_df.drop(columns=["charges"]).values
targets = insurance_df["charges"].values

X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

knn_model = KNN(k=5, task="regression", weights="distance", standardize=True, device=device_default, metric="euclidean")

knn_model.fit(X_train_tensor, y_train_tensor)
y_pred_tensor = knn_model.predict(X_test_tensor)

mse, mae, rmse = regression_metrics(y_test_tensor, y_pred_tensor)
print(f"MSE: {mse:.2f}, MAE: {mae:.2f}, RMSE: {rmse:.2f}")

MSE: 26512832.00, MAE: 2998.35, RMSE: 5149.06


In [139]:
pca = PCA(n_components=10)
fashion_test_df = fashion_df
X_fashion_train = torch.from_numpy(np.array(pca.fit_transform(fashion_df.drop('label', axis=1)), dtype='float32')).to(device_default)
X_fashion_test = torch.from_numpy(np.array(pca.transform(fashion_test_df.drop('label', axis=1)), dtype='float32')).to(device_default)
y_fashion_train = torch.from_numpy(fashion_df.to_numpy(dtype=np.float32)).to(device_default)
y_fashion_test = torch.from_numpy(fashion_test_df.to_numpy(dtype=np.float32)).to(device_default)

knn_clf = KNN(k=10, task="classification", weights="distance", standardize=False, device=device_default)

knn_clf.fit(X_fashion_train, y_fashion_train)

y_pred_clf = knn_clf.predict(X_fashion_test)

torch.sum(y_pred_clf == y_fashion_test)/len(y_pred_clf)

y_pred_clf

RuntimeError: Index tensor must have the same number of dimensions as self tensor