In [20]:
import pandas as pd
import boto3
import json
import os
from botocore.client import Config
import io

s3 = boto3.client('s3')
config = Config(retries = dict(max_attempts = 20),region_name='eu-west-2') # Amazon Textract client 



class ProcessType:
    DETECTION = 1
    ANALYSIS = 2


class DocumentProcessor:
    jobId = ''
    textract = boto3.client('textract', config=config)
    sqs = boto3.client('sqs',config=config)
    sns = boto3.client('sns',config=config)

    roleArn = ''
    bucket = ''
    document = ''

    sqsQueueUrl = ''
    snsTopicArn = ''
    processType = ''
    s3_dir_key = ''
    dest_bucket = ''
    
    def main(self, bucketName, documentName, key_id, dest_bucket, s3_dir_key, process_type="DETECTION"):
        self.roleArn = 'arn:aws:iam::661082688832:role/service-role/AmazonSageMaker-ExecutionRole-20210921T210509'

        self.bucket = bucketName
        self.document = documentName
        self.key_id = key_id
        self.s3_dir_key = s3_dir_key
        self.dest_bucket = dest_bucket
        # self.file_name = file_name

        #self.CreateTopicandQueue()
        if process_type=="DETECTION":
            self.ProcessDocument(ProcessType.DETECTION)
        elif process_type=="ANALYSIS":
            self.ProcessDocument(ProcessType.ANALYSIS)
        else:
            raise Exception(f"process_type can be DETECTION/ANALYSIS, but \"{process_type}\" was passed.")
        #self.DeleteTopicandQueue()

    def ProcessDocument(self, type):
        jobFound = False

        self.processType = type
        validType = False

        # Determine which type of processing to perform
        if self.processType == ProcessType.DETECTION:
            response = self.textract.start_document_text_detection(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}})
            print('Processing type: Detection')
            validType = True

        if self.processType == ProcessType.ANALYSIS:
            response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}},
                                                             FeatureTypes=[
                                                                 "FORMS","LAYOUT"])
            print('Processing type: Analysis')
            validType = True

        if validType == False:
            raise Exception(f"process_type can be DETECTION/ANALYSIS, but \"{process_type}\" was passed.")

        print('Start Job Id: ' + response['JobId'])
        
        
        if self.processType == ProcessType.DETECTION:
            while(self.textract.get_document_text_detection(JobId=str(response['JobId']))["JobStatus"]!="SUCCEEDED"):
                pass
        
        if self.processType == ProcessType.ANALYSIS:
            while(self.textract.get_document_analysis(JobId=str(response['JobId']))["JobStatus"]!="SUCCEEDED"):
                pass
        
        print('Matching Job Found:' + response['JobId'])
        print("storing in S3")
        jobFound = True
        results = self.GetResults(response['JobId'])
        self.StoreInS3(results)


        print('Done!')

    # Store the result in a S3 bucket
    def StoreInS3(self, response):
        print('registering in s3 bucket...')
        outputInJsonText = str(response)
        filename = str(self.key_id).split('/')[-1]
        pdfTextExtractionS3ObjectName = os.path.join(self.s3_dir_key, str(filename) + ".json") 
        pdfTextExtractionS3Bucket = self.dest_bucket

        s3 = boto3.client('s3')

        s3.put_object(Body=outputInJsonText,
                      Bucket= pdfTextExtractionS3Bucket, Key=pdfTextExtractionS3ObjectName)
        print('file ' + pdfTextExtractionS3ObjectName + ' registered successfully!')

    def CreateTopicandQueue(self):

        millis = str(int(round(time.time() * 1000)))

        # Create SNS topic
        snsTopicName = "AmazonTextractTopic" + millis

        topicResponse = self.sns.create_topic(Name=snsTopicName)
        self.snsTopicArn = topicResponse['TopicArn']

        # create SQS queue
        sqsQueueName = "AmazonTextractQueue" + millis
        self.sqs.create_queue(QueueName=sqsQueueName)
        self.sqsQueueUrl = self.sqs.get_queue_url(
            QueueName=sqsQueueName)['QueueUrl']

        attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,
                                                AttributeNames=['QueueArn'])['Attributes']

        sqsQueueArn = attribs['QueueArn']

        # Subscribe SQS queue to SNS topic
        self.sns.subscribe(
            TopicArn=self.snsTopicArn,
            Protocol='sqs',
            Endpoint=sqsQueueArn)

        # Authorize SNS to write SQS queue
        policy = """{{
  "Version":"2012-10-17",
  "Statement":[
    {{
      "Sid":"MyPolicy",
      "Effect":"Allow",
      "Principal" : {{"AWS" : "*"}},
      "Action":"SQS:SendMessage",
      "Resource": "{}",
      "Condition":{{
        "ArnEquals":{{
          "aws:SourceArn": "{}"
        }}
      }}
    }}
  ]
}}""".format(sqsQueueArn, self.snsTopicArn)

        response = self.sqs.set_queue_attributes(
            QueueUrl=self.sqsQueueUrl,
            Attributes={
                'Policy': policy
            })

    def DeleteTopicandQueue(self):
        self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl)
        self.sns.delete_topic(TopicArn=self.snsTopicArn)

    def GetResults(self, jobId):
        maxResults = 1000
        paginationToken = None
        finished = False
        pages = []

        while finished == False:

            response = None

            if self.processType == ProcessType.DETECTION:
                if paginationToken == None:
                    response = self.textract.get_document_text_detection(JobId=jobId,
                                                                         MaxResults=maxResults)
                else:
                    response = self.textract.get_document_text_detection(JobId=jobId,
                                                                         MaxResults=maxResults,
                                                                         NextToken=paginationToken)
                    
                    
            if self.processType == ProcessType.ANALYSIS:
                if paginationToken == None:
                    response = self.textract.get_document_analysis(JobId=jobId,
                                                                         MaxResults=maxResults)
                else:
                    response = self.textract.get_document_analysis(JobId=jobId,
                                                                         MaxResults=maxResults,
                                                                         NextToken=paginationToken)


            # Put response on pages list
            pages.append(response)

            if 'NextToken' in response:
                paginationToken = response['NextToken']
            else:
                finished = True

        # convert pages as JSON pattern
        line_list=[]
        word_counter = 0
        line_counter = 0
        n_pages = (pages[0]["DocumentMetadata"]["Pages"])
  
        for item in pages[0]["Blocks"]:
            if item["BlockType"] == "LINE":
                line_counter = line_counter + 1
                line_list.append(item["Text"])
            if item["BlockType"] == "WORD":
                word_counter = word_counter + 1
                
        rawtext=' '.join(line_list)
        cutoff = min(500,len(rawtext))
        language = "EN"
        # response = clientt.detect_dominant_language(Text=str(rawtext)[:cutoff])
        # language = response["Languages"][0]["LanguageCode"]

        pages = json.dumps(pages)
        return pages

    
    
