# Amazon Augmented AI (Amazon A2I) integration with Amazon Textract's Analyze Document [Example]

Visit https://github.com/aws-samples/amazon-a2i-sample-jupyter-notebooks for all A2I Sample Notebooks


1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
    1. [Workteam](#Workteam)
    2. [Permissions](#Notebook-Permission)
3. [Client Setup](#Client-Setup)
4. [Sample Data](#Sample-Data)
    1. [Download sample images](#Download-sample-images)
    2. [Upload images to S3](#Upload-images-to-S3)
5. [Create Control Plane Resources](#Create-Control-Plane-Resources)
    1. [Create Human Task UI](#Create-Human-Task-UI)
    2. [Create Flow Definition](#Create-Flow-Definition)
6. [Analyze Document with Textract](#Analyze-Document-with-Textract)
6. [Human Loops](#Human-Loops)
    1. [Check Status of Human Loop](#Check-Status-of-Human-Loop)
    2. [Wait For Workers to Complete Task](#Wait-For-Workers-to-Complete-Task)
    3. [Check Status of Human Loop](#Check-Status-of-Human-Loop)
    4. [View Task Results](#View-Task-Results)


## Introduction

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers. 

Amazon A2I provides built-in human review workflows for common machine learning use cases, such as content moderation and text extraction from documents, which allows predictions from Amazon Rekognition and Amazon Textract to be reviewed easily. You can also create your own workflows for ML models built on Amazon SageMaker or any other tools. Using Amazon A2I, you can allow human reviewers to step in when a model is unable to make a high confidence prediction or to audit its predictions on an on-going basis. Learn more here: https://aws.amazon.com/augmented-ai/

In this tutorial, we will show how you can use Amazon A2I directly within your API calls to Textract's Analyze Document API. 

For more in depth instructions, visit https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-getting-started.html

To incorporate Amazon A2I into your human review workflows, you need three resources:

* A **worker task template** to create a worker UI. The worker UI displays your input data, such as documents or images, and instructions to workers. It also provides interactive tools that the worker uses to complete your tasks. For more information, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-instructions-overview.html

* A **human review workflow**, also referred to as a flow definition. You use the flow definition to configure your human workforce and provide information about how to accomplish the human review task. For built-in task types, you also use the flow definition to identify the conditions under which a review human loop is triggered. For example, with Amazon Textract can analyze text in a document using machine learning. You can use the flow definition to specify that a document will be sent to a human for content moderation review if Amazon Textracts's confidence score output is low for any or all pieces of text returned by Textract. You can create a flow definition in the Amazon Augmented AI console or with the Amazon A2I APIs. To learn more about both of these options, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html

* A **human loop** to start your human review workflow. When you use one of the built-in task types, the corresponding AWS service creates and starts a human loop on your behalf when the conditions specified in your flow definition are met or for each object if no conditions were specified. When a human loop is triggered, human review tasks are sent to the workers as specified in the flow definition.

When using a custom task type, you start a human loop using the Amazon Augmented AI Runtime API. When you call StartHumanLoop in your custom application, a task is sent to human reviewers.

### Install Latest SDK

In [1]:
# First, let's get the latest installations of our dependencies
!pip install --upgrade pip
!pip install boto3 --upgrade
!pip install -U botocore

Requirement already up-to-date: pip in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (20.1.1)
Requirement already up-to-date: boto3 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.14.1)
Requirement already up-to-date: botocore in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.17.1)


## Setup
We need to set up the following data:
* `region` - Region to call A2I
* `bucket` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

In [2]:
# Region
REGION = 'us-east-1'

In [3]:
import boto3
import io
import json
import uuid
import botocore
import time
import botocore
from sagemaker import get_execution_role

# Amazon SageMaker client
sagemaker = boto3.client('sagemaker', REGION)
ROLE = get_execution_role()

### Workteam or Workforce

A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers. 

When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job.

To create your Workteam, visit the instructions here: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html

After you have created your workteam, replace YOUR_WORKTEAM_ARN below

In [4]:
WORKTEAM_ARN= "arn:aws:sagemaker:us-east-1:077546553367:workteam/private-crowd/humanflow-team"

#### Setup Bucket and Paths

In [5]:
import boto3
import botocore

BUCKET = 'textract-ocr-unicorn-gym-asean-demo'
OUTPUT_PATH = f's3://{BUCKET}/a2i-results'

In [6]:
OUTPUT_PATH

's3://textract-ocr-unicorn-gym-asean-demo/a2i-results'

### Permissions

The AWS IAM Role used to execute the notebook needs to have the following permissions:

* TextractFullAccess
* SagemakerFullAccess
* S3 Read Access to the bucket listed below
* AmazonSageMakerMechanicalTurkAccess (if using MechanicalTurk as your Workforce)

In [7]:
from sagemaker import get_execution_role

# Setting Role to the default SageMaker Execution Role
ROLE = get_execution_role()
display(ROLE)

'arn:aws:iam::077546553367:role/service-role/AmazonSageMaker-ExecutionRole-20200607T203769'

Visit: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-permissions-security.html to add the necessary permissions to your role

## Client Setup

Here we are going to setup the clients. 

In [8]:
import boto3
import io
import json
import uuid
import botocore
import time
import botocore

# Amazon SageMaker client
sagemaker = boto3.client('sagemaker', REGION)

# Amazon Textract client
textract = boto3.client('textract', REGION)

# S3 client
s3 = boto3.client('s3', REGION)

# A2I Runtime client
a2i_runtime_client = boto3.client('sagemaker-a2i-runtime', REGION)

In [9]:
import pprint

# Pretty print setup
pp = pprint.PrettyPrinter(indent=2)

# Function to pretty-print AWS SDK responses
def print_response(response):
    if 'ResponseMetadata' in response:
        del response['ResponseMetadata']
    pp.pprint(response)

##  Data

#### Specify Human Loop Activation Conditions

Since we are using a built-in integration type for A2I, we can use Human Loop Activation Conditions to provide conditions that trigger a human loop.

Here we are specifying conditions for specific keys in our document. If Textract's confidence falls outside of the thresholds set here, the document will be sent to a human for review, with the specific keys that triggered the human loop prompted to the worker. 

### Analyze Document with Textract

Now that we have setup our Flow Definition, all that's left is calling Textract's Analyze Document API, and including our A2I paramters in the HumanLoopConfig.

## define flow here 

+ 1. create a template (in the formate of liquid html)

+ 2. create a flow 

In [11]:
template1  = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>


  <crowd-bounding-box
    name="annotatedResult"
    src="{{ task.input.taskObject | grant_read_access }}"
    header="Please ensure the correct value is extracted from the receipt"
    labels="['Error Fields','Missing Fields']"
  ></crowd-bounding-box>
  
  <p><label>{{ task.input.date }}</label></p>
  <crowd-input name="date" label="date" ></crowd-input>
  <p><label>{{ task.input.vendor }}</label></p>
  <crowd-input name="vendor" label="vendor" required></crowd-input>
  <p><label>{{ task.input.total }}</label></p>
  <crowd-input name="total" label="total" required></crowd-input>

  <short-instructions>
    Your custom quick instructions and examples
  </short-instructions>

  <full-instructions>
    Your custom detailed instructions and more examples
  </full-instructions>
</crowd-form>
"""

def create_task_ui(task_ui_name):
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=task_ui_name,
        UiTemplate={'Content': template})
    return response

In [12]:
template2 = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
  <img style="max-width: 35vw; max-height: 50vh" src="{{ task.input.taskObject | grant_read_access }}">
  <p><label>{{ task.input.date }}</label></p>
  <crowd-input name="date" label="date" required></crowd-input>
  <p><label>{{ task.input.vendor }}</label></p>
  <crowd-input name="vendor" label="vendor" required ></crowd-input>
  <p><label>{{ task.input.total }}</label></p>
  <crowd-input name="total" label="total" required></crowd-input>

  <short-instructions>
    Your custom quick instructions and examples
  </short-instructions>

  <full-instructions>
    Your custom detailed instructions and more examples
  </full-instructions>
</crowd-form>
"""



In [30]:
def create_task_ui(task_ui_name,template):
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=task_ui_name,
        UiTemplate={'Content': template})
    return response

taskUIName = 'a2i-unicorngym-textract-demo-vv5'
humanTaskUiResponse = create_task_ui(taskUIName,template2)

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
  <img style="max-width: 35vw; max-height: 50vh" src="{{ task.input.taskObject | grant_read_access }}">
  <p><label>{{ task.input.date }}</label></p>
  <crowd-input name="date" label="date" required></crowd-input>
  <crowd-input name="tag1" label="Word/phrase 1"></crowd-input>
  <p><label>{{ task.input.vendor }}</label></p>
  <crowd-input name="vendor" label="vendor" required></crowd-input>
  <p><label>{{ task.input.total }}</label></p>
  <crowd-input name="total" label="total" required></crowd-input>

  <short-instructions>
    Your custom quick instructions and examples
  </short-instructions>

  <full-instructions>
    Your custom detailed instructions and more examples
  </full-instructions>
</crowd-form>
"""

def create_task_ui(task_ui_name):
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=task_ui_name,
        UiTemplate={'Content': template})
    return response

In [31]:
def create_flow_definition(flow_definition_name,humanTaskUiResponse, tasktitle='A2I-UniCornGym-Demo'):
    '''
    Creates a Flow Definition resource

    Returns:
    struct: FlowDefinitionArn
    '''

    response = sagemaker.create_flow_definition(
            FlowDefinitionName= flow_definition_name,
            RoleArn= ROLE,
            HumanLoopConfig= {
                "WorkteamArn": WORKTEAM_ARN,
                "HumanTaskUiArn": humanTaskUiResponse,
                "TaskCount": 1,
                "TaskDescription": tasktitle,
                "TaskTitle": tasktitle
            },
            OutputConfig={
                "S3OutputPath" : OUTPUT_PATH
            }
        )
    
    return response['FlowDefinitionArn']

In [33]:
uniqueId = str(uuid.uuid4())
flowDefinitionName = f'yuan-textract-demo-{uniqueId}' 
flowDefinitionArn = create_flow_definition(flowDefinitionName,humanTaskUiResponse['HumanTaskUiArn'],'A2I-UNICornGym-Demo-v1')
flowDefinitionArn

'arn:aws:sagemaker:us-east-1:077546553367:flow-definition/yuan-textract-demo-daf63a51-f664-424b-872e-033c0a31943d'

In [34]:
uniqueId = str(uuid.uuid4())
human_loop_unique_id = uniqueId + '1'

humanLoopConfig = {
    'FlowDefinitionArn':flowDefinitionArn,
    'HumanLoopName':human_loop_unique_id, 
    'DataAttributes': { 'ContentClassifiers': [ 'FreeOfPersonallyIdentifiableInformation' ]}
}

In [35]:
def analyze_document_with_a2i(document_name, bucket):
    response = textract.analyze_document(
        Document={'S3Object': {'Bucket': bucket, 'Name': document_name}},
        FeatureTypes=["TABLES", "FORMS"], 
        HumanLoopConfig=humanLoopConfig
    )
    return response

In [36]:
import uuid
import boto3
import json

uniqueId = str(uuid.uuid4())
human_loop_unique_id = uniqueId + '1'

# A2I Runtime client
a2i_runtime_client = boto3.client('sagemaker-a2i-runtime', 'us-east-1')

s3_fname='s3://textract-ocr-unicorn-gym-asean-demo/data_extracted/X00016469671.jpg'

inputContent = {
    
    "taskObject": s3_fname,
    "date": "02/01/2019",
    "vendor": "OJC Marketing",
    "total" : "170"
}

a2i_response = a2i_runtime_client.start_human_loop(
    HumanLoopName=human_loop_unique_id,
    FlowDefinitionArn=flowDefinitionArn,
    HumanLoopInput={
        'InputContent': json.dumps(inputContent)
    },
    DataAttributes={
        'ContentClassifiers': [
            'FreeOfPersonallyIdentifiableInformation'
        ]
    }
    
)

print(a2i_response)

{'ResponseMetadata': {'RequestId': '2ec77cd3-c1cc-460b-bcff-2d7e211bbfea', 'HTTPStatusCode': 201, 'HTTPHeaders': {'date': 'Fri, 12 Jun 2020 07:36:35 GMT', 'content-type': 'application/json; charset=UTF-8', 'content-length': '249', 'connection': 'keep-alive', 'x-amzn-requestid': '2ec77cd3-c1cc-460b-bcff-2d7e211bbfea', 'access-control-allow-origin': '*', 'x-amz-apigw-id': 'OASEeFteoAMFRtA=', 'x-amzn-trace-id': 'Root=1-5ee33082-99f1ce60aebed82003905620'}, 'RetryAttempts': 0}, 'HumanLoopArn': 'arn:aws:sagemaker:us-east-1:077546553367:human-loop/77ce39f3-0b6f-4599-9136-eccde9005f2c1'}


## Human Loops
When a document passed to Textract matches the conditions in FlowDefinition, a HumanLoopArn will be present in the response to analyze_document. 

If a _HumanLoopArn_ is present in the _HumanLoopActivationOutput_, we know **a Human Loop has been started**!

### Check Status of Human Loop

In [37]:
all_human_loops_in_workflow = a2i_runtime_client.list_human_loops(FlowDefinitionArn=flowDefinitionArn)['HumanLoopSummaries']

for human_loop in all_human_loops_in_workflow:
    print(f'\nHuman Loop Name: {human_loop["HumanLoopName"]}')
    print(f'Human Loop Status: {human_loop["HumanLoopStatus"]} \n')
    print('\n')



Human Loop Name: 77ce39f3-0b6f-4599-9136-eccde9005f2c1
Human Loop Status: InProgress 





### Wait For Workers to Complete Task

In [38]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!
https://g27hwd6auf.labeling.us-east-1.sagemaker.aws


### Check Status of Human Loop

In [40]:
all_human_loops_in_workflow = a2i_runtime_client.list_human_loops(FlowDefinitionArn=flowDefinitionArn)['HumanLoopSummaries']

completed_loops = []
for human_loop in all_human_loops_in_workflow:
    print(f'\nHuman Loop Name: {human_loop["HumanLoopName"]}')
    print(f'Human Loop Status: {human_loop["HumanLoopStatus"]} \n')
    print('\n')
    if human_loop['HumanLoopStatus'] == 'Completed':
        completed_loops.append(human_loop['HumanLoopName'])



Human Loop Name: 77ce39f3-0b6f-4599-9136-eccde9005f2c1
Human Loop Status: Completed 





### View Task Results  

Once work is completed, Amazon A2I stores results in your S3 bucket and sends a Cloudwatch event. Your results should be available in the S3 OUTPUT_PATH when all work is completed.

In [41]:
import re
import pprint
pp = pprint.PrettyPrinter(indent=2)

def retrieve_a2i_results_from_output_s3_uri(bucket, a2i_s3_output_uri):
    '''
    Gets the json file published by A2I and returns a deserialized object
    '''
    splitted_string = re.split('s3://' +  bucket + '/', a2i_s3_output_uri)
    output_bucket_key = splitted_string[1]

    response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
    content = response["Body"].read()
    return json.loads(content)
    

for human_loop_name in completed_loops:

    describe_human_loop_response = a2i_runtime_client.describe_human_loop(
        HumanLoopName=human_loop_name
    )
    
    print(f'\nHuman Loop Name: {describe_human_loop_response["HumanLoopName"]}')
    print(f'Human Loop Status: {describe_human_loop_response["HumanLoopStatus"]}')
    print(f'Human Loop Output Location: : {describe_human_loop_response["HumanLoopOutput"]["OutputS3Uri"]} \n')
    
    # Uncomment below line to print out a2i human answers
    output = retrieve_a2i_results_from_output_s3_uri(BUCKET, describe_human_loop_response['HumanLoopOutput']['OutputS3Uri'])
#     pp.pprint(output)

    


Human Loop Name: 77ce39f3-0b6f-4599-9136-eccde9005f2c1
Human Loop Status: Completed
Human Loop Output Location: : s3://textract-ocr-unicorn-gym-asean-demo/a2i-results/yuan-textract-demo-daf63a51-f664-424b-872e-033c0a31943d/2020/06/12/07/36/35/77ce39f3-0b6f-4599-9136-eccde9005f2c1/output.json 



## Clean Ups

In [None]:
for human_loop in all_human_loops_in_workflow:
    print(f'\nHuman Loop Name: {human_loop["HumanLoopName"]}')
    if human_loop["HumanLoopStatus"] == 'Stopped':
          a2i_runtime_client.stop_human_loop(HumanLoopName=human_loop["HumanLoopName"])
          a2i_runtime_client.delete_human_loop(HumanLoopName=human_loop["HumanLoopName"])

## The End!