Skip to content

Commit

Permalink
support FIFO and standard queues with simple q
Browse files Browse the repository at this point in the history
  • Loading branch information
gridcell committed May 13, 2024
1 parent 904e9c2 commit 7570b49
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
13 changes: 10 additions & 3 deletions src/simpleq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from datetime import datetime
from pickle import dumps, loads

from django.conf import settings

USE_FIFO = True if getattr(settings, "USE_FIFO") == "True" else False


class Job:
"""An abstraction for a single unit of work (a job!)."""
Expand Down Expand Up @@ -44,10 +48,8 @@ def composite_id(self):

@property
def message(self):
return dict(
msg = dict(
MessageAttributes={"id": {"StringValue": self.id, "DataType": "String"}},
MessageDeduplicationId=self.composite_id,
MessageGroupId=self.group,
MessageBody=codecs.encode(
dumps(
{
Expand All @@ -59,6 +61,11 @@ def message(self):
"base64",
).decode(),
)
if USE_FIFO:
msg["MessageDeduplicationId"] = self.composite_id
msg["MessageGroupId"] = self.group

return msg

@classmethod
def from_message(cls, message):
Expand Down
15 changes: 9 additions & 6 deletions src/simpleq/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ def queue(self):
try:
self._queue = self.sqs_resource.get_queue_by_name(QueueName=queue_name)
except self.sqs_resource.meta.client.exceptions.QueueDoesNotExist:
# TODO: leave this for now until we sort out local
queue_attributes = {
"VisibilityTimeout": str(self.SQS_MESSAGE_VISIBILITY),
"FifoQueue": "false",
}
if self.USE_FIFO:
queue_attributes["FifoQueue"] = "true"
queue_attributes["ContentBasedDeduplication"] = "false"

self._queue = self.sqs_resource.create_queue(
QueueName=queue_name,
Attributes={
"VisibilityTimeout": str(self.SQS_MESSAGE_VISIBILITY),
"FifoQueue": "true" if self.USE_FIFO else "false",
"ContentBasedDeduplication": "false",
},
Attributes=queue_attributes,
)

return self._queue
Expand Down

0 comments on commit 7570b49

Please sign in to comment.