In [143]:
import numpy as np
import matplotlib.pyplot as plt
import sklearn
import random
from Wu_reproduction.HSIC import HSIC2
from sklearn.kernel_approximation import RBFSampler
from sklearn.datasets import load_iris
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score

RANDOM_STATE = 42

In [120]:
class Kernel():
    def __init__(self):
        pass

    def __call__(self, x):
        return x

    def feature_map(self, x):
        return x



def sample_rff_weights(nin, nout, sigma):
    return np.random.normal(loc=0, scale=sigma, size=(nin, nout))

def sample_rff_b(nin):
    return np.random.uniform(0, 2 * np.pi, size=(nin,1))


class RBFKernel():
    def __init__(self, nin, nout, sigma, static=True):
        self.gamma = 1 / (2 * sigma * sigma)
        self.rff_weights = sample_rff_weights(nin, nout, sigma)
        self.b = sample_rff_b(nin)
        self.define_feature_map(static, nout)

    def define_feature_map(self, static, nout):
        if static:
            def feature_map(x):
                z = np.dot(x, self.rff_weights)
                return np.cos(z)
            self.feature_map=feature_map
        else:
            def feature_map(x, nout):
                sampler = RBFSampler(gamma=self.gamma, n_components=nout, random_state=RANDOM_STATE)
                return sampler.fit_transform(x)
            self.feature_map=feature_map

    def __call__(self, x):
        K = sklearn.metrics.pairwise.rbf_kernel(x, gamma=self.gamma)
        return K


class LinearKernel():
    def __init__(self):
        pass

    def __call__(self, z):
        if len(z.shape) == 1:
            z = z.reshape(z.shape[0], 1).copy()
        return np.dot(z, z.T)
    
    def feature_map(self, z):
        return z

In [146]:

class NIK_Layer():
    def __init__(self, nin, nout, sigma, kernel_x=RBFKernel, kernel_y=LinearKernel) -> None:
        self.weights = None
        self.kernel_x = kernel_x(nin, nout, sigma)
        self.kernel_y = kernel_y()
        self.sigma = sigma

    def forward(self, x):

        ## Depending how the weights get calculated, the output dimension of X varies. Right now, it will be the amount of labels
        ## This would not make much sense as it introduces massive bottlenecks in each layer... it is not clear how this works...

        z = np.dot(x, self.weights.T)
        return self.kernel_x.feature_map(z)

    def print_kernel(self):
        pass

    def calculate_HSIC(self, x, y):
        # for label in np.unique(y):
        #     l_indices = np.where(y==label)
        x = np.dot(x, self.weights.T)
        return HSIC2(x, y, self.sigma, self.kernel_x, self.kernel_y)

    def calculate_Ws(self, x, y):
        # Future Note: If more closed form solutions for the weights are found for other kernels; implement this function as part of the kernel instead of the layer.

        # Note: No normalization yet
        # Note: output dimensions of Ws are (number of unique labels, number of features of X)
        labels = np.unique(y)
        Ws = np.zeros((labels.shape[0], x.shape[1]))
        for i, label in enumerate(labels):
            l_indices = np.where(y==label)
            Ws[i,:] = np.sum(x[l_indices, :], axis=1)
        self.weights = Ws

    def calculate_Wopt(self):
        raise ValueError("W* is not yet implemented")


