Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Add long-polling support to SQS transport #198

Closed
wants to merge 1 commit into
from

Conversation

Projects
None yet
8 participants
Contributor

jamesls commented Jan 11, 2013

We've just released a new version of boto, which adds long
polling support for SQS.
#173 already has a discussion about adding long polling support

to the SQS transport, which is what I've implemented here.

The polling interval was set to 0 and a new
transport option (wait_time_seconds) was added. This
parameter specifies how long to wait for a message from
SQS, and defaults to 20 seconds, which is the maximum
value currently allowed by SQS.

For testing, I verified that the tests in funtests/tests/test_SQS.py
still pass. I also verified that I see the appropriate API calls
being made to SQS as well as 20 seconds between the get_messages
calls.

@jamesls jamesls Add long-polling support to SQS transport
The polling interval was set to 0 and a new
transport option (wait_time_seconds) was added.  This
parameter specifies how long to wait for a message from
SQS, and defaults to 20 seconds, which is the maximum
value currently allowed by SQS.
88d6ff6

@ask ask closed this in fdf962b Jan 11, 2013

Owner

ask commented Jan 11, 2013

Thanks, that's awesome!

Owner

ask commented Feb 9, 2013

@jamesis, do you think #202 could be a regression or is there some configuration problem maybe?

Contributor

jamesls commented Feb 12, 2013

Looking into it now, I suspect this is probably a regression.

Contributor

jamesls commented Feb 13, 2013

I was able to reproduce this.

It looks like the reason this is happening is that the pidbox queue is blocking on a long poll call. I'm not sure what would the best way to handle this. Looking at transport/redis.py, it seems like it checks during _queue_bind if an exchange is of type fanout and handles this differently. I tried updating SQS.py with a similar approach and I verified that I don't block for 20 seconds between fetching messages.

But now I'm wondering if the suggestion in #173 to just update the get_messages call with the new parameter is the right way to do this. Is it OK for the Channel._get to block for up to 20 seconds each time it's called? If so, I have a patch that should work.

Is there any progress with this issue? I'd love to take advantage of long-polling but sadly I'm being hit by #202 as well.

Just want to bump this as SQS users will be stuck on Kombu v2.5.4 (Celery v3.0.13) until it's resolved!

Owner

ask commented Mar 10, 2013

Sadly, I don't have access to SQS anymore.

I don't want to use my credit card there anymore because they are impossible to get in contact with if something goes wrong.

@ask ask reopened this Mar 10, 2013

Contributor

jamesls commented Mar 10, 2013

I can put together a patch, but can someone comment on whether my suggested approach above looks correct? At this point, what I'd propose is:

  • Switch the default options to be the previous settings (long polling is off by default, you can opt in with config params)
  • Make the updates to _queue_bind to handle fanout types differently (roughly based on what transport/redis.py does.
  • Update docs with new config params.

Also, is it ok for Channel._get to block for up to _wait_time_seconds? Otherwise, the only alternative would be to long poll in separate threads and poll the threads.

@jamesls Sadly I'm not familiar enough with Kombu's inner workings to comment. Perhaps @ask could check if those suggestions are okay?

Owner

ask commented Mar 12, 2013

It would be better if channel._get did not block, since there may be multiple channels open and this means that there would be delay * channels before getting to an active queue.

The Celery worker uses two channels, one for tasks and one for remote control commands,
it does this so that remote control command gets priority even if there are many tasks to be consumed.
Seeing as SQS doesn't support broadcast anyway, it could skip the operation if the exchange type is a fanout.

Owner

ask commented Mar 19, 2013

Btw, it may also be possible to move the polling to the Transport level than at the channel level.

Every transport has a .cycle attribute which is used to poll the channels when Connection.drain_events is called.
SQS could use a custom cycle that consolidates the channels so that only a single request is issued to AWS, asking for new messages for all queues in all the channels.

Something like:

    GET /queues/foo GET /queues/bar

instead of:

GET /queues/foo
GET /queues/bar

not sure how that work with the SQS API, this would depend on there being a 'get from multiple queues' command in SQS.

Contributor

jamesls commented Mar 19, 2013

Thanks for the info, I'm working to put something together.

I should mention that as a workaround to get the old behavior you can set the config options to:

wait_time_seconds = 0
polling_seconds = 1 

Actually, setting wait_time_seconds to 0 doesn't work. When it is 0, it treats it as a false in Channel.wait_time_seconds() and then returns the default of 20. Here is what I have set:

BROKER_TRANSPORT_OPTIONS = {
'wait_time_seconds': 1,
'polling_seconds': 1,
}

Owner

ask commented Mar 21, 2013

Ok, I will make 1 the default for now until we have a fix ready!

0 may be better, I wasn't able to test without modifying kombu internals. Set to 1 each worker only will process 1 message a second which may not be good enough for some people.

Contributor

jamesls commented Mar 21, 2013

I submitted a pull request (#215) that puts the defaults back, so that long polling is off by default.

It still has the non-ideal behavior of waiting up to delay * channel if you do opt in to long polling, but works fine for a single queue.

I am taking @ask's advice of moving the polling to the transport level, but there's no "get from multiple queues" command in AWS so I've written a client side implementation of that. Still missing a few pieces, but for the time being #215 should mitigate.

@jamesls So, for what I can understand from the thread and the code that was finally merged into master (#215) long polling works without major issues (with the fanout fix) when I enable it; and is disabled by default (default_wait_time_seconds = 0); correct?

@ask , @jamesls : is there any more testing to be performed around this? I'm working on celery+sqs and could do some testing today/tomorrow.

@ask: since it is disabled by default, how do I enable long polling from celery's configuration? I see that BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} exists... maybe there is something like BROKER_TRANSPORT_OPTIONS = {'default_wait_time_seconds': 20} ?

Contributor

diranged commented Nov 20, 2013

@ask, its a bit unclear right now what the status of long polling support is. Can you comment here?

Contributor

diranged commented Nov 22, 2013

Based on my testing, long polling works fine in master now when you set wait_time_seconds to '20'. I'll end up enabling this feature in another code commit soon..

bshi commented Nov 22, 2013

@diranged did you get a chance to test in multiple queue scenario?

Contributor

diranged commented Nov 22, 2013

I noticed that there is some strange behavior when Celery is setup to use multiple queues. It seems that this transport walks through the queues in a loop and behaves a little odd. That is on my list of things to debug next week actually. When using a single queue (which is actually how our celery workers are configured), it seems to work perfectly though FWIW.

Contributor

diranged commented Nov 22, 2013

It looks to me like having long_polling on (say 20 seconds) while watching multiple queues is broken. If you are watching queue1 and queue2...

queue1: 0 messages
queue2: 20 messages
... celery picks off message from queue2.... 
... sleep 20 seconds ....
queue1: 0 messages
queue2: 19 messages
... celery picks off message from queue2....
... sleep 20 seconds ...

When there is only one queue configured (queue2), it seems to run through the tasks as quickly as possible. There seems to be some bug in the logic around waiting on SQS when you have multiple queues. I thought that each channel creates its own thread for handling backend SQS communication, so I'm not sure why one would impact the other.

@ask, could you dig into this a little bit?

iandees commented Mar 29, 2014

Is this still an issue? I'm looking at using Celery with multiple queues and want to keep an eye out for it if so.

@ask ask added the SQS label May 21, 2014

Owner

ask commented Jul 9, 2016

SQS in master uses long polling and is rewritten to use async I/O.

@ask ask closed this Jul 9, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment