# Tuning pipeline

<div class="alert alert-info"> ðŸ’¡ <strong> LightGBM Only </strong>

This notebook can be used for LightGBM models only
</div>

In [3]:
!pip install -U sagemaker --quiet

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.27.24 requires botocore==1.29.24, but you have botocore 1.29.51 which is incompatible.[0m[31m
[0m

In [4]:
import os
import time
import boto3
import sagemaker.session
from datetime import datetime
from sagemaker import (
    image_uris,
    model_uris,
    script_uris,
    hyperparameters,
    get_execution_role,
)

from sagemaker.utils import name_from_base
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterBoolean,
    ParameterFloat,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.steps import (
    ProcessingStep,
    TuningStep,
    TransformStep,
    CreateModelStep,
)
from sagemaker.workflow.step_collections import RegisterModel, EstimatorTransformer
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor, ScriptProcessor
from sagemaker.predictor import csv_serializer
from sagemaker.debugger import rule_configs, Rule, DebuggerHookConfig
from sagemaker.model_monitor import (
    DataCaptureConfig,
    DatasetFormat,
    DefaultModelMonitor,
)
from sagemaker.s3 import S3Uploader, S3Downloader
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
    HyperparameterTuner,
)
from sagemaker.model import Model
from sagemaker.inputs import TrainingInput, TransformInput
from sagemaker.transformer import Transformer

## Parameters

In [5]:
s3_bucket = ParameterString(
    "s3_bucket", default_value="s3://sagemaker-eu-west-1-708699854342"
)
s3_project_path = ParameterString(
    "s3_project_path", default_value="knnights/test/shapeshifter"
)

accuracy_mae_threshold = ParameterFloat(name="AccuracyMaeThreshold", default_value=0.5)

train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)
max_tuning_jobs = ParameterInteger(name="MaxTuningJob", default_value=1)
max_parallel_jobs = ParameterInteger(name="MaxParallelJobs", default_value=1)

In [6]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = s3_bucket

pipeline_session = PipelineSession()

cache_config = CacheConfig(enable_caching=True, expire_after="1d")

## s3 Paths

https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.execution_variables.ExecutionVariables

In [7]:
# S3 paths
project = "shapeshifter"
model_type = "lightgbm"

project_path = Join(values=["s3:/", default_bucket, s3_project_path], on="/")
data_path = Join(values=[project_path, "data"], on="/")
output_path = Join(
    values=[data_path, "output", ExecutionVariables.START_DATETIME], on="/"
)
model_path = Join(values=[project_path, "models"], on="/")
input_data_path = Join(values=[data_path, "input"], on="/")
inference_path = Join(
    values=[project_path, "inference", ExecutionVariables.START_DATETIME], on="/"
)
inference_test_path = Join(values=[inference_path, "test_predictions"], on="/")

data_capture_path = Join(values=[project_path, "data_capture"], on="/")

data_processed_path = Join(
    values=[data_path, "processed", ExecutionVariables.START_DATETIME], on="/"
)
# model_evaluation_path = name_from_base(Join(values=[model_path, "lightgbm", "evaluation"))

## Input processor

In [8]:
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role=role,
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
)

root = "/opt/ml/processing"

step_process = ProcessingStep(
    name="ShapeshifterPrepData",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data_path, destination=f"{root}/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source=f"{root}/processed/train",
            destination=Join(values=[data_processed_path, "train"], on="/"),
        ),
        ProcessingOutput(
            output_name="validation",
            source=f"{root}/processed/validation",
            destination=Join(values=[data_processed_path, "validation"], on="/"),
        ),
        ProcessingOutput(
            output_name="test",
            source=f"{root}/processed/test",
            destination=Join(values=[data_processed_path, "test"], on="/"),
        ),
        ProcessingOutput(
            output_name="test_with_header",
            source=f"{root}/processed/test_with_header",
            destination=Join(values=[data_processed_path, "with_header"], on="/"),
        ),
        ProcessingOutput(
            output_name="test_no_target",
            source=f"{root}/processed/test_no_target",
            destination=Join(values=[data_processed_path, "test_no_target"], on="/"),
        ),
        ProcessingOutput(
            output_name=model_type,
            source=f"{root}/processed/{model_type}",
            destination=Join(values=[data_processed_path, "lightgbm"], on="/"),
        ),
        ProcessingOutput(
            output_name="encoders",
            source=f"{root}/processed/encoders",
            destination=Join(values=[data_processed_path, "encoders"], on="/"),
        ),
    ],
    code="processing.py",
    cache_config=cache_config,
)

## Tuning

In [9]:
instance_type = "ml.m5.2xlarge"
objective_metric_name = "rmse"

### Light GBM

<div class="alert alert-info"> ðŸ’¡ <strong> Why can't we use the train_source_uri?? </strong>

s3://jumpstart-cache-prod-eu-west-1/source-directory-tarballs/lightgbm/inference/regression/v1.1.2/ is not accessible?
</div>

In [10]:
! aws s3 cp s3://jumpstart-cache-prod-eu-west-1/source-directory-tarballs/lightgbm/inference/regression/v1.1.2/ s3://sagemaker-eu-west-1-708699854342/shapeshifter/lightgbm/model_scripts/ --recursive

