In [None]:
import numpy as np
import pandas as pd
import time
import pickle
import matplotlib.pyplot as plt
%matplotlib inline
from numpy.random import standard_normal, seed, uniform, randint
import scipy.stats as stats
from scipy.stats import norm
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error

import warnings
warnings.filterwarnings('ignore')


In [None]:
import numpy as np
from keras import backend as K


def monte_carlo_prediction(model, X, forward_path=100):
    """
    take a fitted keras model implementing dropout
    and generate forward path into the network resulting
     in different prediction for X
    :param model: keras model or sequential
    :param X: input data
    :param forward_path: number of prediction path to generate
    :return: array
    """
    MC_output = K.function([model.layers[0].input, K.learning_phase()],
                           [model.layers[-1].output])
    learning_phase = True  # use dropout at test time

    MC_samples = [MC_output([X, learning_phase])[0] for _ in range(forward_path)]
    MC_samples = np.array(MC_samples)

    return MC_samples


def compute_uncertainty(samples_prediction):
    """
    compute MC mean, var and confidence
    :param samples_prediction: array of mc prediction path
    :return: tuple : (mean, std, conf_inf 30%, conf_sup 70%, conf_inf 10%, conf_sup 90%,)
    """

    mean = np.mean(samples_prediction, axis=0)
    std = np.std(samples_prediction, axis=0)
    return mean, std, mean - std, mean + std, mean - 1.96 * std, mean + 1.96 * std


import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.optimizers import Adam, Adadelta
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K



def build_sequential_mlp_reg(n_feature=64,n_output=1,activation='relu',layers=2,hidden_size=40,hidden_size_2=None,kernel_initializer='glorot_uniform',dropout=0.,
dropout_2=0.,use_bias=True,kernel_reg_l1=0,kernel_reg_l2=0,learning_rate=0.001,beta_1=0.9,beta_2=0.999,batch_normalisation=False,last_activation='linear',loss="mse"):
    """
    Function to build a sequential MLP regressor using Keras Sequential with 2 or 3 Hidden layers

    Parameters
    ----------
    n_feature : int
        input size
    n_output : int
        output size
    activation : str
        Hidden layer dense activation
    hidden_size : int
        size of the first hidden layer
    hidden_size_2 : int or None
        size of the second hidden layer if not None. Else no additional hidden layer is added
    kernel_initializer : str
        kernel_initializer of hidden layer
    dropout : 0<= float < 1
        dropout rate of the first hidden leyer
    dropout_2 : 0<= float < 1
        dropout of the second hidden layer
    use_bias : bool
        wheather to use bias
    kernel_reg_l1 : float
        Kernel regularizer L1
    kernel_reg_l2 : float
        Kernel regularizer L2
    learning_rate : float, default=0.001
        learning rate
    beta_1 : float, default=0.9
    beta_2 : float, default=0.99
    batch_normalisation : bool, default False
        Weather to use batch normalization before each layer with non-linear activation
    last_activation : str
        last Dense activation
    loss : str or Keras.losses object
        The loss function

    Returns
    -------
    compiled model : tf.Keras.Model
    """
    if not((layers == 2) or (layers == 3)):
        raise ValueError('layers should be 2 or 3')
    x = Sequential()
    x.add(Dense(hidden_size,
                kernel_initializer=kernel_initializer,
                activation='relu',
                input_dim=n_feature,
                kernel_regularizer=l1_l2(l1=kernel_reg_l1,
                                         l2=kernel_reg_l2)))
    x.add(Dropout(dropout))
    if batch_normalisation and activation not in ['linear']:
        x.add(BatchNormalization())
    x.add(Dense(hidden_size,
                kernel_initializer=kernel_initializer,
                activation=activation,
                use_bias=use_bias,
                kernel_regularizer=l1_l2(l1=kernel_reg_l1,
                                         l2=kernel_reg_l2)
                ))
    x.add(Dropout(dropout))

    if layers == 3:
        if hidden_size_2 is not None:
            if batch_normalisation and activation not in ['linear']:
                x.add(BatchNormalization())
            x.add(Dense(hidden_size_2,
                        kernel_initializer=kernel_initializer,
                        activation=activation,
                        use_bias=use_bias))
            x.add(Dropout(dropout_2))

    if batch_normalisation:
        x.add(BatchNormalization())

    x.add(Dense(units=n_output,
                kernel_initializer=kernel_initializer,
                activation=last_activation,
                use_bias=use_bias))

    optimizer = Adam(lr=learning_rate,
                     beta_1=beta_1,
                     beta_2=beta_2)
    x.compile(loss=loss,
              optimizer=optimizer,
              metrics=['mse'])
    return x


class MLPRegressor(BaseEstimator, RegressorMixin):

    """Class implementing a MLP regressor (which support quantile)
     implementing variational approx. (Monte-Carlo dropout).

    Args
    ----------
    scaler : Callable
        scaler used to normalize or preprocess input
    model : Keras.model
        Regression Keras model
    epochs : int
        number of epochs
    batch_size : int
        number of element in each batch
    reinitialize : bool, default True
        reset model before fitting, needed for parameters search or cv
    verbose : int
        verbosity of self.model.fit() method
    **nn_params : dict
        additional argument passed to build_sequential_mlp_reg method
        except {n_features, n_output}

    """
    def __init__(self, epochs=1, batch_size=32, scaler=StandardScaler(),
                 output_scaler=None, loss='mean_squared_error', verbose=0,
                 reinitialize=True, **nn_params):

        self._estimator_type = 'regressor'
        self.scaler = scaler
        self.output_scaler = output_scaler
        self.loss = loss
        self.epochs = epochs
        self.batch_size = batch_size
        self.reinitialize = reinitialize
        self.verbose = verbose
        self.nn_params = nn_params
        self.model = None
        self.history = None

    def _create_model(self, n_feature, n_output):
        tf.keras.backend.clear_session()
        return build_sequential_mlp_reg(n_feature=n_feature,
                                        n_output=n_output,
                                        loss=self.loss,
                                        **self.nn_params)

    def fit(self, X, y, sample_weight=None, **fit_params):
        """
        Fit the model.

        Args
        ----------
        X: array-like
            Feature space
        y: array-like
            target space
        sample_weight: array-like, default None
            sample weights
        fit_params: dict
            arguments passed to self.model.fit()
        """
        n_feature = X.shape[1] if len(X.shape) > 1 else 1
        n_output = y.shape[1] if len(y.shape) > 1 else 1
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values

        X = self.scaler.fit_transform(X)
        if self.output_scaler is not None:
            if len(y.shape) == 1: y = y.reshape(-1, 1)
            y = self.output_scaler.fit_transform(y)
        if 'validation_data' in fit_params.keys():
            X_val = self.scaler.transform(fit_params['validation_data'][0])
            y_val = fit_params['validation_data'][1]
            if self.output_scaler is not None:
                if len(y_val.shape) == 1: y_val = y_val.reshape(-1, 1)
                y_val = self.output_scaler.transform(y_val)
            fit_params['validation_data'] = (X_val, y_val)

        if self.reinitialize or self.model is None:
            self.model = self._create_model(n_feature=n_feature, n_output=n_output)
        history = self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size,
                                 verbose=self.verbose, sample_weight=sample_weight, **fit_params)
        self.history = history.history
        return self

    def summary(self):
        if self.model is None:
            pass
        else:
            self.model.summary()

    def predict(self, X, nb_path=None, conf=False, **predict_params):
        """
        Make predictions with one or several paths in the network

        Args
        ----------
        X : array-like
            Feature space
        nb_path : int or None
            number of forward path in network used for prediction
        conf : bool, default False
            weather to return confidence as tuple( MC mean, MC variance, conf inf, conf sup 95%)
        predict_params : dict
            arguments passed to self.model.predict()
        """
        if self.model is None:
            raise NotFittedError('Model not fitted')

        X = self.scaler.transform(X)

        if nb_path is not None:
            mc_samples = monte_carlo_prediction(self.model, X,
                                                forward_path=nb_path)
            if self.output_scaler is not None:
                mc_samples = np.apply_along_axis(self.output_scaler.inverse_transform, 1, mc_samples)
            mean, var, _, _, conf_inf, conf_sup = compute_uncertainty(mc_samples)
            if conf:
                return mean, var, conf_inf, conf_sup
            else:
                return mean
        else:
            pred = self.model.predict(X, **predict_params)
            if self.output_scaler is not None:
                pred = self.output_scaler.inverse_transform(pred)
            return pred

    def set_params(self, **params):
        """
        Setting model parameters.

        Args
        ----------
        params: dict
            params to be set to model
        """
        if 'epochs' in params.keys():
            self.epochs = params['epochs']
            del params['epochs']
        if 'batch_size' in params.keys():
            self.batch_size = params['batch_size']
            del params['batch_size']
        self.nn_params.update(params)
        self.model = None
        return self

    def save(self, path):
        """serialize model"""
        self.model.save(path)


def build_sequential_cnn_reg(input_shape,
                             kernel_size=(10, 10),
                             filters=32,
                             activation='relu',
                             n_output=1,
                             hidden_size=500,
                             use_bias=False,
                             bias_initializer='random_uniform',
                             loss=mean_squared_error,
                             optimizer=tf.keras.optimizers.legacy.Adadelta(lr=0.01, rho=0.95, decay=0.0),
                             ):
    model = Sequential()
    model.add(Conv2D(filters,
                     kernel_size=kernel_size,
                     activation=activation,
                     input_shape=input_shape))
    model.add(Flatten())
    model.add(Dense(hidden_size, use_bias=True,
                    bias_initializer=bias_initializer,
                    activation=activation))
    model.add(Dense(n_output,
                    use_bias=use_bias,
                    bias_initializer=bias_initializer))
    model.compile(loss=loss,
                  optimizer=optimizer)
                  #optimizer=keras.optimizers.RMSprop(lr=0.001, rho=0.9, decay=0.0))
    return model


class CNNRegressor(BaseEstimator, RegressorMixin):

    """Class implementing a regressor using CNN structure.

    Args
    ----------
    input_shape: tuple
        input shape
    n_output: int
        number of output neurons
    epochs: int
        number of epochs
    batch_size: int
        number of element in each batch
    reinitialize: bool, default True
        reset model before fitting, needed for parameters search or cv
    loss: str or callable
        loss for the sequential model
    verbose: int
        verbosity of self.model.fit() method
    model: Keras.Sequential
        a sequential Keras model
    nn_params: any parameter of above sequential building func

    """
    def __init__(self, epochs, batch_size, input_shape=10,
                 loss='mean_squared_error', n_output=1,
                 reinitialize=True, verbose=0, **nn_params):
        self._estimator_type = 'regressor'
        self.input_shape = input_shape
        self.n_output = n_output
        self.epochs = epochs
        self.batch_size = batch_size
        self.reinitialize = reinitialize
        self.verbose = verbose
        self.loss = loss
        self.nn_params = nn_params
        self.model = self._create_model()
        self.history = None

    def _create_model(self):
        #K.clear_session()
        return build_sequential_cnn_reg(input_shape=self.input_shape,n_output=self.n_output,loss=self.loss,**self.nn_params)

    def fit(self, X, y, **fit_params):
        """
        Fit the model.

        Args
        ----------
        X: array-like
            Feature space
        y: array-like
            target space
        kwargs: dict
            arguments passed to self.model.fit()
        """
        if self.reinitialize or self.model is None:
            self.model = self._create_model()
        history = self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size,
                                 verbose=self.verbose, **fit_params)
        self.history = history
        return self

    def predict(self, X, **kwargs):
        return self.model.predict(X, **kwargs)

    def set_params(self, **params):
        if 'epochs' in params.keys():
            self.epochs = params['epochs']
            del params['epochs']
        if 'batch_size' in params.keys():
            self.batch_size = params['batch_size']
            del params['batch_size']
        self.nn_params.update(params)
        self.model = self._create_model()
        return self

