-
-
Notifications
You must be signed in to change notification settings - Fork 922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQS - Gossip not supported #1337
base: main
Are you sure you want to change the base?
Conversation
@@ -314,6 +322,10 @@ def _new_queue(self, queue, **kwargs): | |||
"""Ensure a queue with given name exists in SQS.""" | |||
if not isinstance(queue, str): | |||
return queue | |||
|
|||
if queue.endswith('reply-celery-pidbox'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right place to put this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
considering the following stack trace, yes:
File "kombu/transport/virtual/base.py", line 528, in queue_declare self._new_queue(queue, **kwargs) File "kombu/transport/SQS.py", line 233, in _new_queue raise UndefinedQueueException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd guess that @thedrow was questioning whether kombu should be aware of things like this queue suffix which are specific to celery. It feels like a special case. Is there any other property of the queue args or channel instance we can check to reach the same conclusion? Maybe we instead need some property on the class like does_ping_mean_anything_for_this_type_of_channel() -> bool
?
Bumping this since it's on the in progress list for 5.1.0 still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maybe-sybr I think this is a relatively narrow use-case, where the generic use-case you are suggesting is too broad. I don't see a place where this property will be used besides with SQS broker.
About the transport layer to be aware of the context - I think it's inevitable. The alternative is making Celery aware of SQS specifics, which is the exact same problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transports have capabilities as far as I recall.
We can add a new one instead and let celery handle raising the error.
I think that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thedrow Can you give me some hints around where would you like me to implement this? I don't have an idea where to start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 133 to 137 in 3b6cd13
default_transport_capabilities = Implements( | |
asynchronous=False, | |
exchange_type=frozenset(['direct', 'topic', 'fanout', 'headers']), | |
heartbeats=False, | |
) |
Lines 858 to 861 in 3b6cd13
implements = virtual.Transport.implements.extend( | |
asynchronous=True, | |
exchange_type=frozenset(['direct']), | |
) |
I'd assume a new key in the default_transport_capabilities
set to True
so we don't break any existing working behaviours, having that same key set to False
in the SQS.Transport
, and then some logic in celery to avoid the ping/gossip behaviour if the transport has that implements
key set to something falsey? Perhaps something like implements['multiple_queues']
?
Alternatively, maybe the fact that SQS.Transport
only supports the 'direct'
exchange type is a valid proxy for this limitation already? We could potentially do something like:
if transport.implements.get('exchange_type', set()) - {'direct'}:
# support more than just direct queues -> attempt gossip
do_gossip_stuff()
or something like that? Maybe that check could be a fallback for if a new implements
key which gets added per the paragraph above is missing (to handle celery upgrades with kombu having been held back)?
caveat emptor: I have literally never touched this code so I'm making this all up on the fly. If @thedrow or @auvipy have any further input, I'd trust them more than me :) Hope this helps, and thanks for the work and thought on this so far.
add gossip not supported exception