In [None]:
!pip install kfp --upgrade
!pip install sagemaker --upgrade
!which dsl-compile

## Amazon RoboMaker Components for Kubeflow Pipelines
In this example we'll build a Kubeflow pipeline that uses RoboMaker Simulation jobs in conjunction with SageMaker RLEstimator Training jobs
Our simple pipeline will perform:

1. Train an RL model using a RoboMaker Simulation job

The source code for this example comes from various sources:

1. The SageMaker RLEstimator train-objecttracker-ray.py was contributed by https://github.com/raghaprasad
1. The SageMaker sagemaker_rl common libs come from https://github.com/aws/amazon-sagemaker-examples/tree/master/reinforcement_learning/common
1. The output.tar file used to create the RoboMaker Simulation Application is based on the DeepRacer objecttracker example: https://aws-deepracer-tutorial-iros2020.s3.us-east-2.amazonaws.com/output.tar

In [None]:
import kfp
from kfp import components
from kfp import dsl
import time
from sagemaker.rl import RLEstimator, RLToolkit
import random
import string

https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker

In [None]:
robomaker_create_sim_app_op = components.load_component_from_url(
   'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/create_simulation_app/component.yaml'
)

robomaker_sim_job_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/simulation_job/component.yaml'
)

robomaker_delete_sim_app_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/delete_simulation_app/component.yaml'
)

sagemaker_rlestimator_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/rlestimator/component.yaml'
)


In [None]:
import boto3

s3 = boto3.resource('s3')
role = "<your_role_arn>"

#### Upload source code to Amazon S3

In [None]:
bucket_name = "<your_bucket_name>"
s3.meta.client.upload_file("sourcedir.tar.gz", bucket_name, "sagemaker-sources/sourcedir.tar.gz")
print(f"\nUploaded to S3 location: {bucket_name}sagemaker-sources/sourcedir.tar.gz")

s3.meta.client.upload_file("output.tar", bucket_name, "robomaker-sources/output.tar")
print(f"\nUploaded to S3 location: {bucket_name}robomaker-sources/output.tar")


#### Create the pipeline as a Python function


In [None]:
metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY)
security_groups = ["sg-1"]
subnets = [
    "subnet-1",
    "subnet-2",
]

