In [1]:
import pandas as pd
import numpy as np
import scipy

In [2]:
df_train_org = pd.read_csv("data/ml_scratch_1_train.csv")
df_test_org = pd.read_csv("data/ml_scratch_1_test.csv")

df_train_org.head()

Unnamed: 0,X,Y,month,day,FFMC,DMC,DC,ISI,temp,RH,wind,rain,area
0,7,5,3,5,86.2,26.2,94.3,5.1,8.2,51,6.7,0.0,0.0
1,7,4,10,2,90.6,35.4,669.1,6.7,18.0,33,0.9,0.0,0.0
2,7,4,10,6,90.6,43.7,686.9,6.7,14.6,33,1.3,0.0,0.0
3,8,6,3,5,91.7,33.3,77.5,9.0,8.3,97,4.0,0.2,0.0
4,8,6,3,7,89.3,51.3,102.2,9.6,11.4,99,1.8,0.0,0.0


In [None]:
class Knn():

    """A knn classifier """

    def __init__(self, 
                 k:"int" = 3,
                 norm:"float"=2.0 ,
                 standardization="normalize"):

        self.k = k
        self.norm = norm
        self.standardization = standardization
  

    def fit(self, tr_X, tr_y):
        """Pass training data as X and y """
        self.standardizer = Standardize(X=tr_X)
        self.tr_X = self.standardizer.standardize(method=self.standardization)
        self.tr_y = tr_y


    def predict(self, ts_X):
        ts_X = self.standardizer.standardize(
            method=self.standardization, X=ts_X)
        d = self._dist_matrix(ts_X)
        neighbours = self._get_nearest_neighbour_indices(d)
        counts = self._count_votes(neighbours)
        return self._tag_winner(counts)


    def _dist_matrix(self, ts_X):
        dists = scipy.spatial.distance_matrix(ts_X, self.tr_X, p=self.norm)
        dists = np.vstack(dists)
        return dists


    def _calc_dist(self, vec):
        rows = self.tr_X.shape[0]
        dists = np.zeros(rows)
        for i in range(rows):
            tr_row = self.tr_X.iloc[i]
            dist = 0
            for j in range(len(tr_row)):
                if isinstance(tr_row[j].dtype, int) and self.cat_hamming:
                    dist += (tr_row[j] == vec[j])*1
                else:
                    dist += (abs(tr_row[j] - vec[j])**self.norm)
            dists[i] = dist ** (1/self.norm)
        return dists


    def _get_nearest_neighbour_indices(self, dist_matrix):
        return np.argpartition(dist_matrix, kth=self.k, axis=1)[:, :self.k]


    def _count_votes(self, indice_matrix):
        return np.apply_along_axis(lambda x: collections.Counter(self.tr_y.iloc[x]), 1, indice_matrix)


    def _tag_winner(self, vote_mat):
        count_max = np.vectorize(lambda x: x.most_common(1)[0][0])
        return count_max(vote_mat)


    def score(self, ts_X, ts_y):
        return (ts_y == self.predict(ts_X)).mean()


    def __repr__(self):
        return (f'Knn(k={self.k}, norm={self.norm}, standardization="{self.standardization}"')
