# KNN Implementation

1. Select K value.
2. Memorize the data.
    - load the data;
    - compute the distances;
    - save it;
3. Assign class for new element.

# 0.0 Import Libraries

In [1]:
import time
import numpy as np
import pandas as pd
from numba import jit
import scipy.stats as st
import statistics as stat
import plotly.express as px
from sklearn import datasets
from sklearn.pipeline import Pipeline

from sklearn.metrics import accuracy_score
from multiprocessing import Pool, cpu_count
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split

In [2]:
cell_hover = {
    "selector": "td:hover",
    "props": [("background-color", "#fae7af")]
}
index_names = {
    "selector": ".index_name",
    "props": "font-style: italic; color: #021e37; font-weight:normal;"
}
headers = {
    "selector": "th:not(.index_name)",
    "props": "background-color: #021e37; color: white;"
}
facecolor='#021e37'

'1.26.2'

# 1.0 Creating the Data

In [3]:
# creating random matrix.
np.random.seed(42)
data = np.random.rand(40,3)*100


In [4]:
# assigning labels for close elements.
np.random.seed(42)
class_1 = np.random.randint(low=7, high=27, size=(100, 11)) * np.random.random()
class_2 = np.random.randint(low=50, high=75, size=(150, 11)) * np.random.random()
class_3 = np.random.randint(low=90, high=115, size=(150, 11)) * np.random.random()

data = np.concatenate([class_1, class_2, class_3])
y = np.array([0 if i < 100 else 1 if i < 250 else 2 for i in range(400)])

## 1.1 Visualizing the data

In [5]:
px.scatter_3d(data_frame=data, x=data[:, 0], y=data[:, 1], z=data[:, 2], color=y)

# 2.0 Implementing KNeighbors Class

In [6]:

class KNeighbors:

    """
    K-nearest neighbors classifier.

    Parameters:
    - n_neighbors (int): Number of neighbors to consider.
    - metric (str): Distance metric (e.g., 'euclidean').

    Methods:
    - fit(X: np.array, y: np.array): Save the training data.
    - euclidean_distance(data: np.array): Compute Euclidean distances between data and training set.
    - predict(X_test: np.array, batch_size: int = 100): Make predictions using k-nearest neighbors algorithm.

    Example:
    ```python
    knn = KNeighbors(n_neighbors=5, metric='euclidean')
    knn.fit(X_train, y_train)
    predictions = knn.predict(X_test, batch_size=100)
    ```

    """

    def __init__(self, n_neighbors: int, metric: str):
        """
        Initialize the K-nearest neighbors classifier.

        Parameters:
        - n_neighbors (int): Number of neighbors to consider.
        - metric (str): Distance metric (e.g., 'euclidean').
        """
        
        self.n_neighbors = n_neighbors
        self.metric = metric


    def fit(self, X: np.array, y: np.array):

        """
        Save the training data.

        Parameters:
        - X (np.array): Training data.
        - y (np.array): Labels.
        """

        self.X = X
        self.y = y


    def euclidean_distance(self, data: np.array):

        """
        Compute Euclidean distances between data and training set.

        Parameters:
        - data (np.array): Data points to compute distances for.

        Returns:
        np.array: Euclidean distances.
        """

        distances = np.sqrt(np.sum((data[:, np.newaxis] - self.X[np.newaxis])**2, axis=2))

        return distances


    def predict(self, X_test: np.array, batch_size: int = 100):

       
        """
        Make predictions using the k-nearest neighbors algorithm.

        Parameters:
        - X_test (np.array): Dataset used to predict the labels.
        - batch_size (int): Batch size for processing distances.

        Returns:
        np.array: Predicted labels.
        """

        self.X_test = X_test

        distances = np.concatenate([self.euclidean_distance(X_test[i:i+batch_size]) for i in range(0, len(X_test), batch_size)])
  
        indices = np.concatenate([self.y[np.argsort(distances[i:i+batch_size])][:, :self.n_neighbors] for i in range(0, len(distances), batch_size)])

        if self.metric == 'euclidean':

            if self.n_neighbors == 0:

                raise ValueError('n_neighbors cannot be zero.')

            if self.n_neighbors % 2 == 0:

                predictions = np.array([np.random.choice(stat.multimode(voters)) for voters in indices])

            else:

                predictions = np.array([stat.multimode(voters)[0] for voters in indices])

            return predictions
        
        else:
            raise ValueError('Invalid metric. Only euclidean distance is supported.')


