# Amazon Textract


## 2. Asynchronous operations

Asynchronous operations can also process documents that are in PDF format. Using PDF format files enables you to process multipage documents. 

For asynchronous operations, you need to supply input documents in an **Amazon S3 bucket**.

In [None]:
#Detects text in a document stored in an S3 bucket. Display polygon box around text and angled text 
import boto3
import json
import time
import sys
import textract.util as tu
from io import BytesIO

In [None]:
textract = boto3.client('textract')
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

In [None]:
class ProcessType:
    DETECTION = 1
    ANALYSIS = 2

In [None]:
bucket=''  ## S3 데이터 버킷 정보, 실제 분석한 파일(jpg, png, pdf)를 올리는 장소
test_document = []
roleArn = 'arn:aws:iam::XXXXXXXXXXXXX:role/XXXXXXXXXXXXXX' ## Publishing Role in SNSTopic 
## Check for https://docs.aws.amazon.com/textract/latest/dg/api-async-roles.html

test_document = [
    'data/XXXXXXXX.pdf',
    'data/XXXXXXXX.png',
    'data/XXXXXXXX.jpeg',
    'data/XXXXXXXX.jpg'
]
## Setting up a ProcessType
types=ProcessType.ANALYSIS


The following diagram shows the process for detecting document text in a document image that's stored in an Amazon S3 bucket. In the diagram, an Amazon SQS queue gets the completion status from the Amazon SNS topic.
![](https://docs.aws.amazon.com/textract/latest/dg/images/asynchronous.png)


### 2-1) Asynchronous Document Processor

In [None]:
class DocumentProcessor:
    jobId = ''
    textract = boto3.client('textract')
    sqs = boto3.client('sqs')
    sns = boto3.client('sns')

    roleArn = ''   
    bucket = ''
    document = ''
    
    sqsQueueUrl = ''
    snsTopicArn = ''
    processType = ''


    def __init__(self, role, bucket, document):    
        self.roleArn = role
        self.bucket = bucket
        self.document = document
        self.jobid = ''

 
    def ProcessDocument(self,types):
        jobFound = False
        
        self.processType=types
        validType=False

        #Determine which type of processing to perform
        if self.processType==ProcessType.DETECTION:
            response = self.textract.start_document_text_detection(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}},
                    NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})
            print('Processing type: Detection')
            validType=True        

        
        if self.processType==ProcessType.ANALYSIS:
            response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}},
                FeatureTypes=["TABLES", "FORMS"],
                NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})
            print('Processing type: Analysis')
            validType=True    

        if validType==False:
            print("Invalid processing type. Choose Detection or Analysis.")
            return

        print('Start Job Id: ' + response['JobId'])
        self.jobid = response['JobId']
        dotLine=0
        while jobFound == False:
            sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'],
                                          MaxNumberOfMessages=10)

            if sqsResponse:
                
                if 'Messages' not in sqsResponse:
                    if dotLine<40:
                        print('.', end='')
                        dotLine=dotLine+1
                    else:
                        print()
                        dotLine=0    
                    sys.stdout.flush()
                    time.sleep(5)
                    continue

                for message in sqsResponse['Messages']:
                    notification = json.loads(message['Body'])
                    textMessage = json.loads(notification['Message'])
                    print(textMessage['JobId'])
                    print(textMessage['Status'])
                    if str(textMessage['JobId']) == response['JobId']:
                        print('Matching Job Found:' + textMessage['JobId'])
                        jobFound = True
                        tu.GetResults(textMessage['JobId'],types)
                        self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
                                       ReceiptHandle=message['ReceiptHandle'])
                    else:
                        print("Job didn't match:" +
                              str(textMessage['JobId']) + ' : ' + str(response['JobId']))
                    # Delete the unknown message. Consider sending to dead letter queue
                    self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
                                   ReceiptHandle=message['ReceiptHandle'])

        print('Done!')
        return self.jobid

    


    def CreateTopicandQueue(self):
      
        millis = str(int(round(time.time() * 1000)))

        #Create SNS topic
        snsTopicName="AmazonTextractTopic" + millis

        topicResponse=self.sns.create_topic(Name=snsTopicName)
        self.snsTopicArn = topicResponse['TopicArn']

        #create SQS queue
        sqsQueueName="AmazonTextractQueue" + millis
        self.sqs.create_queue(QueueName=sqsQueueName)
        self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl']
 
        attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,
                                                    AttributeNames=['QueueArn'])['Attributes']
                                        
        sqsQueueArn = attribs['QueueArn']

        # Subscribe SQS queue to SNS topic
        self.sns.subscribe(
            TopicArn=self.snsTopicArn,
            Protocol='sqs',
            Endpoint=sqsQueueArn)

        #Authorize SNS to write SQS queue 
        policy = """{{
                      "Version":"2012-10-17",
                      "Statement":[
                        {{
                          "Sid":"MyPolicy",
                          "Effect":"Allow",
                          "Principal" : {{"AWS" : "*"}},
                          "Action":"SQS:SendMessage",
                          "Resource": "{}",
                          "Condition":{{
                            "ArnEquals":{{
                              "aws:SourceArn": "{}"
                            }}
                          }}
                        }}
                      ]
                    }}""".format(sqsQueueArn, self.snsTopicArn)
 
        response = self.sqs.set_queue_attributes(
            QueueUrl = self.sqsQueueUrl,
            Attributes = {
                'Policy' : policy
            })

    def DeleteTopicandQueue(self):
        self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl)
        self.sns.delete_topic(TopicArn=self.snsTopicArn)
        return self.jobid



### 2-2) Performing ProcessDocument

In [None]:
%%time
analyzers = []
jobids = []
for document in test_document:
    print(document)
    try:
        analyzer = DocumentProcessor(roleArn,bucket,document)
        analyzer.CreateTopicandQueue()
        jobid = analyzer.ProcessDocument(types)
    except Exception as e:
        print("Exception : {}".format(e))
        pass
    finally:
        jobid = analyzer.DeleteTopicandQueue()
        print(jobid)
        analyzers.append(analyzer)
        jobids.append(jobid)

In [None]:
%store test_document
%store jobids
%store bucket
