# Step 2: Use SageMaker processing and training jobs
In this step we move data processing and model training into SageMaker containers and use SageMaker Python SDK to interact with SageMaker.

![](img/six-steps-2.png)

In [100]:
import time

import boto3
import numpy as np  
import pandas as pd  
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sklearn.metrics import roc_auc_score

In [8]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
abalone_dataset_file_name                -> 'abalone.csv'
abalone_dataset_local_url                -> '../dataset/abalone.csv'
auto_ml_job_name                         -> 'automl-asinproc-24-14-33-56'
bucket_name                              -> 'sagemaker-us-east-1-906545278380'
bucket_prefix                            -> 'from-idea-to-prod/xgboost'
customers_count                          -> 10000
customers_feature_group_name             -> 'fscw-customers-07-20-17-46'
data_bucket                              -> 'sagemaker-us-east-1-906545278380'
data_uploaded                            -> True
domain_id                                -> 'd-r8pbvl3oamh6'
dw_flow_file_url                         -> 's3://sagemaker-us-east-1-906545278380/feature-sto
dw_output_name                           -> '928854ec-259e-4130-8e0f-b65221b27d6e.default'
endpoint_name                            -> 'reorder-classifier-2022-07-20-19-45-09-338'
execution_role      

## Process data
We use SageMaker processing by simply providing a Python data preprocessing script.
We need to upload the input data to S3 and specify an S3 location for output data. SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

![](img/sagemaker-processing.png)

Upload the input dataset to an Amazon S3 bucket:

In [9]:
sm_session = sagemaker.Session()

In [12]:
input_s3_url = sm_session.upload_data(
    path="data/bank-additional/bank-additional-full.csv",
    bucket=bucket_name,
    key_prefix=f"{bucket_prefix}/input"
)

In [13]:
!aws s3 ls {bucket_name}/{bucket_prefix} --recursive

2022-09-21 08:54:54    5834924 from-idea-to-prod/xgboost/input/bank-additional-full.csv


We create a Python script by moving the data processing code from the step 1 to a .py file:

In [16]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = "y"
    
    # Load data
    df_data = pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")

    # Indicator variable to capture when pdays takes a value of 999
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

    # Indicator for individuals not actively employed
    df_data["not_working"] = np.where(
        np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
    )

    # remove unnecessary data
    df_model_data = df_data.drop(
        ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
        axis=1,
    )

    df_model_data = pd.get_dummies(df_model_data)  # Convert categorical variables to sets of indicators

    # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
    df_model_data = pd.concat(
        [
            df_model_data["y_yes"].rename(target_col),
            df_model_data.drop(["y_no", "y_yes"], axis=1),
        ],
        axis=1,
    )

    # Shuffle and splitting dataset
    train_data, validation_data, test_data = np.split(
        df_model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
    # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    print("## Processing complete. Exiting.")

Overwriting preprocessing.py


Set the Amazon S3 paths:

In [20]:
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"

Instantiate a [`SKLearnProcessor`](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) object before starting the SageMaker processing job. You specify the instance type to use in the job, as well as how many instances for distributed processing.

In [21]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sm_role,
    instance_type="ml.m5.large",
    instance_count=1, 
    base_job_name='from-idea-to-prod-processing'
)

Start the SageMaker processing job:

In [22]:
sklearn_processor.run(
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
    inputs=[
        ProcessingInput(
            source=input_s3_url, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_s3_url,
        ),
        ProcessingOutput(
            output_name="validation_data", 
            source="/opt/ml/processing/output/validation", 
            destination=validation_s3_url
        ),
        ProcessingOutput(
            output_name="test_data", 
            source="/opt/ml/processing/output/test", 
            destination=test_s3_url
        ),
    ]
)


Job Name:  from-idea-to-prod-processing-2022-09-21-10-03-07-464
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/input/bank-additional-full.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod-processing-2022-09-21-10-03-07-464/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train_data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/train', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'EndOfJob'}}, {'Ou

In [52]:
!aws s3 ls {bucket_name}/{bucket_prefix} --recursive

2022-09-21 08:54:54    5834924 from-idea-to-prod/xgboost/input/bank-additional-full.csv
2022-09-21 12:34:58      14580 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2022-09-21-12-32-02-357/output/model.tar.gz
2022-09-21 12:35:00          0 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2022-09-21-12-32-02-357/profiler-output/framework/training_job_end.ts
2022-09-21 12:34:56      68641 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2022-09-21-12-32-02-357/profiler-output/system/incremental/2022092112/1663763580.algo-1.json
2022-09-21 12:34:56     223781 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2022-09-21-12-32-02-357/profiler-output/system/incremental/2022092112/1663763640.algo-1.json
2022-09-21 12:35:00          0 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2022-09-21-12-32-02-357/profiler-output/system/training_job_end.ts
2022-09-21 12:37:19     329708 from-idea-to-prod/xgboost/output/from-idea-to-prod-training

