# Environment setup

In [17]:
import re
import csv
import sagemaker
import boto3
import time
import os
import glob
import json
import pandas as pd
import botocore
from botocore.config import Config
from datetime import datetime

from tqdm import tqdm
from tabulate import tabulate
from IPython.display import HTML, display

### Identify resources created by CloudFormation stack

In [None]:
cf_client = boto3.client('cloudformation')
cf_client_response = cf_client.describe_stacks(StackName='invoice-processing')
s3_bucket_name = None
comprehend_role_arn = None
lambda_arn = None

for output in cf_client_response["Stacks"][0]["Outputs"]:
    
    if output["OutputKey"] == "S3BucketName":
        s3_bucket_name = output["OutputValue"]
        
    if output["OutputKey"] == "ComprehendRoleArn":
        comprehend_role_arn = output["OutputValue"]
        
    if output["OutputKey"] == "ComprehendRoleArn":
        comprehend_role_arn = output["OutputValue"]
        
    if output["OutputKey"] == "LambdaFunctionArn":
        lambda_arn = output["OutputValue"]
        
print ('Bucket name: {}'.format(s3_bucket_name))
print ('Comprehend role arn: {}'.format(comprehend_role_arn))
print ('Lambda function arn: {}'.format(lambda_arn))

# Create custom entity recognizer model

### Copy hotel invoices for training the entity recognizer model to S3

In [None]:
s3_client = boto3.client('s3')
for file in tqdm(glob.glob("./dataset/*/*.pdf" ), desc="Copy hotel invoices to S3"):
        
    s3_client.upload_file(file, s3_bucket_name, "/".join(file.split('/')[1:]))

### Extract text from hotel invoices (created for model train) by using Amazon Textract. Copy extracted text to S3.

Extract text from pdf hotel invoices step takes 10-15 minutes.

In [None]:
%%time

# Set retry mode
config = Config(
            retries = {
                      'max_attempts': 5,
                      'mode': 'standard'
            }
)

textract = boto3.client('textract', config=config)

# Create output directory for extracted hotel invoices
output_path = './extracted_text/train'

if not os.path.exists(output_path):
    
  os.makedirs(output_path)
  print ("{} directory created".format(output_path))
    

for file in tqdm([os.path.basename(x) for x in glob.glob("./dataset/train/*.pdf")], desc="Processing hotel invoices"):
    
    print ("File {} will be copied to S3".format(file))
    
    # Start hotel invoice text detection
    response = textract.detect_document_text(
        Document={
            'S3Object': {
                'Bucket': s3_bucket_name,
                'Name': 'dataset/train/{}'.format(file)
            }
        }
    )
    
    extracted_file_name = file.split('/')[-1].replace('pdf','txt')
    extracted_file_name_path = "./{}/{}".format(output_path, extracted_file_name)

    # Save extracted text file locally
    with open(extracted_file_name_path, "w") as extracted_file: 
        for item in response["Blocks"]:
            if item["BlockType"] == "LINE":
                extracted_file.write(item["Text"]+' ')
                    
    
    # Copy extracted text file to S3
    s3_client.upload_file(extracted_file_name_path, s3_bucket_name, 'extracted_text/train/{}'.format(extracted_file_name))
        
    print ("File {} extracted and copied to S3".format(file))
                    

### Create Annotations

In [None]:
# Read guest_entity_list.csv
entities = {}
with open("dataset/guest_entity_list.csv", "r") as f:
    for num, line in enumerate(f.readlines()):
        entity=line.split(',')
        
        if entity[1].strip() not in entities:
        
            entities[entity[1].strip()] = {'INVOICE_DATE': '', 'GUEST_NAME': '', 'INVOICE_NUMBER': ''}
    
        entities[entity[1].strip()][entity[2].strip()] = entity[0]
                   
if not os.path.exists('annotation'):
    
  os.makedirs('annotation')
    
with open("./annotation/annotations.csv", "w", encoding="utf-8") as csv_file:
    
    csv_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
    csv_writer.writerow(["File", "Line", "Begin Offset", "End Offset", "Type"])
    
    for file_path in glob.glob("./extracted_text/train/*"):
        file_name = os.path.basename(file_path)
  
        with open(file_path, "r") as fr:
            for num, line in enumerate(fr.readlines()):
                
                for key, value in entities[file_name.split('.')[0]].items():
                    
                    search=re.search(value, line)
                    if search:
                       
                        csv_writer.writerow([file_name, num, search.start(), search.end(), key])


s3_client.upload_file("./annotation/annotations.csv", s3_bucket_name, 'annotation/annotations.csv')
print ('Annotation file created.')

The annotation file uses an guest entity list that was prepared beforehand to label the location where the guest name appears in the text. 

Below is how it looks like:

In [None]:
annotation = pd.read_csv('annotation/annotations.csv')
annotation.head(10)

### Create the entity recognizer model

We can then use the annotation file along with the training dataset to train a custom entity recognition model in Comprehend.

Creation of the entity recognition model takes 20-30 minutes.

In [None]:
%%time

comprehend_client = boto3.client('comprehend')

recognizer_name = 'invoice-recognizer-{}'.format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

# Submit entity recognition 
create_entity_recognizer_response = comprehend_client.create_entity_recognizer(
    RecognizerName = recognizer_name,
    DataAccessRoleArn=comprehend_role_arn,
    InputDataConfig={
        'EntityTypes': [
            {
                'Type': 'GUEST_NAME'
            },
            {
                'Type': 'INVOICE_NUMBER'
            },
            {
                'Type': 'INVOICE_DATE'
            }
        ],
        'Documents': {
            'S3Uri': 's3://{}/extracted_text/train/'.format(s3_bucket_name),
            'InputFormat': 'ONE_DOC_PER_LINE'
        },
        'Annotations': {
            'S3Uri': 's3://{}/annotation/annotations.csv'.format(s3_bucket_name)
        }
    },
    LanguageCode='en'
)

# Wait till model traning completed
status = describe_entity_recognizer_response = None
while status != 'TRAINED':
    
    describe_entity_recognizer_response = comprehend_client.describe_entity_recognizer(
        EntityRecognizerArn=create_entity_recognizer_response['EntityRecognizerArn']
    )
    
    status = describe_entity_recognizer_response['EntityRecognizerProperties']['Status']
    print('Training Job Status:\t', status)
    if status == 'IN_ERROR':
        print ('ERROR: ', describe_entity_recognizer_response['EntityRecognizerProperties']['Message'])
        break
    
    time.sleep(30)
    
model_arn = describe_entity_recognizer_response['EntityRecognizerProperties']['EntityRecognizerArn']
print ('Entity recognizer model arn: {}'.format(model_arn))
    

Print model evaluation statistics.

In [None]:
RecognizerMetadata = describe_entity_recognizer_response['EntityRecognizerProperties']['RecognizerMetadata']
EvaluationMetrics = describe_entity_recognizer_response['EntityRecognizerProperties']['RecognizerMetadata']['EvaluationMetrics']

table_context = tabulate([["Number Of Trained Documents", "Number Of Test Documents"],
                          [RecognizerMetadata['NumberOfTrainedDocuments'], RecognizerMetadata['NumberOfTestDocuments']]],
                          tablefmt='html'
)

display(HTML(table_context))


table_context = tabulate([["Precision", "Recall", "F1 Score"], [EvaluationMetrics['Precision'], EvaluationMetrics['Recall'], EvaluationMetrics['F1Score']]],
                         tablefmt='html'
)
                           
display(HTML(table_context))


# Test trained hotel invoice recognition model

## Test trained hotel invoice recognition model in batch mode

### Extract text from hotel invoices (created for model test) by using Amazon Textract. Copy extracted text to S3.

