Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support to pass the queue url directly #47

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sqs_workers/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sqs_workers.exceptions import SQSError
from sqs_workers.processors import DEFAULT_CONTEXT_VAR, Processor
from sqs_workers.shutdown_policies import NEVER_SHUTDOWN
from sqs_workers.utils import is_queue_url

DEFAULT_MESSAGE_GROUP_ID = "default"
SEND_BATCH_SIZE = 10
Expand Down Expand Up @@ -54,6 +55,17 @@ class GenericQueue(object):
batching_policy: BatchingConfiguration = attr.ib(default=NoBatching())
_queue = attr.ib(repr=False, default=None)

def __attrs_post_init__(self):
"""
Change the attributes name and _queue if the name is an SQS queue url

This will avoid the use of sqs_resource.get_queue_by_name in get_queue method
"""
if is_queue_url(self.name):
queue_url = self.name
self.name = queue_url.split("/")[-1]
self._queue = self.env.sqs_resource.Queue(queue_url)

@classmethod
def maker(cls, **kwargs):
return partial(cls, **kwargs)
Expand Down
9 changes: 9 additions & 0 deletions sqs_workers/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib
import logging
import re
from inspect import Signature
from typing import Any

Expand Down Expand Up @@ -129,3 +130,11 @@ def ensure_string(obj: Any, encoding="utf-8", errors="strict") -> str:
return obj.decode(encoding, errors)
else:
return str(obj)


def is_queue_url(queue_name: str):
"""
Return true if the queue_name variable is an SQS queue url
"""
sqs_queue_regex = r"(http|https)[:][\/]{2}[a-zA-Z0-9-_:.]+[\/][0-9]{12}[\/]{1}[a-zA-Z0-9-_]{0,80}"
return True if re.search(sqs_queue_regex, queue_name) else False
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def queue_name2(sqs_session, sqs, random_string):
delete_queue(sqs, random_string + "_2")


@pytest.fixture
def queue_url(sqs_session, sqs, random_string):
queue_url = create_standard_queue(sqs, random_string)
yield queue_url
delete_queue(sqs, random_string)


@pytest.fixture
def queue_name_with_redrive(sqs_session, sqs, random_string):
# dead letter queue_name
Expand Down
12 changes: 12 additions & 0 deletions tests/test_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ def sqs_codec(sqs, codec):
return codec


def test_create_queue_with_queue_url(sqs_session, sqs, queue_url):
if isinstance(sqs_session, MemorySession):
pytest.skip("MemorySession doesn't produce a SQS compatible queue_url")

expected_name = queue_url.split("/")[-1]
expected_private_queue = sqs.sqs_resource.Queue(queue_url)

queue = sqs.queue(queue_url)
assert queue.name == expected_name
assert queue._queue == expected_private_queue


def test_add_job_with_codec(sqs, queue_name, codec):
codec_name, codec_cls = codec

Expand Down
16 changes: 16 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
bind_arguments,
instantiate_from_dict,
instantiate_from_string,
is_queue_url,
string_to_object,
validate_arguments,
)
Expand Down Expand Up @@ -103,3 +104,18 @@ def foo(a, /, b, c, *, d):
"d": 4,
},
)


@pytest.mark.parametrize(
"queue_name,expected",
[
("https://sqs.us-east-1.amazonaws.com/177715257436", False),
("https://sqs.us-east-1.amazonaws.com/1/MyQueue", False),
("https://sqs.us-east-1.amazonaws.com/MyQueue", False),
("http://localhost:9324/000000000000/sqs_workers_tests_20231213_gkzk7rh0ca", True),
("https://localhost:9324/000000000000/sqs_workers_tests_20231213_gkzk7rh0ca", True),
("https://sqs.us-east-1.amazonaws.com/177715257436/MyQueue", True),
],
)
def test_is_queue_url(queue_name, expected):
assert expected == is_queue_url(queue_name)