In [1]:
from sklearn.datasets import load_wine

# %load_ext autoreload
# %autoreload 2

X, y = load_wine(return_X_y=True)

In [2]:
import numpy as np
from scipy.spatial import KDTree
import cvxpy as cp
from sklearn.base import BaseEstimator, ClassifierMixin

class DKNN(BaseEstimator, ClassifierMixin):
    def __init__(self, k, alpha=1, beta=1):
        super().__init__()
        self.k = k          # 'k' neighbors
        self.A = None       # PSD matrix objective
        self.pi = None      # technically log(pi)
        self.trees = []     # search tree for NN
        
        # importance weights for each class (k,)
        if type(alpha) in {float, int}:
            self.alpha = np.full(k, alpha)
        else:
            assert k == len(alpha)
            self.alpha = alpha
        self.beta  = beta   # regularization term

    # Mahalanobis distance
    def dist(self, x, mu, c):
        delta = x - mu
        return np.sum(np.multiply(delta @ self.A, delta), axis=-1) - self.pi[c]

    def fit(self, X, y):
        self.trees = [] # reset for each fit -- when using CV need this
        self.X = X
        self.C = np.unique(y)
        self.classes_ = self.C
        self.c_idx = []  # indices belonging to 'c' w.r.t full training X
        for ci in self.C:
            self.c_idx.append(np.where(y == ci))
        n, d = X.shape

        centroids = []

        # Find centroids of class C[i]
        for idx in self.c_idx:
            # Get k nearest neighbors of class C[i] for all training data X
            tree = KDTree(X[idx])
            _, n_idx = tree.query(X, self.k)
            self.trees.append(tree)

            # Compute centroids
            neighbors = X[idx][n_idx] # X[of class 'c'][its nearest neighbors w.r.t X[c]]
            if self.k == 1:
                centroid_c = neighbors
            else:
                centroid_c = np.mean(neighbors, axis=1)
            centroids.append(centroid_c)
        
        centroids = np.stack(centroids, axis=0)

        # Convex problem formulation
        self.pi = np.array([len(idx[0]) / n for idx in self.c_idx])
        self.A = cp.Variable((d, d))

        delta = X - centroids

        # should work
        # f_mult = np.sum(np.multiply(delta @ self.A, delta), axis=2) - self.pi[:, np.newaxis]
        # print(f_mult[0, 0])

        constraints = []
        epsilon = cp.Variable(n)
        constraints.append(epsilon >= 0)

        for i in range(n):
            for c in self.C:
                if c == y[i]:
                    continue
                constraints += [
                    delta[y[i], i] @ self.A @ delta[y[i], i].T - cp.log(self.pi[y[i]]) + 1 - epsilon[i] 
                    <= delta[c, i] @ self.A @ delta[c   , i].T - cp.log(self.pi[c])
                ]
            constraints += [
                epsilon[i] >= 0
            ]
        
        alpha_vec = np.array([self.alpha[y_i] for y_i in y])  # corresponding class importance weight
        objective = cp.Minimize(cp.sum(cp.multiply(alpha_vec, epsilon)) + self.beta * cp.norm(self.A))

        prob = cp.Problem(objective, constraints)
        prob.solve()

        self.A = self.A.value

    def predict(self, X_new):
        if X_new.ndim == 1:
            n = 1
        else:
            n = X_new.shape[0]

        dist_c = np.empty((n, len(self.trees)))
        for c, t in enumerate(self.trees):
            # each tree 't' is already a subset of X conditioned on y=c
            _, n_idx = t.query(X_new, self.k)

            # Compute centroids
            neighbors = self.X[self.c_idx[c][0][n_idx]]
            centroid = np.mean(neighbors, axis=-2)
            cur_dist = self.dist(X_new, centroid, c)
            dist_c[:, c] = cur_dist
        
        predictions = np.argmin(dist_c, axis=1)
        return predictions
    
    def score(self, X_test, y_test):
        y_pred = self.predict(X_test)
        return np.average(y_pred == y_test)
    
    def get_params(self, deep=False):
        return {
            'k': self.k,
            'alpha': self.alpha,
            'beta': self.beta,
        }
    
    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        return self

In [3]:
# import dataset_helpers as ds
# data, labels = ds.get_UCI_dataset("wine")
# dknn_clf = DKNN(k=3, alpha=[1.0, 0.5, 1.0], beta=0.01)
# ds.accuracy_splits(data, labels, dknn_clf)

In [14]:
from sklearn.model_selection import GridSearchCV, ShuffleSplit
from sklearn.preprocessing import StandardScaler, PowerTransformer
from sklearn.neighbors import KNeighborsClassifier as KNN
from sklearn.neighbors import NearestCentroid
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

class DataTransformer:

    def __init__(self):
        self.pt = PowerTransformer(method="box-cox")
        self.lda = LDA()

    def fit(self, X, y):
        X = self.pt.fit_transform(X)
        # self.lda.fit(X, y)

        return self

    def transform(self, X):
        X = self.pt.transform(X)
        # X = self.lda.transform(X)

        return X

rs = ShuffleSplit(n_splits=10, test_size=0.1)
print("DKNN")
for k in range(3, 7):
    acc = []
    for i, (train_index, test_index) in enumerate(rs.split(X)):
        clf = DKNN(k)
        tf = DataTransformer().fit(X[train_index], y[train_index])

        clf.fit(tf.transform(X[train_index]), y[train_index])
        acc.append(clf.score(tf.transform(X[test_index]), y[test_index]))
    print(f"k = {k}, acc = {np.mean(acc)}")
print("-----------------")
print("KNN")
for k in range(3, 7):
    acc = []
    for i, (train_index, test_index) in enumerate(rs.split(X)):
        clf = KNN(k)
        tf = DataTransformer().fit(X[train_index], y[train_index])

        clf.fit(tf.transform(X[train_index]), y[train_index])
        acc.append(clf.score(tf.transform(X[test_index]), y[test_index]))
    print(f"k = {k}, acc = {np.mean(acc)}")

print("-----------------")
print("Centroid")
acc = []
for i, (train_index, test_index) in enumerate(rs.split(X)):
    clf = NearestCentroid()
    tf = DataTransformer().fit(X[train_index], y[train_index])

    clf.fit(tf.transform(X[train_index]), y[train_index])
    acc.append(clf.score(tf.transform(X[test_index]), y[test_index]))
print(f"acc = {np.mean(acc)}")

DKNN
k = 3, acc = 0.9666666666666666
k = 4, acc = 0.9555555555555555
k = 5, acc = 0.9666666666666666
k = 6, acc = 0.9777777777777779
-----------------
KNN
k = 3, acc = 0.9444444444444444
k = 4, acc = 0.9222222222222223
k = 5, acc = 0.9277777777777778
k = 6, acc = 0.961111111111111
-----------------
Centroid
acc = 0.9722222222222221
