# Step 2: Add SageMaker processing and training jobs
In this step you move data processing and model training into [SageMaker Docker containers](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html) and use [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/index.html) to interact with SageMaker.

![](img/six-steps-2.png)

SageMaker makes use of Docker containers to enable developers to process data, train and deploy models. Containers allow developers and data scientists to package software into standardized units that run consistently on any platform that supports Docker. Containers ensure that code, runtime, system tools, system libraries, and settings are all in the same place, isolating them from the execution environment. It guarantees a consistent runtime experience regardless of where a container is being run.

SageMaker also provides pre-build containers with popular data processing frameworks and ML algorithms. All SageMaker built-in algorithms are delivered as Docker containers.

<div class="alert alert-info"> Make sure you using <code>Data Science 3.0</code> image in Studio for this notebook.</div>

In [87]:
import time
import boto3
import botocore
import numpy as np  
import pandas as pd  
import sagemaker
import os
from time import gmtime, strftime, sleep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.huggingface import HuggingFaceProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
# from sklearn.metrics import roc_auc_score
from sagemaker.experiments.run import Run, load_run

sagemaker.__version__

'2.215.0'

In [3]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
baseline_s3_url               -> 's3://sagemaker-ap-northeast-1-250506505253/blip-v
bucket_name                   -> 'sagemaker-ap-northeast-1-250506505253'
bucket_prefix                 -> 'blip-vqa'
domain_id                     -> 'd-rvigbtfoquob'
experiment_name               -> 'mlops-blip-vqa-experiment-09-01-27-16'
initialized                   -> True
region                        -> 'ap-northeast-1'
sm_role                       -> 'arn:aws:iam::250506505253:role/service-role/Amazo
test_s3_url                   -> 's3://sagemaker-ap-northeast-1-250506505253/blip-v
train_s3_url                  -> 's3://sagemaker-ap-northeast-1-250506505253/blip-v
user_profile_name             -> 'default-20240507t111746'
validation_s3_url             -> 's3://sagemaker-ap-northeast-1-250506505253/blip-v


In [4]:
session = sagemaker.Session()
sm = session.sagemaker_client

## Load or create an experiment
Load the existing experiment we created in the previous notebook. You're going to track new runs in the same experiment.
You can also create a new experiment to track runs in this notebook.

In [5]:
# Uncomment code block (Cmd + /) if you would like to create a new experiment
# experiment_name = f"from-idea-to-prod-experiment-{strftime('%d-%H-%M-%S', gmtime())}"

## Process data in SageMaker processing job
Use [SageMaker Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) by simply providing a Python data preprocessing script and choosing a [SageMaker SDK processor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html) class.
You must upload the input data to S3 and specify an S3 location for output data. SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete. The processing container image can either be an Amazon SageMaker built-in image or a custom image that you provide. The underlying infrastructure for a Processing job is fully managed by Amazon SageMaker. Cluster resources are provisioned for the duration of your job, and cleaned up when a job completes.

![](img/sagemaker-processing.png)

Your input data must be stored in an Amazon S3 bucket. Alternatively, you can use [Amazon Athena](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.dataset_definition.inputs.AthenaDatasetDefinition) or [Amazon Redshift](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.dataset_definition.inputs.AthenaDatasetDefinition) as input sources.

Upload the input dataset to an Amazon S3 bucket:

In [93]:
input_s3_url = f's3://{bucket_name}/{bucket_prefix}/input/IconDomainVQAData.zip'

%store input_s3_url

Stored 'input_s3_url' (str)


In [7]:
!aws s3 ls {bucket_name}/{bucket_prefix} --recursive

2024-05-09 01:29:29  262963308 blip-vqa/input/IconDomainVQAData.zip


Create a Python script by moving the data processing code from the step 1 notebook to a .py file:

In [113]:
%%writefile preprocessing.py

from datasets import Dataset
import pandas as pd
import numpy as np
import argparse
import os
import json
import zipfile


