# Augmented AI integration with Amazon Textract's Analyze Document

## Setup environment
We need to set up the following data:
* `bucket` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

In [None]:
import os
import uuid
import time
import boto3
import pprint
import botocore
import sagemaker
from setup_a2i import add_cors_policy
from setup_a2i import update_notebook_role
from setup_a2i import create_task_ui, create_flow_definition, describe_flow_definition
from setup_a2i import retrieve_a2i_results_from_output_s3_uri

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()


# Amazon SageMaker client
sagemaker = boto3.client('sagemaker')
# Amazon Textract client
textract = boto3.client('textract')
# Amazon S3 client
s3 = boto3.client('s3')
# A2I Runtime client
a2i_runtime_client = boto3.client('sagemaker-a2i-runtime')


# !Update notebook role with TextractFullAccess
role_name = role.split('/')[1]
update_notebook_role(role_name=role_name)

## Setup Amazon Augmented AI (A2I)

#### Create a private labelling workforce
A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers. 

When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job.

After you have created your workteam, replace YOUR_WORKTEAM_ARN below

In [None]:
workteam_arn = "<PUT YOUR PRIVATE LABELLING WORKFORACE ARN HERE"

### Create Worker Task Template

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

We are providing a simple demo template that is compatible with AWS Textract's Analyze Document API input and response.

Since we are integrating A2I with Textract, we can create the template in the Console using default templates provided by A2I, to make the process easier (https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-instructions-overview.html). 

To make things easier, the below template string is copied from the defeault template provided by Amazon A2I (found in the SageMaker Console under Worker task templates).

In [None]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
task_ui_name = 'ui-textract-dem0'

# Create task UI
human_task_ui_response = create_task_ui(task_ui_name)
human_task_ui_arn = human_task_ui_response['HumanTaskUiArn']
print(human_task_ui_arn)

### Creating the Review Workflow Definition

In this section, we're going to create a review workflow definition. Flow Definitions allow us to specify:

* The conditions under which your human loop will be called.
* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.
* Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. 

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

First, the bucket you use must have CORS enabled

In [None]:
add_cors_policy(bucket_name=bucket)
a2i_output_path = f's3://{bucket}/a2i-results'

#### Specify Human Loop Activation Conditions
Since we are using a built-in integration type for A2I, we can use Human Loop Activation Conditions to provide conditions that trigger a human loop.

Here we are specifying conditions for specific keys in our document. If Textract's confidence falls outside of the thresholds set here, the document will be sent to a human for review, with the specific keys that triggered the human loop prompted to the worker. 

In [None]:
# Now we are ready to create our Flow Definition!
# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flow_definition_name = f'fd-textract-{str(uuid.uuid4())}' 

flow_definition_arn = create_flow_definition(
    flow_definition_name,
    workteam_arn, 
    human_task_ui_arn, 
    role, 
    a2i_output_path
)

In [None]:
# Describe flow definition - status should be active
for x in range(60):
    describe_flow_definition_response = describe_flow_definition(flow_definition_name)
    print(describe_flow_definition_response['FlowDefinitionStatus'])
    if (describe_flow_definition_response['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

## Analyze Document with Textract

### Upload data to S3
Upload the sample images to your S3 bucket. They will be read by Textract and A2I later when the human task is created.

In [None]:
document = 'document-demo.jpeg'
s3.upload_file(document, bucket, document)

### Launch analysis with human loop
Now that we have setup our Flow Definition, all that's left is calling Textract's Analyze Document API, and including our A2I paramters in the HumanLoopConfig.

In [None]:
human_loop_unique_id = str(uuid.uuid4()) + '1'

human_loop_config = {
    'FlowDefinitionArn': flow_definition_arn,
    'HumanLoopName': human_loop_unique_id, 
    'DataAttributes': { 'ContentClassifiers': [ 'FreeOfPersonallyIdentifiableInformation' ]}
}


def analyze_document_with_a2i(document_name, bucket):
    response = textract.analyze_document(
        Document={'S3Object': {'Bucket': bucket, 'Name': document_name}},
        FeatureTypes=["TABLES", "FORMS"], 
        HumanLoopConfig=human_loop_config
    )
    return response

In [None]:
analyze_document_response = analyze_document_with_a2i(document, bucket)

### Human Loops
When a document passed to Textract matches the conditions in FlowDefinition, a HumanLoopArn will be present in the response to analyze_document. 

If a _HumanLoopArn_ is present in the _HumanLoopActivationOutput_, we know **a Human Loop has been started**!

In [None]:
if 'HumanLoopArn' in analyze_document_response['HumanLoopActivationOutput']:
    # A human loop has been started!
    print(f'A human loop has been started with ARN: {analyze_document_response["HumanLoopActivationOutput"]["HumanLoopArn"]}')

### Check Status of Human Loop

In [None]:
all_human_loops_in_workflow = a2i_runtime_client.list_human_loops(FlowDefinitionArn=flow_definition_arn)['HumanLoopSummaries']

for human_loop in all_human_loops_in_workflow:
    print(f'\nHuman Loop Name: {human_loop["HumanLoopName"]}')
    print(f'Human Loop Status: {human_loop["HumanLoopStatus"]} \n')
    print('\n')

### Wait For Workers to Complete Task

In [None]:
workteam_name = workteam_arn[workteam_arn.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker.describe_workteam(WorkteamName=workteam_name)['Workteam']['SubDomain'])

### Check Status of Human Loop

In [None]:
all_human_loops_in_workflow = a2i_runtime_client.list_human_loops(FlowDefinitionArn=flow_definition_arn)['HumanLoopSummaries']

completed_loops = []
for human_loop in all_human_loops_in_workflow:
    print(f'\nHuman Loop Name: {human_loop["HumanLoopName"]}')
    print(f'Human Loop Status: {human_loop["HumanLoopStatus"]} \n')
    print('\n')
    if human_loop['HumanLoopStatus'] == 'Completed':
        completed_loops.append(human_loop['HumanLoopName'])


### View Task Results  

Once work is completed, Amazon A2I stores results in your S3 bucket and sends a Cloudwatch event. Your results should be available in the S3 a2i_output_path when all work is completed.

In [None]:
pp = pprint.PrettyPrinter(indent=2)
    

for human_loop_name in completed_loops:

    describe_human_loop_response = a2i_runtime_client.describe_human_loop(
        HumanLoopName=human_loop_name
    )
    
    print(f'\nHuman Loop Name: {describe_human_loop_response["HumanLoopName"]}')
    print(f'Human Loop Status: {describe_human_loop_response["HumanLoopStatus"]}')
    print(f'Human Loop Output Location: : {describe_human_loop_response["HumanLoopOutput"]["OutputS3Uri"]} \n')
    
    # Uncomment below line to print out a2i human answers
    output = retrieve_a2i_results_from_output_s3_uri(bucket, describe_human_loop_response['HumanLoopOutput']['OutputS3Uri'])
#     pp.pprint(output)

    

## The End!