## Model training
We follow the same approach and now run the model training as a SageMaker training job.

In [25]:
# get training container uri
training_image = sagemaker.image_uris.retrieve("xgboost", region=region, version="latest")

print(training_image)

811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest


Define the data input channels for the training job. We use _train_ and _validation_ channels via the SageMaker SDK [`TrainingInput`](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput) class:

In [33]:
s3_input_train = sagemaker.inputs.TrainingInput(train_s3_url, content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(validation_s3_url, content_type='csv')

In [44]:
instance_count = 1
instance_type = "ml.m5.xlarge"

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

Instantiate an Estimator object and set algorithm's hyperparameters. Refer to [XGBoost hyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html) for more information.

In [49]:
# Instantiate an XGBoost estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=training_image,  # XGBoost algorithm container
    instance_type=instance_type,  # type of training instance
    instance_count=instance_count,  # number of instances to be used
    role=sm_role,  # IAM execution role to be used
    max_run=20 * 60,  # Maximum allowed active runtime
    # use_spot_instances=True,  # Use spot instances to reduce cost
    # max_wait=30 * 60,  # Maximum clock time (including spot delays)
    output_path=output_s3_url, # S3 location for saving the training result
    sagemaker_session=sm_session, # Session object which manages interactions with SageMaker API and AWS services
    base_job_name="from-idea-to-prod-training", # Prefix for training job name
)

