In [17]:
import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
import sys
import os

In [20]:
!pip install xgboost

[33mYou are using pip version 10.0.1, however version 20.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [21]:
import data_manipulation as dm
import xgboost as xgb

  config.update(yaml.load(text) or {})


## Setting up session

In [19]:
sm_boto3 = boto3.client('sagemaker')
sess = sagemaker.Session()

region = sess.boto_session.region_name

role = sagemaker.get_execution_role()

bucket = 'lvp-ion-switching'
prefix = 'xgb-lvp-ion-switching'

print('Using bucket ' + bucket)

Using bucket lvp-ion-switching


## Loading Data

In [4]:
train_name = "train_clean.csv"
test_name = "test_clean.csv"

train_path = f's3://{bucket}/{train_name}'
test_path = f's3://{bucket}/{test_name}'

all_data = pd.read_csv(train_path)
# test_data = pd.read_csv(test_path)

In [5]:
# dm.plot_signal_and_channels(all_data)

In [6]:
# dm.plot_signal(test_data)

## Feature engineering
Here we are adding the shifted version of the signal variable as feature, with the shift ranging from -9 to +9. As demonstrated in the RandomForest notebook, this results in a much better prediction.

In [7]:
dm.load_shifted_values([all_data], max_shift=10)

In [8]:
all_data.head()

Unnamed: 0,time,signal,open_channels,previous_signal_1,next_signal_1,previous_signal_2,next_signal_2,previous_signal_3,next_signal_3,previous_signal_4,...,previous_signal_5,next_signal_5,previous_signal_6,next_signal_6,previous_signal_7,next_signal_7,previous_signal_8,next_signal_8,previous_signal_9,next_signal_9
0,0.0001,-2.76,0,-2.76,-2.8557,-2.76,-2.4074,-2.76,-3.1404,-2.76,...,-2.76,-2.6418,-2.76,-2.6993,-2.76,-2.5935,-2.76,-2.6682,-2.76,-2.7586
1,0.0002,-2.8557,0,-2.76,-2.4074,-2.76,-3.1404,-2.76,-3.1525,-2.76,...,-2.76,-2.6993,-2.76,-2.5935,-2.76,-2.6682,-2.76,-2.7586,-2.76,-3.1136
2,0.0003,-2.4074,0,-2.8557,-3.1404,-2.76,-3.1525,-2.76,-2.6418,-2.76,...,-2.76,-2.5935,-2.76,-2.6682,-2.76,-2.7586,-2.76,-3.1136,-2.76,-2.6221
3,0.0004,-3.1404,0,-2.4074,-3.1525,-2.8557,-2.6418,-2.76,-2.6993,-2.76,...,-2.76,-2.6682,-2.76,-2.7586,-2.76,-3.1136,-2.76,-2.6221,-2.76,-2.7316
4,0.0005,-3.1525,0,-3.1404,-2.6418,-2.4074,-2.6993,-2.8557,-2.5935,-2.76,...,-2.76,-2.7586,-2.76,-3.1136,-2.76,-2.6221,-2.76,-2.7316,-2.76,-2.9028


In [9]:
# test_data.head()

### Index feature for training data
This new feature indicates the number of the chunk of 50k samples to which each sample belongs. Since the original data was generated by chunks which were then concatenated, this is helpful for prediction as demonstrated in the RandomForest notebook.

In [10]:
def get_index_mod_int(col, d):
    return pd.Series(range(len(col))).floordiv(d)

index_feat = get_index_mod_int(all_data['time'], 50000)
all_data['index_feat'] = index_feat
all_data = all_data.drop(columns=['time'])

### Index feature for test data
For the corresponding feature in the test data, we compute for each 50k chunk of the test data, which 50k chunk of the train data is the closest (by computing the distance induced by the canonical dot product)

In [11]:
all_data.columns

Index(['signal', 'open_channels', 'previous_signal_1', 'next_signal_1',
       'previous_signal_2', 'next_signal_2', 'previous_signal_3',
       'next_signal_3', 'previous_signal_4', 'next_signal_4',
       'previous_signal_5', 'next_signal_5', 'previous_signal_6',
       'next_signal_6', 'previous_signal_7', 'next_signal_7',
       'previous_signal_8', 'next_signal_8', 'previous_signal_9',
       'next_signal_9', 'index_feat'],
      dtype='object')

In [12]:
print(sys.getsizeof(all_data))

840000104


## Format data
To use the built-in algorithms parameter tuning of sagemaker, the data must:
* have no missing value
* be numeric (including categorical variables)
* be scaled (not always necessary, like here for gradient boosting algorithms)
* first column should be the target variable
* CSV File should have no header

In [13]:
previous_columns = [x for x in all_data.columns if x != 'open_channels']

In [14]:
all_data = all_data[['open_channels'] + previous_columns]
all_data.head()

Unnamed: 0,open_channels,signal,previous_signal_1,next_signal_1,previous_signal_2,next_signal_2,previous_signal_3,next_signal_3,previous_signal_4,next_signal_4,...,next_signal_5,previous_signal_6,next_signal_6,previous_signal_7,next_signal_7,previous_signal_8,next_signal_8,previous_signal_9,next_signal_9,index_feat
0,0,-2.76,-2.76,-2.8557,-2.76,-2.4074,-2.76,-3.1404,-2.76,-3.1525,...,-2.6418,-2.76,-2.6993,-2.76,-2.5935,-2.76,-2.6682,-2.76,-2.7586,0
1,0,-2.8557,-2.76,-2.4074,-2.76,-3.1404,-2.76,-3.1525,-2.76,-2.6418,...,-2.6993,-2.76,-2.5935,-2.76,-2.6682,-2.76,-2.7586,-2.76,-3.1136,0
2,0,-2.4074,-2.8557,-3.1404,-2.76,-3.1525,-2.76,-2.6418,-2.76,-2.6993,...,-2.5935,-2.76,-2.6682,-2.76,-2.7586,-2.76,-3.1136,-2.76,-2.6221,0
3,0,-3.1404,-2.4074,-3.1525,-2.8557,-2.6418,-2.76,-2.6993,-2.76,-2.5935,...,-2.6682,-2.76,-2.7586,-2.76,-3.1136,-2.76,-2.6221,-2.76,-2.7316,0
4,0,-3.1525,-3.1404,-2.6418,-2.4074,-2.6993,-2.8557,-2.5935,-2.76,-2.6682,...,-2.7586,-2.76,-3.1136,-2.76,-2.6221,-2.76,-2.7316,-2.76,-2.9028,0


In [15]:
random_permutation = np.random.choice(len(all_data), len(all_data), replace=False)
split_idx = int(0.7 * len(all_data))
train_data = all_data.iloc[random_permutation[:split_idx], :]
val_data = all_data.iloc[random_permutation[split_idx:], :]

In [18]:
train_data.to_csv('train.csv', index=False, header=False)
val_data.to_csv('validation.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

# Setup hyperparameter tuning

In [24]:
%%writefile xgb_model.py

import argparse
import json
import logging
import os
import pandas as pd
import pickle as pkl

from sagemaker_containers import entry_point
from sagemaker_xgboost_container.data_utils import get_dmatrix
from sagemaker_xgboost_container import distributed

import xgboost as xgb

def _xgb_train(params, dtrain, evals, num_boost_round, model_dir, is_master):
    """Run xgb train on arguments given with rabit initialized.

    This is our rabit execution function.

    :param args_dict: Argument dictionary used to run xgb.train().
    :param is_master: True if current node is master host in distributed training, or is running single node training job. Note that rabit_run will include this argument.
    """
    booster = xgb.train(params=params, dtrain=dtrain, evals=evals, num_boost_round=num_boost_round)

    if is_master:
        model_location = model_dir + '/xgboost-model'
        pkl.dump(booster, open(model_location, 'wb'))
        logging.info("Stored trained model at {}".format(model_location))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--max_depth', type=int,)
    parser.add_argument('--eta', type=float)
    parser.add_argument('--gamma', type=int)
    parser.add_argument('--min_child_weight', type=int)
    parser.add_argument('--subsample', type=float)
    parser.add_argument('--verbose', type=int)
    parser.add_argument('--objective', type=str)
    parser.add_argument('--num_round', type=int)

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output_data_dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model_dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    parser.add_argument('--validation', type=str, default=os.environ['SM_CHANNEL_VALIDATION'])
    parser.add_argument('--sm_hosts', type=str, default=os.environ['SM_HOSTS'])
    parser.add_argument('--sm_current_host', type=str, default=os.environ['SM_CURRENT_HOST'])

    args, _ = parser.parse_known_args()

    # Get SageMaker host information from runtime environment variables
    sm_hosts = json.loads(os.environ['SM_HOSTS'])
    sm_current_host = args.sm_current_host

    dtrain = get_dmatrix(args.train, 'csv')
    dval = get_dmatrix(args.validation, 'csv')
    watchlist = [(dtrain, 'train'), (dval, 'validation')] if dval is not None else [(dtrain, 'train')]

    train_hp = {
        'max_depth': args.max_depth,
        'eta': args.eta,
        'gamma': args.gamma,
        'min_child_weight': args.min_child_weight,
        'subsample': args.subsample,
        'verbose': args.verbose,
        'objective': args.objective}

    xgb_train_args = dict(
        params=train_hp,
        dtrain=dtrain,
        evals=watchlist,
        num_boost_round=args.num_round,
        model_dir=args.model_dir)

    if len(sm_hosts) > 1:
        # Wait until all hosts are able to find each other
        entry_point._wait_hostname_resolution()

        # Execute training function after initializing rabit.
        distributed.rabit_run(
            exec_fun=_xgb_train,
            args=xgb_train_args,
            include_in_training=(dtrain is not None),
            hosts=sm_hosts,
            current_host=sm_current_host,
            update_rabit_args=True
        )
    else:
        # If single node training, call training method directly.
        if dtrain:
            xgb_train_args['is_master'] = True
            _xgb_train(**xgb_train_args)
        else:
            raise ValueError("Training channel must have data to train model.")


def model_fn(model_dir):
    """Deserialized and return fitted model.

    Note that this should have the same name as the serialized model in the _xgb_train method
    """
    model_file = 'xgboost-model'
    booster = pkl.load(open(os.path.join(model_dir, model_file), 'rb'))
    return booster

Overwriting xgb_model.py


In [27]:
nb_classes = len(all_data['open_channels'].value_counts())
print(nb_classes)

11


In [23]:
hyperparams = {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "verbose":"1",
        "objective":"multi:softmax",
        "num_class": str(nb_classes)
        "num_round":"100"}

instance_type = "ml.m5.xlarge"
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'liv-ion-xgb')
content_type = "csv"

In [28]:
# Open Source distributed script mode
from sagemaker.session import s3_input, Session
from sagemaker.xgboost.estimator import XGBoost

boto_session = boto3.Session(region_name=region)
session = Session(boto_session=boto_session)
script_path = 'xgb_model.py'

xgb_script_mode_estimator = XGBoost(
    entry_point=script_path,
    framework_version='0.90-1', # Note: framework_version is mandatory
    hyperparameters=hyperparams,
    role=role,
    train_instance_count=2, 
    train_instance_type=instance_type,
    output_path=output_path)

train_input = s3_input("s3://{}/{}/{}/".format(bucket, prefix, 'train'), content_type=content_type)
validation_input = s3_input("s3://{}/{}/{}/".format(bucket, prefix, 'validation'), content_type=content_type)

In [None]:
xgb_script_mode_estimator.fit({'train': train_input, 'validation': validation_input})

2020-05-10 21:35:03 Starting - Starting the training job...
2020-05-10 21:35:05 Starting - Launching requested ML instances...
2020-05-10 21:36:03 Starting - Preparing the instances for training.