In [7]:
knn = KNeighbors(n_neighbors=5, metric='euclidean')
knn.fit(data, y)

In [8]:
y_h = knn.predict(data, batch_size=100)

In [9]:
accuracy_score(y, y_h)

1.0

# 3.0 Test on Iris Dataset

In [10]:
X, y = datasets.load_iris(return_X_y=True)
X.shape

(150, 4)

In [11]:
X_train,  X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, stratify=y, random_state=42)

Checking size of the datasets

In [12]:
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(127, 4)
(127,)
(23, 4)
(23,)


In [13]:
%%time
knn_test = KNeighbors(n_neighbors=5, metric='euclidean')
knn_test.fit(X_train, y_train)

y_hat = knn_test.predict(X_test)
y_hat_train = knn_test.predict(X_train)

print(f'Train Accuracy: {accuracy_score(y_true=y_train, y_pred=y_hat_train)}')
print(f'Test Accuracy: {accuracy_score(y_true=y_test, y_pred=y_hat)}')

Train Accuracy: 0.9606299212598425
Test Accuracy: 1.0
CPU times: total: 0 ns
Wall time: 4.51 ms


**Scaled Data**

In [14]:
std = StandardScaler()
std.fit(X_train)

X_train_scaled = std.transform(X_train)
X_test_scaled = std.transform(X_test)

In [15]:
%%time
knn_test = KNeighbors(n_neighbors=5, metric='euclidean')
knn_test.fit(X_train_scaled, y_train)
y_hat_sc = knn_test.predict(X_test_scaled)
y_hat_train_sc = knn_test.predict(X_train_scaled)


accuracy_iris_scratch = accuracy_score(y_true=y_test, y_pred=y_hat_sc)
accuracy_iris_scratch_train = accuracy_score(y_true=y_train, y_pred=y_hat_train_sc)

print(f'Train Accuracy: {accuracy_iris_scratch_train}')
print(f'Test Accuracy: {accuracy_iris_scratch }')

Train Accuracy: 0.968503937007874
Test Accuracy: 0.9565217391304348
CPU times: total: 0 ns
Wall time: 3 ms


# 3.1 Test on Iris Dataset with Sklearn

In [16]:
%%time
knn_sk = KNeighborsClassifier(n_neighbors=5, metric='euclidean')
knn_sk.fit(X_train, y_train)

y_hat_sk = knn_sk.predict(X_test)
y_hat_train_sk = knn_sk.predict(X_train)

CPU times: total: 0 ns
Wall time: 10 ms


In [17]:
%%time
print(f'Train Accuracy: {accuracy_score(y_true=y_train, y_pred=y_hat_train_sk)}')
print(f'Test Accuracy: {accuracy_score(y_true=y_test, y_pred=y_hat_sk)}')

Train Accuracy: 0.9606299212598425
Test Accuracy: 1.0
CPU times: total: 0 ns
Wall time: 2 ms


**Scaled data**

In [18]:
%%time
knn_sk = KNeighborsClassifier(n_neighbors=5, metric='euclidean')
knn_sk.fit(X_train_scaled, y_train)
y_hat_sc_sk = knn_sk.predict(X_test_scaled)
y_hat_train_sc_sk = knn_sk.predict(X_train_scaled)

accuracy_iris_sklearn = accuracy_score(y_true=y_test, y_pred=y_hat_sc_sk)
accuracy_iris_sklearn_train = accuracy_score(y_true=y_train, y_pred=y_hat_train_sc_sk)
print(f'Train Accuracy: {accuracy_iris_sklearn_train}')
print(f'Test Accuracy: {accuracy_iris_sklearn}')

Train Accuracy: 0.968503937007874
Test Accuracy: 0.9565217391304348
CPU times: total: 0 ns
Wall time: 12.2 ms


# 4.0 Testing on Cardio Diseases Dataset 

In [19]:
train = pd.read_csv('data/train.csv')
test = pd.read_csv('data/cardio_test.csv')

train['gender'] = train['gender'].apply(lambda x: 1 if x == 2 else 0)
test['gender'] = test['gender'].apply(lambda x: 1 if x == 2 else 0)

X_train = train.drop(['id', 'cardio'], axis=1)
X_test = test.drop(['id', 'cardio'], axis=1)

y_train = train['cardio'].values
y_test = test['cardio'].values

In [20]:
train.head()

Unnamed: 0,id,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,88308,17537,1,178,110.0,140,80,1,1,0,0,1,0
1,25750,21783,0,165,60.0,110,70,1,1,0,0,1,1
2,17970,20545,0,172,70.0,120,80,1,1,0,0,1,0
3,48020,20186,0,159,73.0,120,80,1,1,0,0,1,1
4,55576,19950,0,162,66.0,110,60,1,1,0,0,1,0


In [21]:
train.shape

(59500, 13)

In [22]:
test.shape

(10500, 13)

**Rescaling the dataset**

In [23]:
std_cols = ['age', 'height', 'weight', 'ap_hi', 'ap_lo']
pass_cols = ['cholesterol', 'gender',	'gluc',	'smoke',	'alco',	'active']
std = StandardScaler()

linear_preprocessor_all = ColumnTransformer(transformers=[('stdscaler', std, std_cols),
                                                          ('pass', 'passthrough', pass_cols)
                                                        
                                                          ])

linear_preprocessor_all.fit(X_train)

X_train_processed = linear_preprocessor_all.transform(X_train)
X_test_processed = linear_preprocessor_all.transform(X_test)

In [24]:
pd.DataFrame(X_train_processed, columns=np.concatenate([std_cols, pass_cols])).head()

Unnamed: 0,age,height,weight,ap_hi,ap_lo,cholesterol,gender,gluc,smoke,alco,active
0,-0.782621,1.669282,2.492975,0.072597,-0.08722,1.0,1.0,1.0,0.0,0.0,1.0
1,0.938316,0.081183,-0.987306,-0.123018,-0.13798,1.0,0.0,1.0,0.0,0.0,1.0
2,0.436544,0.936313,-0.29125,-0.057813,-0.08722,1.0,0.0,1.0,0.0,0.0,1.0
3,0.291039,-0.651786,-0.082433,-0.057813,-0.08722,1.0,0.0,1.0,0.0,0.0,1.0
4,0.195386,-0.285301,-0.569673,-0.123018,-0.188741,1.0,0.0,1.0,0.0,0.0,1.0


In [25]:
%%time
knn_test = KNeighbors(n_neighbors=5, metric='euclidean')
knn_test.fit(X_train_processed, y_train)
#y_hat = knn_test.predict(X_test_processed)

CPU times: total: 0 ns
Wall time: 0 ns


In [26]:
%%time
y_hat = knn_test.predict(X_test_processed, batch_size=300)

CPU times: total: 49.7 s
Wall time: 1min 20s


In [27]:
acuracy_cardio_scratch = accuracy_score(y_true=y_test, y_pred=y_hat)
print(f'Test Accuracy: {acuracy_cardio_scratch}')

Test Accuracy: 0.6522857142857142


## 4.1 KNN from Sklearn

In [28]:
%%time
knn_clf = KNeighborsClassifier(n_neighbors=5, metric='euclidean')

knn_clf.fit(X_train_processed, y_train)
y_hat_sk = knn_clf.predict(X_test_processed)

CPU times: total: 1.41 s
Wall time: 2.19 s


In [29]:
accuracy_cardio_sklearn = accuracy_score(y_true=y_test, y_pred=y_hat_sk)
print(f'Test Accuracy: {accuracy_cardio_sklearn}')

Test Accuracy: 0.6521904761904762


## Summary

In [30]:
summary = pd.DataFrame({"Implementation": ['From Scratch', 'Sklearn', 'From Scratch', 'Sklearn'], 'Dataset': ['IRIS', 'IRIS', 'CARDIO DISEASE', 'CARDIO DISEASE'], 'Accuracy': [accuracy_iris_scratch, accuracy_iris_sklearn, acuracy_cardio_scratch, accuracy_cardio_sklearn]})

In [31]:

summary.style.set_table_styles([cell_hover, index_names, headers])

Unnamed: 0,Implementation,Dataset,Accuracy
0,From Scratch,IRIS,0.956522
1,Sklearn,IRIS,0.956522
2,From Scratch,CARDIO DISEASE,0.652286
3,Sklearn,CARDIO DISEASE,0.65219
