In [None]:
import numpy as np
import h5py

In [5]:
from src.load_script import load_contest_train_dataset
from sklearn.model_selection import train_test_split

if True:
    X, y, samples = load_contest_train_dataset('datasets/contest_TRAIN.h5', 10)
    wavelengths = X.columns

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42, stratify=samples)
    del X, y, samples
else:
    X_train = np.load(open('datasets/x_train.npy', 'rb'))
    y_train = np.load(open('datasets/y_train.npy', 'rb'))
    X_test = np.load(open('datasets/x_test.npy', 'rb'))
    y_test = np.load(open('datasets/y_test.npy', 'rb'))
    wavelengths = np.load(open('datasets/wavelengths.npy', 'rb'))

In [None]:
from sklearn.preprocessing import RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import Normalizer

def euclid(a, b):
    return np.linalg.norm(a - b)

def cosine(a, b):
    return 1 - np.dot(a, b) / np.linalg.norm(a) / np.linalg.norm(b)

def direct_mutual_information(x, y, bins=100):
    c_xy, _, _ = np.histogram2d(x, y, bins)
    return - mutual_info_score(None, None, contingency=c_xy)

def mutual_information(a, b, bins=100):
    hgram, _, _ = np.histogram2d(a, b)
    pxy = hgram / float(np.sum(hgram))
    px = np.sum(pxy, axis=1)
    py = np.sum(pxy, axis=0)
    px_py = px[:, None] * py[None, :]
    nzs = pxy > 0
    return - np.sum(pxy[nzs] * np.log(pxy[nzs] / px_py[nzs]))


models = []
for metric in [mutual_information]:
    pipe = Pipeline([
        #('scaling', RobustScaler(unit_variance=True)),
        #('pca', PCA(whiten=False)),
        ('normalize', Normalizer(norm='max')),
        ('clf', KNeighborsClassifier(metric=metric, n_neighbors=5)),
    ])
    params = {
        'clf__metric'  : [lambda x, y: mutual_information(x, y, bins) for bins in [10, 100, 1000]],
    }
    gs = GridSearchCV(pipe, params, verbose=3, cv=2).fit(X_train, y_train)
    models.append(gs.best_estimator_)

In [None]:
from sklearn.metrics import classification_report
for model, model_name in zip(models, ['euclid', 'cosine', 'mutual_information']):
    print(model_name, sep=', best params were: ')
    print(classification_report(y_test, model.predict(X_test)))
    print('=' * 80)