# Machine learning models based on XGBoost framework

## Libraries

In [None]:
import os
import numpy as np
from sklearn.metrics import accuracy_score
import pandas as pd
from sagemaker.session import Session
from sagemaker.session import get_execution_role
from sagemaker.tuner import HyperparameterTuner, CategoricalParameter, IntegerParameter, ContinuousParameter
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.serializers import CSVSerializer

## Definition of training job functions

In [None]:
def perform_tuning(session, role, train_data, test_data, base_dict, sweep_dict, n_jobs=10, parallel_jobs=3):
    """
    Perform hyperparmeter search and use best model to predict accuracy on validation data. For hyperparmeter search, 
    the test data is used. Final evaluated accuracy value will be stored in a directory called validation-{model}.
    
    Args:
    - session (SageMaker session): SageMaker session.
    - role (SageMaker role): SageMaker role.
    - train_data (str): Path to the training data csv file on s3 storage.
    - test_data (str): Path to the test data csv file on s3 storage.
    - base_dict (dict): Default hyperparameter for the model.
    - sweep_dict (dict): Hyperparameter dictionary for search.
    - n_jobs (int): Number of hyperparameter combinations.
    - parallel_jobs (int): Number of searches executed at the same time.
    """
    # point to s3 storage
    s3_input_train = TrainingInput(s3_data=train_data, content_type='csv')
    s3_input_test = TrainingInput(s3_data=test_data, content_type='csv')
    
    container = image_uris.retrieve('xgboost', session.boto_region_name, '1.2-1')
    
    regressor = Estimator(
        container,
        role, 
        instance_count=1, 
        instance_type='ml.c5.2xlarge',
        sagemaker_session=session
    ) 
    
    regressor.set_hyperparameters(**base_dict)
    
    tuner = HyperparameterTuner(
        regressor,
        objective_metric_name='validation:rmse', # xgboost optimizes on rsme because prediction output is float
        objective_type='Minimize',
        hyperparameter_ranges=sweep_dict,
        max_jobs=n_jobs,
        max_parallel_jobs=parallel_jobs
    )
    
    # tuning is crosscheck on validation target, we use the test data for this
    tuner.fit({'train': s3_input_train, 'validation': s3_input_test})
    
    # attach best model to an estimator for hyperparameter extraction
    xgb = sagemaker.estimator.Estimator.attach(tuner.best_training_job())
    
    return tuner, xgb.hyperparameters()

In [None]:
def deploy_best_model(tuner):
    """
    Deploys best model from hyperparameter tuning.
    
    Args:
    - tuner (SageMaker hyperparameter tuned estimator): Tuned estimator.
    
    Returns:
    - predictor (SageMaker predictor): Predictor.
    """
    predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.t2.large')
    
    return predictor

In [None]:
def perform_prediction(status_type, predictor, validation_data, rows=100):
    """
    Send data where a prediction should be made of to an estimator.
    
    Args:
    - status_type (str): Status ['test', 'validation'] to be identified in output.
    - predictor (SageMaker predictor endpoint): SageMaker endpoint.
    - validation_data (pandas dataframe): Complete pandas dataframe where prediction should be made of 
      (first column should reference to target and rest are the features).
    - rows (int): Number of rows of data to be sent to endpoint.
    
    Return:
    - round_predictions (numpy array): Predicted class labels.
    - validation_y (numpy array): Effective class labels extracted from the validation_data pandas dataframe.
    """  
    print('perform {} prediction'.format(status_type), end='')
    
    predictor.serializer = CSVSerializer()
    
    validation_y = validation_data.iloc[:, 0].to_numpy()
    validation_X = validation_data.iloc[:, 1:].to_numpy()
    
    split_array = np.array_split(validation_X, int(validation_X.shape[0] / float(rows) + 1))
    predictions = ''
    iteration = 0
    for array in split_array:
        iteration += 1
        if iteration % 10 == 0:
            print('.', end='')
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])
        
    round_predictions = np.fromstring(predictions[1:], sep=',').round()
    print('done')

    return round_predictions, validation_y

In [None]:
def save_model_results(validation_accuracy, test_accuracy, data, path):
    """
    Save accuracy result from validation data on local instance.
    
    Args:
    - accuracy (float): Accuracy result.
    - data (str): Defines the validated dataset.
    - path (str): Path where file is saved.
    
    Returns:
    - None: Writes file directly on local filesystem.
    """ 
    data_path = '{}/{}.csv'.format(path, data)
    if not os.path.exists(path):
        os.mkdir(path)
        
    save_dict = {
        'data': data,
        'validation_accuracy': validation_accuracy,
        'test_accuracy': test_accuracy
    }
    
    accuracy_df = pd.DataFrame.from_dict([save_dict])
    accuracy_df.to_csv(data_path, index=False)
    print('accuracy save done')

