Skip to content

Add batch message-getting to the Kombu SQS transport#281

Merged
ask merged 19 commits intocelery:masterfrom
Nextdoor:master
Dec 13, 2013
Merged

Add batch message-getting to the Kombu SQS transport#281
ask merged 19 commits intocelery:masterfrom
Nextdoor:master

Conversation

@diranged
Copy link
Copy Markdown
Contributor

Should solve issue #280. Patch includes unit tests as well and has been tested manually by me against the normal master/worker and solo process model. Speeds up Celery SQS message fetching by 10x by default.

Allow the user to specify the number of messages an individual Channel
will fetch at a given time from SQS. Default this to Amazons max of
10 -- if there are less messages in the queue, Amazon will just return
whats available.

These messages are fetched and stored in a local _queue_message_cache
deque object. When subsequent _get() calls are made, this queue is
depleted until its empty before making another request to SQS for
new messages.
@ask
Copy link
Copy Markdown
Contributor

ask commented Nov 25, 2013

Thanks! Great work (and even with unit tests).

One problem right now is that it doesn't adhere to the QoS limits, I think it should rather
override Channel.drain_events directly, that way you can also avoid the extra "buffer by queue" mapping.
The mapping may be a problem since it may leave "orphan messages" if you stop consuming from a queue,
as there is no clean up step, so removing the mapping and using a single buffer would make the implementation
simpler.

So instead of a _get method you should define drain_events directly:

from kombu.five import Empty

def drain_events(self, timeout=None):
    queues = self._active_queues  # <-- list of queues we currently consume from
    buffer = self._buffer   # <-- the buffer deque

    # ... read messages into buffer .... #
    buffer.append((message, name_of_origin_queue))

    try:
        return buffer.popleft()
    except IndexError:
        raise Empty()

Here name_of_origin_queue is the name of the queue the message was delivered to.

As for QoS I suspect that you cannot reliably get the number of messages you are allowed
to receive since the object does not use locks (way too slow), but it will be better than nothing
even if it may consume one too many (or too few) by chance.

The following method would need to be added to kombu.transport.virtual.QoS:

    def can_consume_max_estimate(self):
        pcount = self.prefetch_count
        if pcount:
            return pcount - (len(self._delivered) - len(self._dirty))

Then you can use this to estimate the maximum number of messages drain_events may retrieve

@ask
Copy link
Copy Markdown
Contributor

ask commented Nov 25, 2013

Note: if the above method returns None it means 'no limit':

fetch_max = self.messages_to_fetch
limit = self._qos.can_consume_max_estimate()
limit = fetch_max if limit is None else min(fetch_max, limit)

@diranged
Copy link
Copy Markdown
Contributor Author

@ask, I've been digging into this change for a day now, and its a lot more complex than I expected. I'm particularly unclear about the use of basic_get() (which is called by Queue.get()), and the drain_events() methods.

From what I can tell, if I don't do the "buffering" logic inside of _get(), then I need to override basic_get() and drain_events(). Overriding both of those means I need to create common "get_from_cache" and "put_to_cache" methods for handling the local cache buffer. Even then, none of that solves the stranded message issue in the event that 5-10 messages are pulled down from SQS, but the worker only gets a chance to work on 1 of them.

I can see changing the "messages_to_fetch" code so that it uses QoS.can_consume_max_estimate() pretty easily, which allows the prefetch configuration to define how many messages to ask Amazon for at once... but I still think the simplest system is to handle all of the "get and buffer message" logic inside of the _get() method.

Perhaps there are two possible solutions here..

  1. Never get more than 1 message at a time for a noack queue?
  2. When shutting Kombu down gracefully, handle re-injecting the messages that were left in the buffer back into SQS?

This method will be used by channels with bulk 'get'
capabilities to improve performance of get calls by
retrieving more than one message at a time.
In preparation for building a _get_bulk() method to
handle getting many messages from SQS at once, this commit
moves most of the logic from _get() into separate methods
that can be more easily unit tested and re-used.
This patch implements a new private method on the SQS Channel
object named _get_bulk(). This method calls out to the QoS object
to get a number of estimated messages its allowed to retrieve
from SQS at any given moment, and then pulls them down and returns
a list of these messages.

An updated drain_events() method then stores these messages in
a local cache object, and pops them off one by one as the drain_events()
method is called by the Transport.drain_channel() method.
@diranged
Copy link
Copy Markdown
Contributor Author

diranged commented Dec 2, 2013

Here you go @ask. Per our discussion earlier today, this patch now only modifies the drain_events() method and leaves basic_get() and _get() alone. The drain_events() method itself is what handles queueing message objects up and storing them now.

The unit tests are basic, but execute. I've also executed the code inside our own Celery environment and it seems to work as expected.

Boto does not yet support Python 3+, so these
unit tests cannot run in Python 3.3
Python 3+ cannot even build/install Boto, so we
need to have a separate test-ci3.txt file to define
Python 3+ test requirements.
@diranged
Copy link
Copy Markdown
Contributor Author

diranged commented Dec 3, 2013

Here are my final touches for this pull request @ask. The unit tests now all execute in TravisCI where they can (Python 2.x), and are skipped on Python 3.x.

I will, in a separate pull request, clean up the SQS Mock code so that it uses dicts instead of files for storing messages.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to rewrite this so that it doesn't use the filesystem. Should be easy by just using self.messages = []

ask added a commit that referenced this pull request Dec 13, 2013
@ask ask merged commit 1b4672a into celery:master Dec 13, 2013
@ask
Copy link
Copy Markdown
Contributor

ask commented Dec 13, 2013

Merged but with changes so please review and test: 49bf684

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants