In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import configparser
import os
import json
import warnings
import boto3
import sagemaker
import pandas as pd
from sagemaker.estimator import Estimator
from sagemaker.inputs import CreateModelInput, TrainingInput, TransformInput
from sagemaker.lineage.visualizer import LineageTableVisualizer
from sagemaker.model import Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import (
    ScriptProcessor,
)
from sagemaker.transformer import Transformer
from sagemaker.tuner import (
    ContinuousParameter,
    IntegerParameter,
    HyperparameterTuner,
)
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
    ConditionLessThanOrEqualTo,
)
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import (
    CacheConfig,
    CreateModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
    TuningStep,
)
from sagemaker.workflow.pipeline import Pipeline

warnings.filterwarnings(action="ignore")

In [3]:
config = configparser.ConfigParser()
_ = config.read(os.path.join("..", "conf", "config.ini"))

region = config["proj"]["region"]
default_bucket = config["proj"]["s3_default_bucket"] 
base_job_prefix = config["proj"]["s3_base_job_prefix"]
role = config["proj"]["iam_role"]
repository_name = config["proj"]["ecr_repository_name"]
tuning_max_jobs = eval(config["model"]["tuning_max_jobs"])
valid_size = config["model"]["valid_size"]
test_size = config["model"]["test_size"]
model_package_group_name = config["pipelines"]["model_package_group_name"]
pipeline_name = config["pipelines"]["pipeline_name"]
target_key = config["pipelines"]["target_key"]
target_value = eval(config["pipelines"]["target_value"])
minimize_target = eval(config["pipelines"]["minimize_target"])

In [4]:
RAW_DATA_PATH = os.path.join("..", "..", "..", "data", "ieee-fraud-detection")

boto_session = boto3.Session()
if len(region) == 0:
    region = boto_session.region_name
sagemaker_session = sagemaker.session.Session()
if len(default_bucket) == 0:
    default_bucket = sagemaker_session.default_bucket()   
account_id = boto_session.client("sts").get_caller_identity().get("Account")
role = f"arn:aws:iam::{account_id}:role/service-role/{role}"

#### Uploading Datasets to S3 Bucket

In [5]:
%%time
!aws s3 cp {RAW_DATA_PATH}/train_identity.csv s3://{default_bucket}/{base_job_prefix}/raw_data/training/train_identity.csv  --quiet
!aws s3 cp {RAW_DATA_PATH}/train_transaction.csv s3://{default_bucket}/{base_job_prefix}/raw_data/training/train_transaction.csv --quiet
!aws s3 cp {RAW_DATA_PATH}/test_identity.csv s3://{default_bucket}/{base_job_prefix}/raw_data/prediction/test_identity.csv --quiet 
!aws s3 cp {RAW_DATA_PATH}/test_transaction.csv s3://{default_bucket}/{base_job_prefix}/raw_data/prediction/test_transaction.csv --quiet

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.05 µs


## Defining Parameters to Parametrize Pipeline Execution

