In [1]:
import numpy as np
import pandas as pd
from collections import Counter
import warnings

# Optional imports for convenience (plotting and splitting)
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

In [2]:
try:
    from sklearn.model_selection import train_test_split as sklearn_train_test_split
    SKLEARN_SPLIT = True
except Exception:
    SKLEARN_SPLIT = False

In [3]:
def load_csv(path, label_column=None, drop_columns=None, header='infer'):
    """
    Load CSV into (X, y, feature_names).
    - label_column: name or index of label. If None, assumes last column is label.
    - drop_columns: list of columns to drop (names or indices).
    Returns:
        X: pandas.DataFrame of features
        y: pandas.Series of labels
        feature_names: list
    """
    df = pd.read_csv(path, header=header)

    if drop_columns:
        df = df.drop(columns=drop_columns)

    if label_column is None:
        label_column = df.columns[-1]

    y = df[label_column].copy()

    X = df.drop(columns=[label_column])

    return X, y, list(X.columns)



In [5]:
def simple_preprocess_1(X, fillna='mean', encode_categorical=True, scale=True):
    """
    Preprocess a pandas DataFrame:
      - fill missing values (mean/median/zero/ffill/bfill)
      - encode categorical columns (one-hot via pd.get_dummies)
      - scale numeric features to zero mean + unit std (if scale=True)
    Returns: numpy array X_proc, list of column names, scaler dict (mean,std)
    """
    X = X.copy()
    # fillna
    if fillna is not None:
        for col in X.columns:
            if X[col].dtype.kind in 'biufc':  # numeric
                if fillna == 'mean':
                    X[col] = X[col].fillna(X[col].mean())
                elif fillna == 'median':
                    X[col] = X[col].fillna(X[col].median())
                elif fillna == 'zero':
                    X[col] = X[col].fillna(0)
                else:
                    X[col] = X[col].fillna(fillna)
            else:
                X[col] = X[col].fillna(method='ffill').fillna(
                    method='bfill').fillna('missing')

    # encode categorical
    if encode_categorical:
        X = pd.get_dummies(X, drop_first=False)

    # scaling
    scaler = {}
    if scale:
        means = X.mean()
        stds = X.std().replace(0, 1)  # avoid zero-division
        X = (X - means) / stds
        scaler = {'mean': means, 'std': stds}

    return X.values.astype(float), list(X.columns), scaler

In [6]:
def simple_preprocess_2(X, y=None, fillna="mean", fillna_categorical="most_frequent", normalize=True):
    """
    Simple preprocessing function for numeric and categorical features.

    Parameters
    ----------
    X : pd.DataFrame
        Input features
    y : pd.Series or None
        Target (optional, unused here but kept for flexibility)
    fillna : str or number, default="mean"
        Strategy for numeric columns:
        - "mean", "median", "zero", or a custom number
    fillna_categorical : str, default="most_frequent"
        Strategy for categorical columns:
        - "most_frequent" (mode)
        - "ffill" (forward fill → backward fill → 'missing')
        - any custom string (e.g., "unknown")
    normalize : bool, default=True
        If True, normalize numeric features

    Returns
    -------
    X : pd.DataFrame
        Preprocessed features
    """
    X = X.copy()

    for col in X.columns:
        if X[col].dtype.kind in 'biufc':  # numeric
            if fillna == "mean":
                X[col] = X[col].fillna(X[col].mean())
            elif fillna == "median":
                X[col] = X[col].fillna(X[col].median())
            elif fillna == "zero":
                X[col] = X[col].fillna(0)
            else:
                X[col] = X[col].fillna(fillna)

            if normalize:
                X[col] = (X[col] - X[col].mean()) / (X[col].std() + 1e-8)

        else:  # categorical
            if fillna_categorical == "most_frequent":
                if X[col].mode().empty:   # if column is all NaN
                    X[col] = X[col].fillna("missing")
                else:
                    X[col] = X[col].fillna(X[col].mode()[0])
            elif fillna_categorical == "ffill":
                X[col] = X[col].fillna(method="ffill").fillna(
                    method="bfill").fillna("missing")
            else:
                X[col] = X[col].fillna(fillna_categorical)

            X[col] = X[col].astype(str)  # ensure categorical as string

    return X