class NIK_model():

    def __init__(self, max_layer, n_features = 200, kernel_x=RBFKernel, kernel_y=LinearKernel, method="Ws") -> None:
        self.layers = []
        self.max_layer = max_layer
        self.method = method
        self.kernel_x = kernel_x
        self.kernel_y = kernel_y
        self.n_features = n_features

    def fit(self, X: np.array, Y: np.array, verbose: bool = True, sigma_range = [0.00001, 0.001, 0.005, 0.01, 0.05, 0.1]):
        self.unique_labels = np.unique(Y)
        i = 1
        while len(self.layers) < self.max_layer:

            if verbose:
                print(f"Training layer {i}")
                print("------------------")
                i += 1

            nin = X.shape[1]
            current_layer = self.get_optimal_layer(X, Y, nin, sigma_range, verbose=verbose)

            # Should be build to check if the layer improves the model. Currently not implemented so final amount of layers = self.max_layers
            stopping_criteria = False
            if stopping_criteria:
                break

            X = current_layer.forward(X)

            self.layers.append(current_layer)

            # if verbose:
            #     accuracy()
            

    def get_optimal_layer(self, x, y, nin, sigma_range, verbose):
        print(x.shape)
        print(np.mean(x))
        HSIC_optimal = 0
        best_layer = None
        nin = np.unique(y).shape[0]
        for sigma in sigma_range:
            candidate_layer = NIK_Layer(nin, self.n_features, sigma, kernel_x=self.kernel_x, kernel_y=self.kernel_y)
            if self.method == "Ws":
                candidate_layer.calculate_Ws(x, y)
            elif self.method == "W*":
                candidate_layer.calculate_Wopt()
            else:
                raise ValueError(f"Method should either be 'Ws' or 'W*'. Got {self.method} instead")

            # This implementation of the HSIC might well be very wrong... it is just conflicting in every sense with the paper and his implementation
            HSIC_candidate_layer = candidate_layer.calculate_HSIC(x, y)

            if verbose:
                print(f"Sigma: {sigma} - HSIC_candidate: {HSIC_candidate_layer} - HSIC_optimal: {HSIC_optimal}")

            if HSIC_candidate_layer > HSIC_optimal:
                HSIC_optimal = HSIC_candidate_layer
                best_layer = candidate_layer
    
        return best_layer

    def predict(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

In [147]:
def accuracy():
    pass

In [148]:
np.dot(y.reshape(150, 1), y.reshape(150,1).T)

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 4, 4, 4],
       [0, 0, 0, ..., 4, 4, 4],
       [0, 0, 0, ..., 4, 4, 4]])

In [166]:
def HSIC2(x, y, sigma, KernelX, KernelY):
    Kx = KernelX(x)
    Ky = KernelY(y)

    HKx = Kx - np.mean(Kx, axis=0) # equivalent to		HKᵪ = H.dot(Kᵪ)
    HKy = Ky - np.mean(Ky, axis=0) # equivalent to		HKᵧ = H.dot(Kᵧ)

    Hxy = np.trace(HKx.T*HKy)

    # Hx = np.linalg.norm(HKx)
    # Hy = np.linalg.norm(HKy)

    # print(f"Hxy: {Hxy}")
    # print(f"HKy: {HKy}")
    # print(f"Hx: {Hx}")
    # print(f"Hy: {Hy}")

    # hsic = Hxy / (Hx * Hy)

    return Hxy

In [184]:
c = NIK_model(10)

In [185]:
iris = load_iris()
X = iris.data
y = iris.target.reshape(150,)

In [186]:
y.shape

(150,)

In [187]:
c.fit(X, y)

Training layer 1
------------------
(150, 4)
3.4644999999999997
Sigma: 1e-05 - HSIC_candidate: 99.33333333333331 - HSIC_optimal: 0
Sigma: 0.001 - HSIC_candidate: 99.30676582293367 - HSIC_optimal: 99.33333333333331
Sigma: 0.005 - HSIC_candidate: 99.30667064001359 - HSIC_optimal: 99.33333333333331
Sigma: 0.01 - HSIC_candidate: 99.30666766005889 - HSIC_optimal: 99.33333333333331
Sigma: 0.05 - HSIC_candidate: 99.30666670640305 - HSIC_optimal: 99.33333333333331
Sigma: 0.1 - HSIC_candidate: 99.30666667660076 - HSIC_optimal: 99.33333333333331
Training layer 2
------------------
(150, 200)
0.9984815282802902
Sigma: 1e-05 - HSIC_candidate: 99.30666666666666 - HSIC_optimal: 0
Sigma: 0.001 - HSIC_candidate: 99.28045225782088 - HSIC_optimal: 99.30666666666666
Sigma: 0.005 - HSIC_candidate: 99.26091869666232 - HSIC_optimal: 99.30666666666666
Sigma: 0.01 - HSIC_candidate: 99.21621285347086 - HSIC_optimal: 99.30666666666666
Sigma: 0.05 - HSIC_candidate: 98.94643714967114 - HSIC_optimal: 99.3066666666

In [188]:
c.predict(X).shape

(150, 200)

In [191]:
clf = GaussianNB()

y_pred = clf.fit(c.predict(X), y).predict(c.predict(X))

In [192]:
accuracy_score(y_pred=y_pred, y_true=y)

0.7466666666666667