@dsl.pipeline(
    name="SageMaker & RoboMaker pipeline",
    description="SageMaker & RoboMaker Reinforcement Learning job where the jobs work together to train an RL model",
)
def sagemaker_robomaker_rl_job(
    region="us-east-1",
    role=role,
    name="robomaker-pipeline-simulation-application"
    + "".join(random.choice(string.ascii_lowercase) for i in range(10)),
    sources=[
        {
            "s3Bucket": bucket_name,
            "s3Key": "robomaker-sources/output.tar",
            "architecture": "X86_64",
        }
    ],
    simulation_software_name="Gazebo",
    simulation_software_version="9",
    robot_software_name="ROS",
    robot_software_version="Melodic",
    rendering_engine_name="OGRE",
    rendering_engine_version="1.x",
    output_bucket=bucket_name,
    robomaker_output_path="test-output-key",
    vpc_security_group_ids=security_groups,
    vpc_subnets=subnets,
    entry_point="src/train-objecttracker-ray.py",
    source_dir="s3://{}/{}".format(bucket_name, "sagemaker-sources/sourcedir.tar.gz"),
    toolkit="ray",
    toolkit_version="0.8.5",
    framework="tensorflow",
    assume_role=role,
    instance_type="ml.c5.2xlarge",
    instance_count=1,
    output_path="s3://{}/".format(bucket_name),
    job_name="rlestimator-pipeline-"
    + "".join(random.choice(string.ascii_lowercase) for i in range(10)),
    metric_definitions=metric_definitions,
    max_run=300,
    input_bucket_name=bucket_name,
):
    robomaker_create_sim_app = robomaker_create_sim_app_op(
        region=region,
        app_name=name,
        sources=sources,
        simulation_software_name=simulation_software_name,
        simulation_software_version=simulation_software_version,
        robot_software_name=robot_software_name,
        robot_software_version=robot_software_version,
        rendering_engine_name=rendering_engine_name,
        rendering_engine_version=rendering_engine_version,
    ).set_display_name('Create RoboMaker Sim App')
    robomaker_create_sim_app.execution_options.caching_strategy.max_cache_staleness = "P0D"

    rlestimator_training_toolkit_ray = sagemaker_rlestimator_op(
        region=region,
        entry_point=entry_point,
        source_dir=source_dir,
        toolkit=toolkit,
        toolkit_version=toolkit_version,
        framework=framework,
        role=assume_role,
        instance_type=instance_type,
        instance_count=instance_count,
        model_artifact_path=output_path,
        job_name=job_name,
        metric_definitions=metric_definitions,
        max_run=max_run,
        hyperparameters={
            "rl.training.config.lambda": "0.95",
            "robomaker.config.app_arn": robomaker_create_sim_app.outputs["arn"],
            "robomaker.config.num_workers": "3",
            "robomaker.config.packageName": "object_tracker_simulation",
            "robomaker.config.launchFile": "local_client.launch",
            "robomaker.config.policyServerPort": "9000",
            "robomaker.config.iamRole": assume_role,
            "robomaker.config.sagemakerBucket": input_bucket_name,
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
    ).set_display_name('Start RLEstimator Training')
    rlestimator_training_toolkit_ray.execution_options.caching_strategy.max_cache_staleness = "P0D"

    robomaker_simulation_job_1 = robomaker_sim_job_op(
        region=region,
        role=role,
        output_bucket=output_bucket,
        output_path=robomaker_output_path,
        max_run=max_run,
        failure_behavior="Fail",
        sim_app_arn=robomaker_create_sim_app.outputs["arn"],
        sim_app_launch_config={
            "packageName": "object_tracker_simulation",
            "launchFile": "local_client.launch",
            "environmentVariables": {
                "RLCAMP_POLICY_SERVER_PORT": "9000",
                "RLCAMP_SAGEMAKER_BUCKET": input_bucket_name,
                "RLCAMP_SAGEMAKER_JOB_NAME": job_name,
                "RLCAMP_AWS_REGION": region,
            },
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
        use_public_ip="False",
    ).set_display_name('RoboMaker Simulation 1')
    robomaker_simulation_job_1.execution_options.caching_strategy.max_cache_staleness = "P0D"

    robomaker_simulation_job_2 = robomaker_sim_job_op(
        region=region,
        role=role,
        output_bucket=output_bucket,
        output_path=robomaker_output_path,
        max_run=max_run,
        failure_behavior="Fail",
        sim_app_arn=robomaker_create_sim_app.outputs["arn"],
        sim_app_launch_config={
            "packageName": "object_tracker_simulation",
            "launchFile": "local_client.launch",
            "environmentVariables": {
                "RLCAMP_POLICY_SERVER_PORT": "9000",
                "RLCAMP_SAGEMAKER_BUCKET": input_bucket_name,
                "RLCAMP_SAGEMAKER_JOB_NAME": job_name,
                "RLCAMP_AWS_REGION": region,
            },
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
        use_public_ip="False",
    ).set_display_name('RoboMaker Simulation 2')
    robomaker_simulation_job_2.execution_options.caching_strategy.max_cache_staleness = "P0D"

    robomaker_simulation_job_3 = robomaker_sim_job_op(
        region=region,
        role=role,
        output_bucket=output_bucket,
        output_path=robomaker_output_path,
        max_run=max_run,
        failure_behavior="Fail",
        sim_app_arn=robomaker_create_sim_app.outputs["arn"],
        sim_app_launch_config={
            "packageName": "object_tracker_simulation",
            "launchFile": "local_client.launch",
            "environmentVariables": {
                "RLCAMP_POLICY_SERVER_PORT": "9000",
                "RLCAMP_SAGEMAKER_BUCKET": input_bucket_name,
                "RLCAMP_SAGEMAKER_JOB_NAME": job_name,
                "RLCAMP_AWS_REGION": region,
            },
        },
        vpc_security_group_ids=vpc_security_group_ids,
        vpc_subnets=vpc_subnets,
        use_public_ip="False",
    ).set_display_name('RoboMaker Simulation 3')
    robomaker_simulation_job_3.execution_options.caching_strategy.max_cache_staleness = "P0D"

    robomaker_delete_sim_app = robomaker_delete_sim_app_op(
        region=region, arn=robomaker_create_sim_app.outputs["arn"],
    ).after(
        robomaker_simulation_job_1,
        robomaker_simulation_job_2,
        robomaker_simulation_job_3,
        robomaker_create_sim_app,
    ).set_display_name('Delete RoboMaker Sim App')
    robomaker_delete_sim_app.execution_options.caching_strategy.max_cache_staleness = "P0D"

    dsl.get_pipeline_conf().set_image_pull_policy(policy="Always")

#### Compile the Pipeline to a zip file


In [None]:
kfp.compiler.Compiler().compile(sagemaker_robomaker_rl_job,'sagemaker_robomaker_rl_job.zip')

#### Push the Pipeline to KFP, create an Experiment and then run the Experiment


In [None]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='rm-kfp-experiment')

exp_name    = f'sagemaker_robomaker_rl_job-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sagemaker_robomaker_rl_job.zip')