In [1]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from scipy.spatial import distance_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_regression
from sklearn.neighbors import NearestNeighbors


import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras import layers
import tensorflow.keras.backend as K

### Simple Keras Model builder Function

In [2]:
def keras_model(shape, loss, metrics):
    model = models.Sequential()

    for layer in shape:
        model.add(layers.Dense(layer[0], activation=layer[1]))

    model.compile(optimizer='sgd', loss=loss, metrics=metrics)

    return model

### Custom Callback: Generalization Score

In [3]:
class generalization_score(tf.keras.callbacks.Callback):
    def __init__(self, X_train, X_test, y_train, y_test):
        super(generalization_score, self).__init__()
        
        from scipy.spatial import distance_matrix
        
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test
        
        #self.distance_matrix = tf.dtypes.cast(K.constant(np.add(distance_matrix(np.c_[X_train, y_train], np.c_[X_test, y_test]), 1), name='distance_matrix'), tf.float64)
        self.distance_matrix = tf.dtypes.cast(K.constant(np.add(distance_matrix(X_train, X_test), 1), name='distance_matrix'), tf.float64)

        

    def on_epoch_end(self, epoch, logs={}):
        logs['gen_score'] = float('-inf')
        logs['test_mae'] = float('-inf')
        logs['p_score'] = float(1)


        self.y_train_pred = self.model.predict(self.X_train)
        self.y_test_pred = self.model.predict(self.X_test)
        

        errors_difference = K.abs(tf.math.subtract(K.transpose(K.abs(tf.math.subtract(self.y_test, self.y_test_pred))), K.abs(tf.math.subtract(self.y_train, self.y_train_pred))))

        errors_by_distance = tf.math.divide(errors_difference, self.distance_matrix, name='division')

        p_x = K.mean(K.exp(tf.math.negative(errors_by_distance)), axis=1)

        adjusted_error = tf.math.multiply(K.abs(tf.math.subtract(self.y_test, self.y_test_pred)), p_x)
        
        
        gen_score = K.mean(adjusted_error)
        p_score = K.mean(p_x)

        logs['gen_score'] = np.round(gen_score, 5)
        logs['test_mae'] = np.round(K.mean(tf.keras.losses.MAE(self.y_test, self.y_test_pred)), 5)
        logs['p_score'] = np.round(p_score, 5)


### Custom Loss Function: Neirest Neighboor Centroid Based

In [4]:
def centroid(points):
    centroid = np.empty(points[0].shape)
    k = len(points)
    for point in points:
        centroid += point
    return np.divide(centroid,k)


from scipy.spatial.distance import euclidean


def nn_centroids(x_y):

    centroid_list = []
    distances_y_centroid = []

    k = np.add(x_y.shape[-1], 1)
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='auto').fit(x_y)

    distances, indices = nbrs.kneighbors(x_y)

    for k, index in enumerate(indices):
        nn = [x_y[_] for _ in index]

        temp = centroid(nn)

        centroid_list.append(temp)
        distances_y_centroid.append(euclidean(x_y[k], temp))

    return np.array(centroid_list), distances_y_centroid, x_y[:,:-1]



def centroid_loss(x_y):
    centroids, distances_y_centroid, original = nn_centroids(x_y)
    def loss(y_true, y_pred):
        
        y_pred_concat_x = tf.concat([y_pred, original], 1)

        distances_pred_centroid = tf.math.reduce_euclidean_norm(tf.math.subtract(y_pred_concat_x, centroids), 1) 
        
        score = tf.math.multiply(K.abs(y_true - y_pred), tf.math.divide(distances_pred_centroid, distances_y_centroid))
        

        return K.mean(score)

    return loss

### Benchmarking 

In [None]:
X, y = make_regression(1000, 20)


for seed in range(20,40):
    
    print(seed)
    
    # Setup:
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.33,
                                                                          random_state=seed)

    X_train = StandardScaler().fit_transform(X_train)
    y_train = StandardScaler().fit_transform(y_train.reshape(-1, 1))

    X_test = StandardScaler().fit_transform(X_test)
    y_test = StandardScaler().fit_transform(y_test.reshape(-1, 1))

    
    
    model_shape = [[4, 'relu'] * 4, [1, 'linear']]
    
    
    metrics = ['mae']
    generalization_callback = generalization_score(X_train, X_test, y_train, y_test)
    
    batch_size = int(len(X_train)) # Required for the custom loss
    
    epochs = 2000
    
    
    
    
    
    
    
    
    
    # Custom Benchmark:

    built_loss = centroid_loss(np.c_[X_train, y_train])

    compiled_model = keras_model(model_shape, built_loss, metrics)

    history = compiled_model.fit(X_train, y_train,
                                      epochs=epochs, batch_size=batch_size, verbose=0,
                                      callbacks=[generalization_callback])
    
    np.save(f"results/{seed}_custom.npy", history.history, allow_pickle=True)
    

    
    
    # Control Benchmark:
    
    built_loss = 'mae'

    compiled_model = keras_model(model_shape, built_loss, metrics)


    history = compiled_model.fit(X_train, y_train,
                                      epochs=epochs, batch_size=batch_size, verbose=0,
                                      callbacks=[generalization_callback])
    
    np.save(f"results/{seed}_control.npy", history.history, allow_pickle=True)
    
print('Done!')

20
21
22
23
24
25
26
27
28
29
30
31
32
33