In [6]:
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.2xlarge"
)
training_instance_count = ParameterInteger(
    name="TrainingInstanceCount", default_value=1
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.2xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
training_data_uri = f"s3://{default_bucket}/{base_job_prefix}/raw_data/training"
training_data = ParameterString(
    name="TrainingData", default_value=training_data_uri
)
prediction_data_uri = f"s3://{default_bucket}/{base_job_prefix}/raw_data/prediction"
prediction_data = ParameterString(
    name="PredictionData",
    default_value=prediction_data_uri,
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

## Defining a Processing Step for Data Splitting and Preprocessing
* I created a custom image to use `scikit-learn` version 0.24 and `category_encoders` library. To build the image with the pre-made `Dockerfile` and push it to Amazon ECR, you need to run the shell script named `run.sh`. As a result, instead of **SKLearnProcessor** with framework version 0.23, you can run a custom image-based **ScriptProcessor** with the required libraries.

In [7]:
processing_image_uri = (
    f"{account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}"
)

preprocessor = ScriptProcessor(
    role=role,
    image_uri=processing_image_uri,
    command=["python3"],
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    base_job_name=f"{base_job_prefix}-data-prep",
)

step_preprocess = ProcessingStep(
    name="PreprocessData",
    processor=preprocessor,
    inputs=[
        ProcessingInput(
            source=training_data, destination="/opt/ml/processing/raw_data/training"
        )
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/train", output_name="train"),
        ProcessingOutput(source="/opt/ml/processing/valid", output_name="valid"),
        ProcessingOutput(source="/opt/ml/processing/test", output_name="test"),
    ],
    code=os.path.join("..", "modelbuild", "pipelines", "ieee_fraud_detection", "preprocess.py"),
    cache_config=cache_config,
    job_arguments=[
        "--base_dir",
        "/opt/ml/processing",
        "--valid_size",
        valid_size,
        "--test_size",
        test_size,
        "--is_prediction",
        "False",
    ],
)

## Defining a Tuning Step to Tune Hyperparameters of a *XGBoost* Estimator

In [8]:
training_image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.3-1",
    py_version="py3",
    instance_type="ml.m5.2xlarge",
)
model_output_uri = f"{default_bucket}/{base_job_prefix}/models"

estimator = Estimator(
    image_uri=training_image_uri,
    role=role,
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    output_path="s3://" + model_output_uri,
)

hyperparameters = {
    "booster": "gbtree",
    "verbosity": 0,
    "objective": "binary:logistic",
    "seed": 42,
    "scale_pos_weight": 1.0,
    "eval_metric": "auc",
    "num_round": 1000,
    "early_stopping_rounds": 10,
}
estimator.set_hyperparameters(**hyperparameters)

In [9]:
hyperparameter_ranges = {
    "max_depth": IntegerParameter(1, 30, scaling_type="Auto"),
    "eta": ContinuousParameter(0.01, 1.0, scaling_type="Auto"),
    "gamma": ContinuousParameter(0.0, 1.0, scaling_type="Auto"),
    "min_child_weight": ContinuousParameter(1e-6, 1.0, scaling_type="Auto"),
    "subsample": ContinuousParameter(0.1, 1.0, scaling_type="Auto"),
    "colsample_bytree": ContinuousParameter(0.1, 1.0, scaling_type="Auto"),
}

tuner = HyperparameterTuner(
    estimator,
    "validation:auc",
    hyperparameter_ranges,
    objective_type="Maximize",
    max_jobs=tuning_max_jobs,
    max_parallel_jobs=3,
    base_tuning_job_name=f"{base_job_prefix}-param-tuning",
    early_stopping_type="Auto",
)

step_tune = TuningStep(
    name="TuneHyperparameters",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "valid"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

step_train = TrainingStep(
    name="TrainModel",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "valid"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

## Defining a Model Evaluation Step to Evaluate the Fitted Estimator

In [10]:
evaluator = ScriptProcessor(
    role=role,
    image_uri=training_image_uri,
    command=["python3"],
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    base_job_name=f"{base_job_prefix}-model-eval",
)

evaluation = PropertyFile(
    name="ModelEvaluation", output_name="evaluation", path="eval_metrics.json"
)
model_data = step_train.properties.ModelArtifacts.S3ModelArtifacts
# model_data = step_tune.get_top_model_s3_uri(top_k=0, s3_bucket=model_output_uri)

step_evaluate = ProcessingStep(
    name="EvaluateModel",
    processor=evaluator,
    inputs=[
        ProcessingInput(
            source=model_data,
            destination="/opt/ml/processing/models",
        ),
        ProcessingInput(
            source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/eval", output_name="evaluation")
    ],
    code=os.path.join("..", "modelbuild", "pipelines", "ieee_fraud_detection", "evaluate.py"),
    property_files=[evaluation],
    cache_config=cache_config,
    job_arguments=[
        "--base_dir",
        "/opt/ml/processing",
    ],
)

## Defining a Processing Step for Re-preprocessing

In [11]:
step_re_preprocess = ProcessingStep(
    name="Re-preprocessData",
    processor=preprocessor,
    inputs=[
        ProcessingInput(
            source=training_data, destination="/opt/ml/processing/raw_data/training"
        ),
        ProcessingInput(
            source=prediction_data, destination="/opt/ml/processing/raw_data/prediction"
        ),
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/re_train", output_name="re_train"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/re_valid", output_name="re_valid"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/re_test", output_name="re_test"
        ),
    ],
    code=os.path.join("..", "modelbuild", "pipelines", "ieee_fraud_detection", "re_preprocess.py"),
    cache_config=cache_config,
    job_arguments=[
        "--base_dir",
        "/opt/ml/processing",
        "--test_size",
        test_size,
    ],
)

## Defining a Training Step to Re-fit a *XGBoost* Estimator

In [12]:
step_re_train = TrainingStep(
    name="Re-trainModel",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_re_preprocess.properties.ProcessingOutputConfig.Outputs[
                "re_train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_re_preprocess.properties.ProcessingOutputConfig.Outputs[
                "re_valid"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

## Defining a Register Model Step to Register a Model Package

In [13]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        content_type="application/json",
        s3_uri=f"{step_evaluate.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']}/\
        eval_metrics.json",
    ),
)

model_data = step_re_train.properties.ModelArtifacts.S3ModelArtifacts
step_register = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    model_data=model_data,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.2xlarge"],
    transform_instances=["ml.m5.2xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

## Defining a Create Model Step to Deploy a Model

In [14]:
model = Model(
    image_uri=training_image_uri,
    model_data=model_data,
    role=role,
    sagemaker_session=sagemaker_session,
)

step_deploy = CreateModelStep(
    name="DeployModel",
    model=model,
    inputs=CreateModelInput(
        instance_type="ml.m5.2xlarge", accelerator_type="ml.eia2.medium"
    ),
)

## Defining a Transform Step to Perform Batch Transformation

In [15]:
transformer = Transformer(
    model_name=step_deploy.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    output_path=f"s3://{default_bucket}/{base_job_prefix}/pred",
)

step_predict = TransformStep(
    name="PredictData",
    transformer=transformer,
    inputs=TransformInput(
        data=step_re_preprocess.properties.ProcessingOutputConfig.Outputs[
            "re_test"
        ].S3Output.S3Uri,
        content_type="text/csv",
        split_type="Line",
    ),
    cache_config=cache_config,
)

## Defining a Condition Step to Check a Target Metric and Conditionally Perform Subsequent Steps

In [16]:
step = (
    ConditionLessThanOrEqualTo if minimize_target else ConditionGreaterThanOrEqualTo
)
condition = step(
    left=JsonGet(
        step_name="EvaluateModel",
        property_file=evaluation,
        json_path=f"binary_classification_metrics.{target_key}.value",
    ),
    right=target_value,
)

step_check = ConditionStep(
    name="CheckCondition",
    conditions=[condition],
    if_steps=[
        step_re_preprocess,
        step_re_train,
        step_register,
        step_deploy,
        step_predict,
    ],
    else_steps=[],
)

## Defining a Pipeline of Parameters and Steps

In [17]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_count,
        training_instance_type,
        training_data,
        prediction_data,
        model_approval_status,
    ],
    steps=[step_preprocess, step_train, step_evaluate, step_check],
    # steps=[step_preprocess, step_tune, step_evaluate, step_check],
    sagemaker_session=sagemaker_session,
)

In [18]:
definition = json.loads(pipeline.definition())
# pprint.pprint(definition)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


## Submitting the Pipeline to SageMaker and Performing Pipeline Execution

In [19]:
_ = pipeline.upsert(role_arn=role)
execution = pipeline.start()
description = execution.describe()
# pprint.pprint(description)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


In [20]:
execution.wait(delay=60, max_attempts=720)

In [21]:
# execution.list_steps()

In [22]:
viz = LineageTableVisualizer(sagemaker_session)
for execution_step in reversed(execution.list_steps()):
    display(pd.json_normalize(execution_step))
    display(viz.show(pipeline_execution_step=execution_step))
    print("")

Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.ProcessingJob.Arn
0,PreprocessData,2022-01-31 21:31:32.004000+09:00,2022-01-31 21:38:45.237000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:proce...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...e10aec10456ef75/input/code/preprocess.py,Input,DataSet,ContributedTo,artifact
1,s3://...1/ieee-fraud-detection/raw_data/training,Input,DataSet,ContributedTo,artifact
2,99860...mazonaws.com/sagemaker-category-encoders,Input,Image,ContributedTo,artifact
3,s3://...8952e1aa2819ce10aec10456ef75/output/test,Output,DataSet,Produced,artifact
4,s3://...952e1aa2819ce10aec10456ef75/output/valid,Output,DataSet,Produced,artifact
5,s3://...952e1aa2819ce10aec10456ef75/output/train,Output,DataSet,Produced,artifact





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.TrainingJob.Arn
0,TrainModel,2022-01-31 21:38:46.036000+09:00,2022-01-31 21:52:04.475000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:train...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...952e1aa2819ce10aec10456ef75/output/valid,Input,DataSet,ContributedTo,artifact
1,s3://...952e1aa2819ce10aec10456ef75/output/train,Input,DataSet,ContributedTo,artifact
2,68331...-1.amazonaws.com/sagemaker-xgboost:1.3-1,Input,Image,ContributedTo,artifact
3,s3://...rainModel-5AJlwyFasQ/output/model.tar.gz,Output,Model,Produced,artifact





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.ProcessingJob.Arn
0,EvaluateModel,2022-01-31 21:52:05.011000+09:00,2022-01-31 21:57:03.793000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:proce...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...63f3cbc84d684f161/input/code/evaluate.py,Input,DataSet,ContributedTo,artifact
1,s3://...8952e1aa2819ce10aec10456ef75/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...rainModel-5AJlwyFasQ/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...-1.amazonaws.com/sagemaker-xgboost:1.3-1,Input,Image,ContributedTo,artifact
4,s3://...4f92063f3cbc84d684f161/output/evaluation,Output,DataSet,Produced,artifact





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.Condition.Outcome
0,CheckCondition,2022-01-31 21:57:04.594000+09:00,2022-01-31 21:57:05.243000+09:00,Succeeded,True


None




Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.ProcessingJob.Arn
0,Re-preprocessData,2022-01-31 21:57:06.055000+09:00,2022-01-31 22:05:54.387000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:proce...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...3cb1c3b7c556/input/code/re_preprocess.py,Input,DataSet,ContributedTo,artifact
1,s3://...ieee-fraud-detection/raw_data/prediction,Input,DataSet,ContributedTo,artifact
2,s3://...1/ieee-fraud-detection/raw_data/training,Input,DataSet,ContributedTo,artifact
3,99860...mazonaws.com/sagemaker-category-encoders,Input,Image,ContributedTo,artifact
4,s3://...1f1f115a9417f3cb1c3b7c556/output/re_test,Output,DataSet,Produced,artifact
5,s3://...f1f115a9417f3cb1c3b7c556/output/re_valid,Output,DataSet,Produced,artifact
6,s3://...f1f115a9417f3cb1c3b7c556/output/re_train,Output,DataSet,Produced,artifact





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.TrainingJob.Arn
0,Re-trainModel,2022-01-31 22:05:55.607000+09:00,2022-01-31 22:20:06.505000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:train...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...f1f115a9417f3cb1c3b7c556/output/re_valid,Input,DataSet,ContributedTo,artifact
1,s3://...f1f115a9417f3cb1c3b7c556/output/re_train,Input,DataSet,ContributedTo,artifact
2,68331...-1.amazonaws.com/sagemaker-xgboost:1.3-1,Input,Image,ContributedTo,artifact
3,s3://...rainModel-ShChKxaUtQ/output/model.tar.gz,Output,Model,Produced,artifact





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.Model.Arn
0,DeployModel,2022-01-31 22:20:07.432000+09:00,2022-01-31 22:20:08.591000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:model...


None




Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.RegisterModel.Arn
0,RegisterModel,2022-01-31 22:20:07.432000+09:00,2022-01-31 22:20:08.546000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:model...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...rainModel-ShChKxaUtQ/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...-1.amazonaws.com/sagemaker-xgboost:1.3-1,Input,Image,ContributedTo,artifact
2,ieee-fraud-detection-21-PendingManualApproval-...,Input,Approval,ContributedTo,action
3,ieee-fraud-detection-1620038248-aws-model-pack...,Output,ModelGroup,AssociatedWith,context





Unnamed: 0,StepName,StartTime,EndTime,StepStatus,Metadata.TransformJob.Arn
0,PredictData,2022-01-31 22:20:09.106000+09:00,2022-01-31 22:26:27.424000+09:00,Succeeded,arn:aws:sagemaker:us-east-1:...:trans...


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...rainModel-ShChKxaUtQ/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...-1.amazonaws.com/sagemaker-xgboost:1.3-1,Input,Image,ContributedTo,artifact
2,s3://...1f1f115a9417f3cb1c3b7c556/output/re_test,Input,DataSet,ContributedTo,artifact
3,s3://...1-.../ieee-fraud-detection/pred,Output,DataSet,Produced,artifact





In [23]:
# pipeline.delete()