# Intelligent Document Processing for Document Classification with Human Review

## Introduction
Intelligent Document Processing
Intelligent Document Processing (IDP) is the automation of manual document processing tasks. IDP usually involves using machine learning solutions to automate tasks such as extracting text from images or other legacy documents and performing business processing tasks on extracted text, such as document classification from the content of documents.

AWS Definition of IDP - Intelligent document processing (IDP) is automating the process of manual data entry from paper-based documents or document images to integrate with other digital business processes.

Augmented Intelligence - This is used to improve accuracy of machine learning tasks by including humans verify classification outputs based on rules it minimize misclassification in edge cases


# Document Classification
In this lab we will walk through a hands-on lab on document classification using Amazon Comprehend
Custom Classifier. We will use Amazon Textract to extract the text from documents, label the documents, use text and data for training our Amazon comprehend custom classifier. We use Amazon Comprehend Analysis job to perform batch analysis for our document classification, and sent documents where classification confidence fell below a set threshold for a to human review.

![IDP Classify](./images/IDP-ARC-Diag-2.png)

- [Step 1: Setup notebook and upload sample documents to Amazon S3](#step1)
- [Step 2: Extract text from sample documents using Amazon Textract](#step2)
- [Step 3: Prepare a CSV training dataset for Amazon Comprehend custom classifier training](#step3)
- [Step 4: Create Amazon Comprehend Classification training job](#step4)
- [Step 5: Classify documents with Amazon Comprehend custom classifier](#step5)
- [Step 6: Amazon Augmented AI](#step6)



---

In [None]:
## Install Latest SDK

In [1]:
# First, let's get the latest installations of our dependencies
!pip install --upgrade pip
!pip install boto3 --upgrade
!pip install -U botocore

Collecting pip
  Downloading pip-24.0-py3-none-any.whl.metadata (3.6 kB)
Downloading pip-24.0-py3-none-any.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m39.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.3.2
    Uninstalling pip-23.3.2:
      Successfully uninstalled pip-23.3.2
Successfully installed pip-24.0
Collecting boto3
  Downloading boto3-1.34.56-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.35.0,>=1.34.56 (from boto3)
  Downloading botocore-1.34.56-py3-none-any.whl.metadata (5.7 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Using cached s3transfer-0.10.0-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.34.56-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.34.56-py3-no

## Setup
We need to set up the following data:
* `region` - Region to call A2I
* `bucket` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

### Role and Permissions

The AWS IAM Role used to execute the notebook needs to have the following permissions:

* ComprehendFullAccess
* SagemakerFullAccess
* IAMReadOnlyAccess
* AmazonS3FullAccess
* Inline policy (Comprehend-passrole)

Click in the under `permissions` in the add `Add Permissions` drop down select `Create Inline Policy`
Select `Json` on the `Specify permissions` page and paste the following code
```
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "iam:PassRole",
			"Resource": "arn:aws:iam::*:role/*",
			"Condition": {
				"StringEquals": {
					"iam:PassedToService": [
						"comprehend.amazonaws.com"
					]
				}
			}
		}
	]
}
```
Then click `Next` button
click `Save Changes` On the `Review and Save Changes` page 
You can read this blog to learn more about IAM pass roles https://aws.amazon.com/blogs/security/how-to-use-the-passrole-permission-with-iam-roles/

# Step 1: Setup notebook and upload  sample documents to Amazon S3 <a id="step1"></a>

In this step, we will import some necessary libraries that will be used throughout this notebook. We will then upload all the documents from the `/classification-training` folder to SageMaker's default bucket.

In [3]:
!python -m pip install -q amazon-textract-response-parser --upgrade
!python -m pip install -q amazon-textract-caller --upgrade
!python -m pip install -q amazon-textract-prettyprinter --upgrade

In [4]:
from sagemaker import get_execution_role

# Setting Role to the default SageMaker Execution Role
ROLE = get_execution_role()

from textractcaller.t_call import call_textract, Textract_Features
from textractprettyprinter.t_pretty_print import Textract_Pretty_Print, get_string
from trp import Document

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [None]:
import boto3
import botocore
import sagemaker
import os
import io
import datetime
import pandas as pd
from PIL import Image
from pathlib import Path
import multiprocessing as mp
from IPython.display import Image, display, HTML, JSON

# variables
data_bucket = sagemaker.Session().default_bucket()
region = boto3.session.Session().region_name

os.environ["BUCKET"] = data_bucket
os.environ["REGION"] = region
role = sagemaker.get_execution_role()

print(f"SageMaker role is: {role}\nDefault SageMaker Bucket: s3://{data_bucket}")

s3=boto3.client('s3')
textract = boto3.client('textract', region_name=region)
comprehend=boto3.client('comprehend', region_name=region)


### Download and Unzip the sample data `classification-training.zip`

In [6]:
!curl https://idp-assets-wwso.s3.us-east-2.amazonaws.com/workshop-data/classification-training.zip --output classification-training.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 21.0M  100 21.0M    0     0  46.5M      0 --:--:-- --:--:-- --:--:-- 46.4M


In [7]:
import shutil

try:
    shutil.unpack_archive("./classification-training.zip", extract_dir="classification-training")
    print("Document archive extracted successfully...")
    for path, subdirs, files in os.walk('./classification-training'):
        for name in files:
            if name.startswith('.'):
                hidden = os.path.join(path, name)
                print(f'Removing hidden files/directories: {hidden}')
                os.system(f"rm -rf {hidden}")
        for dirs in subdirs:
            if dirs.startswith('.'):
                if dirs.startswith('.'):
                    hidden = os.path.join(path, dirs)
                    print(f'Removing hidden files/directories: {hidden}')
                    os.system(f"rm -rf {hidden}")
except Exception as e:
    print("Please upload the document zip file classification-training.zip")
    raise e

Document archive extracted successfully...
Removing hidden files/directories: ./classification-training/receipts/.ipynb_checkpoints
Removing hidden files/directories: ./classification-training/invoices/.DS_Store
Removing hidden files/directories: ./classification-training/invoices/.ipynb_checkpoints
Removing hidden files/directories: ./classification-training/bank-statements/.DS_Store
Removing hidden files/directories: ./classification-training/bank-statements/.ipynb_checkpoints


### Upload sample data to S3 bucket

The sample documents are in `/classification-training` directory. For this workshop, we will be using sample bank statements, invoices, and receipts.

In [8]:
# Upload images to S3 bucket:
!aws s3 cp classification-training s3://{data_bucket}/idp/textract --recursive --only-show-errors

### Validate the documents in S3

We will create a small utility function to verify that our documents have been uploaded to the S3 bucket. This function will also be used to collect the document paths (S3 keys) into an array that we will user later to extract text using Amazon Textract.

In [9]:
def get_s3_bucket_items(bucket, prefix, start_after):
    list_items=[]
    
    paginator = s3.get_paginator('list_objects_v2')
    operation_parameters = {'Bucket': bucket,
                            'Prefix': prefix,
                            'StartAfter':start_after}
    page_iterator = paginator.paginate(**operation_parameters)
    for page in page_iterator:
        for item in page['Contents']:
            list_items.append(item['Key'])
    names=list(set([os.path.dirname(x)+'/' for x in list_items]))
    images=[x for x in list_items if x not in names and '.ipynb_checkpoints' not in x ]
    names=[x.replace(prefix,'').strip('/') for x in names if  '.ipynb_checkpoints' not in x]
    return list_items, names, images

list some documents uploaded to S3

In [10]:
docs=[]

train_objects, names, train_images=get_s3_bucket_items(data_bucket, 'idp/textract', 'idp/textract/') 
docs.append(train_images)

if type(docs[0]) is list:
    docs=[item for sublist in docs for item in sublist]
    
names, docs[-10:], docs[:10]

---
# Step 2: Extract text from sample documents using Amazon Textract and label<a id="step2"></a>

In this section we  use Amazon Textract's `detect_document_text` API to extract the raw text information for all the documents in S3. We will also label the data according to the document type. This labeled data will be used to train a custom Amazon Comprehend classifier. We define a utility function that uses the `textract_extract_text` API to extract text from a document and find which category (or directory in S3) it belongs to and then label the data and return an array `[<label>, <document_text>]`. 

In order to extract text from a document using textract we use the `DetectDocumentText` API. You can use the Boto3 version of the API as `textract.detect_document_text`, however in this notebook we will use the `call_textract` tool that we installed earlier in the Notebook ([refer to `amazon-textract-caller`](https://pypi.org/project/amazon-textract-caller/) for more info).

In [11]:
def textract_extract_text(document, bucket=data_bucket):        
    try:
        print(f'Processing document: {document}')
        lines = ""
        row = []
        
        # using amazon-textract-caller
        response = call_textract(input_document=f's3://{bucket}/{document}') 
        # using pretty printer to get all the lines
        lines = get_string(textract_json=response, output_type=[Textract_Pretty_Print.LINES])
        
        label = [name for name in names if(name in document)]  
        row.append(label[0])
        row.append(lines)        
        return row
    except Exception as e:
        print (e)

In [12]:
pool = mp.Pool(mp.cpu_count())
pool_results = [pool.apply_async(textract_extract_text, (document,data_bucket)) for document in docs]
labeled_collection = [res.get() for res in pool_results]
pool.close()

Processing document: idp/textract/bank-statements/bank_stmt_0.png
Processing document: idp/textract/bank-statements/bank_stmt_1.png
Processing document: idp/textract/bank-statements/bank_stmt_10.png
Processing document: idp/textract/bank-statements/bank_stmt_11.png
Processing document: idp/textract/bank-statements/bank_stmt_12.png
Processing document: idp/textract/bank-statements/bank_stmt_13.png
Processing document: idp/textract/bank-statements/bank_stmt_14.png
Processing document: idp/textract/bank-statements/bank_stmt_15.png
Processing document: idp/textract/bank-statements/bank_stmt_16.png
Processing document: idp/textract/bank-statements/bank_stmt_17.png
Processing document: idp/textract/bank-statements/bank_stmt_18.png
Processing document: idp/textract/bank-statements/bank_stmt_19.png
Processing document: idp/textract/bank-statements/bank_stmt_2.png
Processing document: idp/textract/bank-statements/bank_stmt_20.png
Processing document: idp/textract/bank-statements/bank_stmt_21.pn

---
# Step 3: Prepare a CSV training dataset for Amazon Comprehend custom classifier training<a id="step3"></a>

Now that we have text extracted from our documents and have also labeled them, we will create the training data in order to train an [Amazon Comprehend custom classification model](https://docs.aws.amazon.com/comprehend/latest/dg/how-document-classification.html). Let's take a look at the labeled data. We have 100 sample of each document, so we should have about 300 rows of labeled data.

In [16]:
comprehend_df = pd.DataFrame(labeled_collection, columns=['label','document'])
comprehend_df

Unnamed: 0,label,document
0,bank-statements,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
1,bank-statements,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
2,bank-statements,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
3,bank-statements,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
4,bank-statements,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
...,...,...
295,receipts,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."
296,receipts,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."
297,receipts,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."
298,receipts,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."


uploading the training dataset to s3

In [18]:
# Upload Comprehend training data to S3
key='idp/comprehend/comprehend_train_data.csv'

comprehend_df.to_csv("comprehend_train_data.csv", index=False, header=False)
s3.upload_file(Filename='comprehend_train_data.csv', 
               Bucket=data_bucket, 
               Key=key)


---
# Step 4: Create Amazon Comprehend Classification training job <a id="step4"></a>

Once we have a labeled dataset ready we are going to create and train a [Amazon Comprehend custom classification model](https://docs.aws.amazon.com/comprehend/latest/dg/how-document-classification.html) with the dataset.

In [None]:
# Create a document classifier
account_id = boto3.client('sts').get_caller_identity().get('Account')
id = str(datetime.datetime.now().strftime("%s"))

document_classifier_name = 'Sample-Doc-Classifier-IDP'
document_classifier_version = 'Sample-Doc-Classifier-IDP-v1
document_classifier_arn = ''
response = None

try:
    create_response = comprehend.create_document_classifier(
        InputDataConfig={
            'DataFormat': 'COMPREHEND_CSV',
            'S3Uri': f's3://{data_bucket}/{key}'
        },
        DataAccessRoleArn=role,
        DocumentClassifierName=document_classifier_name,
        VersionName=document_classifier_version,
        LanguageCode='en',
        Mode='MULTI_CLASS'
    )
    
    document_classifier_arn = create_response['DocumentClassifierArn']
    
    print(f"Comprehend Custom Classifier created with ARN: {document_classifier_arn}")
except Exception as error:
    if error.response['Error']['Code'] == 'ResourceInUseException':
        print(f'A classifier with the name "{document_classifier_name}" already exists.')
        document_classifier_arn = f'arn:aws:comprehend:{region}:{account_id}:document-classifier/{document_classifier_name}/version/{document_classifier_version}'
        print(f'The classifier ARN is: "{document_classifier_arn}"')
    else:
        print(error)

This job can take ~30 minutes to complete. Once the training job is completed move on to next step.

In [14]:
%store document_classifier_arn


Stored 'document_classifier_arn' (str)


Checking status of classification training job

In [19]:
%%time
# Loop through and wait for the training to complete . Takes up to 10 mins 
from IPython.display import clear_output
import time
from datetime import datetime

jobArn = create_response['DocumentClassifierArn']

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    describe_custom_classifier = comprehend.describe_document_classifier(
        DocumentClassifierArn = jobArn
    )
    status = describe_custom_classifier["DocumentClassifierProperties"]["Status"]
    clear_output(wait=True)
    print(f"{current_time} : Custom document classifier: {status}")
    
    if status == "TRAINED" or status == "IN_ERROR":
        break
        
    time.sleep(60)
    

14:08:21 : Custom document classifier: TRAINED
CPU times: user 310 ms, sys: 33 ms, total: 343 ms
Wall time: 15min 2s


# Step 5: Classify documents with Amazon Comprehend custom classifier <a id="step5"></a>

In this step we will use Amazon Comprehend custom classification model to classify sample documents. We will use `start_document_classification_job` API to launch an asynchronous job to classify the documents. This API supports documents in their native format (PDF/PNG/JPG/TIF) and can use Amazon Textract behind the scenes to read the text from the documents and subsequently determine the document class. Let's start by uploading our sample documents to the S3 bucket

In [None]:
!mkdir samples
!aws s3 cp s3://idp-sample-docs/comprehend/mixedbag ./samples/mixedbag --recursive
!aws s3 cp s3://idp-sample-docs/comprehend/textract ./samples/textract --recursive

In [28]:
!aws s3 cp ./samples s3://{data_bucket}/idp/comprehend --recursive --only-show-errors

In [None]:
import uuid

jobname = f'doc-classification-job-{uuid.uuid1()}'
print(f'Starting Comprehend Classification job {jobname} with model {document_classifier_arn}')

response = comprehend.start_document_classification_job(
    JobName=jobname,
    DocumentClassifierArn=document_classifier_arn,
    InputDataConfig={
        'S3Uri': f's3://{data_bucket}/idp/comprehend/mixedbag/',
        'InputFormat': 'ONE_DOC_PER_FILE',
        'DocumentReaderConfig': {
            'DocumentReadAction': 'TEXTRACT_DETECT_DOCUMENT_TEXT',
            'DocumentReadMode': 'FORCE_DOCUMENT_READ_ACTION'
        }
    },
    OutputDataConfig={
        'S3Uri': f's3://{data_bucket}/idp/comprehend/doc-class-output/'
    },
    DataAccessRoleArn=role
)

response 

## Check status of the classification job

The code block below will check the status of the classification job. If the job completes then it will download the output predictions. The output is a zip file which will contain the inference result for each of the documents being classified. The zip will also contain the output of the Textract operation performed by Amazon Comprehend.

****

In [None]:
%%time
# Loop through and wait for the training to complete . Takes up to 10 mins 
import time
import json
from datetime import datetime
import tarfile
import os

classify_response=response
max_time = time.time() + 3*60*60 # 3 hours
documents=[]

while time.time() < max_time:
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    describe_job = comprehend.describe_document_classification_job(
        JobId=classify_response['JobId']
    )
    status = describe_job["DocumentClassificationJobProperties"]["JobStatus"]

    print(f"{current_time} : Custom document classifier Job: {status}")
    
    if status == "COMPLETED" or status == "FAILED":
        if status == "COMPLETED":
            classify_output_file = describe_job["DocumentClassificationJobProperties"]["OutputDataConfig"]["S3Uri"]
            print(f'Output generated - {classify_output_file}')
            !mkdir -p classification-output
            !aws s3 cp {classify_output_file} ./classification-output
            
            opfile = os.path.basename(classify_output_file)
            # open file
            file = tarfile.open(f'./classification-output/{opfile}')
            # extracting file
            file.extractall('./classification-output')
            file.close()
            
            for file in os.listdir('./classification-output'):
                if file.endswith('.out'):
                    with open(f'./classification-output/{file}', 'r') as f:
                        documents.append(dict(file=file, classification_output=json.load(f)['Classes']))        
        else:
            print("Classification job failed")
            print(describe_job)
        break
        
    time.sleep(10)

In [32]:
classification = []
for doc in documents:
    document = []    
    classes_df = pd.DataFrame(doc['classification_output'])
    result = classes_df.iloc[classes_df['Score'].idxmax()]
    document.extend([doc['file'].replace(".out",""), result.Name, result.Score])    
    classification.append(document)
    
doc_class_df = pd.DataFrame(classification, columns = ['Document', 'DocType', 'Confidence'])
doc_class_df                                                      

Unnamed: 0,Document,DocType,Confidence
0,document_6.png,invoices,1.0
1,document_10.png,receipts,1.0
2,document_7.png,invoices,0.9999
3,document_1.png,bank-statements,1.0
4,document_4.png,bank-statements,1.0
5,document_0.png,bank-statements,1.0
6,document_8.png,receipts,1.0
7,document_5.png,invoices,0.9999
8,document_2.png,bank-statements,1.0
9,document_3.png,bank-statements,1.0


In [33]:
root='idp/comprehend/classified-docs'

def upload_classified_docs(filename,prefix):
    document = os.path.basename(filename)
    key = f'{root}/{prefix}/{document}'
    print(f'Uploading: {filename}...')
    res = s3.upload_file(Filename=f"./samples/mixedbag/{filename}", 
                   Bucket=data_bucket, 
                   Key=key)
    return f'{root}/{prefix}/{document}'

doc_class_df['s3path'] = doc_class_df.apply(lambda row : upload_classified_docs(row['Document'],row['DocType']), axis = 1)

#verify uploads
[objects['Key'] for objects in s3.list_objects(Bucket=data_bucket, Prefix=f"{root}/")['Contents']]

Uploading: document_6.png...
Uploading: document_10.png...
Uploading: document_7.png...
Uploading: document_1.png...
Uploading: document_4.png...
Uploading: document_0.png...
Uploading: document_8.png...
Uploading: document_5.png...
Uploading: document_2.png...
Uploading: document_3.png...
Uploading: document_9.png...


['idp/comprehend/classified-docs/bank-statements/document_0.png',
 'idp/comprehend/classified-docs/bank-statements/document_1.png',
 'idp/comprehend/classified-docs/bank-statements/document_2.png',
 'idp/comprehend/classified-docs/bank-statements/document_3.png',
 'idp/comprehend/classified-docs/bank-statements/document_4.png',
 'idp/comprehend/classified-docs/invoices/document_5.png',
 'idp/comprehend/classified-docs/invoices/document_6.png',
 'idp/comprehend/classified-docs/invoices/document_7.png',
 'idp/comprehend/classified-docs/receipts/document_10.png',
 'idp/comprehend/classified-docs/receipts/document_8.png',
 'idp/comprehend/classified-docs/receipts/document_9.png']

... text

In [34]:
from textractprettyprinter.t_pretty_print import Textract_Pretty_Print, get_string
import json

def get_text(doc):
    with open(f'classification-output/amazon-textract-output/{doc}/1', 'r') as myfile:
        data=myfile.read()
    obj = json.loads(data)
    text = get_string(textract_json=obj, output_type=[Textract_Pretty_Print.LINES])
    return text

doc_class_df['DocText'] = doc_class_df.apply(lambda row : get_text(row['Document']), axis = 1)
doc_class_df

Unnamed: 0,Document,DocType,Confidence,s3path,DocText
0,document_6.png,invoices,1.0,idp/comprehend/classified-docs/invoices/docume...,INVOICE\nAnyCompany Manufacturing\nDATE\nDec 2...
1,document_10.png,receipts,1.0,idp/comprehend/classified-docs/receipts/docume...,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."
2,document_7.png,invoices,0.9999,idp/comprehend/classified-docs/invoices/docume...,"INVOICE\nAnyCompany Hardware\nDATE\nDec 09, 20..."
3,document_1.png,bank-statements,1.0,idp/comprehend/classified-docs/bank-statements...,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
4,document_4.png,bank-statements,1.0,idp/comprehend/classified-docs/bank-statements...,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
5,document_0.png,bank-statements,1.0,idp/comprehend/classified-docs/bank-statements...,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
6,document_8.png,receipts,1.0,idp/comprehend/classified-docs/receipts/docume...,"THE AIML StORE\n1234 SOMEWHERE RD\nPOWAY, CALI..."
7,document_5.png,invoices,0.9999,idp/comprehend/classified-docs/invoices/docume...,INVOICE\nAnyCompany Hardwares LLC\nDATE\nMay 2...
8,document_2.png,bank-statements,1.0,idp/comprehend/classified-docs/bank-statements...,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...
9,document_3.png,bank-statements,1.0,idp/comprehend/classified-docs/bank-statements...,Page 1 of 5 03/02/2022\nDC 1090001004290\nAnyC...


In [None]:
doc_class_df.to_csv('extracted_doc.csv')
#Upload dataframe as csv to S3
s3.upload_file(Filename='extracted_doc.csv', 
               Bucket=data_bucket, 
               Key=f'idp/comprehend/extracted/extracted_doc.csv')

# step 6: Amazon Augmented AI <a id="step6"></a> 

### Create Human Task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

Below we've provided a simple demo template that is compatible with AWS Comprehend's Detect Sentiment API input and response.

For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [38]:
sagemaker = boto3.client('sagemaker', region)
a2i = boto3.client('sagemaker-a2i-runtime')

OUTPUT_PATH = f's3://{data_bucket}/idp/doc-class-output/comprehend-custom-workflow'

In [52]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier
      name="sentiment"
      categories="['invoice', 'receipt', 'bank-statement']"
      initial-value="{{ task.input.initialValue }}"
      header="What sentiment does this text convey?"
    >
      <classification-target>
        {{ task.input.taskObject }}
      </classification-target>
      
      <full-instructions header="Sentiment Analysis Instructions">
        <p><strong>Invoice</strong> If the extracted text is from a Invoice</p>
        <p><strong>Receipt</strong> If the extracted text is from a Receipt</p>
        <p><strong>Bank Statement</strong> If the extracted text is from a Bank Statement</p>
      </full-instructions>

      <short-instructions>
       Choose the primary sentiment that is expressed by the text. 
      </short-instructions>
    </crowd-classifier>
</crowd-form>
"""

def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-comprehend-' + str(uuid.uuid4()) 

# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Creating the Flow Definition

We assume you have already created a workteam for simplicity, you can create a workteam using the AWS console follow this [guide](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-create-private-console.html). Or you can create a cognito Idp to use the API for workteam creation. 
After creating the workteam copy the workteam ARN from `labelling workforces` under `Ground truth` in the SageMaker navigation pane

In [42]:
WORKTEAM_ARN= "<YOUR_WORKTEAM>" 

In [54]:
# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-comprehend-demo-' + str(uuid.uuid4()) 

create_workflow_definition_response = sagemaker.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= ROLE,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Identify the sentiment of the provided text",
            "TaskTitle": "Detect Sentiment of Text"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [55]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

Active
Flow Definition is active


In [56]:
human_loops_started = []
SENTIMENT_SCORE_THRESHOLD = 1
for _, blurb in doc_class_df.iterrows():
    # Call AWS Comprehend's Detect Sentiment API
    response = blurb["Confidence"]
    
    print(f'Processing blurb: \"{blurb["Document"]}\"')
    
    # Our condition for when we want to engage a human for review
    if (response < SENTIMENT_SCORE_THRESHOLD):
    
        humanLoopName = str(uuid.uuid4())
        inputContent = {
            "initialValue": blurb["DocType"][:-1],
            "taskObject": blurb["DocText"]
        }
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(inputContent)
            }
        )
        human_loops_started.append(humanLoopName)
        print(f'SentimentScore of {response}, {blurb["DocType"]} is less than the threshold of {SENTIMENT_SCORE_THRESHOLD}')
        print(f'Starting human loop with name: {humanLoopName}  \n')
    else:
        print(f'SentimentScore of {response}, {blurb["DocType"]} is above threshold of {SENTIMENT_SCORE_THRESHOLD}')
        print('No human loop created. \n')

Processing blurb: "document_6.png"
SentimentScore of 1.0, invoices is above threshold of 1
No human loop created. 

Processing blurb: "document_10.png"
SentimentScore of 1.0, receipts is above threshold of 1
No human loop created. 

Processing blurb: "document_7.png"
SentimentScore of 0.9999, invoices is less than the threshold of 1
Starting human loop with name: 14a2982a-3c74-41c5-a7ae-b7bbce078172  

Processing blurb: "document_1.png"
SentimentScore of 1.0, bank-statements is above threshold of 1
No human loop created. 

Processing blurb: "document_4.png"
SentimentScore of 1.0, bank-statements is above threshold of 1
No human loop created. 

Processing blurb: "document_0.png"
SentimentScore of 1.0, bank-statements is above threshold of 1
No human loop created. 

Processing blurb: "document_8.png"
SentimentScore of 1.0, receipts is above threshold of 1
No human loop created. 

Processing blurb: "document_5.png"
SentimentScore of 0.9999, invoices is less than the threshold of 1
Startin

The task will be completed after a human reviews the documents

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)