In [199]:
import numpy as np
import scipy
import time
from sklearn.neighbors import KNeighborsClassifier

In [231]:
def time_func(func):
    def wrapper(*args):
        tic = time.time()
        output = func(*args)
        toc = time.time()
#         print(toc - tic)
        return output
    return wrapper

In [232]:
class KNearestNeighbor:
    def __init__(self):
        pass
    
    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
    
    @time_func
    def compute_distances_two_loops(self, X):
        num_test = X.shape[0]
        num_train = self.X_train.shape[0]
        dists = np.zeros((num_test, num_train))
        for i in range(num_test):
            for j in range(num_train):
                dists[i][j] = np.sqrt((X[i] - self.X_train[j]) @ (X[i] - self.X_train[j]))
        return dists
    @time_func
    def compute_distances_one_loop(self, X):
        num_test = X.shape[0]
        num_train = self.X_train.shape[0]
        dists = np.zeros((num_test, num_train))
        for i in range(num_test):
            row_dists = X[i, :] - self.X_train 
            dists[i, :] = np.sqrt(np.sum(row_dists * row_dists, axis=1))
        return dists
    
    @time_func
    def compute_distances_no_loops(self, X):
        num_test = X.shape[0]
        num_train = self.X_train.shape[0]
        dists = np.zeros((num_test, num_train))
        sub = np.expand_dims(X, axis=1) - np.expand_dims(self.X_train, axis=0)
        dists = np.sqrt(np.sum(sub * sub, axis=2))
        return dists
    
    
    def test_dists(self):
        k = np.random.randint(1, 100)
        X_test = np.random.rand(k, self.X_train.shape[1])
        dists_true = scipy.spatial.distance.cdist(X_test, self.X_train)
        assert np.allclose(dists_true, self.compute_distances_two_loops(X_test))
        assert np.allclose(dists_true, self.compute_distances_one_loop(X_test))
        assert np.allclose(dists_true, self.compute_distances_no_loops(X_test))

        
    def predict(self, X, k=5, num_loops=0):
        if num_loops == 2:
            dists = self.compute_distances_two_loops(X)
        elif num_loops == 1:
            dists = self.compute_distances_one_loop(X)
        elif num_loops == 0:
            dists = self.compute_distances_no_loops(X)
        else:
            raise ValueError(f'Invalid value {num_loops} for num_loops')
        return self.predict_labels(dists, k)
        
    def predict_labels(self, dists, k=1):
        num_test = dists.shape[0]
        y_pred = np.zeros(num_test)
        for i in range(num_test):
            top_k_args = np.argsort(dists[i])[:k]  
            closest_y = self.y_train[top_k_args]
            y_pred[i] = np.bincount(closest_y).argmax()
        return y_pred
    
    def test_predict_labels(self):
        k = np.random.randint(1,20)
        knn_true = KNeighborsClassifier(n_neighbors=k)
        
        knn_true.fit(self.X_train, self.y_train)
        test_size = np.random.randint(1,10)
        X_test = np.random.rand(test_size, self.X_train.shape[1])
        
        assert np.allclose(knn_true.predict(X_test), self.predict(X_test, k))

In [233]:
train_size = 4500
num_features = 100
X_train = np.random.rand(train_size, num_features)
y_train = np.random.randint(0, 2, size=(train_size))

In [234]:
knn = KNearestNeighbor()
knn.fit(X_train, y_train)

In [235]:
knn.test_dists()
knn.test_predict_labels()

In [206]:
import time

In [207]:
knn_test = KNearestNeighbor()

In [208]:
def time_function(f, *args):
    """
    Call a function f with args and return the time (in seconds) that it took to execute.
    """
    tic = time.time()
    output = f(*args)
    toc = time.time()
    return toc - tic, output

In [221]:
test_size = np.random.randint(100,1000)
k = 5

In [222]:
knn_test.fit(X_train, y_train)
np.random.seed(42)
X_test = np.random.rand(test_size, num_features)
# real_distances = KNeighborsClassifier(k).fit(X_train, y_train).predict(X_test)
real_distances = scipy.spatial.distance.cdist(X_test, X_train)
two_loop_time, out_2_loops = time_function(knn_test.compute_distances_two_loops, X_test)
assert np.allclose(real_distances, out_2_loops)

In [223]:
np.random.seed(42)
X_test = np.random.rand(test_size, num_features)
real_distances = scipy.spatial.distance.cdist(X_test, X_train)
one_loop_time, out_1_loops = time_function(knn_test.compute_distances_one_loop, X_test)
assert np.allclose(real_distances, out_1_loops)

In [224]:
np.random.seed(42)
X_test = np.random.rand(test_size, num_features)
real_distances = scipy.spatial.distance.cdist(X_test, X_train)
no_loop_time, out_2_loops = time_function(knn_test.compute_distances_two_loops, X_test)
assert np.allclose(real_distances, out_2_loops)

In [225]:
two_loop_time, one_loop_time, no_loop_time

(13.402306079864502, 0.4839286804199219, 13.506409406661987)

In [None]:
# np.random.seed(42)
# for k in [1, 3, 5, 7, 9]:
#     predicted_labels = knn_test.predict(X_test, k=k)
#     predicted_labels = np.array(predicted_labels, dtype=int).squeeze()
#     real_labels = np.array(y_ref_predictions[k], dtype=int).squeeze()
#     assert np.array_equal(predicted_labels, real_labels), 'Wrong answer for k={}'.format(k)