copy: s3://jumpstart-cache-prod-eu-west-1/source-directory-tarballs/lightgbm/inference/regression/v1.1.2/sourcedir.tar.gz to s3://sagemaker-eu-west-1-708699854342/shapeshifter/lightgbm/model_scripts/sourcedir.tar.gz


In [11]:
train_model_id, train_model_version = f"{model_type}-regression-model", "*"
docker_image_train = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope="training",
    instance_type=instance_type,
)

train_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope="training"
)

train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope="training"
)

hyperparameter_ranges = {
    "learning_rate": ContinuousParameter(1e-4, 1, scaling_type="Logarithmic"),
    "num_boost_round": IntegerParameter(10, 100),
    #     "early_stoppings": IntegerParameter(2, 20),
    "num_leaves": IntegerParameter(10, 30),
    "feature_fraction": ContinuousParameter(0, 1),
    "bagging_fraction": ContinuousParameter(0, 1),
    "bagging_freq": IntegerParameter(1, 10),
    "max_depth": IntegerParameter(5, 30),
    "min_data_in_leaf": IntegerParameter(5, 50),
    "tweedie_variance_power": ContinuousParameter(1, 1.99),
    "boosting": CategoricalParameter(["gbdt", "dart"]),
}

hp = hyperparameters.retrieve_default(
    model_id=train_model_id, model_version=train_model_version
)

lightgbm_estimator = sagemaker.estimator.Estimator(
    image_uri=docker_image_train,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    role=role,
    instance_count=1,
    entry_point="transfer_learning.py",
    instance_type=instance_type,
    output_path=Join(values=[model_path, model_type], on="/"),
    base_job_name=f"{project}-{model_type}",
    sagemaker_session=pipeline_session,
    hyperparameters=hp,
)

metric_definitions = [
    {"Name": objective_metric_name, "Regex": f"{objective_metric_name}: ([0-9\.]+)"},
]

tuner = HyperparameterTuner(
    estimator=lightgbm_estimator,
    objective_metric_name=objective_metric_name,
    objective_type="Minimize",
    metric_definitions=metric_definitions,
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=max_tuning_jobs,
    max_parallel_jobs=max_parallel_jobs,
)

step_lightgbm_tuning = TuningStep(
    name="LightGBMHPTuning",
    step_args=tuner.fit(
        inputs=TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "lightgbm"
            ].S3Output.S3Uri
        )
    ),
)

step_lightgbm_tuning.add_depends_on([step_process])

model_type = "lightgbm"
docker_image_inference = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=f"{model_type}-regression-model",
    model_version="*",
    image_scope="inference",
    instance_type=instance_type,
)

best_lightgbm_model = Model(
    name="shapeshifter-lightgbm",
    image_uri=docker_image_inference,
    model_data=step_lightgbm_tuning.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=sagemaker_session.default_bucket(),
        prefix=f"shapeshifter/models/{model_type}",
    ),
    source_dir="model_scripts",
    sagemaker_session=pipeline_session,
    entry_point="inference.py",
    role=role,
)



In [12]:
step_create_best_lightgbm = ModelStep(
    name="CreateBestLightGBM",
    step_args=best_lightgbm_model.create(instance_type="ml.m5.xlarge"),
)

## Create predictionspath

In [13]:
from sagemaker.inputs import BatchDataCaptureConfig

lightgbm_transformer = Transformer(
    model_name=step_create_best_lightgbm.properties.ModelName,
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    output_path=inference_test_path,
    sagemaker_session=pipeline_session,
    assemble_with="Line",
    accept="text/csv",
    strategy="MultiRecord",
    max_payload=2,
)

step_lightgbm_transform = TransformStep(
    name="LightGBMTransform",
    transformer=lightgbm_transformer,
    inputs=TransformInput(
        step_process.properties.ProcessingOutputConfig.Outputs[
            "test_no_target"
        ].S3Output.S3Uri,
        content_type="text/csv",
        split_type="Line",
        batch_data_capture_config=BatchDataCaptureConfig(
            destination_s3_uri=data_capture_path,
        ),
    ),
)

step_lightgbm_transform.add_depends_on([step_create_best_lightgbm])

## Evaluate models

In [14]:
sklearn_evaluation_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role=role,
    instance_type="ml.m5.4xlarge",
    instance_count=1,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_evaluate_lightgbm = ProcessingStep(
    name="EvaluateModelLightGBM",
    processor=sklearn_evaluation_processor,
    inputs=[
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        ),
        ProcessingInput(
            source=inference_test_path,
            destination=f"/opt/ml/processing/input/{model_type}",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=Join(values=[inference_test_path, "evaluation"], on="/"),
        ),
    ],
    code="evaluate.py",
    property_files=[evaluation_report],
)

step_evaluate_lightgbm.add_depends_on([step_lightgbm_transform])

## Register models

### Explainability

In [15]:
# from sagemaker import clarify

# shap_config = clarify.SHAPConfig(
#     baseline=[test_features.iloc[0].values.tolist()],
#     num_samples=15,
#     agg_method="mean_abs",
#     save_local_shap_values=True,
# )

# explainability_output_path = "s3://{}/{}/clarify-explainability".format(bucket, prefix)
# explainability_data_config = clarify.DataConfig(
#     s3_data_input_path=train_uri,
#     s3_output_path=explainability_output_path,
#     label="Target",
#     headers=training_data.columns.to_list(),
#     dataset_type="text/csv",
# )
# clarify_processor.run_explainability(
#     data_config=explainability_data_config,
#     model_config=model_config,
#     explainability_config=shap_config,
# )

In [16]:
# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics_lightgbm = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            values=[
                step_evaluate_lightgbm.arguments["ProcessingOutputConfig"]["Outputs"][
                    0
                ]["S3Output"]["S3Uri"],
                "evaluation.json",
            ],
            on="/",
        ),
        content_type="application/json",
    )
)

# Create a RegisterModel step, which registers the model with Sagemaker Model Registry.
step_register_lightgbm_model = RegisterModel(
    name="RegisterLightgbm",
    estimator=lightgbm_estimator,
    model_data=step_create_best_lightgbm.properties.PrimaryContainer.ModelDataUrl,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=f"{project}-{model_type}",
    approval_status="PendingManualApproval",
    model_metrics=model_metrics_lightgbm,
    description="Model for Shapeshifter predictions based on LightGBM",
    depends_on=[step_evaluate_lightgbm],
    image_uri=docker_image_inference,
)

step_register_lightgbm_model_approval = RegisterModel(
    name="RegisterLightgbmApproved",
    estimator=lightgbm_estimator,
    model_data=step_create_best_lightgbm.properties.PrimaryContainer.ModelDataUrl,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=f"{project}-{model_type}",
    approval_status="Approved",
    model_metrics=model_metrics_lightgbm,
    description="Model for Shapeshifter predictions based on LightGBM",
    depends_on=[step_evaluate_lightgbm],
    image_uri=docker_image_inference,
)

## Deploy model

In [17]:
# from sagemaker.workflow.lambda_step import LambdaStep
# from sagemaker.lambda_helper import Lambda

# endpoint_config_name = "shapeshifter-endpoint-config"
# endpoint_name = "shapeshifter-endpoint" #Join(values=["shapeshifter-endpoint-", ExecutionVariables.START_DATETIME], on="")

# deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda" #Join(values=["sagemaker-deploy-model-lambda-", ExecutionVariables.START_DATETIME], on="")

# deploy_model_lambda_function = Lambda(
#     function_name=deploy_model_lambda_function_name,
#     execution_role_arn=role,
#     script="deploy_model.py",
#     handler="deploy_model.lambda_handler",
# )

# step_deploy_model_lambda = LambdaStep(
#     name="DeployShapeshifterModelToEndpoint",
#     lambda_func=deploy_model_lambda_function,
#     inputs={
#         "model_name": step_create_best_lightgbm.properties.ModelName,
#         "endpoint_config_name": endpoint_config_name,
#         "endpoint_name": endpoint_name,
#         "endpoint_instance_type": "ml.m5.xlarge",
#     },
# )

## Config step

In [18]:
step_config = ProcessingStep(
    name="ShapeshifterSaveConfig",
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/config",
            destination=Join(values=[project_path, "config"], on="/"),
        ),
    ],
    job_arguments=[
        "--training-date",
        ExecutionVariables.START_DATETIME,
        "--encoders-s3-path",
        step_process.properties.ProcessingOutputConfig.Outputs[
            "encoders"
        ].S3Output.S3Uri,
        "--model-location",
        step_create_best_lightgbm.properties.PrimaryContainer.ModelDataUrl,
    ],
    code="create_config.py",
    cache_config=cache_config,
)

## Accuracy condition step

In [19]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_lightgbm.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mean_absolute_error.value",
    ),
    right=accuracy_mae_threshold,
)

# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name="MAELowerThanThresholdCondition",
    conditions=[cond_lte],
    if_steps=[step_register_lightgbm_model_approval, step_config],
    else_steps=[step_register_lightgbm_model],
)

## Construct pipeline

In [21]:
pipeline_name = f"ShapeshifterPipeline-LightGBM"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        s3_bucket,
        s3_project_path,
        accuracy_mae_threshold,
        train_instance_count,
        max_tuning_jobs,
        max_parallel_jobs,
    ],
    steps=[
        step_process,
        step_lightgbm_tuning,
        step_create_best_lightgbm,
        step_lightgbm_transform,
        step_evaluate_lightgbm,
        step_cond,
    ],
)

pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:708699854342:pipeline/shapeshifterpipeline-lightgbm',
 'ResponseMetadata': {'RequestId': '3cbe01b8-8f66-4f81-9026-849ad3e82a7d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3cbe01b8-8f66-4f81-9026-849ad3e82a7d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '97',
   'date': 'Wed, 18 Jan 2023 15:09:16 GMT'},
  'RetryAttempts': 0}}

In [None]:
# execution = pipeline.start(execution_display_name=f"{project}-{round(time.time())}")