<h4>Poll SQS queue messages with boto3:</h4>

In [3]:
from aws_assume_role import *
from sqs_queue_poll import *

STS: Assume AWS role and return sqs queue client.

In [5]:
sqs_client = sts_assume_role('sqs')

Helper functions details:

Create SQS queue-

In [6]:
def create_sqs_queue(queue_name):
    """
    output syntax
    {
        'QueueUrl': 'https://us-east-1.queue.amazonaws.com/xxxx/sqs-queue', 
        'ResponseMetadata': {
            'RequestId': 'xxxxxxxxx', 
            'HTTPStatusCode': 200, 
            'HTTPHeaders': {
                'x-amzn-requestid': 'xxxxxxxxx', 
                'date': 'date time', 
                'content-type': 'text/xml', 
                'content-length': '334'
            }
        }
    }

    """
    try:
        sqs_client = sts_assume_role('sqs')
        
        response = sqs_client.create_queue(
            QueueName=queue_name,
            Attributes={
                'DelaySeconds': '0',
                'VisibilityTimeout': '60',  # 60 seconds
                'ReceiveMessageWaitTimeSeconds': '20'  # 20 seconds
            }
        )

        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return True, response['QueueUrl']
            
        return False, None
        
    except Exception as e:
        print(f'ERROR: {traceback.format_exc()}')
        return False, None

Get SQS queue URL if the queue exists-

In [None]:
def get_sqs_queue_url(queue_name):
    """
    output:
    {
        'QueueUrl': 'https://us-east-1.queue.amazonaws.com/xxxx/sqs-queue'
    }
    """
    try:
        sqs_client = sts_assume_role('sqs')
        response = sqs_client.get_queue_url(
            QueueName=queue_name,
        )
        return response['QueueUrl']
        
    except Exception as e:
        print(f'ERROR: {traceback.format_exc()}')
        return ''

Send messages to a valid SQS queue-

In [None]:
def send_message_to_sqs_queue(queue_name, messages):
    """
    messages: list(dict)
    messages = [
        {'key1': 'value', ...},
        {'key2': 'value', ...},
        ...
    ]

    output:
    {
        'MD5OfMessageBody': 'xxxxxxx', 
        'MessageId': 'xxxxxxx', 
        'ResponseMetadata': {
            'RequestId': 'xxxxxxx', 
            'HTTPStatusCode': 200, 
            'HTTPHeaders': {
                'x-amzn-requestid': 'xxxxxxx', 
                'date': 'date time', 
                'content-type': 'text/xml', 
                'content-length': '378'
            }
        }
    }
    """
    try:
        sqs_client = sts_assume_role('sqs')
        queue_url = get_sqs_queue_url(queue_name)
        if not queue_url:
            raise Exception('ERROR: Queue not found')
    
        response = sqs_client.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(messages)
        )
        
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            data = (response['MD5OfMessageBody'], response['MessageId'])
            return True, data
    
        return False, None

    except Exception as e:
        print(f'ERROR: {traceback.format_exc()}')
        return False, None

Long polling a valid SQS queue-

In [None]:
def long_poll_sqs_queue(queue_name):
    """
    output:
    {
        'Messages': [
            {
                'MessageId': 'string',
                'ReceiptHandle': 'string',
                'MD5OfBody': 'string',
                'Body': 'string',
                'Attributes': {
                    'string': 'string'
                },
                'MD5OfMessageAttributes': 'string',
                'MessageAttributes': {
                    'string': {
                        'StringValue': 'string',
                        'BinaryValue': b'bytes',
                        'StringListValues': [
                            'string',
                        ],
                        'BinaryListValues': [
                            b'bytes',
                        ],
                        'DataType': 'string'
                    }
                }
            },
        ]
    }
    """
    try:
        sqs_client = sts_assume_role('sqs')
        queue_url = get_sqs_queue_url(queue_name)
        if not queue_url:
            raise Exception('ERROR: Queue not found')
            
        response = sqs_client.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=20,
        )

        messages = response.get('Messages', [])
        return messages

    except Exception as e:
        print(f'ERROR: {traceback.format_exc()}')
        return []

In [7]:
!jupyter nbconvert --execute --to markdown README.ipynb

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr