###  XGBOOST model:


In [1]:
"""
Created on Dec 2019

@author: Dina Berenbaum

XGBOOST model
"""
import pickle
import time

import xgboost as xgb
import numpy as np
from sklearn.model_selection import train_test_split
from smac.configspace import ConfigurationSpace
from ConfigSpace.hyperparameters import UniformFloatHyperparameter, UniformIntegerHyperparameter
from smac.scenario.scenario import Scenario
from smac.facade.smac_ac_facade import SMAC4AC


class XgboostModel:
    DEFAULT_PARAMS = {"objective": "binary:logistic", 'colsample_bytree': 0.3, 'learning_rate': 0.1, 'max_depth': 5, 'alpha': 10,
                      'n_estimators': 10}
    DEFAULT_OPTIMIZED_PARAMS = {"objective": "binary:logistic", 'colsample_bytree': 0.59, 'learning_rate': 0.1, 'max_depth': 8, 'alpha': 1,
                                'n_estimators': 88}
    DEFAULT_NFOLDS = 3
    DEFAULT_BOOST_ROUNDS = 50
    DEFAULT_EARLY_STOPPING = 10
    DEFAULT_MATRIX = 'error'
    DEFAULT_SEED = 123
    DEFAULT_TEST_SIZE = 0.2

    def __init__(self, X_train, Y_train, X_mytest, Y_mytest, optimized_params=True):
        self._Xdata = X_train
        self._ydata = Y_train
        self._Xtest = X_mytest
        self._ytest = Y_mytest
        self._dmatrix = xgb.DMatrix(data=self._Xdata, label=self._ydata)
        self._params = XgboostModel.DEFAULT_PARAMS
        if optimized_params:
            self._params = XgboostModel.DEFAULT_OPTIMIZED_PARAMS
        self.xg_class = None
        self.stats = None

    def show_params(self):
        for key, value in self._params:
            print(key, value)

    def train(self, model_path, do_hps=False, do_split=False, do_save=False, do_print=False, seed=42):
        timestr = time.strftime("%Y%m%d_%H%M%S")
        if do_hps:
            self.hyperparameter_search()

        if do_print: print('Training...')

        if do_split:
            X_train, X_test, y_train, y_test = train_test_split(self._Xdata, self._ydata, test_size=XgboostModel.DEFAULT_TEST_SIZE,
                                                                random_state=XgboostModel.DEFAULT_SEED)
        else:
            X_train = self._Xdata
            X_test = self._Xtest
            y_train = self._ydata
            y_test = self._ytest

        self.xg_class = xgb.XGBClassifier(objective=self._params['objective'], colsample_bytree=self._params['colsample_bytree'],
                                          learning_rate=self._params['learning_rate'], max_depth=self._params['max_depth'],
                                          alpha=self._params['alpha'], n_estimators=self._params['n_estimators'],
                                          random_state=seed)
        self.xg_class.fit(X_train, y_train)

        if do_print: print('Preforming prediction of the test data')
        prediction = self.predict(X_test)
        self.stats = self.test_statistics(y_test, prediction, print_it=do_print)
        if do_save:
            if do_print: print(f'Saving to {model_path + timestr}')
            pickle.dump(self.xg_class, open(model_path + timestr + ".pickle.dat", "wb"))

    def predict(self, xdata):
        predictions = self.xg_class.predict(xdata)
        return predictions

    def hyperparameter_search(self):
        # Configuration space:
        cs = ConfigurationSpace()
        param_list = self._create_param_grid()
        default_params = XgboostModel.DEFAULT_PARAMS.keys()
        for param in param_list:
            if param.name in default_params:
                cs.add_hyperparameter(param)

        # Scenario object:
        scenario = Scenario({"run_obj": "quality",  # we optimize quality (alternatively runtime)
                             "runcount-limit": 200,  # maximum function evaluations
                             "cs": cs,  # configuration space
                             "deterministic": "true"})

        print("Optimizing! Depending on your machine, this might take a few minutes.")
        smac = SMAC4AC(scenario=scenario, rng=np.random.RandomState(42),
                       tae_runner=self._kfold_train)
        incumbent = smac.optimize()
        inc_value = self._kfold_train(incumbent)
        self._params = incumbent.get_dictionary()
        self._params.update({"objective": "binary:logistic"})
        print('The best configurations were updated into params and will be used in the training. Optimized value: %.2f' % inc_value)

    def _kfold_train(self, cfg):
        # K-fold training:
        cfg = {k: cfg[k] for k in cfg if cfg[k]}
        cv_results = xgb.cv(dtrain=self._dmatrix, params=cfg, nfold=XgboostModel.DEFAULT_NFOLDS,
                            num_boost_round=XgboostModel.DEFAULT_BOOST_ROUNDS, early_stopping_rounds=XgboostModel.DEFAULT_EARLY_STOPPING,
                            metrics=XgboostModel.DEFAULT_MATRIX, as_pandas=True, seed=XgboostModel.DEFAULT_SEED)
        return cv_results["test-error-mean"].tail(1).values[0]
    
    @staticmethod
    def _create_param_grid():
        """
        Manually determined parameters
        :return:
        """
        param_list = [UniformFloatHyperparameter("colsample_bytree", 0.1, 0.6),
                      UniformFloatHyperparameter("learning_rate", 0.05, 0.5),
                      UniformIntegerHyperparameter("max_depth", 2, 10),
                      UniformIntegerHyperparameter("alpha", 1, 50),
                      UniformFloatHyperparameter("n_estimators", 1, 100)]
        return param_list

    
    @staticmethod
    def test_statistics(original_labels, predictions, print_it=False):
        """
        Calculate and output test statistics of precision, recall and f1-score
        :param original_labels:
        :param predictions:
        :param print_it:
        :return: named tuple of the statistics results
        """
        results = namedtuple('results', 'precision_score, recall_score, f1_score, accuaracy_score')

        results.precision_score = precision_score(original_labels, predictions)
        results.recall_score = recall_score(original_labels, predictions)
        results.f1_score = f1_score(original_labels, predictions)
        results.accuaracy_score = accuracy_score(original_labels, predictions)
        if print_it:
            print(f"Precision: {results.precision_score}, Accuaracy: {results.accuaracy_score}, Recall: {results.recall_score}, "
                  f"F1_score: {results.f1_score}")
        return results


  self.re = re.compile(self.reString)
