In [0]:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [0]:
#authenticate
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [0]:
import numpy as np
from sklearn.neighbors import NearestNeighbors

In [0]:
def L2(pred,true):
    loss = np.square(pred-true)
    return loss.mean()

In [0]:
def L1(pred,true):
    loss = np.abs(pred-true)
    return loss.mean()

In [0]:
def SMAPE(pred,true):
    loss = np.abs((pred-true)/(pred+true))
    return loss.mean()

In [0]:
#This function chooses the best point estimate for a numpy array, according to a particular loss.
#The loss function should take two numpy arrays as arguments, and return a scalar. One example is SMAPE, see above.

def solver(x,loss):
    mean = x.mean()
    best = loss(mean,x)
    result = mean
    for i in x:
        score = loss(i,x)
        if score < best:
            best = score
            result = i
    return result

In [0]:
class NonparametricKNN(object):
    def __init__(self,n_neighbors=5,loss='L2'):
        if loss in ['L1','L2','SMAPE']:
            loss = {'L1':L1,'L2':L2,'SMAPE':SMAPE}[loss]
        self.model = NearestNeighbors(n_neighbors,algorithm='auto',n_jobs=-1)
        self.solver = lambda x:solver(x,loss)
    def fit(self,train,target):#All inputs should be numpy arrays.
        self.model.fit(train)
        self.f=np.vectorize(lambda x:target[x])
        return self
    def predict(self,test):#Return predictions as a numpy array.
        neighbors = self.model.kneighbors(test,return_distance=False)
        neighbors = self.f(neighbors)
        result = np.apply_along_axis(self.solver,1,neighbors)
        return result

In [0]:
#train, target and test data
train = np.array([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5]])
target = np.array([1,2,3,7,17])
test = np.array([[3,3,3],[6,6,6]])

In [0]:
#Initialize model
model = NonparametricKNN(n_neighbors=3,loss='L2')

In [15]:
#Train the model and predict using it
#Train
model.fit(train,target)
#Predict
model.predict(test)

array([4., 9.])

In [16]:
#When we use L2 loss, we get exactly the same result as the KNN regressor in sklearn.
from sklearn.neighbors import KNeighborsRegressor
model = NonparametricKNN(n_neighbors=3)
model.fit(train,target)
model.predict(test)

array([4., 9.])

In [17]:
#When we use other loss, predictions could be different.
model = NonparametricKNN(n_neighbors=3,loss='L1')
model.fit(train,target)
model.predict(test)

array([3, 7])

In [18]:
#Customized loss
def loss(pred,true):#Loss function should take two numpy arrays as inputs...
    result = np.power(pred-true,4)
    return result.mean()#and return a scalar.
model = NonparametricKNN(n_neighbors=3,loss=loss)#Pass the loss function as argument.
model.fit(train,target)
model.predict(test)

array([4., 9.])