In [None]:
%%time

# Set retry mode
config = Config(
            retries = {
                      'max_attempts': 5,
                      'mode': 'standard'
            }
)

textract = boto3.client('textract', config=config)

output_path = './extracted_text/test'

if not os.path.exists(output_path):
    
  os.makedirs(output_path)
  print ("{} directory created".format(output_path))
    
for file in tqdm([os.path.basename(x) for x in glob.glob("./dataset/test/*.pdf")], desc="Processing hotel invoices"):
    
    # Start hotel invoice text detection
    response = textract.detect_document_text(
        Document={
            'S3Object': {
                'Bucket': s3_bucket_name,
                'Name': 'dataset/test/{}'.format(file)
            }
        }
    )
        
    extracted_file_name = file.split('/')[-1].replace('pdf','txt')
    extracted_file_name_path = "./{}/{}".format(output_path, extracted_file_name)

    # Save extracted text file locally
    with open(extracted_file_name_path, "w") as extracted_file: 
        for item in response["Blocks"]:
            if item["BlockType"] == "LINE":
                extracted_file.write(item["Text"]+' ')
                    
                    
    # Copy extracted text file to S3
    s3_client.upload_file(extracted_file_name_path, s3_bucket_name, 'extracted_text/test/{}'.format(extracted_file_name))
    print ("File {} extracted and copied to S3".format(file))

### Start hotel invoice recognition job

Hotel invoice recognition job takes 20-30 minutes.

In [None]:
%%time

jobname = 'invoice-job-{}'.format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

start_entities_detection_job_response = comprehend_client.start_entities_detection_job(
    JobName = jobname,
    InputDataConfig={
         'S3Uri': 's3://{}/extracted_text/test'.format(s3_bucket_name),
         'InputFormat': 'ONE_DOC_PER_FILE',
    },
    OutputDataConfig={
        'S3Uri': 's3://{}/batch-detection/output'.format(s3_bucket_name)
    },
    DataAccessRoleArn=comprehend_role_arn,
    EntityRecognizerArn=model_arn,
    LanguageCode='en'
    
)

# Wait till batch job is completed
status = describe_entity_recognizer_response = None
while status != 'COMPLETED':
    
    describe_entity_recognizer_response = comprehend_client.describe_entities_detection_job(
        JobId = start_entities_detection_job_response['JobId']
    )
    
    status = describe_entity_recognizer_response['EntitiesDetectionJobProperties']['JobStatus']
    print('Detection Job Status:\t', status)
    if status == 'FAILED':
        print ('ERROR: ', describe_entity_recognizer_response['EntitiesDetectionJobProperties']['Message'])
        break
    
    time.sleep(10)
    
# Retrieve the s3 location of the output
detection_job_output_path = describe_entity_recognizer_response['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri']

### Retrieve and check the batch job result

Download the output from the batch job and look at the results.

In [None]:
!aws s3 cp $detection_job_output_path output.tar.gz
!tar -xf output.tar.gz

with open('output', 'r') as text:
    textfile = text.read()
    print(textfile)

## Test trained hotel invoice recognition model in real time mode

### Create Comprehend endpoint

Comprehend endpoint creation step takes 10-15 minutes.

In [None]:
%%time

create_endpoint_response = comprehend_client.create_endpoint(
    EndpointName='invoice-detect-endpoint',
    ModelArn=model_arn,
    DesiredInferenceUnits=1,
    Tags=[
        {
            'Key': 'Project',
            'Value': 'Hotel invoice recognition'
        },
    ]
)

# Wait endpoint creation completed
status = describe_endpoint_response = None
endpoint_arn = create_endpoint_response['EndpointArn']
while status != 'IN_SERVICE':
    
    describe_endpoint_response = comprehend_client.describe_endpoint(
        EndpointArn=endpoint_arn
    )
    
    status = describe_endpoint_response['EndpointProperties']['Status']
    print('Endpoint creation Status:\t', status)
    if status == 'FAILED':
        print ('ERROR: ', describe_endpoint_response['EndpointProperties']['Status'])
        break
    
    time.sleep(30)
    


