In [20]:
import os
import boto3
import logging
from dotenv import load_dotenv

# load aws credentials
load_dotenv()

True

# Setup Logger

In [21]:
def setup_logger(logger_name, log_file_path, log_level=logging.DEBUG):
    # Create a logger with the specified name
    logger = logging.getLogger(logger_name)
    logger.setLevel(log_level) 

    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler = logging.FileHandler(log_file_path)
    file_handler.setFormatter(formatter)

    # Create a stream handler (console) and set the formatter
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger


logger_name = 'ocr_logger'
log_file_path = 'ocr_logger.log'
logger = setup_logger(logger_name, log_file_path)

# Create SQS Queue and SNS Topic

In [45]:
class SQSAndSNSController:
    """Encapsulates Amazon SNS topic and subscription functions."""

    def __init__(self):
        """
        :param sns_client: A Boto3 Amazon SNS resource.
        """
        aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
        aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
        self.sns_client = boto3.client('sns', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
        self.sqs_client = boto3.client('sqs', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


    def delete_topic(self, topic_arn: str):
        """
        Deletes a topic. All subscriptions to the topic are also deleted.
        """
        try:
            _ = self.sns_client.delete_topic(TopicArn=topic_arn)
            logger.info(f"Deleted topic {topic_arn}")
        except Exception as e:
            logger.exception(f"Couldn't delete topic {topic_arn}, {e}")
            raise


    def create_topic(self, name):
        """
        Creates a notification topic.

        :param name: The name of the topic to create.
        :return: The newly created topic.
        """
        try:
            topic = self.sns_client.create_topic(Name=name)
            topic_arn = topic["TopicArn"]
            logger.info(f"Created topic {name}  with ARN {topic_arn}.")
            return topic_arn

        except Exception as e:
            logger.exception(f"Couldn't create topic {name}, {e}")
            raise


    def delete_queue(self, queue_url: str):
        """
        Removes an SQS queue. When run against an AWS account, it can take up to
        60 seconds before the queue is actually deleted.

        :param queue: The queue to delete.
        :return: None
        """
        try:
            _ = self.sqs_client.delete_queue(QueueUrl=queue_url)
            logger.info(f"Deleted queue with URL={queue_url}")
        except Exception as e:
            logger.exception(f"Couldn't delete queue with URL {queue_url}!, {e}")
            
            raise 


    def create_queue(self, name: str, attributes:dict ={
            "DelaySeconds": "3",
            "VisibilityTimeout": "60" 
        }):
        """
        Creates an Amazon SQS queue.

        :param name: The name of the queue. This is part of the URL assigned to the queue.
        :param attributes: The attributes of the queue, such as maximum message size or
                        whether it's a FIFO queue.
        :return: A Queue object that contains metadata about the queue and that can be used
                to perform queue operations like sending and receiving messages.
        """
        if not attributes:
            attributes = {}

        try:
            queue = self.sqs_client.create_queue(QueueName=name, Attributes=attributes)
            queue_url = queue['QueueUrl']
            logger.info(f"Created queue {name} with URL {queue_url}")
            return queue_url

        except Exception as e:
            logger.exception(f"Couldn't create queue named {name}, {e}")
            raise 

In [47]:
name = "newspaper-ocr"
sns_and_sqs_controller = SQSAndSNSController()
topic_arn = sns_and_sqs_controller.create_topic(name)
queue_url = sns_and_sqs_controller.create_queue(name)

2023-12-24 06:53:13,605 - ocr_logger - INFO - Created topic newspaper-ocr  with ARN arn:aws:sns:us-east-1:222311789433:newspaper-ocr.
2023-12-24 06:53:13,605 - ocr_logger - INFO - Created topic newspaper-ocr  with ARN arn:aws:sns:us-east-1:222311789433:newspaper-ocr.
2023-12-24 06:53:13,839 - ocr_logger - INFO - Created queue newspaper-ocr with URL https://sqs.us-east-1.amazonaws.com/222311789433/newspaper-ocr
2023-12-24 06:53:13,839 - ocr_logger - INFO - Created queue newspaper-ocr with URL https://sqs.us-east-1.amazonaws.com/222311789433/newspaper-ocr


In [48]:
sns_and_sqs_controller.delete_topic(topic_arn)
sns_and_sqs_controller.delete_queue(queue_url)

2023-12-24 06:53:13,969 - ocr_logger - INFO - Deleted topic arn:aws:sns:us-east-1:222311789433:newspaper-ocr
2023-12-24 06:53:13,969 - ocr_logger - INFO - Deleted topic arn:aws:sns:us-east-1:222311789433:newspaper-ocr
2023-12-24 06:53:14,068 - ocr_logger - INFO - Deleted queue with URL=https://sqs.us-east-1.amazonaws.com/222311789433/newspaper-ocr
2023-12-24 06:53:14,068 - ocr_logger - INFO - Deleted queue with URL=https://sqs.us-east-1.amazonaws.com/222311789433/newspaper-ocr


In [1]:
class TextractWrapper:
    """Encapsulates Textract functions."""

    def __init__(self, textract_client, s3_resource, sqs_resource):
        """
        :param textract_client: A Boto3 Textract client.
        :param s3_resource: A Boto3 Amazon S3 resource.
        :param sqs_resource: A Boto3 Amazon SQS resource.
        """
        self.textract_client = textract_client
        self.s3_resource = s3_resource
        self.sqs_resource = sqs_resource


    def start_analysis_job(
        self,
        bucket_name,
        document_file_name,
        feature_types,
        sns_topic_arn,
        sns_role_arn,
    ):
        """
        Starts an asynchronous job to detect text and additional elements, such as
        forms or tables, in an image stored in an Amazon S3 bucket. Textract publishes
        a notification to the specified Amazon SNS topic when the job completes.
        The image must be in PNG, JPG, or PDF format.

        :param bucket_name: The name of the Amazon S3 bucket that contains the image.
        :param document_file_name: The name of the document image stored in Amazon S3.
        :param feature_types: The types of additional document features to detect.
        :param sns_topic_arn: The Amazon Resource Name (ARN) of an Amazon SNS topic
                              where job completion notification is published.
        :param sns_role_arn: The ARN of an AWS Identity and Access Management (IAM)
                             role that can be assumed by Textract and grants permission
                             to publish to the Amazon SNS topic.
        :return: The ID of the job.
        """
        try:
            response = self.textract_client.start_document_analysis(
                DocumentLocation={
                    "S3Object": {"Bucket": bucket_name, "Name": document_file_name}
                },
                NotificationChannel={
                    "SNSTopicArn": sns_topic_arn,
                    "RoleArn": sns_role_arn,
                },
                FeatureTypes=feature_types,
            )
            job_id = response["JobId"]
            logger.info(
                "Started text analysis job %s on %s.", job_id, document_file_name
            )
        except ClientError:
            logger.exception("Couldn't analyze text in %s.", document_file_name)
            raise
        else:
            return job_id