In [None]:
def save_best_model_parameters(model_data_dict, data, path):
    """
    Saves the model hyperparameters to a file {path}/{data}.
    
    Args:
    - model_data_dict (dict): Dictionary of model hyperparameters.
    - data (str): Filename (without .csv).
    - path (str): Folder where file will be stored.
    """
    data_path = '{}/{}.csv'.format(path, data)
    if not os.path.exists(path):
        os.mkdir(path)
    
    save_dict = {
        'data': data,
    }
    save_dict.update(model_data_dict)
    
    model_data_df = pd.DataFrame.from_dict([save_dict])
    model_data_df.to_csv(data_path, index=False)
    print('best model parameter save done') 

In [None]:
def validate_data(model, data_prefix, dataset_list, base_dict, sweep_dict, n_jobs=10, parallel_jobs=3):
    """
    Perform hyperparmeter search and use best model to predict accuracy on validation data. For hyperparmeter search, 
    the test data is used. Final evaluated accuracy value will be stored in a directory called validation-{model}.
    
    Args:
    - model (str): Model identifier.
    - data_prefix (str): Path on s3 where training, test and validation data is found.
    - dataset_list (list of str): Datasets to be tested as list.
    - base_dict (dict): Default hyperparameter for knn model.
    - sweep_dict (dict): Hyperparameter dictionary for search.
    - n_jobs (int): Number of hyperparameter combinations.
    - parallel_jobs (int): Number of searches executed at the same time.
    
    Returns:
    - None: Accuracy result is stored directly in a file.
    """
    print('define some SageMaker base parameters...', end='')
        
    sagemaker_session = Session()
    role = get_execution_role()
    bucket = sagemaker_session.default_bucket()
    
    print('done')
    
    for dataset in dataset_list:
        
        model_name = '{}-{}'.format(model, dataset)
        print('evaluate model {}...'.format(model_name))
        
        # define input data
        input_data = []
        data_type_list = ['train', 'test', 'validation']
        for data_type in data_type_list:
            input_data.append('s3://{}/{}/{}-{}'.format(bucket, data_prefix, data_type, dataset))
        
        # start hyperparameter tuning job
        print('start tuning', end='')
        tuner, model_hyperparameters = perform_tuning(
            session=sagemaker_session, 
            role=role, 
            train_data=input_data[0],
            test_data=input_data[1],
            base_dict=base_dict, 
            sweep_dict=sweep_dict, 
            n_jobs=n_jobs, 
            parallel_jobs=parallel_jobs
        )

        # deploy endpoint
        print('deploy best model', end='')
        predictor = deploy_best_model(tuner)
        print('')

        # read validation data
        validation_data = pd.read_csv('{}/{}'.format(input_data[2], 'validation.csv'))
        
        # read test data
        test_data = pd.read_csv('{}/{}'.format(input_data[1], 'test.csv'))
        
        # we have a lot of validation data, so we'll split it into batches of 100
        # split the validate data set into batches and evaluate using prediction endpoint  
        test_pred_y, test_y = perform_prediction('test', predictor, test_data, 100)
        validation_pred_y, validation_y = perform_prediction('validation', predictor, validation_data, 100)              

        # get accuracy metrics
        test_accuracy = accuracy_score(test_y, test_pred_y)
        print('test model...accuracy: {} %'.format(round(test_accuracy * 100, 1)))
        validation_accuracy = accuracy_score(validation_y, validation_pred_y)
        print('validate model...accuracy: {} %'.format(round(validation_accuracy * 100, 1)))
        
        # save model results continuously into files in case something crashes, we have at least old results
        validation_prefix = 'validation-{}'.format(model) # add prefix where data will be stored
        tuned_model_prefix = 'tuned-model-{}'.format(model) # add prefix where best model data will be stored
        save_model_results(validation_accuracy, test_accuracy, dataset, validation_prefix)
        save_best_model_parameters(model_hyperparameters, dataset, tuned_model_prefix)
        
        # remove resources
        predictor.delete_endpoint()

## Parameters

In [None]:
# data to be validated
data = [
    'tf-44898-250-1', 'tf-idf-44898-250-1', 
    'tf-44898-125-1', 'tf-idf-44898-125-1', 
    'tf-44898-250-2', 'tf-idf-44898-250-2', 
    'tf-44898-125-2', 'tf-idf-44898-125-2'
]

In [None]:
# hyperparameter search job definition
jobs = 8 # we search for 8 combinations in parameter space
parallel_jobs = 8 # in parallel, we execute 8 jobs

In [None]:
# initial estimator parameters
base = {
    'objective': 'binary:logistic',
    'num_round': 100,
    'rate_drop': 0.3,
    'tweedie_variance_power': 1.4
}

In [None]:
# tunable hyperparameters
ranges = {
    'eta': ContinuousParameter(0, 1),
    'min_child_weight': ContinuousParameter(1, 10),
    'alpha': ContinuousParameter(0, 2),
    'max_depth': IntegerParameter(1, 10)
}

## XGBoost model

In [None]:
validate_data('xgb', 'data', data, base, ranges, jobs, parallel_jobs)