### Add Lambda function trigger

Lambda function will be trigged for a new S3 object event (in our case, hotel invoices).

In [None]:
bucket_notification = boto3.resource('s3').BucketNotification(s3_bucket_name)
response = bucket_notification.put(
    NotificationConfiguration={'LambdaFunctionConfigurations': [{
        'LambdaFunctionArn': lambda_arn,
        'Events': [
                    's3:ObjectCreated:*'
        ],
        'Filter' : {
            'Key' : {
                'FilterRules' : [{
                    'Name' : 'prefix', 'Value' : 'realtime-detection/invoice'}, {
                    'Name' : 'suffix', 'Value' : '.pdf'}
                ]}}
    },
]})

print ('Lambda function trigger created.')

### Start hotel invoice recognition

Copy hotel invoices for testing into S3 bucket. After S3 object copied, that will trigger Lambda function, which start hotel invoice recognition.

By default, AWS account has the soft limit with one StartDocumentTextDetection job per second. If more that one StartDocumentTextDetection job per second triggered, then throttle error will be raised.
For mitigation, we used 5 seconds delay between each run. In real production environment, request Amazon Textract
quota increase.

In [None]:
for file_path in tqdm(glob.glob("./dataset/test/*.pdf" ), desc="Copy hotel invoices to S3"):
        
    
    s3_client.upload_file(file_path, s3_bucket_name, 'realtime-detection/invoice/{}'.format(os.path.basename(file_path)))
    time.sleep(5)

Check recognition results in the invoice-recognize-output DynamoDB table.

Test trained Comprehend model to recognize changed invoice structure

In [None]:
for file_path in tqdm(glob.glob("./dataset/test_changed/*.pdf" ), desc="Copy hotel invoices to S3"):
        
    
    s3_client.upload_file(file_path, s3_bucket_name, 'realtime-detection/invoice/{}'.format(os.path.basename(file_path)))
    time.sleep(5)

# Cleanup resources

Delete Comprehend endpoint.

In [None]:
%%time

delete_endpoint_response = comprehend_client.delete_endpoint(
    EndpointArn=endpoint_arn
)

# Wait endpoint deletetion
status = describe_endpoint_response = None
endpoint_arn = create_endpoint_response['EndpointArn']
while True:
    
    try:
    
        describe_endpoint_response = comprehend_client.describe_endpoint(
            EndpointArn=endpoint_arn
        )
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ResourceNotFoundException': 
            print('Endpoint deletion Status:\t', 'DELETED')
            break
        else:
            raise
        
    status = describe_endpoint_response['EndpointProperties']['Status']
    print('Endpoint deletion Status:\t', status)
    if status == 'FAILED':
        print ('ERROR: ', describe_endpoint_response['EndpointProperties']['Status'])
        break
    
    time.sleep(30)

Delete trained model.

In [None]:
delete_entity_recognizer_response = comprehend_client.delete_entity_recognizer(
    EntityRecognizerArn=model_arn
)

# Wait trained model deletetion
status = describe_entity_recognizer_response = None
while True:
    
    try:
        describe_entity_recognizer_response = comprehend_client.describe_entity_recognizer(
            EntityRecognizerArn=model_arn
        )
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ResourceNotFoundException': 
            print('Deletion Trained Model Status:\t', 'DELETED')
            break
        else:
            raise
    
    status = describe_entity_recognizer_response['EntityRecognizerProperties']['Status']
    print('Deletion Trained Model Status:\t', status)
    if status == 'IN_ERROR':
        print ('ERROR: ', describe_entity_recognizer_response['EntityRecognizerProperties']['Message'])
        break
    
    time.sleep(30)