import logging
from typing import List
import inspect
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from hyperopt import Trials, STATUS_OK
from hyperopt import fmin
from hyperopt import tpe
from hyperopt.fmin import generate_trials_to_calculate
from sklearn.base import MetaEstimatorMixin, BaseEstimator
from sklearn.exceptions import NotFittedError
from sklearn.model_selection import KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted


from typing import Callable


logging.basicConfig()
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


__score__metrics = ["f1_score", "accuracy_score", "auc", "recall_score",
                    "precision_score", "fbeta_score", "r2_score",
                    "explained_variance_score"]

__loss__metrics = ["log_loss", "hinge_loss", "brier_score_loss", "zero_one_loss", "neg_log_loss",
                   "mean_squared_error", "root_mean_square_error", "mean_absolute_error",
                   "mean_poisson_deviance", "mean_gamma_deviance"]


def set_params(model, **params):
    if hasattr(model, 'set_params'):
        model.set_params(**params)
    else:
        for param, val in params.items():
            setattr(model, param, val)


def _apply_metric(metric, y_true, y_pred, sample_weight=None, **kw):

    if 'sample_weight' in inspect.signature(
            metric).parameters.keys():

        return metric(y_true, y_pred, sample_weight=sample_weight, **kw)
    else:
        return metric(y_true, y_pred, **kw)


def build_sample_weight(weight_feature, scaling='sum_to_one'):

    if scaling is None or weight_feature is None:
        return weight_feature
    elif scaling == 'minmax':
        scaler = MinMaxScaler()
        weight_feature = np.absolute(weight_feature)
        weights = scaler.fit_transform(weight_feature.reshape(-1, 1))
        return np.ravel(weights)
    elif scaling == 'sum_to_one':
        return (np.absolute(weight_feature) / np.absolute(weight_feature).sum())\
               * len(weight_feature)


def build_loss_fun(metric_fun: Callable):
    """
     Transform a score or loss function into a loss metric
     able to be passed in cv_methods for hyperparameters
     optimization purpose

    Parameters
    ----------
    metric_fun: Callable
        either a metric of module sklearn.metric
        or function with the signature
        func(y_true, y_pred, sample_weight=None) -> (eval_name, eval_result, is_higher_better)
        or func(y_true, y_pred, sample_weight=None) -> (eval_name, eval_result, is_higher_better)

    Returns
    -------
    loss_fun: callable
        a metric method of signature
        loss_fun(y_test, y_pred, sample_weight=None)
    """
    if metric_fun.__name__ in __loss__metrics:
        return metric_fun

    elif metric_fun.__name__ in __score__metrics:
        def loss_metric(y_true, y_pred, sample_weight=None):
            return - metric_fun(y_true, y_pred, sample_weight=sample_weight)
        loss_metric.__name__ = metric_fun.__name__
        return loss_metric

    else:
        def loss_metric(y_test, y_pred, sample_weight=None):
            name, val, is_h_b = metric_fun(y_test, y_pred,
                                           sample_weight=sample_weight)
            return (-1 if is_h_b else 1) * val
        loss_metric.__name__ = metric_fun.__name__
        return loss_metric


