# Improve accuracy of pdf batch processing with Amazon Textract and Amazon A2I

In this chapter and this accompanying notebook learn with an example on how you can use Amazon Textract in asynchronous mode by extracting content from multiple PDF files in batch, and sending specific content from these PDF documents to an Amazon A2I human review loop to review and modify the values, and send them to an Amazon DynamoDB table for downstream processing. 

**Important Note:** This is an accompanying notebook for Chapter 16 - Improve accuracy of pdf batch processing with Amazon Textract and Amazon A2I from the Natural Language Processing with AWS AI Services book. Please make sure to read the instructions provided in the book prior to attempting this notebook. 

### Step 0 - Create a private human review workforce

This step requires you to use the AWS Console. However, we highly recommend that you follow it, especially when creating your own task with a custom template we will use for this notebook. We will create a private workteam and add only one user (you) to it.

To create a private team:

   1. Go to AWS Console > Amazon SageMaker > Labeling workforces
   1. Click "Private" and then "Create private team".
   1. Enter the desired name for your private workteam.
   1. Enter your own email address in the "Email addresses" section.
   1. Enter the name of your organization and a contact email to administer the private workteam.
   1. Click "Create Private Team".
   1. The AWS Console should now return to AWS Console > Amazon SageMaker > Labeling workforces. Your newly created team should be visible under "Private teams". Next to it you will see an ARN which is a long string that looks like arn:aws:sagemaker:region-name-123456:workteam/private-crowd/team-name. Please copy this ARN to paste in the cell below.
   1. You should get an email from no-reply@verificationemail.com that contains your workforce username and password.
   1. In AWS Console > Amazon SageMaker > Labeling workforces, click on the URL in Labeling portal sign-in URL. Use the email/password combination from Step 8 to log in (you will be asked to create a new, non-default password).
   1. This is your private worker's interface. When we create a verification task in Verify your task using a private team below, your task should appear in this window. You can invite your colleagues to participate in the labeling job by clicking the "Invite new workers" button.

Please refer to the [Amazon SageMaker documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html) if you need more details.

### Step 1 - Import libraries and initiliaze variables

In [None]:
# Step 1 - Cell 1
import urllib
import boto3
import os
import json
import time
import uuid
import sagemaker
import pandas as pd
from sagemaker import get_execution_role
from sagemaker.s3 import S3Uploader, S3Downloader

textract = boto3.client('textract')
s3 = boto3.resource('s3')
bucket = "<S3-bucket-name>"
prefix = 'chapter16/input'
# Enter the Workteam ARN you created from point 7 in Step 0 above
WORKTEAM_ARN= '<your-private-workteam-arn>'

In [None]:
# Step 1 - Cell 2
# Upload the SEC registration documents
s3_client = boto3.client('s3')
for secfile in os.listdir():
    if secfile.endswith('pdf'):
        response = s3_client.upload_file(secfile, bucket, prefix+'/'+secfile)
        print("Uploaded {} to S3 bucket {} in folder {}".format(secfile, bucket, prefix))

### Step 2 - Start Amazon Textract Text Detection Job

In [None]:
# Step 2 - Cell 1
input_bucket = s3.Bucket(bucket)
jobids = {}

In [None]:
# Step 2 - Cell 2
for doc in input_bucket.objects.all():
    if doc.key.startswith(prefix) and doc.key.endswith('pdf'): 
        tres = textract.start_document_text_detection(
            DocumentLocation={
                "S3Object": {
                    "Bucket": bucket,
                    "Name": doc.key
                }
            }
        )
        jobids[doc.key.split('/')[2]] = tres['JobId']

In [None]:
# Step 2 - Cell 3
for j in jobids:
    print("Textract detection Job ID for {} is {}".format(j,str(jobids[j])))

### Step 3 - Get Amazon Textract Text Detection Results

In [None]:
# Step 3 - Cell 1

class TextExtractor():
    def extract_text(self, jobId):
        """ Extract text from document corresponding to jobId and
        generate a list of pages containing the text
        """

        textract_result = self.__get_textract_result(jobId)
        pages = {}
        self.__extract_all_pages(jobId, textract_result, pages, [])
        return pages

    def __get_textract_result(self, jobId):
        """ retrieve textract result with job Id """

        result = textract.get_document_text_detection(
            JobId=jobId
        )
        return result

    def __extract_all_pages(self, jobId, textract_result, pages, page_numbers):
        """ extract page content: build the pages array,
        recurse if response is too big (when NextToken is provided by textract)
        """
        blocks = [x for x in textract_result['Blocks'] if x['BlockType'] == "LINE"]
        content = {}
        line = 0
        for block in blocks:
            line += 1
            content['Text'+str(line)] = block['Text']
            content['Confidence'+str(line)] = block['Confidence']
            if block['Page'] not in page_numbers:
                page_numbers.append(block['Page'])
                pages[block['Page']] = {
                    "Number": block['Page'],
                    "Content": content
                }
            else:
                pages[block['Page']]['Content'] = content
        nextToken = textract_result.get("NextToken", "")
        if nextToken != '':
            textract_result = textract.get_document_text_detection(
                JobId=jobId,
                NextToken=nextToken
            )
            self.__extract_all_pages(jobId,
                                     textract_result,
                                     pages,
                                     page_numbers)

In [None]:
# Step 3 - Cell 2
text_extractor = TextExtractor()
indoc = {}
df_indoc = pd.DataFrame(columns = ['DocName','LineNr','DetectedText','Confidence', 'CorrectedText', 'Comments'])
for x in jobids:
    pages = text_extractor.extract_text(jobids[x])
    contdict =pages[1]['Content']
    for row in range(1,(int(len(contdict)/2))+1):
        df_indoc.loc[len(df_indoc.index)] = [x, row, contdict['Text'+str(row)], round(contdict['Confidence'+str(row)],1),'','']
# Uncomment the line below if you want to review the contents of this dataframe
#df_indoc.to_csv('extract.csv')

In [None]:
# Step 3 - Cell 3
# The lines in each document that are of importance for the human loop to review
bounding_dict = {'lines': '9:11:12:13:15:16:17:18:19:20:21:22:23:24:25'}

In [None]:
# Step 3 - Cell 4
# Let us now create a new dataframe that only contains the subset of lines we need from the bounding_dict
df_newdoc = pd.DataFrame(columns = ['DocName','LineNr','DetectedText','Confidence','CorrectedText','Comments'])
for idx, row in df_indoc.iterrows():
    if str(row['LineNr']) in bounding_dict['lines'].split(':'):
        df_newdoc.loc[len(df_newdoc.index)] = [row['DocName'],row['LineNr'], row['DetectedText'], row['Confidence'], row['CorrectedText'],row['Comments']]
df_newdoc

### Step 4 - Create the Amazon A2I human review Task UI
We will customize a sample tabular template from the Amazon A2I sample Task UI template page - https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [None]:
# Step 4 - Cell 1
# Initialize A2I variables
a2i_prefix = "chapter16/a2i-results"

# Define IAM role
role = get_execution_role()
print("RoleArn: {}".format(role))

timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
# Amazon SageMaker client
sagemaker_client = boto3.client('sagemaker')

# Amazon Augment AI (A2I) client
a2i = boto3.client('sagemaker-a2i-runtime')

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-pdf-docs-' + timestamp

# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-pdf-docs-' + timestamp

# Flow definition outputs
OUTPUT_PATH = f's3://' + bucket + '/' + a2i_prefix

In [None]:
# Step 4 - Cell 2
# We will use the tabular liquid template and customize it for our requirements

template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<style>
  table, tr, th, td {
    border: 1px solid black;
    border-collapse: collapse;
    padding: 5px;
  }
</style>

<crowd-form>
    <div>
        <h1>Instructions</h1>
        <p>Please review the SEC registration form inputs, and make corrections where appropriate. </p>
    </div>
   <div>
      <h3>Original Registration Form - Page 1</h3>
      <classification-target>
        <img style="width: 70%; max-height: 40%; margin-bottom: 10px" src="{{ task.input.image | grant_read_access }}"/>        
      </classification-target>     
   </div>
    <br>
    <h1> Please enter your modifications below </h1>
    <table>
    <tr>
        <th>Line Nr</th>
        <th style="width:500px">Detected Text</th>
        <th style="width:500px">Confidence</th>
        <th>Change Required</th>
        <th style="width:500px">Corrected Text</th>
        <th>Comments</th>
    </tr>
    {% for pair in task.input.document %}

        <tr>
          <td>{{ pair.linenr }}</td>
          <td><crowd-text-area name="predicteddoc{{ pair.linenr }}" value="{{ pair.detectedtext }}"></crowd-text-area></td>
          <td><crowd-text-area name="confidence{{ pair.linenr }}" value="{{ pair.confidence }}"></crowd-text-area></td>
          <td>
            <p>
              <input type="radio" id="agree{{ pair.linenr }}" name="rating{{ pair.linenr }}" value="agree" required>
              <label for="agree{{ pair.linenr }}">Correct</label>
            </p>
            <p>
              <input type="radio" id="disagree{{ pair.linenr }}" name="rating{{ pair.linenr }}" value="disagree" required>
              <label for="disagree{{ pair.linenr }}">Incorrect</label>
            </p>
          </td>
          <td>
            <p>
            <input style="width:500px" rows="3" type="text" name="correcteddoc{{ pair.linenr }}" value="{{pair.detectedtext}}" required/>
            </p>
           </td>
           <td>
            <p>
            <input style="width:500px" rows="3" type="text" name="comments{{ pair.linenr }}" placeholder="Explain why you changed the value"/>
            </p>
           </td>
        </tr>

      {% endfor %}
    </table>
    <br>
    <br>
</crowd-form>
"""

In [None]:
# Step 4 - Cell 3
# Define the method to initialize and create the Task UI
def create_task_ui():
    response = sagemaker_client.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

# create Task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Step 5 - Create the Amazon A2I flow definition

In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:

* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* Where your output data will be stored.

This notebook is going to use the API, but you can optionally create this workflow definition in the console as well.
For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

In [None]:
# Step 5 - Cell 1
create_workflow_definition_response = sagemaker_client.create_flow_definition(
        FlowDefinitionName=flowDefinitionName,
        RoleArn=role,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Review the contents and correct values as indicated",
            "TaskTitle": "SEC Registration Form Review"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Step 5 - Cell 2
for x in range(60):
    describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

### Step 6 - Activate the Amazon A2I flow definition

In [None]:
# Step 6 - Cell 1
# We will display the PDF first page for reference on what is being edited by the human loop
reg_images = {}
for image in os.listdir():
    if image.endswith('png'):
        reg_images[image.split('_')[0]] = S3Uploader.upload(image, 's3://{}/{}'.format(bucket, prefix))

In [None]:
# Step 6 - Cell 2
# Activate human loops for all the three documents. These will be delivered for review sequentially in the Task UI.
# We will also send only low confidence detections to A2I so the human team can update the text for what is should actually be
humanLoopName = {}
docs = df_newdoc.DocName.unique()
# confidence threshold
confidence_threshold = 95
for doc in docs:
    doc_list = []
    humanLoopName[doc] = str(uuid.uuid4())
    for idx, line in df_newdoc.iterrows():
        # Send only those lines whose confidence score is less than threshold
        if line['DocName'] == doc and line['Confidence'] <= confidence_threshold:
            doc_list.append({'linenr': line['LineNr'], 'detectedtext': line['DetectedText'], 'confidence':line['Confidence']})
    ip_content = {"document": doc_list,
              'image': reg_images[doc.split('.')[0]]
             }                
    start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName[doc],
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(ip_content)
            }
        )


In [None]:
# Step 6 - Cell 3
completed_human_loops = []
for doc in humanLoopName:
    resp = a2i.describe_human_loop(HumanLoopName=humanLoopName[doc])
    print(f'HumanLoop Name: {humanLoopName[doc]}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')

In [None]:
# Step 6 - Cell 4
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

In [None]:
# Step 6 - Cell 5
completed_human_loops = []
for doc in humanLoopName:
    resp = a2i.describe_human_loop(HumanLoopName=humanLoopName[doc])
    print(f'HumanLoop Name: {humanLoopName[doc]}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    if resp["HumanLoopStatus"] == "Completed":
          completed_human_loops.append(resp)

In [None]:
# Step 6 - Cell 7
import re
import pandas as pd

for resp in completed_human_loops:
    splitted_string = re.split('s3://' + bucket  + '/', resp['HumanLoopOutput']['OutputS3Uri'])
    output_bucket_key = splitted_string[1]
    response = s3_client.get_object(Bucket=bucket, Key=output_bucket_key)
    content = response["Body"].read()
    json_output = json.loads(content)
    loop_name = json_output['humanLoopName']
    for i in json_output['humanAnswers']:
        x = i['answerContent']
        docname = list(humanLoopName.keys())[list(humanLoopName.values()).index(loop_name)]
        for i, r in df_newdoc.iterrows():
            if r['DocName'] == docname:
                df_newdoc.at[i,'CorrectedText'] = x['correcteddoc'+str(r['LineNr'])] if 'correcteddoc'+str(r['LineNr']) in x else ''
                df_newdoc.at[i,'Comments'] = x['comments'+str(r['LineNr'])] if 'comments'+str(r['LineNr']) in x else ''

In [None]:
# Step 6 - Cell 8
df_newdoc.head(30)

### Step 7 - Save changes to Amazon DynamoDB

In [None]:
# Step 7 - Cell 1
# Create the Amazon DynamoDB table - note that a new DynamoDB table is created everytime you execute this cell

# Get the service resource.
dynamodb = boto3.resource('dynamodb')
tablename = "SEC-registration-"+str(uuid.uuid4())

# Create the DynamoDB table.
table = dynamodb.create_table(
    TableName=tablename,
    KeySchema=[
        {
            'AttributeName': 'row_nr',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'row_nr',
            'AttributeType': 'N'
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)

# Wait until the table exists, this will take a minute or so
table.meta.client.get_waiter('table_exists').wait(TableName=tablename)

# Print out some data about the table.
print("Table successfully created")

In [None]:
# Step 7 - Cell 2
# Load the Amazon DynamoDB table

for idx, row in df_newdoc.iterrows():
    table.put_item(
       Item={
        'row_nr': idx,
        'doc_name': str(row['DocName']) ,
        'line_nr': str(row['LineNr']),
        'detected_line': str(row['DetectedText']),
        'confidence': str(row['Confidence']),   
        'corrected_line': str(row['CorrectedText']),
        'change_comments': str(row['Comments'])   
        }
    )

print("Items were successfully created in DynamoDB table")

### End of Notebook
Please go back to Chapter 16 - Improve accuracy of pdf batch processing with Amazon Textract and Amazon A2I from the Natural Language Processing with AWS AI Services book to proceed further. 