# Azure queue

In [37]:
import os

os.chdir("..")
print(os.getcwd())

/workspaces/mastering-python


In [39]:
from dotenv import load_dotenv

load_dotenv(f".env", override=True)

True

In [40]:
from azure.storage.queue import QueueClient, QueueServiceClient

In [41]:
try:
    print("Azure Queue storage - Python quickstart sample")

    # Create a unique name for the queue
    queue_name = "quickstartqueues"

    account_url = "https://sthtiaididevwe01.queue.core.windows.net"

    # Create the QueueClient object
    # We'll use this object to create and interact with the queue
    queue_service_client = QueueServiceClient.from_connection_string(
        conn_str=os.environ["AZURE_STORAGE_ACCOUNT_CONN_STR"]
    )

    # creating the queue
    queue_service_client.create_queue(queue_name)
    queue_client: QueueClient = queue_service_client.get_queue_client(queue_name)
except Exception as ex:
    print("Exception:")
    print(ex)

Azure Queue storage - Python quickstart sample
Exception:
Queue already exists
RequestId:fed69251-d003-0075-72af-1cf61a000000
Time:2025-09-03 08:47:32+00:00
ErrorCode:StorageErrorCode.QUEUE_ALREADY_EXISTS


In [26]:
print("\nAdding messages to the queue...")

# Send several messages to the queue
queue_client.send_message("First message")
queue_client.send_message("Second message")
saved_message = queue_client.send_message("Third message")


Adding messages to the queue...


In [None]:
print("\nPeek at the messages in the queue...")

# Peek at messages in the queue - this doesn't change the message at all
peeked_messages = queue_client.peek_messages(max_messages=5)

for peeked_message in peeked_messages:
    # Display the message
    print("Message: " + peeked_message.content)


Peek at the messages in the queue...
Message: First message
Message: Second message
Message: Third message


In [8]:
print("\nUpdating the third message in the queue...")

# Update a message using the message saved when calling send_message earlier
queue_client.update_message(
    saved_message,
    pop_receipt=saved_message.pop_receipt,
    content="Third message has been updated",
)


Updating the third message in the queue...


{'id': 'f7abce36-942b-4d2d-9d94-1331d249baf3', 'inserted_on': datetime.datetime(2025, 9, 3, 8, 23, 53, tzinfo=datetime.timezone.utc), 'expires_on': datetime.datetime(2025, 9, 10, 8, 23, 53, tzinfo=datetime.timezone.utc), 'dequeue_count': None, 'content': 'Third message has been updated', 'pop_receipt': 'AwAAAAMAAAAAAAAA0yLfQKwc3AEAAAAA', 'next_visible_on': datetime.datetime(2025, 9, 3, 8, 25, 6, tzinfo=datetime.timezone.utc)}

In [28]:
# getting queue length
properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

Message count: 3


In [None]:
print("\nReceiving messages from the queue...")

# Get messages from the queue, once received,
# they are not visible for 5 seconds - but only after you go through them with .by_page()
# default is 30 seconds
messages = queue_client.receive_messages(max_messages=5, visibility_timeout=5)


Receiving messages from the queue...


In [29]:
for msg_batch in messages.by_page():
    for msg in msg_batch:
        pass

In [30]:
print("\nPress Enter key to 'process' messages and delete them from the queue...")
input()

for msg_batch in messages.by_page():  # messages is a lazy iterator, which could not be materialized - since queue uses pagination internally we need to iterate through it this way
    for msg in msg_batch:
        # "Process" the message
        print(msg.content)
        # Let the service know we're finished with
        # the message and it can be safely deleted.
        queue_client.delete_message(msg)


Press Enter key to 'process' messages and delete them from the queue...
First message
Second message
Third message


## Structured queue messages

In [None]:
import json
from dataclasses import asdict, dataclass
from datetime import datetime


@dataclass
class TaskMessage:
    task_id: str
    task_type: str
    priority: int
    payload: dict

    def to_json(self) -> str:
        return json.dumps(asdict(self))

    @classmethod
    def from_json(cls, json_str: str):
        return cls(**json.loads(json_str))


# Send structured messages
tasks = [
    TaskMessage("task-001", "email_notification", 1, {"recipient": "user@example.com"}),
    TaskMessage("task-002", "data_processing", 3, {"file_path": "/data/batch.csv"}),
    TaskMessage("task-003", "report_generation", 2, {"report_type": "monthly"}),
]

for task in tasks:
    queue_client.send_message(task.to_json())

print("✅ Structured messages sent!")

✅ Structured messages sent!


In [36]:
# Process structured messages
messages = queue_client.receive_messages(max_messages=10)

for msg_batch in messages.by_page():
    for msg in msg_batch:
        try:
            task = TaskMessage.from_json(msg.content)
            print(f"📋 {task.task_id}: {task.task_type} (priority {task.priority})")
            queue_client.delete_message(msg)
        except Exception as e:
            print(f"❌ Error: {e}")

📋 task-001: email_notification (priority 1)
📋 task-002: data_processing (priority 3)
📋 task-003: report_generation (priority 2)
