# StepFunctions Data Science SDK for BYO Container

Here we demonstrate how to use the StepFunction Data Science SDK to do an end-to-end data science workflow where you bring your own code and deploy a model on SageMaker.

The Steps are as follows:


1/ Create a Lambda function which launches a CodeBuild job that launches the creation of your Docker container. (Steps for this are included separately)

2/ Launch the Lambda function as a Step Functions workflow. 

3/ Once the Docker container is built, launch a SageMaker training job using SF DS SDK.

4/ Use the DS SDK to deploy the trained model.

## TODO: IAM Roles and Permissions

Before running the code, ensure that your Amazon SageMaker notebook IAM role can call the AWS StepFunctions SDK, and vice-versa.

To do this, follow the steps in the Setup Section of this notebook upto the section "Configure Execution Roles": https://github.com/awslabs/amazon-sagemaker-examples/blob/master/step-functions-data-science-sdk/machine_learning_workflow_abalone/machine_learning_workflow_abalone.ipynb


If you have already completed this, then ignore this section and move on to the next.

## Setup

### Add a policy to your SageMaker role in IAM

**If you are running this notebook on an Amazon SageMaker notebook instance**, the IAM role assumed by your notebook instance needs permission to create and run workflows in AWS Step Functions. To provide this permission to the role, do the following.

1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
2. Select **Notebook instances** and choose the name of your notebook instance
3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console
4. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.
5. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**

If you are running this notebook in a local environment, the SDK will use your configured AWS CLI configuration. For more information, see [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html).

Next, create an execution role in IAM for Step Functions. 

### Create an execution role for Step Functions

You need an execution role so that you can create and execute workflows in Step Functions.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/)
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**
4. Choose **Next** until you can enter a **Role name**
5. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**


Attach a policy to the role you created. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need.  

1. Under the **Permissions** tab, click **Add inline policy**
2. Enter the following in the **JSON** tab

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateTransformJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:StopTransformJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DeleteEndpoint",
                "sagemaker:UpdateEndpoint",
                "sagemaker:ListTags",
                "lambda:InvokeFunction",
                "sqs:SendMessage",
                "sns:Publish",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:DescribeTasks",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "batch:SubmitJob",
                "batch:DescribeJobs",
                "batch:TerminateJob",
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:BatchStopJobRun"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        }
    ]
}
```

3. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`
4. Choose **Create policy**. You will be redirected to the details page for the role.
5. Copy the **Role ARN** at the top of the **Summary**

## Import necessary libraries

In [None]:
# Import required libraries and upload the training data to S3
import boto3
import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
import time
import uuid



role = get_execution_role()
sess = sagemaker.Session()
bucket = sess.default_bucket() # or feel free to replace with a bucket of your choosing
WORK_DIRECTORY = 'PennFudanPed'
key = 'BYO-Mask-RCNN'
prefix = '{}/{}'.format(key, WORK_DIRECTORY)
account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name

In [None]:
# TODO: replace the IAM role below with the StepFunctionsWorkflowExecutionRole ARN from the role set up. Generally,
# it looks like this:
workflow_execution_role = "arn:aws:iam::{}:role/StepFunctionsWorkflowExecutionRole".format(account)

In [None]:
# install StepFunctions SDK
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import stepfunctions
import logging
from stepfunctions.steps import (LambdaStep, Retry, Catch, Fail, Chain, TrainingStep, ModelStep, EndpointConfigStep, EndpointStep)
from stepfunctions.workflow import Workflow
from stepfunctions.template.pipeline import TrainingPipeline
from stepfunctions.inputs import ExecutionInput

stepfunctions.set_stream_logger(level=logging.INFO)

## Download training dataset

In [None]:
!wget https://www.cis.upenn.edu/~jshi/ped_html/PennFudanPed.zip

In [None]:
!unzip PennFudanPed.zip

## Upload the training dataset to Amazon S3

In [None]:
data_location = sess.upload_data(WORK_DIRECTORY, bucket=bucket, key_prefix=prefix)
print(data_location)

# Define Estimator and StepFunctions Workflow

In [None]:


# Note that this image name will work below provided you have made the changes to the Environment variables in the Lambda
#defintion as suggested in the workshop Readme. If not please make those first.

image = '{}.dkr.ecr.{}.amazonaws.com/sm-container-maskrcnn:torch'.format(account, region) 

maskrcnn = sagemaker.estimator.Estimator(image,
                       role, 1, 'ml.p2.xlarge', #feel free to modify with your own. A cost estimate is provided in Readme.
                       output_path="s3://{}/{}/output".format(sess.default_bucket(), key),
                       sagemaker_session=sess)

maskrcnn.set_hyperparameters(num_epochs = 1,
                              num_classes = 2)

#maskrcnn.fit(os.path.dirname(data_location))

### Create StepFunction Pipeline

**IMPORTANT** Replace the Lambda function name below with the Lambda function created in the Outputs of CloudFormation

In [None]:
lambda_state = LambdaStep(
    state_id="Calls CodeBuild to Build Container",
    parameters={  
        "FunctionName": "lambda-build-docker-maskrcnn", #TODO: REPLACE with the name of the Lambda function you created
        "Payload": {  
           "input": " "
        }
    }
)

lambda_state.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=15,
    max_attempts=2,
    backoff_rate=4.0
))

lambda_state.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("LambdaTaskFailed")
))

In [None]:
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str,
    'EndpointName': str
})

In [None]:
train_step = TrainingStep(
    'Train Step', 
    estimator=maskrcnn,
#    role=workflow_execution_role,
    data=os.path.dirname(data_location),
    job_name=execution_input['JobName']
)

In [None]:
model_step = ModelStep(
    'Save model',
    model=train_step.get_expected_model(),
    model_name=execution_input['ModelName'] 
)

In [None]:
endpoint_config_step = EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

In [None]:
endpoint_step = EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)

In [None]:
workflow_definition = Chain([
    lambda_state,
    train_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

# Next, we define the workflow
workflow = Workflow(
    name="MyWorkflow-BYOC-MaskRCNN-{}".format(uuid.uuid1().hex),
    definition=workflow_definition,
    role=workflow_execution_role
)

In [None]:
print(workflow.definition.to_json(pretty=True))

In [None]:
workflow.render_graph()

In [None]:
workflow.create()

In [None]:
execution = workflow.execute(
    inputs={
        'JobName': 'BYOC-Mask-RCNN-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'BYOC-Mask-RCNN-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name,
        'EndpointName': 'BYOC-Mask-RCNN-{}'.format(uuid.uuid1().hex) # Each Endpoint requires a unique name,
    }
)

## Watch the progress of your workflow here

In [None]:
from IPython.display import display, display_html
while True:
    display_html(execution.render_progress())
    time.sleep(60)

## Inferences

Once your model is deployed, you can run inferences using this endpoint by using the SageMaker RealTimePredictor API. Please refer to the existing SageMaker documentation for how to do this.

Also to ensure you don't rack up costs, make sure you delete the endpoint once you are done. 

**Important**: Replace the endpoint name below with your endpoint. To find the name, navigate to the SageMaker Console --> Endpoints and look for the name starting with 'BYOC_Mask-RCNN-*******'. **Make sure that your endpoint is up and running before you proceed!**

In [None]:
# Let's take a look at the SageMaker Console to get the endpoint name
endpoint_name = 'BYOC-Mask-RCNN-98f41636a76511eaba5ac98107dec9fb' # TO DO: REPLACE this with your endpoint
from sagemaker.predictor import RealTimePredictor

predictor = sagemaker.predictor.RealTimePredictor(endpoint=endpoint_name)

In [None]:
# Let's take an input image and run inference on it.
from PIL import Image
import numpy as np
import pickle
f = f'{WORK_DIRECTORY}/PNGImages/FudanPed00001.png'
Image.open(f)

In [None]:
import json
img = np.array(Image.open(f))
imginput = json.dumps(img.tolist())

In [None]:
result = predictor.predict(imginput)
prediction = json.loads(result)

In [None]:
Image.fromarray(np.uint8(np.asarray(prediction[0][0])*255))

In [None]:
Image.fromarray(np.uint8(np.asarray(prediction[1][0])*255))

In [None]:
sess.delete_endpoint(predictor.endpoint)