In [1]:
import numpy as np
from math import sqrt
from collections import Counter

In [17]:
from functools import reduce
class KNNClassifier:
    def __init__(self, k):
        assert k >= 1, "k must be valid"
        self.k = k
        self._X_train = None
        self._Y_train = None
    
    def fit(self, X_train, Y_train):
        assert X_train.shape[0] == Y_train.shape[0], \
            "The size of X_train and Y_train must be equal."
        assert self.k <= X_train.shape[0], \
            "The size of X_train must be at least k."
        
        self._X_train = X_train
        self._Y_train = Y_train
        return self
    
    def predict(self, X_predict):
        assert self._X_train is not None and self._Y_train is not None, \
            "must fit before predict"
        assert X_predict.shape[1] == self._X_train.shape[1], \
            "the feature number of X_predict must be equal to X_train"
            
        y_predict = [self._predict(x) for x in X_predict]
        return np.array(y_predict)
    
    def _predict(self, x):
        #给定单个待预测数据x，返回x的预测结果
        assert x.shape[0] == self._X_train.shape[1], \
            "the feature number of x must be equal to X_train"
            
        distances = [sqrt(np.sum((x_train - x) ** 2)) for x_train in self._X_train]
            
        nearest = np.argsort(distances)
            
        topK_y = [self._Y_train[i] for i in nearest[:self.k]]
        votes = Counter(topK_y)
            
        return votes.most_common(1)[0][0]
      
    def score(self, X_test, y_test):
        #直接通过测试数据返回准确度，不再返回测试结果占用内存
        hits = 0
        is_hit = lambda x, y: 1 if self._predict(x) == y else 0
        accumulator = lambda acc, val: acc+val
        return reduce(accumulator, map(is_hit, X_test, y_test)) / len(y_test)
    
    def __repr__(self):
        return "KNN(k=%d)" % self.k