# KNN implementation

The goal of this project is to develop an algorithm that implements the KNN (K-Nearest Neighbors) machine learning model through a class using only the numpy library (methods and functions take pandas dataframe format as input and use the same formatting as the scikit-learn library).

The dataframe to be used as an example is the Titanic Kaggle competition (https://www.kaggle.com/competitions/titanic/data). Considerations and feature normalization have been carried out, and the methods used are shown below.

In [1]:
import pandas as pd

def normalize(s):
    #StandardScaler
    mean = s.mean()
    std = s.std()
    s = (s-mean)/std

    #MinMaxScaler
    max = s.max()
    min = s.min()
    s = (s-min)/(max-min)
    return s

def get_df(path):
    df = pd.read_csv(path)
    df = df.drop(['PassengerId', 'Name', 'Ticket', 'Cabin', 'Embarked'], axis=1)
    df['Sex'] = df['Sex'].map({'male': 0, 'female': 1})
    df = df.dropna()
    df['Age'] = normalize(df['Age'])
    df['SibSp'] = normalize(df['SibSp'])
    df['Parch'] = normalize(df['Parch'])        
    df['Fare'] = normalize(df['Fare'])
    df = pd.get_dummies(df, columns=['Pclass'])
    df = df.reset_index(drop=True)
    return df

df = get_df('/content/train.csv')
df

Unnamed: 0,Survived,Sex,Age,SibSp,Parch,Fare,Pclass_1,Pclass_2,Pclass_3
0,0,0,0.271174,0.2,0.000000,0.014151,0,0,1
1,1,1,0.472229,0.2,0.000000,0.139136,1,0,0
2,1,1,0.321438,0.0,0.000000,0.015469,0,0,1
3,1,1,0.434531,0.2,0.000000,0.103644,1,0,0
4,0,0,0.434531,0.0,0.000000,0.015713,0,0,1
...,...,...,...,...,...,...,...,...,...
709,0,1,0.484795,0.0,0.833333,0.056848,0,0,1
710,0,0,0.334004,0.0,0.000000,0.025374,0,1,0
711,1,1,0.233476,0.0,0.000000,0.058556,1,0,0
712,1,0,0.321438,0.0,0.000000,0.058556,1,0,0


In [2]:
import numpy as np

#Data split for training and testing.
df_train = df.drop(df.tail(100).index)
X_train = df_train.drop('Survived', axis=1)
y_train = df_train['Survived']

df_test = df.tail(100)
X_test = df_test.drop('Survived', axis=1)
y_test = df_test['Survived']

class KNNClassifier:
    def __init__(self, X_train, y_train, k=5, p=1):
        self.X_train = X_train.values
        self.y_train = y_train.values[:, np.newaxis]
        self.k = k  #Number of neighbors to consider for classification
        self.p = p  #Factor for calculating the distance between the training feature and the test feature.

    def predict(self, X_test):
        X_train = self.X_train
        y_train = self.y_train
        X_test = X_test.values

        #Iteration to measure the distance of all test rows to all training rows.
        y_pred = []
        for i in range(len(X_test)):
            #For each set of distances associated with each test row, a prediction is made through a weighted vote by the inverse of the distance using the get_voting() method.
            distance_list = []
            for j in range(len(X_train)):
                distance_list.append(self.get_distance(row_1=X_test[i], row_2=X_train[j]))
                
            distance_array = np.array(distance_list)[:, np.newaxis]
            y_pred.append(self.get_voting(distance_array))

        self.distance_array = distance_array
        self.y_pred = np.array(y_pred)

    def get_distance(self, row_1, row_2):
        p = self.p        
        sum = 0
        for i in range(len(row_1)):
            sum += abs(row_1[i] - row_2[i])**p
        return sum**(1/p)

    def get_voting(self, distance_array):
        y_train = self.y_train
        k = self.k

        M = np.hstack((distance_array, y_train))
        M = M[M[:,0].argsort()]

        #Adding more rows to the voting matrix in case of a tie in the distance between positions k-1 and k.
        while M[k, 0] == M[k-1, 0]:
            k += 1
        V = M[:k]

        W = []
        #The following iteration aims to calculate the inverse of the distances and apply these results in the voting process.
        for y in np.unique(V[:,1]):
            query = V[V[:,1] == y]
            dist = query[:, 0]
            inv_dist = self.get_inv_dist(dist)
            votes = np.sum(inv_dist)
            W.append([y, votes])
        W = np.array(W)

        #Finally, the voting matrix is sorted and the winner is chosen.
        W = W[W[:,1].argsort()]
        winner = W[-1, 0]
        return winner

    def get_inv_dist(self, array):
        inv_list = []
        for i in array:
            if i == 0:
                inv_list.append(float('inf'))
            else:
                inv_list.append(1/i)
        return np.array(inv_list)

    def get_score(self, y_test):
        #This method is identical to the "accuracy" score in the scikit-learn library.
        y_test = y_test.values
        if hasattr(self, 'y_pred') == False:
            print("'No prediction has been made yet.")
            return
        
        y_pred = self.y_pred
        right = 0
        for i in range(len(y_test)):
            if y_test[i] == y_pred[i]:
                right += 1
        
        return right/len(y_test)

clf = KNNClassifier(X_train, y_train)
clf.predict(X_test)
clf.get_score(y_test)

0.84

##Cross-validation

The next cell seeks to develop a cross-validation process using the object created above.

The following cells aim to perform the same cross-validation process using the algorithm imported from the scikit-learn library.

In [3]:
from math import ceil

class CrossValidation:
    def __init__(self, df, n=5):
        df = df.sample(frac=1)
        rows_per_split = ceil(len(df)/n)

        split_list = [df.iloc[i:i + rows_per_split, :] for i in range(0, len(df), rows_per_split)]
        self.split_list = split_list

        score_list = []
        for i in range(n):
            df_train = pd.concat(split_list[:i] + split_list[i+1:])
            df_test = split_list[i]

            X_train, y_train = self.get_X_y(df_train, 'Survived')
            X_test, y_test = self.get_X_y(df_test, 'Survived')

            clf = KNNClassifier(X_train, y_train)
            clf.predict(X_test)
            score_list.append(clf.get_score(y_test))

        self.score_list = score_list

    def get_X_y(self, df, target):
        X = df.drop(target, axis=1)
        y = df[target]
        return X, y

CrossValidation(df).score_list

[0.8041958041958042,
 0.7272727272727273,
 0.7762237762237763,
 0.8181818181818182,
 0.7605633802816901]

In [5]:
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.neighbors import KNeighborsClassifier
random_state = 0

def apply_cv(df, model, random_state=0):
    X = df.drop('Survived', axis=1)
    y = df['Survived']

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)
    scores = cross_val_score(model, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
    return scores

df = get_df('/content/train.csv')
apply_cv(df, KNeighborsClassifier(n_neighbors=5, weights='distance', algorithm='brute'))

array([0.76923077, 0.8041958 , 0.81118881, 0.74125874, 0.77464789])

In [6]:
apply_cv(df, KNeighborsClassifier(n_neighbors=5, weights='uniform', algorithm='brute'))

array([0.78321678, 0.81118881, 0.81118881, 0.76923077, 0.76760563])