## Lambda Step
#### this step aims to get the latest model name from SSM parameter then have the model name feed into the Batch Transform Step

In [33]:
role = "<sagemaker_executuion_role>"

In [34]:
%%writefile code/lambda_helper.py

import boto3
import json

def lambda_handler(event, context):
    
    ssm = boto3.client("ssm")
    
    try:
        # only get the latest parameter store Value (which in this case will be the model namme)
        model_name = ssm.get_parameter(
            Name='model_name')["Parameter"]["Value"]
        print("model_name", model_name)

    except ssm.exceptions.ParameterNotFound:
        #if found model arn
        return {
            "statusCode": 500,
            "body": json.dumps("Training pipeline not run in DEV"),
            }
    except Exception as e:
        print(e)
        return {
            "statusCode": 500,
            "body": json.dumps(f"lambda func fail with {e}"),
                }
    
    
    return {
    "statusCode": 200,
    "body": json.dumps("Created lambda step!"),
    "model_name": model_name

}



Overwriting code/lambda_helper.py


In [35]:
lambda_role = "<lambda_execution_role>"

function_name = "read-prod-ssm"

In [36]:
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum)
    
    
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="code/lambda_helper.py",
    handler="lambda_helper.lambda_handler"
)

output_param_1 = LambdaOutput(output_name="model_name", output_type=LambdaOutputTypeEnum.String)

In [37]:
step_lambda = LambdaStep(
    name="LambdaStep_pipeline",
    lambda_func=func,
    inputs={},
    outputs=[output_param_1]
    )

## Transform Step

In [44]:
!mkdir -p data

In [69]:
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-sample-files").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

base_uri = f"s3://<bucket_name>/abalone" ## replace bucket_name
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

In [70]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name

local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://<bucket_name>/abalone"  ## replace bucket_name
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

In [71]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

In [53]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_lambda.properties.Outputs["model_name"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://<bucket_name>/AbaloneTransform", ## s3 URI output path
)

In [54]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="PRODAbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the data dependency DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

In [55]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"PRODAbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_lambda, step_transform]
)

In [65]:
import json

definition = json.loads(pipeline.definition())
definition

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [66]:
pipeline.upsert(role_arn=role)

Start the pipeline and accept all the default parameters.

In [58]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [67]:
execution.describe()

Wait for the execution to complete.

In [60]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [68]:
execution.list_steps()