In [7]:
def train_test_split(X, y, test_size=0.2, random_state=None):
    """
    Wrapper: uses sklearn if available (preferred), else a small numpy implementation.
    Returns numpy arrays: X_train, X_test, y_train, y_test
    """
    X = np.asarray(X)
    y = np.asarray(y)
    if SKLEARN_SPLIT:
        X_tr, X_te, y_tr, y_te = sklearn_train_test_split(
            X, y, test_size=test_size, random_state=random_state)
        return X_tr, X_te, y_tr, y_te
    # fallback
    rng = np.random.default_rng(random_state)
    idx = np.arange(len(X))
    rng.shuffle(idx)
    split_at = int(len(X) * (1 - test_size))
    tr_idx = idx[:split_at]
    te_idx = idx[split_at:]
    return X[tr_idx], X[te_idx], y[tr_idx], y[te_idx]

In [8]:
class Custom_KNN:
    """
    K-Nearest Neighbors implementation (from scratch).
    - Supports classification and regression (auto-detected from y).
    - distance metrics: 'euclidean' (default) or 'manhattan'
    - weights: 'uniform' or 'distance' (distance weighting)
    """

    def __init__(self, k=3, metric='euclidean', weights='uniform'):
        self.k = int(k)
        if self.k < 1:
            raise ValueError("k must be >= 1")
        if metric not in ('euclidean', 'manhattan'):
            raise ValueError("metric must be 'euclidean' or 'manhattan'")
        if weights not in ('uniform', 'distance'):
            raise ValueError("weights must be 'uniform' or 'distance'")
        
        self.metric = metric
        self.weights = weights
        self.is_fitted = False

    def fit(self, X, y):
        """Store training data. 
        X: numpy array (n_samples, n_features). 
        y: 1D array-like."""

        self.X_train = np.asarray(X, dtype=float)
        self.y_train = np.asarray(y)

        if self.X_train.ndim != 2:
            raise ValueError("X must be 2D array")
        if self.y_train.ndim != 1:
            # allow shape (n,1) too
            self.y_train = self.y_train.ravel()

        self.n_samples = self.X_train.shape[0]
        # adjust k if too large
        if self.k > self.n_samples:
            warnings.warn(
                f"k ({self.k}) is greater than number of training samples ({self.n_samples}); reducing k to {self.n_samples}")
            self.k = self.n_samples

        # detect task type
        # If y is numeric and not integer-like, treat regression; else classification.
        if np.issubdtype(self.y_train.dtype, np.number) and not np.all(np.equal(np.mod(self.y_train, 1), 0)):
            self.task = 'regression'
        else:
            self.task = 'classification'
            
        self.is_fitted = True
        return self

    def _pairwise_distances(self, X):
        """
        Compute pairwise distances between each row in X and each row in self.X_train.
        Returns shape (n_test, n_train).
        """
        X = np.asarray(X, dtype=float)
        if X.ndim == 1:
            X = X.reshape(1, -1)
        if self.metric == 'euclidean':
            # (a-b)^2 sum over features
            # Efficient broadcasting
            diff = X[:, None, :] - self.X_train[None, :, :]
            d = np.sqrt(np.sum(diff**2, axis=2))
        else:  # manhattan
            diff = np.abs(X[:, None, :] - self.X_train[None, :, :])
            d = np.sum(diff, axis=2)
        return d

    def predict(self, X):
        """
        Predict labels (classification) or values (regression).
        Returns numpy array.
        """
        if not self.is_fitted:
            raise RuntimeError("KNN not fitted. Call fit(X,y) first.")
        distances = self._pairwise_distances(X)  # shape (n_test, n_train)
        # indices of k nearest neighbors for each test row
        k_idx = np.argsort(distances, axis=1)[:, :self.k]  # (n_test, k)
        k_dist = np.take_along_axis(distances, k_idx, axis=1)

        if self.task == 'regression':
            # simple mean or distance-weighted mean
            if self.weights == 'uniform':
                preds = np.mean(self.y_train[k_idx], axis=1)
            else:
                # weight by 1/d (avoid div by zero)
                w = 1.0 / (k_dist + 1e-12)
                numer = np.sum(w * self.y_train[k_idx], axis=1)
                denom = np.sum(w, axis=1)
                preds = numer / denom
            return preds

        # classification
        if self.weights == 'uniform':
            preds = []
            for neigh_idx in k_idx:
                labels = self.y_train[neigh_idx]
                most_common = Counter(labels).most_common()
                top_count = most_common[0][1]
                # check tie
                tied = [lab for lab, cnt in most_common if cnt == top_count]
                if len(tied) == 1:
                    preds.append(most_common[0][0])
                else:
                    # tie-breaker: choose the label among tied ones with smallest average distance
                    avg_dists = {}
                    for t in tied:
                        mask = (labels == t)
                        avg_dists[t] = np.mean(k_dist[len(preds)][mask])
                    chosen = min(avg_dists.items(), key=lambda x: x[1])[0]
                    preds.append(chosen)
            return np.array(preds)
        else:
            # distance-weighted voting
            preds = []
            for i, neigh_idx in enumerate(k_idx):
                labels = self.y_train[neigh_idx]
                dists = k_dist[i]
                weights = 1.0 / (dists + 1e-12)
                totals = {}
                for lab, w in zip(labels, weights):
                    totals[lab] = totals.get(lab, 0.0) + w
                # choose label with highest total weight
                preds.append(max(totals.items(), key=lambda x: x[1])[0])
            return np.array(preds)

    def predict_proba(self, X):
        """
        For classification only: return class probability estimates over neighbors.
        Returns list of dicts (or a 2D array with consistent class ordering if desired).
        """
        if not self.is_fitted:
            raise RuntimeError("KNN not fitted.")
        if self.task != 'classification':
            raise RuntimeError(
                "predict_proba only available for classification tasks.")
        distances = self._pairwise_distances(X)
        k_idx = np.argsort(distances, axis=1)[:, :self.k]
        k_dist = np.take_along_axis(distances, k_idx, axis=1)

        proba_list = []
        for i, neigh_idx in enumerate(k_idx):
            labels = self.y_train[neigh_idx]
            if self.weights == 'uniform':
                counts = Counter(labels)
                total = sum(counts.values())
                proba = {lab: counts.get(
                    lab, 0) / total for lab in np.unique(self.y_train)}
            else:
                w = 1.0 / (k_dist[i] + 1e-12)
                totals = {}
                for lab, wt in zip(labels, w):
                    totals[lab] = totals.get(lab, 0.0) + wt
                s = sum(totals.values())
                proba = {lab: totals.get(lab, 0.0) /
                         s for lab in np.unique(self.y_train)}
            proba_list.append(proba)
        return proba_list

    def score(self, X, y):
        """
        Returns accuracy for classification, R^2 for regression (simple).
        """
        y = np.asarray(y)
        preds = self.predict(X)
        if self.task == 'classification':
            return np.mean(preds == y)
        else:
            # compute R^2
            ss_res = np.sum((y - preds) ** 2)
            ss_tot = np.sum((y - np.mean(y)) ** 2)
            return 1 - ss_res / (ss_tot + 1e-12)