def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='IconDomainVQAData.zip')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    # target_col = "y"
    
    # process data
    zip_file_path = os.path.join(args.filepath, args.filename)
    
    with zipfile.ZipFile(zip_file_path, "r") as z:
        print("Unzipping VQA data...")
        z.extractall("data")
        
    train_meta_path = os.path.join('data', 'IconDomainVQAData/train.jsonl')
    train_data_path = os.path.join('data', 'IconDomainVQAData/train_fill_in_blank/train_fill_in_blank/')
    test_data_path = os.path.join('data', 'IconDomainVQAData/test_data/test_data')

    train_percent = 90
    with open(train_meta_path) as fp:
        data_list = [json.loads(line) for line in fp]

    training_dataset = Dataset.from_list(data_list[:len(data_list)*train_percent//100])
    validation_dataset = Dataset.from_list(data_list[len(data_list)*train_percent//100:])
    print("Training sets: {} - Validating set: {}".format(len(training_dataset), len(validation_dataset)))
    
    # Save datasets locally
    training_dataset.save_to_disk(os.path.join(args.outputpath, 'train'))
    validation_dataset.save_to_disk(os.path.join(args.outputpath, 'val'))
    
    print("## Processing complete. Exiting.")

Overwriting preprocessing.py


The processing script contains a statement to save the whole dataset without the header and the label column as a baseline dataset. You need the data baseline later on in the model monitoring notebook.

Set the Amazon S3 paths:

In [114]:
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
train_data_s3_url = f"s3://{bucket_name}/{bucket_prefix}/data/train"
test_data_s3_url = f"s3://{bucket_name}/{bucket_prefix}/data/test"

In [115]:
%store train_s3_url
%store validation_s3_url
%store train_data_s3_url
%store test_data_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'train_data_s3_url' (str)
Stored 'test_data_s3_url' (str)


Set the framework version and type and number of compute instances:

In [116]:
skprocessor_framework_version = "0.23-1"
processing_instance_type = "ml.m5.large"
processing_instance_count = 1

### Create a experiment run
Create a new run in your experiment to track parameters, configuration, inputs, and outputs of the processing job.

In [117]:
run_suffix = strftime('%Y-%m-%M-%S', gmtime())
run_name = f"container-processing-{run_suffix}"

with Run(
    experiment_name=experiment_name,
    run_name=run_name,
    run_display_name="container-processing",
    sagemaker_session=session
) as run:
    run.log_parameters(
        {
            "train": 0.9,
            "validate": 0.1,
        }
    )
   
    experiment_config = run.experiment_config
    # time.sleep(8) # wait until resource tags are propagated to the run

### Create a processor
Instantiate a [`SKLearnProcessor`](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) object before starting the SageMaker processing job. You specify the instance type to use in the job, as well as how many instances for distributed processing.

Note how SageMaker maps your data to the local paths on the processing container's EBS volume:

![](img/data-processing.png)

### Create a processor and set inputs and outputs
Instantiate a [`SKLearnProcessor`](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) object before starting the SageMaker processing job. You specify the instance type to use in the job, as well as how many instances for distributed processing.

Note how SageMaker maps your data to the local paths on the processing container's EBS volume:

![](img/data-processing.png)

In [120]:
sklearn_processor = SKLearnProcessor(
    framework_version=skprocessor_framework_version,
    role=sm_role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count, 
    base_job_name='blip-vqa',
    sagemaker_session=session,
)

processing_inputs = [
        ProcessingInput(
            source=input_s3_url, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ]

processing_outputs = [
        ProcessingOutput(
            output_name="train_meta_data", 
            source="/opt/ml/processing/output/train",
            destination=train_s3_url,
        ),
        ProcessingOutput(
            output_name="validation_data", 
            source="/opt/ml/processing/output/validation", 
            destination=validation_s3_url
        ),
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train_data", 
            destination=train_data_s3_url
        ),
        ProcessingOutput(
            output_name="test_data", 
            source="/opt/ml/processing/output/test_data", 
            destination=test_data_s3_url
        ),
    ]

TypeError: SKLearnProcessor.__init__() got an unexpected keyword argument 'dependencies'

#### Start the SageMaker processing job



In [119]:
try:
    sklearn_processor.run(
        inputs=processing_inputs,
        outputs=processing_outputs,
        code='preprocessing.py',
        wait=True,
        experiment_config=experiment_config,
        # arguments = ['arg1', 'arg2'],
    )
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print(f"Ignore AccessDeniedException: {e.response['Error']['Message']} because of the slow resource tag auto propagation")
    else:
        raise e

INFO:sagemaker:Creating processing-job with name blip-vqa-2024-05-09-06-05-02-912


..............................
[34mTraceback (most recent call last):
  File "/opt/ml/processing/input/code/preprocessing.py", line 2, in <module>
    from datasets import Dataset[0m
[34mModuleNotFoundError: No module named 'datasets'[0m


UnexpectedStatusException: Error for Processing job blip-vqa-2024-05-09-06-05-02-912: Failed. Reason: AlgorithmError: See job logs for more information



In [None]:
# If you set wait to False in the previous code cell, wait until the job completes
while sm.describe_processing_job(
        ProcessingJobName=sklearn_processor._current_job_name
    )["ProcessingJobStatus"] != "Completed":
    time.sleep(10)
    print(f"Wait until {sklearn_processor._current_job_name} completed")

To wait for job completion you can also use `boto3` [waiters](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#waiters). For example:

```python
waiter = session.sagemaker_client.get_waiter('processing_job_completed_or_stopped')
waiter.wait(ProcessingJobName=sklearn_processor._current_job_name)
```

In [17]:
# list the uploaded files
!aws s3 ls {bucket_name}/{bucket_prefix} --recursive

2024-05-09 01:29:29  262963308 blip-vqa/input/IconDomainVQAData.zip


## Run your local code remotely as a SageMaker job
You an use [SageMaker Python SDK decorator `@remote`](https://sagemaker.readthedocs.io/en/stable/remote_function/sagemaker.remote_function.html) to run you local code in the notebook as a SageMaker processing job – called the "Remote Function". This is an even easier way to run your Python code at scale using SageMaker distributed processing and training.

Refer to the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html) in the Amazon SageMaker developer guide.

In the following section you run the data processing code as a SageMaker job using `@remote` decorator.

### Step 1: Develop and test you code locally
First, you implement and test your code locally in the notebook to verify the correctness of code and environment.

In [37]:
import json
from pathlib import Path 

from datasets import load_dataset, Dataset

In [50]:
input_path = "./data" 
output_path = "./data/processed_training_data"
ori_train_path = Path(input_path) / 'IconDomainVQAData/train.jsonl'
train_percent = 90

# define a local function
def preprocess(
    data_list,
    train_percent
):
    training_dataset = Dataset.from_list(data_list[:len(data_list)*train_percent//100])
    validation_dataset = Dataset.from_list(data_list[len(data_list)*train_percent//100:])
    print("Training sets: {} - Validating set: {}".format(len(training_dataset), len(validation_dataset)))
    return training_dataset, validation_dataset

In [51]:
with open(str(ori_train_path)) as fp:
    data_list = [json.loads(line) for line in fp]

In [52]:
# Call the function locally
training_dataset, validation_dataset = preprocess(data_list, train_percent)

Training sets: 13095 - Validating set: 1456


In [55]:
# see the processed data
training_dataset

Dataset({
    features: ['question', 'answer', 'ques_type', 'grade', 'label', 'pid'],
    num_rows: 13095
})

### Step 2: Execute the function remotely using RemoteExecutor
You can use [`RemoteExecutor`](https://sagemaker.readthedocs.io/en/stable/remote_function/sagemaker.remote_function.html#remoteexecutor) SageMaker Python SDK class to run the local function remotely as a SageMaker job. You can run multiple jobs in paralle using `max_parallel_jobs` parameter to control the max number of parallel jobs.

In [23]:
from sagemaker.remote_function import remote, RemoteExecutor

In [24]:
s3_root_uri = f"s3://{bucket_name}/{bucket_prefix}"

In [25]:
s3_root_uri

's3://sagemaker-ap-northeast-1-250506505253/blip-vqa'

In [56]:
with RemoteExecutor(
    s3_root_uri=s3_root_uri,
    instance_type=processing_instance_type,
    dependencies='./requirements.txt' # includes 3rd libraries required to execute your function
) as e:
    future = e.submit(preprocess, data_list, train_percent)                 

2024-05-09 02:44:47,105 sagemaker.remote_function INFO     Serializing function code to s3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-44-47-105/function
2024-05-09 02:44:47,206 sagemaker.remote_function INFO     Serializing function arguments to s3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-44-47-105/arguments
2024-05-09 02:44:47,623 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmpmavr0erh/temp_workspace/sagemaker_remote_function_workspace/requirements.txt'
2024-05-09 02:44:47,626 sagemaker.remote_function INFO     Successfully created workdir archive at '/tmp/tmpmavr0erh/workspace.zip'
2024-05-09 02:44:47,667 sagemaker.remote_function INFO     Successfully uploaded workdir to 's3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-44-47-105/sm_rf_user_ws/workspace.zip'
2024-05-09 02:44:47,669 sagemaker.remote_function INFO     Creating job: preprocess-202

In [57]:
training_dataset, validation_dataset = future.result()

2024-05-09 02:48:32 Starting - Preparing the instances for training
2024-05-09 02:48:32 Downloading - Downloading the training image
2024-05-09 02:48:32 Training - Training image download completed. Training in progress.
2024-05-09 02:48:32 Uploading - Uploading generated training model
2024-05-09 02:48:32 Completed - Training job completed[34mINFO: CONDA_PKGS_DIRS is set to '/opt/ml/sagemaker/warmpoolcache/sm_remotefunction_user_dependencies_cache/conda/pkgs'[0m
[34mINFO: PIP_CACHE_DIR is set to '/opt/ml/sagemaker/warmpoolcache/sm_remotefunction_user_dependencies_cache/pip'[0m
[34mINFO: Bootstraping runtime environment.[0m
[34m/opt/conda/bin/mamba[0m
[34m2024-05-09 02:47:53,521 sagemaker.remote_function INFO     Successfully unpacked workspace archive at '/'.[0m
[34m2024-05-09 02:47:53,521 sagemaker.remote_function INFO     '/sagemaker_remote_function_workspace/pre_exec.sh' does not exist. Assuming no pre-execution commands to run[0m
[34m/opt/conda/bin/mamba[0m
[34m2024

In [58]:
# see the processed data
training_dataset

Dataset({
    features: ['question', 'answer', 'ques_type', 'grade', 'label', 'pid'],
    num_rows: 13095
})

### Step 3: Run code with @remote decorator
Now you can apply `@remote` to the function once the local and remote test runs successfully. 
You can also set default settings for remote functions via a [configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html). The configuration file is used when invoking a function with `@remote` decorator or `RemoteExecutor` API.

In [59]:
@remote(
    s3_root_uri=s3_root_uri,
    instance_type=processing_instance_type,
    dependencies='./requirements.txt' # includes 3rd libraries required to execute your function
)
def preprocess(
    data_list,
    train_percent
):
    training_dataset = Dataset.from_list(data_list[:len(data_list)*train_percent//100])
    validation_dataset = Dataset.from_list(data_list[len(data_list)*train_percent//100:])
    print("Training sets: {} - Validating set: {}".format(len(training_dataset), len(validation_dataset)))
    return training_dataset, validation_dataset

In [60]:
run_suffix = strftime('%Y-%m-%M-%S', gmtime())
run_name = f"remote-function-processing-{run_suffix}"

# Create an experiment run and run the function remotely as a SageMaker job
with Run(
    experiment_name=experiment_name,
    run_name=run_name,
    run_display_name="remote-function-processing",
    sagemaker_session=session
) as run:
    training_dataset, validation_dataset = preprocess(data_list, train_percent)

2024-05-09 02:48:50,332 sagemaker.remote_function INFO     Serializing function code to s3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-48-50-332/function
2024-05-09 02:48:50,509 sagemaker.remote_function INFO     Serializing function arguments to s3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-48-50-332/arguments
2024-05-09 02:48:50,932 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmp7ycraym7/temp_workspace/sagemaker_remote_function_workspace/requirements.txt'
2024-05-09 02:48:50,937 sagemaker.remote_function INFO     Successfully created workdir archive at '/tmp/tmp7ycraym7/workspace.zip'
2024-05-09 02:48:50,980 sagemaker.remote_function INFO     Successfully uploaded workdir to 's3://sagemaker-ap-northeast-1-250506505253/blip-vqa/preprocess-2024-05-09-02-48-50-332/sm_rf_user_ws/workspace.zip'
2024-05-09 02:48:50,983 sagemaker.remote_function INFO     Creating job: preprocess-202

2024-05-09 02:48:51 Starting - Starting the training job...
2024-05-09 02:49:06 Starting - Preparing the instances for training...
2024-05-09 02:49:38 Downloading - Downloading input data...
2024-05-09 02:50:08 Downloading - Downloading the training image...........[34mINFO: CONDA_PKGS_DIRS is set to '/opt/ml/sagemaker/warmpoolcache/sm_remotefunction_user_dependencies_cache/conda/pkgs'[0m
[34mINFO: PIP_CACHE_DIR is set to '/opt/ml/sagemaker/warmpoolcache/sm_remotefunction_user_dependencies_cache/pip'[0m
[34mINFO: Bootstraping runtime environment.[0m
[34m/opt/conda/bin/mamba[0m
[34m2024-05-09 02:51:58,488 sagemaker.remote_function INFO     Successfully unpacked workspace archive at '/'.[0m
[34m2024-05-09 02:51:58,488 sagemaker.remote_function INFO     '/sagemaker_remote_function_workspace/pre_exec.sh' does not exist. Assuming no pre-execution commands to run[0m
[34m/opt/conda/bin/mamba[0m
[34m2024-05-09 02:51:58,490 sagemaker.remote_function INFO     Activating conda env 

In [61]:
# see the processed data
training_dataset

Dataset({
    features: ['question', 'answer', 'ques_type', 'grade', 'label', 'pid'],
    num_rows: 13095
})

For more examples of remote functions see SageMaker [example notebooks](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-examples.html).

## Model training in SageMaker training job
Follow the same approach and now run the model training as a [SageMaker training job](https://sagemaker.readthedocs.io/en/stable/overview.html#using-estimators).

In [62]:
# get training container uri
training_image = sagemaker.image_uris.retrieve("pytorch", region=region, version="2.2.1")
print(training_image)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-xgboost:1.5-1


In [72]:
training_image = sagemaker.image_uris.retrieve("huggingface-llm", region=region)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py310
INFO:sagemaker.image_uris:Defaulting to only supported image scope: gpu.


Define the data input channels for the training job. Set _train_ and _validation_ channels via the SageMaker SDK [`TrainingInput`](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput) class:

In [None]:
s3_input_train = sagemaker.inputs.TrainingInput(train_s3_url, content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(validation_s3_url, content_type='csv')

In [68]:
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

# Define where the training job stores the model artifact
output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

%store output_s3_url

Stored 'output_s3_url' (str)


Instantiate an [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) object and set algorithm's hyperparameters. Refer to [XGBoost hyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html) for more information.

In [None]:
# Instantiate an XGBoost estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=training_image,  # XGBoost algorithm container
    instance_type=train_instance_type,  # type of training instance
    instance_count=train_instance_count,  # number of instances to be used
    role=sm_role,  # IAM execution role to be used
    max_run=20 * 60,  # Maximum allowed active runtime
    # use_spot_instances=True,  # Use spot instances to reduce cost
    # max_wait=30 * 60,  # Maximum clock time (including spot delays)
    output_path=output_s3_url, # S3 location for saving the training result
    sagemaker_session=session, # Session object which manages interactions with SageMaker API and AWS services
    base_job_name="from-idea-to-prod-training", # Prefix for training job name
)

# define its hyperparameters
estimator.set_hyperparameters(
    num_round=150, # the number of rounds to run the training
    max_depth=3, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="binary:logistic",
    eval_metric="auc", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

Run the training:

In [None]:
training_inputs = {'train': s3_input_train, 'validation': s3_input_validation}



In [None]:
try:
    run_suffix = strftime('%Y-%m-%M-%S', gmtime())
    run_name = f"container-training-{run_suffix}"

    with Run(experiment_name=experiment_name,
             run_name=run_name,
             run_display_name="container-training",
             sagemaker_session=session
            ) as run:
        
        estimator.fit(
            training_inputs,
            wait=True,
            logs=False,
        ) 
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print(f"Ignore AccessDeniedException: {e.response['Error']['Message']} because of the slow resource tag auto propagation")
    else:
        raise e



### Reduce training job startup time with warm pools
💡 Instead of using each time a new ephemeral computation cluster to train your models, you can keep your model training hardware instances warm after every job for a specified period. Refer to [Reduce ML Model Training Job startup time by up to 8x using SageMaker Training Managed Warm Pools](https://aws.amazon.com/about-aws/whats-new/2022/09/reduce-ml-model-training-job-startup-time-8x-sagemaker-training-managed-warm-pools/) for more details. If you opt to use warm pools, you are billed for the instances and EBS volumes for the duration of the keep-alive period. 
Refer to [ Train Using SageMaker Managed Warm Pools](https://docs.aws.amazon.com/sagemaker/latest/dg/train-warm-pools.html) in the Amazon SageMaker Developer Guide for details on training API.

<div style="border: 4px solid coral; text-align: center; margin: auto;">
    <p style=" text-align: center; margin: auto;">To use warm pool feature you must have a corresponding warm pool quota for a required instance type set to value greater than 0.
    <br>
    <br>
    Do not run this section in AWS provisioned workshop account as the warm pool quota is set to 0.
    The following section checks the quota value for the training instance type.
    </p>
</div>

In [66]:
def check_quota(quota_code, min_v):
    r = quotas_client.get_service_quota(
        ServiceCode="sagemaker",
        QuotaCode=quota_code,
    )
    
    q = r["Quota"]["Value"]
    n = r["Quota"]["QuotaName"]

    if q < min_v:
        print (
            f"WARNING: Your quota {q} for {n} is less than required value of {min_v}"
        )
    else:
        print(
            f"SUCCESS: Your quota {q} for {n} is equal or more than required value of {min_v}"
        )

In [69]:
quotas_client = boto3.client("service-quotas")
                      
quotas = {
    "ml.m5.large": ["L-2DD73636", 1],
    "ml.m5.xlarge": ["L-0BEF44E8", 1],
    "ml.m5.2xlarge": ["L-1686EE8B", 1],
}
     

check_quota(quotas[train_instance_type][0], quotas[train_instance_type][1])

AccessDeniedException: An error occurred (AccessDeniedException) when calling the GetServiceQuota operation: User: arn:aws:sts::250506505253:assumed-role/AmazonSageMaker-ExecutionRole-20240507T111746/SageMaker is not authorized to perform: servicequotas:GetServiceQuota on resource: arn:aws:servicequotas:ap-northeast-1:250506505253:sagemaker/L-0BEF44E8 because no identity-based policy allows the servicequotas:GetServiceQuota action

#### Optional: Train with SageMaker warm pools
Let's use this feature and run XGBoost training using warm pools.
Notice the matching attributes of a training job to re-use the provisioned infrastructure from a previous job: [Matching criteria](https://docs.aws.amazon.com/sagemaker/latest/dg/train-warm-pools.html#train-warm-pools-matching-criteria)

To create a warm pool you need to set `KeepAlivePeriodInSeconds` parameter in `Estimator` configuration to value greater than 0.

In [None]:
# Instantiate an XGBoost estimator object
warm_pool_estimator = sagemaker.estimator.Estimator(
    image_uri=training_image,  # XGBoost algorithm container
    instance_type=train_instance_type,  # type of training instance
    instance_count=train_instance_count,  # number of instances to be used
    role=sm_role,  # IAM execution role to be used
    max_run=20 * 60,  # Maximum allowed active runtime
    # use_spot_instances=True,  # Use spot instances to reduce cost
    # max_wait=30 * 60,  # Maximum clock time (including spot delays)
    output_path=output_s3_url, # S3 location for saving the training result
    sagemaker_session=session, # Session object which manages interactions with SageMaker API and AWS services
    base_job_name="from-idea-to-prod-training", # Prefix for training job name
    keep_alive_period_in_seconds=1800, # use the warm pool feature
)

In [None]:
training_inputs = {'train': s3_input_train, 'validation': s3_input_validation}

Run a training job by calling `estimator.fit()` several consequent times with different hyperparameters. The initial training job "cold-starts" as SageMaker provisions required compute infrastructure for it. When this job completes, the infrastructure kept alive for the period `KeepAlivePeriodInSeconds`. The warm pool stays `Available` until it either identifies a matching training job for reuse or it exceeds the specified `KeepAlivePeriodInSeconds` and is terminated.

In [None]:
# run the training job five times
for i, d in enumerate([2, 3, 5, 10, 20]):
    print(f"Fit estimator with max_depth={d}")

    warm_pool_estimator.set_hyperparameters(
        num_round=50, # the number of rounds to run the training
        max_depth=d, # maximum depth of a tree
        objective="binary:logistic",
        eval_metric="auc", # evaluation metrics for validation data
        early_stopping_rounds=10, # the model trains until the validation score stops improving
    )
    warm_pool_estimator.fit(
        training_inputs,
        wait=True,
        logs=False,
    )

You can validate that a warm pool used for this training job by going to the [SageMaker training job console](https://console.aws.amazon.com/sagemaker/home?#/jobs) and inspect the training job list:

![](img/warm-pools-training-jobs.png)

The first training job should take about several minutes, but all subsequent jobs reuse the same compute instance and completed in several seconds. You can also see the warm pool status and time left.

### Output model performance

In [None]:
if estimator._current_job_name:
    training_job_name = estimator._current_job_name

In [None]:
metrics = None
while not metrics:
    metrics = sm.describe_training_job(
        TrainingJobName=training_job_name
        ).get("FinalMetricDataList")

    if not metrics:
        print(f"Training job {training_job_name} hasn't finished yet!")
        time.sleep(10)
    
train_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'train:auc'][0])
validate_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'validation:auc'][0])

print(f"Train-auc:{train_auc:.2f}, Validate-auc:{validate_auc:.2f}")

In [None]:
%store training_job_name

In [None]:
# Print the S3 path to the model artifact:
estimator.model_data

## Validate model
The training job saved a model in the specified location on Amazon S3.

You can deploy the model as a [real-time endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html), which is just one [function call](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator.deploy), or create a [batch transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) to predict a label for a large dataset.

### Real-time inference
To test [real-time inference](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html) you create a real-time endpoint using the trained estimator.

In [None]:
# Real-time endpoint
endpoint_name = f"from-idea-to-prod-endpoint-{strftime('%d-%H-%M-%S', gmtime())}"

try:
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type="ml.m5.large",
        wait=False,  # Remember, predictor.predict() won't work until deployment finishes!
        # Turn on data capture here, in case you want to experiment with monitoring:
        data_capture_config=sagemaker.model_monitor.DataCaptureConfig(
            enable_capture=True,
            sampling_percentage=100,
            destination_s3_uri=f"s3://{bucket_name}/{bucket_prefix}/data-capture",
        ),
        endpoint_name=endpoint_name,
        serializer=sagemaker.serializers.CSVSerializer(),
        deserializer=sagemaker.deserializers.CSVDeserializer(),
    )
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print(f"Ignore AccessDeniedException: {e.response['Error']['Message']} because of the slow resource tag auto propagation")
        predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name,
                                                  sagemaker_session=session,
                                                  serializer=sagemaker.serializers.CSVSerializer(),
                                                  deserializer=sagemaker.deserializers.CSVDeserializer(),
                                                 )
    else:
        raise e



In [None]:
# Wait until the endpoint has the status InService
waiter = session.sagemaker_client.get_waiter('endpoint_in_service')
waiter.wait(EndpointName=endpoint_name)



#### Predict

In [None]:
!aws s3 cp $test_s3_url/test_x.csv tmp/test_x.csv
!aws s3 cp $test_s3_url/test_y.csv tmp/test_y.csv

In [None]:
test_x = pd.read_csv("tmp/test_x.csv", header=None)
test_y = pd.read_csv("tmp/test_y.csv", names=['y'])

In [None]:
predictions = np.array(predictor.predict(test_x.values), dtype=float).squeeze()
predictions

#### Evaluate predictions

In [None]:
test_results = pd.concat(
    [
        pd.Series(predictions, name="y_pred", index=test_x.index),
        test_x,
    ],
    axis=1,
)
test_results.head()

In [None]:
pd.crosstab(
    index=test_y['y'].values,
    columns=np.round(predictions), 
    rownames=['actuals'], 
    colnames=['predictions']
)

In [None]:
test_auc = roc_auc_score(test_y, test_results["y_pred"])
print(f"Test-auc: {test_auc:.2f}")

### Batch transform
If you want to run a prediction on a large dataset or don't need a real-time endpoint, you can use SageMaker [batch-transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html).

In [None]:
transform_s3_url = f"s3://{bucket_name}/{bucket_prefix}/transform"

<div class="alert alert-info"> 💡 To create a transformer, use either <b>option 1</b> or <b>option 2</b>
</div>

#### Option 1: create a batch transformer from the trained estimator
You can use [`EstimatorBase.transformer()`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.transformer) to create a transformer for an estimator:

In [None]:
model_name = f"from-idea-to-prod-transform-{strftime('%d-%H-%M-%S', gmtime())}"

transformer = estimator.transformer(
    instance_count=1,
    instance_type=train_instance_type,
    accept="text/csv",
    role=sm_role,
    output_path=transform_s3_url,
    model_name=model_name,
)

<div style="border: 4px solid coral; text-align: center; margin: auto;">
    <p style=" text-align: center; margin: auto;">Skip the Option 2 and go to the section <b>Run transform job</b>.
    </p>
</div>

#### Option 2: load a model from a training job
Alternatively, you can load a model from a model artifact produced by a training job. You create a transformer with that model.

In [None]:
model = session.create_model_from_job(
    training_job_name=training_job_name, 
    name=model_name,
)

In [None]:
transformer = sagemaker.transformer.Transformer(
    model_name=model,
    instance_count=1,
    instance_type=train_instance_type,
    accept="text/csv",
    assemble_with="Line",
    output_path=transform_s3_url,
    base_transform_job_name="from-idea-to-prod-trasform",
    sagemaker_session=session,
)

#### Run transform job



In [None]:
transform_job_name = f"from-idea-to-prod-transform-{strftime('%d-%H-%M-%S', gmtime())}"

try:
    run_suffix = strftime('%Y-%m-%M-%S', gmtime())
    run_name = f"batch-transform-{run_suffix}"

    with Run(experiment_name=experiment_name,
             run_name=run_name,
             run_display_name="batch-transform",
             sagemaker_session=session
            ) as run:
        transformer.transform(    
            data=f"{test_s3_url}/test_x.csv",
            content_type="text/csv",
            split_type="Line", 
            job_name=transform_job_name,
            wait=True,
            # experiment_config=experiment_config,
        )
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print(f"Ignore AccessDeniedException: {e.response['Error']['Message']} because of the slow resource tag auto propagation")
    else:
        raise e



In [None]:
while sm.describe_transform_job(
        TransformJobName=transformer._current_job_name
    )["TransformJobStatus"] != "Completed":
    time.sleep(10)
    print(f"Wait until {transformer._current_job_name} completed")

In [None]:
transformer.output_path

#### Evaluate predictions

In [None]:
!aws s3 ls {transformer.output_path}/

In [None]:
!aws s3 cp {transformer.output_path}/test_x.csv.out tmp/predictions.csv
!aws s3 cp $test_s3_url/test_y.csv tmp/test_y.csv

In [None]:
predictions = pd.read_csv("tmp/predictions.csv", names=["y_prob"])
test_y = pd.read_csv("tmp/test_y.csv", names=['y'])

#### Crosstab

In [None]:
pd.crosstab(
    index=test_y['y'].values,
    columns=np.array(np.round(predictions), dtype=float).squeeze(), 
    rownames=['actuals'], 
    colnames=['predictions']
)

In [None]:
test_auc = roc_auc_score(test_y, predictions)
print(f"Test-auc: {test_auc:.2f}")

#### ROC curve

In [None]:
from sklearn import metrics
from sklearn.metrics import RocCurveDisplay
import matplotlib.pyplot as plt


fpr, tpr, thresholds = metrics.roc_curve(test_y, predictions)
roc_auc = metrics.auc(fpr, tpr)
display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc,estimator_name='Holdout/Test Data - ROC curve')
display.plot()
plt.show()

#### Confusion matrix

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

cm = confusion_matrix(test_y, np.round(predictions))
f = sns.heatmap(cm, annot=True, fmt='d')
plt.show()

#### Precision-recall curve

In [None]:
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import PrecisionRecallDisplay

prec, recall, _ = precision_recall_curve(test_y, predictions)
average_precision= metrics.average_precision_score(test_y, predictions)
pr_display = PrecisionRecallDisplay(precision=prec, recall=recall, average_precision=average_precision, estimator_name='Holdout/Test Data - AUPRC curve')
pr_display.plot()
plt.show()

### Save charts to the experiment run
You can use the [`experiments.load_run()`](https://sagemaker.readthedocs.io/en/stable/experiments/sagemaker.experiments.html#sagemaker.experiments.load_run) method to load an existing run.

In [None]:
title_suffix = strftime('%Y-%m-%M-%S', gmtime())

with load_run(experiment_name=experiment_name, run_name=run_name) as run:
    print(run.experiment_config)
    run.log_confusion_matrix(y_true=test_y['y'].values,
                             y_pred=np.array(np.round(predictions), dtype=float).squeeze(), 
                             title=f"confusion-matrix-{title_suffix}")
    run.log_roc_curve(y_true=test_y['y'].values, 
                      y_score=predictions['y_prob'].values, 
                      title=f"roc-curve2-{title_suffix}")
    run.log_precision_recall(y_true=test_y['y'].values,
                             predicted_probabilities=predictions['y_prob'].values,
                             positive_label=1,
                             title=f"precision-recall-{title_suffix}")

## Explore experiments and runs with Studio UX
You can see all logged metrics, parameters, and artifacts in Studio UX in **SageMaker Home** > **Experiments** widget.

For example, click on your experiment name you used in this notebook:

![](img/experiment-and-runs.png)

You see runs which you created in this notebook:

![](img/runs-02.png)

Select `batch-transform-<timestamp>` run and choose **Charts** in the **Overview** section on the left pane. You see the three added charts in the run:

![](img/run-charts.png)


---

## Optional: Hyperparameter optimization (HPO)
It takes about 20 minutes to run this section. The section is optional and you don't need to run it to continue with other notebooks. You can navigate directly to step 3 [notebook](03-sagemaker-pipeline.ipynb). If you would like to perform a model A/B test in **Additional topics** sections, you can execute this part to produce an alternative model.

[Amazon SageMaker automatic model tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html), also called hyperparameter optimization (HPO), finds the best performing model against a defined objective metric by running many training jobs on the dataset using the algorithm and ranges of hyperparameters that you specify. SageMaker HPT supports random search, bayesian optimization, and [hyperband](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html) as tuning strategies.

In [None]:
# import required HPO objects
from sagemaker.tuner import (
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
    IntegerParameter,
)

In [None]:
# set up hyperparameter ranges
hp_ranges = {
    "min_child_weight": ContinuousParameter(1, 10),
    "max_depth": IntegerParameter(1, 10),
    "alpha": ContinuousParameter(0, 5),
    "eta": ContinuousParameter(0, 1),
    "colsample_bytree": ContinuousParameter(0, 1),
    "gamma": ContinuousParameter(0, 10)
    
}

# set up the objective metric
objective = "validation:auc"

# instantiate a HPO object
tuner = HyperparameterTuner(
    estimator=estimator,  # the SageMaker estimator object
    hyperparameter_ranges=hp_ranges,  # the range of hyperparameters
    max_jobs=30,  # total number of HPO jobs
    max_parallel_jobs=3,  # how many HPO jobs can run in parallel
    strategy="Bayesian",  # the internal optimization strategy of HPO
    objective_metric_name=objective,  # the objective metric to be used for HPO
    objective_type="Maximize",  # maximize or minimize the objective metric
    base_tuning_job_name="from-idea-to-prod-hpo",
    early_stopping_type="Auto",
)

Now run the HPO job. It takes about 10 minutes to complete. 

<div class="alert alert-info"> 💡 Note, that the HPO job creates its own experiment to track each training job with a specific set of hyperparameters as a separate run.
</div>



In [None]:
tuner.fit(
    {"train": s3_input_train, "validation": s3_input_validation},
)



In [None]:
print(f"HPO job status: {sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']}")



In [None]:
hpo_predictor = tuner.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer(),
)



In [None]:
hpo_predictions = np.array(hpo_predictor.predict(test_x.values), dtype=float).squeeze()
print(hpo_predictions)

In [None]:
pd.crosstab(
    index=test_y['y'].values,
    columns=np.round(hpo_predictions), 
    rownames=['actuals'], 
    colnames=['predictions']
)

There is only a small improvements for the model metrics. It can indicate, that the XGBoost model is already at it's limit. You might want to explore other model types to improve the prediction accuracy for this use case.

---

## Clean-up
To avoid charges, remove the hosted endpoint you created.

In [None]:
predictor.delete_endpoint(delete_endpoint_config=True)

In [None]:
# run if you created a tuned predictor after HPO
try:
    hpo_predictor.delete_endpoint(delete_endpoint_config=True)
except NameError:
    print("You have no HPO predictor endpoint")

## Continue with the step 3
open the step 3 [notebook](03-sagemaker-pipeline.ipynb).

## Further development ideas for your real-world projects
- Track, organize, and compare all your model training runs using [SageMaker Experiments](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments.html)
- Use [Amazon SageMaker Data Wrangler](https://aws.amazon.com/sagemaker/data-wrangler/) for creating a no-code or low-code visual data processing and feature engineering flow. Refer to this hands-on tutorial: [Prepare Training Data for Machine Learning with Minimal Code](https://aws.amazon.com/getting-started/hands-on/machine-learning-tutorial-prepare-data-with-minimal-code/)
- Try no-code [SageMaker Canvas](https://docs.aws.amazon.com/sagemaker/latest/dg/canvas.html) on your data to perform analysis and use automated ML to build models and generate predictions

## Additional resources
- [Using Docker containers with SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html)
- [How to create and use a custom SageMaker container: SageMaker hands-on workshop](https://sagemaker-workshop.com/custom/containers.html)
- [Amazon SageMaker Immersion Day](https://catalog.us-east-1.prod.workshops.aws/workshops/63069e26-921c-4ce1-9cc7-dd882ff62575/en-US)
- [Targeting Direct Marketing with Amazon SageMaker XGBoost](https://github.com/aws-samples/amazon-sagemaker-immersion-day/blob/master/processing_xgboost.ipynb)
- [Train a Machine Learning Model](https://aws.amazon.com/getting-started/hands-on/machine-learning-tutorial-train-a-model/)
- [Deploy a Machine Learning Model to a Real-Time Inference Endpoint](https://aws.amazon.com/getting-started/hands-on/machine-learning-tutorial-deploy-model-to-real-time-inference-endpoint/)
- [Amazon SageMaker 101 Workshop](https://catalog.us-east-1.prod.workshops.aws/workshops/0c6b8a23-b837-4e0f-b2e2-4a3ffd7d645b/en-US)
- [Amazon SageMaker 101 Workshop code repository](https://github.com/aws-samples/sagemaker-101-workshop)
- [Amazon SageMaker with XGBoost and Hyperparameter Tuning for Direct Marketing predictions](https://github.com/aws-samples/sagemaker-101-workshop/blob/main/builtin_algorithm_hpo_tabular/SageMaker%20XGBoost%20HPO.ipynb)

# Shutdown kernel

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>