# define its hyperparameters
estimator.set_hyperparameters(
    num_round=150, # the number of rounds to run the training
    max_depth=5, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="binary:logistic",
    eval_metric="auc", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

Run the training:

In [50]:
estimator.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

2022-09-21 12:32:02 Starting - Starting the training job...
2022-09-21 12:32:19 Starting - Preparing the instances for trainingProfilerReport-1663763522: InProgress
......
2022-09-21 12:33:31 Downloading - Downloading input data......
2022-09-21 12:34:14 Training - Downloading the training image..[34mArguments: train[0m
[34m[2022-09-21:12:34:44:INFO] Running standalone xgboost training.[0m
[34m[2022-09-21:12:34:44:INFO] File size need to be processed in the node: 4.35mb. Available memory size in the node: 8461.7mb[0m
[34m[2022-09-21:12:34:44:INFO] Determined delimiter of CSV input is ','[0m
[34m[12:34:44] S3DistributionType set as FullyReplicated[0m
[34m[12:34:44] 28831x59 matrix with 1701029 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2022-09-21:12:34:44:INFO] Determined delimiter of CSV input is ','[0m
[34m[12:34:44] S3DistributionType set as FullyReplicated[0m
[34m[12:34:44] 8238x59 matrix with 486042 entries loaded fro

Output model performance:

In [66]:
metrics = boto3.client("sagemaker", region_name=region).describe_training_job(
    TrainingJobName=estimator._current_job_name
    )["FinalMetricDataList"]

train_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'train:auc'][0])
validate_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'validation:auc'][0])

print(f"Train-auc:{train_auc:.2f}, Validate-auc:{validate_auc:.2f}")

Train-auc:0.78, Validate-auc:0.77


## Validate model
Now we have a model saved in the specified location on Amazon S3. We deploy the model as a real-time endpoint, which is just one function call:

In [51]:
# Real-time endpoint:
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    # wait=False,  # Remember, predictor.predict() won't work until deployment finishes!
    # We will also turn on data capture here, in case you want to experiment with monitoring later:
    data_capture_config=sagemaker.model_monitor.DataCaptureConfig(
        enable_capture=True,
        sampling_percentage=100,
        destination_s3_uri=f"s3://{bucket_name}/{bucket_prefix}/data-capture",
    ),
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer(),
)

---------------!

In [70]:
!aws s3 cp $test_s3_url/test_x.csv tmp/test_x.csv
!aws s3 cp $test_s3_url/test_y.csv tmp/test_y.csv

download: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/test/test_x.csv to tmp/test_x.csv
download: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/test/test_y.csv to tmp/test_y.csv


In [82]:
test_x = pd.read_csv("tmp/test_x.csv", names=[f'{i}' for i in range(59)])
test_y = pd.read_csv("tmp/test_y.csv", names=['y'])

In [106]:
predictions = np.array(predictor.predict(test_x.values), dtype=float).squeeze()
predictions

array([0.05137555, 0.09782112, 0.22581661, ..., 0.04346842, 0.04000453,
       0.03656681])

In [107]:
test_results = pd.concat(
    [
        pd.Series(predictions, name="y_pred", index=test_x.index),
        test_x,
    ],
    axis=1,
)
test_results.head()

Unnamed: 0,y_pred,0,1,2,3,4,5,6,7,8,...,49,50,51,52,53,54,55,56,57,58
0,0.051376,25,1,999,0,1,0,0,0,0,...,0,0,0,1,0,0,0,0,1,0
1,0.097821,28,3,999,0,1,0,0,0,0,...,0,0,0,0,0,1,0,0,1,0
2,0.225817,38,1,999,0,1,0,1,0,0,...,0,0,0,1,0,0,0,0,1,0
3,0.269298,32,1,999,1,1,0,1,0,0,...,0,0,0,1,0,0,0,1,0,0
4,0.040005,40,1,999,0,1,0,0,0,0,...,0,0,0,0,0,1,0,0,1,0


In [108]:
pd.crosstab(
    index=test_y['y'].values,
    columns=np.round(predictions), 
    rownames=['actuals'], 
    colnames=['predictions']
)

predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3589,47
1,387,96


In [109]:
test_auc = roc_auc_score(test_y, test_results["y_pred"])
print(f"Test-auc: {test_auc:.2f}")

Test-auc: 0.77


## Optional: Hyperparameter Optimization (HPO)

In [123]:
# import required HPO objects
from sagemaker.tuner import (
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
    IntegerParameter,
)

# set up hyperparameter ranges
hp_ranges = {
    "min_child_weight": ContinuousParameter(1, 10),
    "max_depth": IntegerParameter(1, 10),
    "alpha": ContinuousParameter(0, 5),
    "eta": ContinuousParameter(0, 1),
}

# set up the objective metric
objective = "validation:auc"

# instantiate a HPO object
tuner = HyperparameterTuner(
    estimator=estimator,  # the SageMaker estimator object
    hyperparameter_ranges=hp_ranges,  # the range of hyperparameters
    max_jobs=20,  # total number of HPO jobs
    max_parallel_jobs=2,  # how many HPO jobs can run in parallel
    strategy="Bayesian",  # the internal optimization strategy of HPO
    objective_metric_name=objective,  # the objective metric to be used for HPO
    objective_type="Maximize",  # maximize or minimize the objective metric
    base_tuning_job_name="from-idea-to-prod-hpo",
    early_stopping_type="Auto",
)

In [124]:
tuner.fit({"train": s3_input_train, "validation": s3_input_validation})

...............................................................................................................................................!


In [127]:
print(f"HPO job status: {boto3.client('sagemaker').describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']}")

HPO job status: Completed


In [128]:
hpo_predictor = tuner.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer(),
)


2022-09-21 14:14:45 Starting - Preparing the instances for training
2022-09-21 14:14:45 Downloading - Downloading input data
2022-09-21 14:14:45 Training - Training image download completed. Training in progress.
2022-09-21 14:14:45 Uploading - Uploading generated training model
2022-09-21 14:14:45 Completed - Resource reused by training job: from-idea-to-prod-hp-220921-1410-004-7b86e51c
-----!

In [129]:
hpo_predictions = np.array(hpo_predictor.predict(test_x.values), dtype=float).squeeze()
print(hpo_predictions)

[0.06523537 0.09084037 0.17831463 ... 0.03452097 0.03494474 0.03519909]


In [130]:
pd.crosstab(
    index=test_y['y'].values,
    columns=np.round(hpo_predictions), 
    rownames=['actuals'], 
    colnames=['predictions']
)

predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3588,48
1,387,96


## Clean-up
To avoid charges, remove the hosted endpoint you created.

In [None]:
predictor.delete_endpoint(delete_endpoint_config=True)

In [None]:
# run if you created a tuned predictor after HPO
hpo_predictor.delete_endpoint(delete_endpoint_config=True)

## Continue with the step 3
open the step 3 [notebook](02-sagemaker-pipeline.ipynb).

## Additional resources
- [Amazon SageMaker Immersion Day](https://catalog.us-east-1.prod.workshops.aws/workshops/63069e26-921c-4ce1-9cc7-dd882ff62575/en-US)
- [Amazon SageMaker 101 Workshop](https://catalog.us-east-1.prod.workshops.aws/workshops/0c6b8a23-b837-4e0f-b2e2-4a3ffd7d645b/en-US)
- [Amazon SageMaker 101 Workshop code repository](https://github.com/aws-samples/sagemaker-101-workshop)
- [Amazon SageMaker with XGBoost and Hyperparameter Tuning for Direct Marketing predictions](https://github.com/aws-samples/sagemaker-101-workshop/blob/main/builtin_algorithm_hpo_tabular/SageMaker%20XGBoost%20HPO.ipynb)

# Shutdown kernel

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>