Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Send and receive Pydantic models with AWS SQS

License

Notifications You must be signed in to change notification settings

andrewthetechie/pydantic-sqs

Repository files navigation

pydantic-sqs

Convert your pydantic models to and from AWS SQS messages.

Latest Commit GitHub release (latest by date)
GitHub Workflow Status Test and Lint (branch)
Package version

Main Dependencies

Getting Started

from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os


class ThisModel(SQSModel):
    foo: str = Field(..., description="Foo")


class ThatModel(SQSModel):
    bar: str = Field(..., description="bar")


async def main():
    queue_kwargs = {
        "queue_url": os.environ.get("SQS_QUEUE_URL"),
        "endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
        "use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
    }
    if queue_kwargs["endpoint_url"] is None:
        del queue_kwargs["endpoint_url"]

    queue = SQSQueue(**queue_kwargs)

    queue.register_model(ThisModel)
    queue.register_model(ThatModel)

    this_thing = ThisModel(foo="1234")
    that_thing = ThatModel(bar="5678")
    await this_thing.to_sqs()
    await that_thing.to_sqs()

    new_things = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
    pprint(new_things)
    for thing in new_things:
        await thing.delete_from_queue()

    print("deleted all the messages we got from the queue")
    pprint(new_things)


if __name__ == "__main__":
    asyncio.run(main())

Examples

Examples are in the examples/ directory of this repo.

Installation

Install the package

pip install pydantic-sqs

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide

License

Licensed under the MIT License