def kfold_cv(model, X, y,
             metric: Callable[[np.ndarray, np.ndarray], float],
             cv_gen=KFold(),
             sample_weight: np.ndarray = None,
             fit_params: dict = None,
             n_jobs: int = 1) -> List:
    '''
    Perform k-fold cross-validation

    Parameters
    ----------
    model:
        ML model object implementing fit and predict

    X : array-like or pd.DataFrame
        X values

    y : array-like or pd.Series
        y values

    metric: callable
        method for cv scoring, etheir sklearn metric or other
        custom scoring fun matching the signature:
        func(y_true, y_pred)->float or func(y_true, y_pred, sample_weight)->float

    cv_gen: sklearn BaseCrossValidator object
        cv split generator

    sample_weight: np.array
        sample weight vector

    fit_params: dict
        dictionary of parameters passed to model.fit

    n_jobs :  int
        number of worker for parallelisation

    Returns
    -------
    scores : list
        list of all scores obtained during cross-validation
    '''

    if isinstance(X, pd.DataFrame):
        X = X.values
    if isinstance(y, pd.Series):
        y = y.values

    def _fit(train, test):
        '''
        Process train and test data,
         fit model and compute prediction and score

        Parameters
        ----------
        train: np.array
            train index
        test: np.array
            test index

        Returns
        -------
        score: float
            the value of the score computed on the test set according to
            the metric
        '''
        sample_weight_train = build_sample_weight(sample_weight[train])
        sample_weight_score = build_sample_weight(sample_weight[test])
        fit_params['sample_weight'] = sample_weight_train

        model.fit(X[train, :], y[train], **fit_params)

        if metric.__name__ == 'log_loss':
            y_pred = model.predict_proba(X[test, :])
        else:
            y_pred = model.predict(X[test, :])

        score = _apply_metric(metric, y[test], y_pred,
                              sample_weight=sample_weight_score)
        return score

    fit_params = fit_params or {}
    fit_params = fit_params.copy()
    if sample_weight is None:
        sample_weight = np.ones((X.shape[0],))

    parallel = Parallel(n_jobs=n_jobs, max_nbytes=None)
    scores = parallel(
        delayed(_fit)(train, test)
        for train, test in cv_gen.split(X, y)
    )

    return scores


def bayesian_tuning(X, y, model,
                    param_grid,
                    metric_fun,
                    cv_gen=KFold(),
                    folds_weights=None,
                    fit_params=None,
                    static_params=None,
                    trials=Trials(),
                    optimizer=tpe.suggest,
                    nb_evals=50,
                    refit=False,
                    random_state=None,
                    n_jobs=1,
                    **kwargs):
    """
    Perform a Bayesian-style optimization of a given ML model
    hyperparameters based on iteratives cross validations and scoring,
    then store trials in an dict. X, y inputs type have to be adapted to cv_gen inputs
    (array or pd.DataFrame for sklearn CV generator or pd.DataFrame with Datetime index for
    Purged CV generators). The method use the library Hyperopt : https://github.com/hyperopt/hyperopt

    Parameters
    ----------
    X: array-like or pd.DataFrame
        X data. It should be a pandas object with DatetimeIndex if cv_gen is a PurgedFoldBase object

    y: array-like or pd.DataFrame or pd.Series
        y data. It should be a pandas object with DatetimeIndex if cv_gen is a PurgedFoldBase object

    model:
        ML model object implementing fit and predict

    param_grid: dict
        Hyperopt type grid search dictionary (see Hyperopt doc :
        https://github.com/hyperopt/hyperopt/wiki/FMin)

    metric_fun: Callable
        either a metric of module sklearn.metric
        or function with the signature
        func(y_true, y_pred, sample_weight=None) -> (eval_name, eval_result, is_higher_better)
        or func(y_true, y_pred, sample_weight=None) -> (eval_name, eval_result, is_higher_better)

    cv_gen: PurgedFoldBase or sklearn BaseCrossValidator object instance
        cross-validation generator for model hyperparameters evaluation
        at each hyperopt fmin iteration. If instance of PurgedFoldBase,
        time-indexed pandas DataFrame and Series object should be provided
        as X and y

    folds_weights : list or array-like
        optional, weights vector to apply to test fold scores. Should have the same lenght as cv_gen.n_splits

    fit_params: dict
        dictionary of parameters passed to model.fit

    static_params: dict or None
        model hyperparameter that are passed in tuning loop

    trials: instance of Trials object
        Hyperopt storage object used for hp calibration

    optimizer:
        optimizer algo used by hyperopt

    nb_evals: int
        number of iteration of optimization process

    refit: bool
        weather to train model on all data with best parameters
        once hyperparam optimization finished

    random_state: int or None
        random state of hyperopt fmin func

    n_jobs: int
        number of worker for cross-validation parallel computing (multi-threading backend)

    kwargs: dict
        additional optional arguments passed to hyperopt.fmin method

    Returns
    -------
    trials_dict:
        list of dict containing optimization info at each iteration
    """
    loss_fun = build_loss_fun(metric_fun)

    def weighted_mean(data, weights):
        """function for weights averaging on cv test fold """
        data = data.dropna(axis=1)
        wm = np.average(data.values, axis=0, weights=weights)
        res = {}
        for i in range(len(data.columns)):
            res[data.columns[i]] = wm[i]
        return res

    def objective(hyperparameters):
        """Objective function for hyperopt optimization. Returns
           the cross validation score from a set of hyperparameters."""

        global ITERATION
        ITERATION += 1

        # deal with nested param space
        for param_name in list(hyperparameters):
            if type(hyperparameters[param_name]) == dict:
                # Retrieve each sub-parameter and put it at top level key
                for sub_param in hyperparameters[param_name].keys():
                    if sub_param != param_name:
                        sub_param_val = hyperparameters[param_name].get(sub_param)
                        hyperparameters[sub_param] = sub_param_val
                # put param with nested space at top level key
                hyperparameters[param_name] = \
                    hyperparameters[param_name][param_name]

        static = static_params or {}
        all_params = {**hyperparameters, **static}
        set_params(model, **all_params)

        result_score = kfold_cv(
            model=model,
            X=X,
            y=y,
            metric=loss_fun,
            fit_params=fit_params,
            cv_gen=cv_gen,
            n_jobs=n_jobs,
        )

        result_score = pd.DataFrame({'loss': result_score})

        # compute weighted mean on test folds, default weights set to one
        if folds_weights is not None:
            weights = folds_weights
        else:
            weights = np.ones(len(result_score))
        agg_score = weighted_mean(result_score, weights)
        agg_score['hyperparameters'] = all_params
        agg_score['status'] = STATUS_OK
        agg_score['iteration'] = ITERATION

        return agg_score

    global ITERATION
    ITERATION = 0
    # Run optimization
    result = fmin(fn=objective, space=param_grid,
                  algo=optimizer, trials=trials,
                  max_evals=nb_evals, show_progressbar=True,
                  rstate=random_state, **kwargs)

    trials_list = sorted(trials.results, key=lambda x: x['loss'])

    set_params(model, **trials_list[0]['hyperparameters'])
    if refit:
        log.info(f'model trained with following hyperparameters'
                 f"\n{trials_list[0]['hyperparameters']}")
        fit_params = fit_params or {}
        model.fit(X, y, **fit_params)

    return trials_list


class BayesianSearchCV(MetaEstimatorMixin, BaseEstimator):


    def __init__(self, estimator, param_distributions, scoring, cv=KFold(),
                 static_params=None, points_to_evaluate=None, optimizer=tpe.suggest,
                 n_iter=50, refit=True, random_state=None, n_jobs=1, **kwargs):

        self.estimator = estimator
        self.param_distributions = param_distributions
        self.scoring = scoring
        self.cv = cv
        self.static_params = static_params
        self.points_to_evaluate = points_to_evaluate
        self.optimizer = optimizer
        self.n_iter = n_iter
        self.refit = refit
        self.random_state = random_state
        self.n_jobs = n_jobs
        self.fmin_params = kwargs

    def _check_is_fitted(self, method_name):
        if not self.refit:
            raise NotFittedError(f'This {type(self).__name__} instance was initialized '
                                 f'with refit=False. {method_name} is '
                                 'available only after refitting on the best '
                                 'parameters. You can refit an estimator '
                                 'manually using the ``best_params_`` '
                                 'attribute')
        else:
            check_is_fitted(self, attributes='best_estimator_')

    def fit(self, X, y, **fit_params):
        """

        Parameters
        ----------
        X : array-like or pd.DataFrame of shape (n_samples, n_features)
            Training vector, where n_samples is the number of samples and n_features is the number of features.
            It should be pandas object with DatetimeIndex if cv is a PurgedFoldBase object

        y : array-like or pd.Series of shape (n_samples, n_output) or (n_samples,)
            Target relative to X for classification or regression;
            It should be pandas object with DatetimeIndex if cv is a PurgedFoldBase object

        **fit_params : dict of str -> object
            Parameters passed to the fit method of the estimator

        Returns
        -------

        """
        if self.points_to_evaluate is not None:
            trials = generate_trials_to_calculate(self.points_to_evaluate)
        else:
            trials = Trials()
        refit = False if self.combine_estimators else self.refit
        bests = bayesian_tuning(X, y, model=self.estimator,
                                param_grid=self.param_distributions,
                                metric_fun=self.scoring,
                                cv_gen=self.cv,
                                fit_params=fit_params,
                                static_params=self.static_params,
                                trials=trials,
                                optimizer=tpe.suggest,
                                nb_evals=self.n_iter,
                                refit=refit,
                                random_state=self.random_state,
                                n_jobs=self.n_jobs,
                                **self.fmin_params)
        self.trials_ = bests
        self.best_estimator_ = self.estimator
        self.best_score_ = bests[0]['loss']
        self.best_params_ = bests[0]['hyperparameters']

    def predict(self, X):

        self._check_is_fitted('predict')
        return self.best_estimator_.predict(X)

    def predict_proba(self, X):

        self._check_is_fitted('predict_proba')
        return self.best_estimator_.predict_proba(X)



In [None]:

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from hyperopt import hp, tpe, Trials
import QuantLib as ql


In [None]:
def eurocall_fourier(m, T, r, q, v0, theta, kappa, sigma, rho):
  # Parameters
  S0 = 1    # Current price of the underlying asset (scale)
  K = S0 * m    # Strike price of the option
  option_type = ql.Option.Call

  # Set up the option
  today = ql.Date.todaysDate()
  # expiry_date = today + ql.Period(T, ql.Days)
  expiry_date = today + ql.Period(f"{int(T*365)}d")
  option = ql.VanillaOption(ql.PlainVanillaPayoff(option_type, K), ql.EuropeanExercise(expiry_date))

  # Set up the Heston model
  heston_process = ql.HestonProcess(
      ql.YieldTermStructureHandle(ql.FlatForward(0, ql.TARGET(), r, ql.Actual365Fixed())),
      ql.YieldTermStructureHandle(ql.FlatForward(0, ql.TARGET(), q, ql.Actual365Fixed())),
      ql.QuoteHandle(ql.SimpleQuote(S0)),
      v0, kappa, theta, sigma, rho
      )
  heston_model = ql.HestonModel(heston_process)

  # Calculate the option price using the Heston model with the Fourier transform
  heston_engine = ql.AnalyticHestonEngine(heston_model, 64)
  option.setPricingEngine(heston_engine)
  # print("The price of the European call option is:", option.NPV())

  return option.NPV()


0.12374587649461455

m: Moneyness
T: Time to maturity of the option
r: Risk-free interest rate
q: Dividend yield of the underlying asset
v0: Initial volatility of the Heston mode
theta: Mean reversion speed of the Heston model
kappa: Mean reversion level of the Heston model
sigma: Volatility of the Heston model
rho: Correlation between the asset price and volatility processes

In [None]:
#fix some parameters
r = 0.02
q = 0.01

n_sample = 10000

x_data = np.zeros((n_sample, 10, 10)) # grid of 10x10 different prices
y_data = np.zeros((n_sample, 5))

#first define a space of S_0, v_0 values in which we will sample 10x10 values
moneyness = uniform(low=0.5, high=2.5, size=15)
Time_to_maturity = uniform(low=0.1, high=2, size=15)

#the parameter of heston model
v0 = uniform(low=0.01, high=1, size=n_sample)
theta = uniform(low=0.01, high=3, size=n_sample)
kappa = uniform(low=0.01, high=1, size=n_sample)
sigma = uniform(low=0.01, high=0.8, size=n_sample)
rho = uniform(low=-0.9, high=0.9, size=n_sample)

for i in range(n_sample):
    M_0 = np.random.choice(moneyness, replace=False, size=10)
    T_0 = np.random.choice(Time_to_maturity, replace=False, size=10)
    y_data[i,:] = np.array([v0[i], theta[i], kappa[i], sigma[i], rho[i]])
    x_data[i,::] = np.array([[eurocall_fourier(M_0[j],T_0[k],r,q,v0[i],theta[i],kappa[i],sigma[i],rho[i])for j in range(M_0.shape[0])]for k in range(T_0.shape[0])])
#x_data = np.where(x_data>1, 1, x_data)
#x_data = np.where(x_data<0, 0, x_data)

In [None]:
x_data = np.where(x_data>1, 1, x_data)
x_data = np.where(x_data<0, 0, x_data)

In [None]:
X_train, X_test, y_train,  y_test = train_test_split(x_data, y_data, test_size=0.33, random_state=42)
X_train = X_train.reshape(-1, X_train.shape[1] * X_train.shape[2])
X_test = X_test.reshape(-1,X_test.shape[1] * X_test.shape[2])

model = MLPRegressor(
    scaler=MinMaxScaler(),
    output_scaler=MinMaxScaler(),
    hidden_size=100,
    hidden_size_2=50,
    learning_rate=0.0005,
    epochs=300,
    batch_size=32,
    loss="mse",
    activation='relu',
    verbose=1
)

model.fit(X_train, y_train, validation_split=0.2)



Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

In [None]:
y_hat = model.predict(X_train)
y_pred = model.predict(X_test)
print('mse on train set', mean_squared_error(y_train, y_hat))
print('mse on test set', mean_squared_error(y_test, y_pred))

mse on train set 0.09626139780581448
mse on test set 0.2849882416937203


In [None]:
y_pred

array([[ 0.42412105,  2.6620317 ,  0.7429268 ,  0.15472631, -0.46056557],
       [ 0.48275423,  1.3806742 ,  0.7307117 ,  0.5094106 ,  0.11143295],
       [ 0.4475245 ,  0.4511895 ,  0.5068165 ,  0.25270113, -0.17692786],
       ...,
       [ 0.4809808 ,  1.7094277 ,  0.29912663,  0.43198356, -0.07428303],
       [ 0.65499425,  0.59877646,  0.6359887 ,  0.44646978, -0.03354494],
       [ 0.45111153,  1.5202634 ,  0.48274264,  0.55979276, -0.5607443 ]],
      dtype=float32)

In [None]:
y_test


array([[ 0.56111064,  1.67587484,  0.88691505,  0.26233099,  0.06892018],
       [ 0.37888708,  1.81841021,  0.90799943,  0.58449473, -0.84879616],
       [ 0.93605776,  0.24958187,  0.88071558,  0.03750053, -0.05454831],
       ...,
       [ 0.3882014 ,  1.15728119,  0.41203469,  0.20221386,  0.81311685],
       [ 0.70065931,  1.40305171,  0.02048335,  0.1526678 , -0.58428492],
       [ 0.10707544,  2.57951236,  0.31308098,  0.13629726,  0.21058986]])

In [None]:
error = pd.DataFrame(mean_absolute_error(y_test, y_pred, multioutput = 'raw_values')).T
error_perc = np.mean(np.divide(np.abs(y_test - y_pred), y_test)*100, axis=0)
error_perc = pd.DataFrame(error_perc).T
error = pd.concat([error, error_perc], axis=0)
error.rename(columns={0:'v_0', 1:'theta', 2:'kappa', 3:'sigma', 4:'rho'}, inplace=True)
error.set_index(np.array(['mae', 'mape']), inplace=True)
print('abs error for each parameters:')
error

abs error for each parameters:


Unnamed: 0,v_0,theta,kappa,sigma,rho
mae,0.216746,0.607306,0.304138,0.260386,0.557391
mape,126.21541,124.503846,149.610163,181.377593,-219.512284