def process_text_analysis(bucket, document):
    #Get the document from S3
    s3_connection = boto3.resource("s3")
    
    client = boto3.client('s3')
    
    result = client.get_object(Bucket=bucket, Key=document)
    text = result['Body'].read().decode('utf-8')
    res = json.loads(text)
    
    left_cor = []
    top_cor = []
    width_cor = []
    height_cor = []
    page = []

    line_text = []

    for response in res:
        blocks=response["Blocks"]
        for block in blocks:
            if (block["BlockType"] == "WORD"):
                left_cor.append(float("{:.2f}".format(block["Geometry"]["BoundingBox"]["Left"])))
                top_cor.append(float("{:.2f}".format(block["Geometry"]["BoundingBox"]["Top"])))
                width_cor.append(float("{:.2f}".format(block["Geometry"]["BoundingBox"]["Width"])))
                height_cor.append(float("{:.2f}".format(block["Geometry"]["BoundingBox"]["Height"])))
                line_text.append((block["Text"]))
                page.append(block["Page"])
    
    df = pd.DataFrame(list(zip(left_cor,top_cor,width_cor,height_cor,line_text,page)),columns = ["xmin","ymin","width_cor","height_cor","line_text","page"])    
    df["xmax"] = (df["xmin"] + df["width_cor"])
    df["ymax"] = (df["ymin"] + df["height_cor"])
    
    pages = df.page.unique().tolist()
    text_dict = {}
    for p in pages:
        dfp = df[df.page==p]
        txt_list = dfp.line_text.tolist()
        txt = " ".join(txt_list)
        text_dict[p] = txt

    return df,text_dict,res


In [21]:
import boto3

def lambda_handler(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    print("key{---}",key)
    print("bucket_name:----",bucket_name)    
#     s3_dir_key = 'XAAS_Practice/LambdaInputData/json_document'
#     analyzer = DocumentProcessor()
#     key_path = key
#     digest = key.replace(".pdf","")
#     analyzer.main(bucket_name,key_path,digest, bucket_name, s3_dir_key, process_type="ANALYSIS")
    
    prefix = 'XAAS_Practice/LambdaInputData/json_document'

    s3 = boto3.client('s3')
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name,Prefix=prefix)
    ct = 0
    f_list = []
    for page in pages:
        for obj in page['Contents']:
            if ".json" in obj['Key'].lower():
                ct = ct + 1
                f_list.append(obj['Key'])
                  
                
    key_path = f_list[0]
    df,text_dict,res = process_text_analysis(bucket_name, key_path)
    excel_path = key_path.replace(".json",".xlsx")
    print("excel_path",excel_path)
    
    with io.BytesIO() as output:
        with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
            df.to_excel(writer)
        data = output.getvalue()

    s3 = boto3.resource('s3')
    s3.Bucket(bucket_name).put_object(Key=excel_path, Body=data)
    
    
    
    
    

In [22]:
event = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'eu-west-2', 'eventTime': '2024-04-01T07:26:35.788Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AWS:AROAZT24ZJVAAOLMJR6XS:Deelip166585@exlservice.com'}, 'requestParameters': {'sourceIPAddress': '163.116.195.118'}, 'responseElements': {'x-amz-request-id': 'Y6PSHYDBJ4P34JF5', 'x-amz-id-2': 'TsgdR+zyq39sa+Lf0mzrgsivLF837zJJFECXpFBdN0gvfdFgBqO+jK7Z9lqHZ+RjDigMsPigtfxtSarUkqVYye5V1FLGKfmWNSbfbsIyuCU='}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'd05b9c0b-b683-4ffd-9bfa-a80fa81d58b5', 'bucket': {'name': 'deeliptutorial', 'ownerIdentity': {'principalId': 'A3R036IGOPVHI4'}, 'arn': 'arn:aws:s3:::deeliptutorial'}, 'object': {'key': 'XAAS_Practice/LambdaInputData/B0180ME1402570AA_06.pdf', 'size': 1181554, 'eTag': '994943182e73cd53ae9209d77a5bbef8', 'sequencer': '00660A61ABAABAFD86'}}}]}
context = []

In [23]:
lambda_handler(event, context)

key{---} XAAS_Practice/LambdaInputData/B0180ME1402570AA_06.pdf
bucket_name:---- deeliptutorial
excel_path XAAS_Practice/LambdaInputData/json_document/B0180ME1402570AA_06.xlsx
