## Scikit-Learns Estimators

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, MaxAbsScaler, MinMaxScaler, RobustScaler

class Prepocess(BaseEstimator, TransformerMixin):
    '''
     The preprocessing of the input is divided in 2 steps:

     1. Max pooling layer with kernel 15 and strides 15: reduce the dimensionality of a factor 15 keeping the max values,
        it preserve the interesting part of the signal
     2. Median filter with kernel 9 to get rid of evenutally present white noise
    '''
    def __init__(self, max_pool_size=15, median_size=9):
        self.max_pool_size = max_pool_size
        self.median_size = median_size
        return None

    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        from skimage.measure import block_reduce
        from scipy.signal import medfilt
        X_max = block_reduce(X, block_size=(1, self.max_pool_size), func=np.max)
        X_med = np.apply_along_axis(medfilt,
                                    axis=1,
                                    arr= X_max,
                                    kernel_size=self.median_size)

        return X_med




class RowScaler(BaseEstimator, TransformerMixin):
    def __init__(self, scaling_method='Standard'):
        self.scaling_options = ['Standard', 'MinMax', 'MaxAbs', 'Robust']
        assert (scaling_method in self.scaling_options), 'scaling_method:' + scaling_method + ' not in ' + str(self.scaling_options)
        
        self.scaling_method = scaling_method
        return None

    def fit(self, X, y=None):
        if self.scaling_method == 'Robust':
            self.scaler = RobustScaler()
        elif self.scaling_method == 'MinMax':
            self.scaler = MinMaxScaler()
        elif self.scaling_method == 'Standard':
            self.scaler = StandardScaler()
        elif self.scaling_method == 'MaxAbs':
            self.scaler = MaxAbsScaler()
        return self
    
    def transform(self, X, y=None):
        return self.scaler.fit_transform(X.transpose()).transpose()

## Results Visualization

In [None]:
def printGridSearchResults(grid_search, scoring):

    print("Best parameters set found on development set:")
    print(grid_search.best_params_)
    print()
    print("Grid scores on development set:")
    print()
    means = grid_search.cv_results_['mean_test_{}'.format(scoring)]
    stds = grid_search.cv_results_['std_test_{}'.format(scoring)]
    for mean, std, params in zip(means, stds, grid_search.cv_results_['params']):
        print("{:0.3f} (+/-{:0.03f}) for {}".format(mean, std * 2, params))

In [None]:
def plot_score (model_dir, model_version, scoring='accuracy', n_results=-1, name=''):
    if model_version.split('.')[-1] == 'pkl':
        model = joblib.load(model_path)
        results = pd.DataFrame(model.cv_results_)
    elif model_version.split('.')[-1] == 'csv':
        results = pd.read_csv(model_path)

    results = results.sort_values(by='rank_test_{}'.format(scoring))
    results = results.reset_index(drop=True)

    test_mean = results['mean_test_{}'.format(scoring)][:n_results]
    test_std = results['std_test_{}'.format(scoring)][:n_results]

    train_mean = results['mean_train_{}'.format(scoring)][:n_results]
    train_std = results['std_train_{}'.format(scoring)][:n_results]

    plt.figure(figsize=(10,6))
    plt.errorbar(range(len(test_mean)), test_mean, yerr=test_std,
                linestyle='None', marker='o', color='b', label='test')

    plt.errorbar(range(len(train_mean)), train_mean, yerr=train_std,
                linestyle='None', marker='o', color='r', label='train')

    plt.tick_params(axis='both', which='major', labelsize=18)
    plt.xlabel('Parameters set (index)', fontsize=18)
    plt.ylabel(name + ' ' + scoring, fontsize=22)
    plt.ylim(0.8,1)

    plt.legend(fontsize=20, frameon=False, loc='lower left')
    plt.show()

## Models Saving

In [None]:
import os

def save_model(model, model_dir, model_version):
    model_path = os.path.join(model_dir, model_version)
    if os.path.exists(model_path):
        print(model_path + ' already exist, not overwritten.')
    else:
        !mkdir -p {model_dir}
        joblib.dump(model, model_path)
        print(model_path + ' succesfully saved.')


def save_keras_model(model, model_dir, model_version):
    model_path = os.path.join(model_dir, model_version)
    if os.path.exists(model_path):
        print(model_path + ' already exist, not overwritten.')
    else:
        !mkdir -p {model_dir}
        model.save(model_path)
        print(model_path + ' succesfully saved.')

## TF Functions (for keras models)

In [None]:
def rowScale(X):
    mean = tf.reshape(tf.reduce_mean(X, axis=1), [-1, 1])
    std =  tf.reshape(tf.math.reduce_std(X, axis=1), [-1, 1])
    return (X - mean)/std

In [None]:
# class OneCycleScheduler(keras.callbacks.Callback):
#     def __init__(self, iterations, max_rate, start_rate=None,
#                  last_iterations=None, last_rate=None):
#         self.iterations = iterations
#         self.max_rate = max_rate
#         self.start_rate = start_rate or max_rate / 10
#         self.last_iterations = last_iterations or iterations // 10 + 1
#         self.half_iteration = (iterations - self.last_iterations) // 2
#         self.last_rate = last_rate or self.start_rate / 1000
#         self.iteration = 0
#     def _interpolate(self, iter1, iter2, rate1, rate2):
#         return ((rate2 - rate1) * (self.iteration - iter1)
#                 / (iter2 - iter1) + rate1)
#     def on_batch_begin(self, batch, logs):
#         if self.iteration < self.half_iteration:
#             rate = self._interpolate(0, self.half_iteration, self.start_rate, self.max_rate)
#         elif self.iteration < 2 * self.half_iteration:
#             rate = self._interpolate(self.half_iteration, 2 * self.half_iteration,
#                                      self.max_rate, self.start_rate)
#         else:
#             rate = self._interpolate(2 * self.half_iteration, self.iterations,
#                                      self.start_rate, self.last_rate)
#             rate = max(rate, self.last_rate)
#         self.iteration += 1
#         K.set_value(self.model.optimizer.lr, rate)