In [9]:
from sklearn import datasets
iris = datasets.load_iris()
X = iris.data[:, 2:4]  # use petal length & width for 2D visualization
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1234)

clf = Custom_KNN(k=5, metric='euclidean', weights='uniform')
clf.fit(X_train, y_train)
preds = clf.predict(X_test)
acc = clf.score(X_test, y_test)
print("Predictions:", preds)
print("Accuracy:", acc)


Predictions: [1 1 2 0 1 0 0 0 1 2 1 0 2 1 0 1 2 0 2 1 1 1 1 1 2 0 2 1 2 0]
Accuracy: 1.0


In [14]:
# 1) Load
X_df, y_ser, feature_names = load_csv("./heart_statlog_cleveland_hungary_final.csv", label_column="target")

# 2) Preprocess (fills missing, one-hot encodes categoricals, scales)
X, cols, scaler = simple_preprocess_1(
    X_df, fillna='mean', encode_categorical=True, scale=True)

# 3) split
X_tr, X_te, y_tr, y_te = train_test_split(
    X, y_ser.values, test_size=0.25, random_state=42)

# 4) fit & evaluate
clf = Custom_KNN(k=9,metric="manhattan", weights='distance')
clf.fit(X_tr, y_tr)
print("Score:", clf.score(X_te, y_te))


Score: 0.9362416107382551
