# SageMaker Ground Truth Labeling Jobs with Step Functions

Pre-requisites:

AWS CLI:
```
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
```
SageMaker: `pip install sagemaker`

Step Functions: `pip install stepfunctions`

Set Region

In [None]:
! aws configure set default.region us-east-1

## Guides and Tutorials
* SageMaker Ground Truth [AWS Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/sms.html)
* Step Functions [Introduction](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* Labeling Jobs Step Functions [Reference](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateLabelingJob.html)
* Amazon States Language [Guide](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-amazon-states-language.html)
* AWS Step Functions [Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/)

## Labeling Jobs

### Main inputs needed for a Labeling Job:
* `LabelingJobName`: Job name, only alphanumeric + hyphens allowed
* `LabelAttributeName`: Name of label in the output JSON, prefer using the job name
* `ManifestS3Uri`: S3 Uri holding the input manifest
* `S3OutputPath`: S3 Uri that will hold the output annotation
* `RoleArn`: IAM Role for permissions
* `LabelCategoryConfigS3Uri`: S3 Uri holding a label category file
* `WorkteamArn`: ARN (identifier) of the team that will do the annotations
* `UiTemplateS3Uri`: S3 Uri holding a HTML file that configures the UI
* `PreHumanTaskLambdaArn`: ARN of preprocessing lambda
* `AnnotationConsolidationLambdaArn`: ARN of post-processing lambda
* `TaskTitle`: Title shown on the labels dashboard
* `TaskTimeLimitInSeconds`: How long before a task is put back in the queue
* `TaskAvailabilityLifetimeInSeconds`: Time after which a task is not longer available

More details in the `boto3` SDK [documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_labeling_job)

### Setup
* Create IAM Role
* Create S3 bucket
* Create workforce
* Setup Lambda functions

### Copy the data to S3
`data/<id>` stores each datapoint. Ideally the id should be a hash

In [None]:
import os

In [None]:
s3_bucket = os.getenv('LABELING_S3_BUCKET')
print(s3_bucket)

<font color="red">IMPORTANT!! Setup CORS for the S3 bucket else the labeling will fail!</font>
```
[
    {
        "AllowedHeaders": [],
        "AllowedMethods": [
            "GET"
        ],
        "AllowedOrigins": [
            "*"
        ],
        "ExposeHeaders": []
    }
]
```

In [None]:
!aws s3 cp data/image-1.jpeg s3://{s3_bucket}/labeling/data/1/image.jpeg

In [None]:
!aws s3 cp data/image-2.jpeg s3://{s3_bucket}/labeling/data/2/image.jpeg

### Copy the configuration files to S3

Input manifest

In [None]:
!echo {\"source-ref\": \"s3://$LABELING_S3_BUCKET/labeling/data/1/image.jpeg\", \"id\": \"1\"} > config/dataset-1.manifest
!echo {\"source-ref\": \"s3://$LABELING_S3_BUCKET/labeling/data/2/image.jpeg\", \"id\": \"2\"} >> config/dataset-1.manifest

In [None]:
!aws s3 cp config/dataset-1.manifest s3://{s3_bucket}/labeling/manifests/

Label category file

In [None]:
!aws s3 cp config/label-category-merchantName.json s3://{s3_bucket}/labeling/config/

UI Template

In [None]:
!aws s3 cp config/bbox-default.html s3://{s3_bucket}/labeling/config/

## Step Functions

### Define the Labeling Job (Bounding Box) Step Function

In [None]:
from stepfunctions.steps import *
from stepfunctions.workflow import Workflow
from stepfunctions.steps.fields import Field

import boto3
import json
import datetime
import os

from IPython.display import JSON

In [None]:
account = os.getenv('AWS_ACCOUNT')
print(account)

In [None]:
workflow_execution_role = f'arn:aws:iam::{account}:role/MLOps2021Role'

In [None]:
class BBoxLabelingJobStep(Task):
    def __init__(self, **kwargs):
        
        
        parameters = {
            'LabelingJobName.$': '$$.Execution.Input.labelingJobName',
            'LabelAttributeName.$': '$$.Execution.Input.labelingJobName',
            'InputConfig':{
                'DataSource': {
                    'S3DataSource': {
                        'ManifestS3Uri.$': '$$.Execution.Input.inputManifest'
                    }
                },
                'DataAttributes': {
                    'ContentClassifiers': [
                        'FreeOfPersonallyIdentifiableInformation', 'FreeOfAdultContent',
                    ]
                }
            },
            'OutputConfig':{
                'S3OutputPath.$': '$$.Execution.Input.outputPath'
            },
            'RoleArn': workflow_execution_role,
            'LabelCategoryConfigS3Uri.$': '$$.Execution.Input.labelCategoriesUri',
            'HumanTaskConfig': {
                'WorkteamArn': f'arn:aws:sagemaker:us-east-1:{account}:workteam/private-crowd/mlops',
                'UiConfig': {
                    'UiTemplateS3Uri.$': '$$.Execution.Input.uiTemplateFile'
                },
                'PreHumanTaskLambdaArn': 'arn:aws:lambda:us-east-1:432418664414:function:PRE-BoundingBox',
                'TaskKeywords': [
                    'Bounding Box',
                ],
                'TaskTitle.$': '$$.Execution.Input.labelingJobTitle',
                'TaskDescription.$': '$$.Execution.Input.labelingJobDescription',
                'NumberOfHumanWorkersPerDataObject': 1,
                'TaskTimeLimitInSeconds': 3600,
                'TaskAvailabilityLifetimeInSeconds': 3600*24*10,
                'AnnotationConsolidationConfig': {
                    'AnnotationConsolidationLambdaArn': 'arn:aws:lambda:us-east-1:432418664414:function:ACS-BoundingBox'
                }
            }
        }
        
        kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createLabelingJob.sync'
        kwargs[Field.Parameters.value] = parameters

        super(BBoxLabelingJobStep, self).__init__('Bounding Box Labels', **kwargs)
        

Function that creates the labeling job step and adds a catch step

In [None]:
def create_labeling_job_step(error_step):
    labeling_job = BBoxLabelingJobStep()
    labeling_job.add_catch(Catch(
        error_equals=["States.TaskFailed"],
        next_step=error_step
    ))
    
    return labeling_job

### Define the function to create the post labeling lambda step<br> 
(Note this lambda needs to deployed already)

In [None]:
def create_postlabeling_lambda_step(error_step):
    lambda_state = LambdaStep(
        state_id="Convert SagemakerGT labels",
        parameters={  
            "FunctionName": "mlopsPostLabelingJobProcess",
            "Payload.$": "$$.Execution.Input"
        }
    )

    lambda_state.add_retry(Retry(
        error_equals=["States.TaskFailed"],
        interval_seconds=15,
        max_attempts=2,
        backoff_rate=4.0
    ))

    lambda_state.add_catch(Catch(
        error_equals=["States.TaskFailed"],
        next_step=error_step
    ))
    
    return lambda_state

### Next, define the function to create the error notification Lambda step <br>
(Again this Lambda function must be deployed already)

In [None]:
def create_error_lambda_step():
    lambda_state = LambdaStep(
        state_id="Notify Errors",
        parameters={  
            "FunctionName": "mlopsNotifyErrors",
            "Payload": {
                "Error.$": "$.Error",
                "Source": "Labeling Workflow",
                "Cause.$": "$.Cause"
            }
        }
    )
    
    return Chain([lambda_state, Fail("Labeling Job Failed")])

### Finally a function to put it all together

In [None]:
def make_labeling_path():
    error_step = create_error_lambda_step()
    labeling_job = create_labeling_job_step(error_step)
    post_labeling_lambda = create_postlabeling_lambda_step(error_step)
    
    return Chain([labeling_job, post_labeling_lambda])
    
def make_bbox_workflow(workflow_name):    
    return Workflow(
        name=workflow_name,
        definition=make_labeling_path(),
        role=workflow_execution_role
    )

This is essentially a JSON

In [None]:
workflow = make_bbox_workflow('MLOpsBBoxLabeling')
JSON(json.loads(workflow.definition.to_json()))

### Define the inputs for the Workflow

In [None]:
def get_workflow_inputs(dataset_name, field_name, labeling_job_name):
    timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    
    return {
        'labelingJobName': f'{labeling_job_name}-{field_name}-{timestamp}', 
        'fieldName': field_name,
        'labelCategoriesUri': f's3://{s3_bucket}/labeling/config/label-category-{field_name}.json',
        'inputManifest': f's3://{s3_bucket}/labeling/manifests/{dataset_name}.manifest',
        'outputPath': f's3://{s3_bucket}/labeling/output',
        'uiTemplateFile': f's3://{s3_bucket}/labeling/config/bbox-default.html',
        'labelingJobTitle': f'Bounding Box: {labeling_job_name}-{field_name}',
        'labelingJobDescription': f'Draw bounding boxes around {field_name} - {labeling_job_name}'
    }

## Running the Workflow

### Create the workflow

In [None]:
workflow = make_bbox_workflow('MLOpsBBoxLabeling')
workflow.create()

### Execute the workflow

In [None]:
workflow.execute(inputs=get_workflow_inputs('dataset-1', 'merchantName', 'mlops1'))

Updating the workflow

In [None]:
workflow.update(definition=make_labeling_path())