Skip to content

SageMaker Workflow Epic #4994

@rlhagerm

Description

@rlhagerm

Sagemaker documentation references:

SageMaker Developer Guide
SageMaker API Reference
Sagemaker examples and example notebooks

Community.aws post (for reference)

https://community.aws/posts/create-and-run-sagemaker-pipelines-using-aws-sdks

Reference implementation of scenario and actions in .NET

https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/dotnetv3/SageMaker

Service actions can either be pulled out as individual functions or can be incorporated into the scenario, but each service action must be included as an excerpt in the SOS output.

SageMaker Actions

CreatePipeline
UpdatePipeline
StartPipelineExecution
DescribePipelineExecution
DeletePipeline

Hello Service

The Hello Service example should demonstrate how to set up the client and make an example call using the SDK.

General info for Hello Service example snippets:
This section of the workflow should be a streamlined, simple example with enough detail to be as close to “copy/paste” runnable as possible. This example may include namespaces and other setup in order to focus on getting the user up and running with the new service.

Sample output:

Hello Amazon SageMaker! Let's list some of your notebook instances:

        Instance: test-notebook
        Arn: arn:aws:sagemaker:us-west-2:123456789:notebook-instance/test-notebook
        Creation Date: 6/7/2023

Hello SageMaker

Initialize the client and call ListNotebookInstances to list up to 5 of the account's notebook instances. If no instances are found, you can direct the user to instructions on how to add one.

Scenario

Overview

AWS SageMaker is a managed machine learning service. Developers can build and train machine learning models and deploy them into a production-ready hosted environment. This example focuses on the pipeline capabilities rather than the model training and building capabilities, since those are more likely to be useful to an SDK developer.

The example uses a Geospatial job because it allows for a fast processing time, and is simple to verify that the pipeline did execute correctly. It is expected that the user would replace this job with processing steps of their own, but be able to use the SDK pipeline operations for creating or updating a pipeline, handling callback and execution steps in an AWS Lambda function, and using pipeline parameters to set up input and output.

The Geospatial job itself is a Vector Enrichment Job (VEJ) that reverse geocodes a set of coordinates. Other job types are much slower to complete, and this job type has an easy-to-read output. Note that you should use us-west-2 region to use this job type. This particular job type is powered by Amazon Location Service, although you will not need to call that service directly. You can read more about geospatial capabilities here.

The AWS Lambda function handles the callback and the parameter-based queue messages from the pipeline. This example includes writing this Lambda function and deploying it as part of the pipeline, as well as connecting it to the SQS queue that is used by the pipeline.

There are multiple ways to handle pipeline operations, but in the interest of consistency, the C# implementation is based on this pipeline example reference. This logic checks for the existence of parameters in the message to determine which type of processing to start. Other languages do not need to mimic the exact logic shown here, that functionality is left up to the language developer.

The pipeline in this example is defined through a JSON file. Each language may wish to name the steps and parameters in a way that makes sense for their implementation, but you can use the file here as a guide.

README

This is a workflow scenario. As such, the READMEs should be standardized.
This is the .NET reference README

Implementation

A scenario runs at a command prompt and prints output to the user on the result of each service action. Because of the choices in this workflow scenario, it must be run interactively.

Create and execute a Sagemaker Pipeline using a geospatial job with an AWS SDK.

Note: geospatial jobs are supported in region us-west-2. All operations should use this region unless otherwise specified.

  1. Set up any missing resources needed for the example if they don’t already exist.
    1. Create a Lambda role with the following attached policies: iamClient CreateRole, AttachRolePolicy
      1. arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
      2. arn:aws:iam::aws:policy/service-role/AmazonSageMakerGeospatialFullAccess
      3. arn:aws:iam::aws:policy/AWSLambdaSQSQueueExecutionRole
      4. arn:aws:iam::aws:policy/service-role/AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy
      5. arn:aws:iam::aws:policy/AmazonSQSFullAccess
      6. AssumeRolePolicy:
        { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "s3.amazonaws.com", "lambda.amazonaws.com", "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
    2. Create a SageMaker role with the following attached policies: iamClient CreateRole, AttachRolePolicy
      1. arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
      2. arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess
      3. AssumeRolePolicy:
        { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "s3.amazonaws.com", "lambda.amazonaws.com", "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
    3. Create an SQS queue for the pipeline. SqsClient CreateQueue, GetQueueUrl
      1. You will need the queue URL for the pipeline execution.
    4. Create a bucket and upload a .csv file that includes Latitude and Longitude columns (see .NET code for example) for reverse geocoding. s3 client PutBucket and PutObject
      1. You can add an /input directory for this file. The pipeline will create a /output directory for the output file.
  2. Add a Lambda, with code included and written in your language, that handles callback functionality and connect it to the queue. If the Lambda already exists, you can prompt the user if they would like to update it. Suggested timeout for the Lambda is 30 seconds. lambdaClient CreateFunction, UpdateFunction, ListEventSourceMappings, CreateEventSourceMapping
    1. The lambda performs the following tasks, based on the input:
      1. If queue records are present, processes the records to check the job status of the geospatial job.
        1. COMPLETED: call SendPipelineExecutionStepSuccess
        2. FAILED: call SendPipelineExeuctionStepFailure
        3. IN_PROGRESS: log that the job is still running
      2. If export configuration is present, call ExportVectorEnrichmentJob
      3. If job name is present, call StartVectorEnrichmentJob
    2. The queue must be added to the event source mappings for the Lambda, and the event source mapping must be enabled.
  3. Create a pipeline using the SDK with the following characteristics. If the pipeline already exists, use an Update call to update it. You can use the JSON referenced here as a guide for the pipeline definition. sagemakerClient UpdatePipeline, CreatePipeline
    1. Pipeline parameters for the job, input, and export steps.
    2. A Lambda processing step: a Lambda that kicks off a vector enrichment job that takes in a set of coordinates for reverse geocode.
    3. A callback step to check the progress of the processing job.
    4. An export step for the results of the VEJ.
    5. A callback step to finish the pipeline.
  4. Execute the pipeline using the SDK with some input and poll for the execution status. sagemakerClient StartPipelineExecution, DescribePipelineExecution
  5. When the execution is complete, fetch the latest output file and display some of the output data to the user. s3client ListObjects, GetObject
  6. Provide instructions for optionally viewing the pipeline and executions in SageMaker studio.
  7. Clean up the pipeline and resources – the user gets to decide if they want to clean these up or not.
    1. Clean up pipeline DeletePipeline
    2. Clean up the queue DeleteQueue
    3. Clean up the bucket DeleteObjects, DeleteBucket
    4. Clean up the Lambda. DeleteFunction
    5. Clean up the pipeline. DeletePipeline

Other Notes:
You may want to view your pipeline in SageMaker studio, which will require a Domain
This will provide better debugging for the pipeline execution steps.

Metadata

sagemaker_Hello
sagemaker_CreatePipeline
sagemaker_ExecutePipeline
sagemaker_DeletePipeline
sagemaker_DescribePipelineExecution
sagemaker_Scenario_Pipelines

SDKs

Acceptance criteria

  • Metadata updates.
  • Runnable scenario and hello service code.
  • Service action code (may be same as scenario code).
  • Integration or unit tests.
  • Scenario and Single Action examples tagged for SOS.
  • The workflow must be in its own folder so that the README appears when the GitHub folder is opened in a webpage. Recommendation is to place this example in a SageMaker folder at the same level as other service examples.

Sample output


--------------------------------------------------------------------------------
Welcome to the Amazon SageMaker pipeline example scenario.

This example workflow will guide you through setting up and executing a
AWS SageMaker pipeline. The pipeline uses an AWS Lambda function and an
AWS SQS Queue, and runs a vector enrichment reverse geocode job to
reverse geocode addresses in an input file and store the results in an export file.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
First, we will set up the roles, functions, and queue needed by the SageMaker pipeline.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Checking for role named SageMakerExampleLambdaRole.
--------------------------------------------------------------------------------
Checking for role named SageMakerExampleRole.
--------------------------------------------------------------------------------
Setting up the Lambda function for the pipeline.
        The Lambda function SageMakerExampleFunction already exists, do you want to update it?
n
        Lambda ready with ARN arn:aws:lambda:us-west-2:1234567890:function:SageMakerExampleFunction.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Setting up queue sagemaker-sdk-example-queue-test.
--------------------------------------------------------------------------------
Setting up bucket sagemaker-sdk-test-bucket-test.
        Bucket sagemaker-sdk-test-bucket-test ready.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Now we can create and execute our pipeline.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Setting up the pipeline.
        Pipeline set up with ARN arn:aws:sagemaker:us-west-2:1234567890:pipeline/sagemaker-sdk-example-pipeline.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Starting pipeline execution.
        Execution started with ARN arn:aws:sagemaker:us-west-2:1234567890:pipeline/sagemaker-sdk-example-pipeline/execution/f8xmafpxx3ke.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Waiting for pipeline execution to finish.
        Execution status is Executing.
        Execution status is Executing.
        Execution status is Executing.
        Execution status is Succeeded.
        Execution finished with status Succeeded.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Getting output results sagemaker-sdk-test-bucket-test.
        Output file: outputfiles/qyycwuuxwc9w/results_0.csv
        Output file contents:

        -149.8935557,"61.21759217
        ",601,USA,"601 W 5th Ave, Anchorage, AK, 99501, USA",Anchorage,,99501 6301,Alaska,Valid Data
        -149.9054948,"61.19533942
        ",2794,USA,"2780-2798 Spenard Rd, Anchorage, AK, 99503, USA",Anchorage,North Star,99503,Alaska,Valid Data
        -149.7522,"61.2297
        ",,USA,"Enlisted Hero Dr, Jber, AK, 99506, USA",Jber,,99506,Alaska,Valid Data
        -149.8643361,"61.19525062
        ",991,USA,"959-1069 E Northern Lights Blvd, Anchorage, AK, 99508, USA",Anchorage,Rogers Park,99508,Alaska,Valid Data
        -149.8379726,"61.13751355
        ",2372,USA,"2276-2398 Abbott Rd, Anchorage, AK, 99507, USA",Anchorage,,99507,Alaska,Valid Data
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
The pipeline has completed. To view the pipeline and executionsin SageMaker Studio, follow these instructions:
https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Finally, let's clean up our resources.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Clean up resources.
        Delete pipeline sagemaker-sdk-example-pipeline? (y/n)
y
        Delete queue https://sqs.us-west-2.amazonaws.com/565846806325/sagemaker-sdk-example-queue-rlhagerm? (y/n)
y
        Delete Amazon S3 bucket sagemaker-sdk-test-bucket-rlhagerm2? (y/n)
y
        Delete role SageMakerExampleLambdaRole? (y/n)
y
        Delete role SageMakerExampleRole? (y/n)
y
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
SageMaker pipeline scenario is complete.
--------------------------------------------------------------------------------

image

Pipeline json:


{
  "Version": "2020-12-01",
  "Metadata": {},
  "Parameters": [
    {
      "Name": "parameter_execution_role",
      "Type": "String",
      "DefaultValue": ""
    },
    {
      "Name": "parameter_region",
      "Type": "String",
      "DefaultValue": "us-west-2"
    },
    {
      "Name": "parameter_queue_url",
      "Type": "String",
      "DefaultValue": ""
    },
    {
      "Name": "parameter_vej_input_config",
      "Type": "String",
      "DefaultValue": ""
    },
    {
      "Name": "parameter_vej_export_config",
      "Type": "String",
      "DefaultValue": ""
    },
    {
      "Name": "parameter_step_1_vej_config",
      "Type": "String",
      "DefaultValue": ""
    }
  ],
  "PipelineExperimentConfig": {
    "ExperimentName": {
      "Get": "Execution.PipelineName"
    },
    "TrialName": {
      "Get": "Execution.PipelineExecutionId"
    }
  },
  "Steps": [
    {
      "Name": "vej-processing-step",
      "Type": "Lambda",
      "Arguments": {
        "role": {
          "Get": "Parameters.parameter_execution_role"
        },
        "region": {
          "Get": "Parameters.parameter_region"
        },
        "vej_input_config": {
          "Get": "Parameters.parameter_vej_input_config"
        },
        "vej_config": {
          "Get": "Parameters.parameter_step_1_vej_config"
        },
        "vej_name": "vej-pipeline-step-1"
      },
      "FunctionArn": "*FUNCTION_ARN*",
      "OutputParameters": [
        {
          "OutputName": "statusCode",
          "OutputType": "String"
        },
        {
          "OutputName": "vej_arn",
          "OutputType": "String"
        }
      ]
    },
    {
      "Name": "vej-callback-step",
      "Type": "Callback",
      "Arguments": {
        "role": {
          "Get": "Parameters.parameter_execution_role"
        },
        "region": {
          "Get": "Parameters.parameter_region"
        },
        "vej_arn": {
          "Get": "Steps.vej-processing-step.OutputParameters['vej_arn']"
        }
      },
      "DependsOn": [
        "vej-processing-step"
      ],
      "SqsQueueUrl": {
        "Get": "Parameters.parameter_queue_url"
      },
      "OutputParameters": [
        {
          "OutputName": "vej_status",
          "OutputType": "String"
        }
      ]
    },
    {
      "Name": "export-vej-step",
      "Type": "Lambda",
      "Arguments": {
        "vej_arn": {
          "Get": "Steps.vej-processing-step.OutputParameters['vej_arn']"
        },
        "role": {
          "Get": "Parameters.parameter_execution_role"
        },
        "region": {
          "Get": "Parameters.parameter_region"
        },
        "vej_export_config": {
          "Get": "Parameters.parameter_vej_export_config"
        }
      },
      "DependsOn": [
        "vej-callback-step"
      ],
      "FunctionArn": "*FUNCTION_ARN*",
      "OutputParameters": [
        {
          "OutputName": "statusCode",
          "OutputType": "String"
        },
        {
          "OutputName": "vej_arn",
          "OutputType": "String"
        }
      ]
    },
    {
      "Name": "export-vej-callback",
      "Type": "Callback",
      "Arguments": {
        "role": {
          "Get": "Parameters.parameter_execution_role"
        },
        "region": {
          "Get": "Parameters.parameter_region"
        },
        "vej_arn": {
          "Get": "Steps.export-vej-step.OutputParameters['vej_arn']"
        }
      },
      "DependsOn": [
        "export-vej-step"
      ],
      "SqsQueueUrl": {
        "Get": "Parameters.parameter_queue_url"
      },
      "OutputParameters": [
        {
          "OutputName": "statusJob",
          "OutputType": "String"
        }
      ]
    }
  ]
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions