# Installation

In [1]:
# !pip install boto3

# Create SQS FIFO Resource

In [2]:
# Initialize SQS Client
import boto3
sqs_client = boto3.client('sqs', region_name='us-east-1') #Replace with your AWS region

In [3]:
# Define Queue Attributes
username = 'sharon'
# queue_name = f'{username}-{hash_key}.fifo' # FIFO queue names must end with '.fifo'
queue_name = f'{username}.fifo'
print(f"Queue will be made with the name: {queue_name}")

attributes = {
    'FifoQueue': 'true',                    # Indicates that the queue is a FIFO queue
    'ContentBasedDeduplication': 'true'     # Enables content-based deduplication
}

Queue will be made with the name: sharon_test.fifo


In [4]:
# create FIFO queue
response = sqs_client.create_queue(
    QueueName=queue_name,
    Attributes=attributes
)

# get url of newly created FIFO queue
queue_url = response['QueueUrl']
print("FIFO queue created successfully with URL:", queue_url)

FIFO queue created successfully with URL: https://sqs.us-east-1.amazonaws.com/160243126421/sharon_test.fifo


# Making function to include all attributes

In [1]:
def create_fifo_queue(queue_name, fifo_queue=True, content_based_deduplication=True,
                      message_retention_period=None, visibility_timeout=None,
                      kms_master_key_id=None, **kwargs):
    """
    Create a FIFO queue in Amazon SQS with customizable attributes.

    Args:
    - queue_name: Name of the FIFO queue to be created (must end with '.fifo').
    - region_name: AWS region where the queue will be created.
    - fifo_queue: Indicates whether the queue is a FIFO queue (default: True).
    - content_based_deduplication: Enables content-based deduplication (default: True).
    - message_retention_period: Message retention period in seconds (optional).
    - visibility_timeout: Visibility timeout in seconds (optional).
    - kms_master_key_id: The ID of the KMS master key used for SSE (optional).
    - **kwargs: Additional custom attributes for the FIFO queue.

    Returns:
    - queue_url: URL of the newly created FIFO queue, or None if queue already exists.
    """

    # Check if the queue already exists
    try:
        response = sqs_client.get_queue_url(QueueName=queue_name)
        queue_url = response['QueueUrl']
        print("Queue '{}' already exists.".format(queue_name))
        return None

    except sqs_client.exceptions.QueueDoesNotExist:
        pass  # Queue doesn't exist, proceed with creation

    # Define default attributes for FIFO queue
    attributes = {
        'FifoQueue': str(fifo_queue).lower(),
        'ContentBasedDeduplication': str(content_based_deduplication).lower()
    }

    # Add optional attributes if provided
    if message_retention_period is not None:
        attributes['MessageRetentionPeriod'] = str(message_retention_period)
    if visibility_timeout is not None:
        attributes['VisibilityTimeout'] = str(visibility_timeout)
    if kms_master_key_id is not None:
        attributes['KmsMasterKeyId'] = str(kms_master_key_id)

    # Add any additional custom attributes
    attributes.update(kwargs)

    # Create the FIFO queue
    response = sqs_client.create_queue(
        QueueName=queue_name,
        Attributes=attributes
    )

    # Get the URL of the newly created FIFO queue
    queue_url = response['QueueUrl']

    return queue_url

In [2]:
# EXAMPLE USAGE

# initialize SQS client
import boto3
region_name = 'us-east-1' # Replace with your AWS region
sqs_client = boto3.client('sqs', region_name=region_name)

queue_name = 'sharon.fifo'
message_retention_period = 86400  # 1 day message retention period
visibility_timeout = 30  # 30 seconds visibility timeout
kms_master_key_id = 'your-kms-master-key-id'  # Replace with your KMS master key ID

# Create FIFO queue with custom attributes
queue_url = create_fifo_queue(queue_name, 
                              message_retention_period=message_retention_period,
                              visibility_timeout=visibility_timeout,
                              kms_master_key_id=kms_master_key_id)
if queue_url:
    print("FIFO queue created successfully with URL:", queue_url)
else:
    print("Failed to create FIFO queue.")


FIFO queue created successfully with URL: https://sqs.us-east-1.amazonaws.com/160243126421/sharon.fifo


In [5]:
# if I try to create fifo with same name:
queue_url = create_fifo_queue(queue_name, 
                              message_retention_period=message_retention_period,
                              visibility_timeout=visibility_timeout,
                              kms_master_key_id=kms_master_key_id)
if queue_url:
    print("FIFO queue created successfully with URL:", queue_url)
else:
    print("Failed to create FIFO queue.")

Queue 'sharon.fifo' already exists.
Failed to create FIFO queue.
