<a href="https://colab.research.google.com/github/chenyaofo/statistical-learning-method-numpy-impl/blob/main/algorithms/KNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy
import requests
from io import BytesIO
from sklearn.datasets import load_svmlight_file
from sklearn.model_selection import train_test_split

In [2]:
# we can modify the following 'download_link' and `n_features` to switch to another dataset
# !!! note that this impl can not be used for a large dataset since it predict the total val samples
# in a single batch, this requires great memory footprint

# we can choose a specific datasets from libsvm (https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/)
# here we choose dataset australian (https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#australian)
# set the download_link from the website (note that here we download the scaled version)
download_link = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/australian_scale"
# set the #features according to the info in the website
n_features = 14
# set the test set proportion to 50%
test_size = 0.5

In [3]:
def download_process_data(download_link, n_features, test_size):
    # download the dataset in a online way
    r = requests.get(download_link)

    # load the dataset by sklearn api
    X, y = load_svmlight_file(BytesIO(r.content), n_features=n_features)
    X = X.toarray()

    n_samples, n_features = X.shape
    X = numpy.column_stack((X, numpy.ones((n_samples, 1))))
    y = y.reshape((-1, 1))

    if test_size is not None:
        # split the dataset into train and val sets by sklearn api
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.5)
        return X_train, X_val, y_train, y_val
    else:
        return X, y

In [4]:
# the shape of X_train is (#train_set, n_features)
# the shape of X_val is (#val_set, n_features)
# the shape of y_train is (#train_set, 1)
# the shape of y_val is (#val_set, 1)
X_train, X_val, y_train, y_val = download_process_data(download_link, n_features, test_size)
# the default type of label is float, we cast it into int32
y_train, y_val = y_train.astype(numpy.int32), y_val.astype(numpy.int32)

# we transform label -1 to 0 since the function 'bincount' only receives non-negative inputs
y_train[y_train==-1]=0
y_val[y_val==-1]=0

# we find the maximum label in train and val sets, which is a prameter for the function 'bincount'
maximum_label = numpy.max(numpy.concatenate([y_train, y_val], axis=0))

In [5]:
# here we define euclidean metric, other distance metric is also ok
def compute_euclidean_distance(X_train:numpy.array, X_val:numpy.array):
    # we expand some dimenstions for these two matrixs, in this case,
    # we can compute the distance between every two samples by broadcast 
    X_train = numpy.expand_dims(X_train, axis=0)
    X_val = numpy.expand_dims(X_val, axis=1)
    return numpy.sqrt(((X_val - X_train)**2).sum(axis=-1)) # return shape is (#val_set, #train_set)

In [6]:
# here we define a function to find topk nearest samples (called closest set) of a given val/test sample x
# note that we impl it in a batch way, which is more efficient
def find_closest_set(distances:numpy.array, X_train:numpy.array, y_train:numpy.array, topk:int):
    ind = numpy.argpartition(distances, kth=topk, axis=-1)[:,:topk]
    return numpy.take(X_train, ind, axis=0), numpy.take(y_train, ind, axis=0)

In [7]:
# here we define a function to predict the label of a given val/test sample x based on its closest set
def predict_in_closest_set(y_closest_set:numpy.array, maximum_label:int):
    # we predict the label from a given sample by vote
    # we adopt the label with highest frequency in the closest set
    freq = numpy.apply_along_axis(numpy.bincount, 1, y_closest_set, minlength=maximum_label+1)
    return freq.argmax(axis=1)

In [8]:
def k_nearest_algo(X_train:numpy.array, y_train:numpy.array,
                   X_val:numpy.array, topk:int, maximum_label:int):
    # step 1: compute the distance between every two samples in train ans val set
    distances = compute_euclidean_distance(X_train, X_val) # (#val_set, #train_set)
    # step 2: for each val/test sample, we find its topk nearest train samples
    x_closest_set, y_closest_set = find_closest_set(distances, X_train, y_train, topk) # (#val_set, topk, n_features), (#val_set, topk, 1)
    # step 3: we vote for final prediction based on above found nearest train samples
    if topk == 1: # when k=1, we do not need to vote
        return y_closest_set.squeeze(axis=1) # (#val_set, 1)
    else:
        return predict_in_closest_set(y_closest_set, maximum_label) # (#val_set, 1)

In [9]:
%time
# here we set some hyper-parameters
topk = 5 # we set K to 5 in the algorithm
# run k nearest algo
y_predict = k_nearest_algo(X_train, y_train, X_val, topk, maximum_label)

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.63 µs


In [10]:
def compute_accuracy(y_gt:numpy.array, y_predict:numpy.array):
    # number of correct samples / number of total samples
    return (y_gt == y_predict).sum() / y_gt.size

In [11]:
# compute the accuracy on predicted samples of the val set
print(f"The accuracy on val set is {compute_accuracy(y_val, y_predict)*100.0:.2f}%")

The accuracy on val set is 84.93%
