Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle long-running consumer tasks #130

Closed
jeremycline opened this issue Feb 14, 2019 · 6 comments
Closed

Gracefully handle long-running consumer tasks #130

jeremycline opened this issue Feb 14, 2019 · 6 comments

Comments

@jeremycline
Copy link
Member

jeremycline commented Feb 14, 2019

At the moment, our consumer API does not handle callbacks that take significantly longer than the AMQP heartbeat interval.

To reproduce this, use a vanilla RabbitMQ broker on Fedora with fedora-messaging, the heartbeat is 60s, and run this callback with fedora-messaging consume:

import time

def bad_consumer(message):
    """ A consumer that misbehaves"""
    time.sleep(300)
    print(message)

This will exit with:

[pika.adapters.base_connection ERROR] Socket Error: 104                                                                                                                                                                                      
[fedora_messaging._session INFO] Channel <Channel number=1 CLOSED conn=<SelectConnection CLOSED socket=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>> closed (-1): ConnectionResetError(104, 'Connection rese
t by peer')                                                                                                                
[fedora_messaging._session WARNING] Connection to localhost closed unexpectedly (-1): ConnectionResetError(104, 'Connection reset by peer')

The problem is this:

  1. Long-running consumer callback blocks the pika event loop, so no heartbeating can occur.

  2. Connection times out and is killed by the broker.

  3. The message(s) the consumer was processing are re-queued by the broker as they were not acknowledged by the consumer (it acknowledges the messages once the consumer returns)

  4. The consumer does not gracefully restart the connection, but that's a minor problem because...

  5. When the consumer reconnects, it gets the message it was processing again.

Now, it's less than ideal to have consumer callbacks block for ages, but I suspect there are many such consumers in Fedora (the masher I think? @bowlofeggs would know) so I think we need to handle this better.

The basic problem is we need to heartbeat at the same time the message callback is running. That means either a) consumer callbacks need to do asynchronous IO (and have scheduling points if they're doing long CPU-intensive tasks) or b) the consumer callback runs in a separate thread.

The most appealing way forward for me is to back the consumer API with the Twisted client we already have, but call the consumer callback with the reactor.deferToThread API to run the message in a thread. This lets the Twisted event loop heartbeat while it waits on the thread to finish, and the callback can use synchronous APIs all it wants.

We need to be careful since consumer callbacks have not historically needed to be thread-safe, so handling one message at a time seems safest. This means using whatever the twisted equivalent of https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock is.

Another consideration is to make sure we still handle OS signals gracefully.

There is one last tricky issue here, which is that Twisted's reactor (AFAIK) can only be started once, which means we're going to have problems if callbacks raise a HaltException and then the caller tries to call fedora_messaging.api.consume later. Perhaps https://github.com/itamarst/crochet is the best way to handle that.

@bowlofeggs
Copy link
Contributor

Yeah Bodhi's masher (being renamed to composer) does run long tasks triggered by messages, and it also uses threads while doing so.

However, we are planning to switch it and many of our other tasks to use celery instead of fedmsg/fedora_messaging, mostly because they are tasks and that's what celery is designed to do.

Bodhi will still have a few other fedora_messaging consumers. For example, we have one to mark builds as signed when robosignatory signs them, and we might add another one to process messages from Greenwave.

A workaround for this problem for projects that have something like Celery might be to just have the message consumer fire off a Celery task to do whatever work is necessary to respond to a message, when that work might take a while.

@jeremycline
Copy link
Member Author

Another idea that occurred to me while running is we could document this behavior in the fedora_messaging.api.consume call and add a second API, fedora_messaging.api.twisted_consume or something that uses Twisted to consume. This API could make the assumption that the Twisted reactor is already running or will be started externally, and have the "deferToThread" behavior. Then we could move the CLI to use this version.

There are two downsides off the top of my head: this would add a hard dependency on Twisted (which is currently optional), and it would complicate things if we at some point want to move to plain asyncio and drop the Twisted dependency. However, I think Twisted plans to integrate nicely with asyncio so that may be something we don't need to worry about. A bit more research is probably warranted.

@abompard
Copy link
Member

Yeah I think that using Twisted for the main API could be a problem, since significant changes in Pika's Twisted plugin have been done recently and probably won't land in EPEL anytime soon.
I vote for the deferToThread method (or the asyncio equivalent). Maybe we can solve the locking mechanism by configuring the AMQP connection to only send the next message after the previous one was acknowledged (prefetch_count=1)? Or document that callbacks need to be thread-safe unless the user sets the prefetch_count value to 1 in the [qos] section in the config file.

@jeremycline
Copy link
Member Author

Well, we're already building pika for the infrastructure EPEL7. We could, in theory, ship pika-1.0.0b2 in the infrastructure repositories for Fedora and EPEL although I'd rather not for Fedora. You're obviously more familiar with the API changes, but the main one was the lack of publish confirms, right? If we just use it for the consume API that shouldn't pose a problem.

Relying on the pre-fetch count might work... although I just noticed it is a RabbitMQ extension. I guess we're already using the publish confirms so that's not an issue, but probably worth documenting. I have no doubt Twisted has a locking primitive, though if prefetch is 1 I can't think why that wouldn't be equivalent.

I'll see about making a proof-of-concept which should hopefully answer a lot of my questions.

@jeremycline
Copy link
Member Author

So I've been playing around with this for a bit, and things I've got are:

  • A way to ensure messages are processed serially without needing pre-fetch=1 (this actually was easy, the way the read loop is currently written forces this)

  • Works with pre-Pika-1.0.0 Twisted. Since it's not publishing, the fact that publish confirms aren't supported isn't a problem.

  • It should work with Twisted 12.whatever is in EL7. Haven't tested that yet.

The problem is I really don't like the API. I've done it a few different ways and I'm still not happy. I'm going to work on something else for a day or two and see if coming back with fresh eyes will inspire me. It definitely works at the moment, but it's hacky at the moment.

In case you're curious, the API I tried to get working neatly is:

def consume_twisted_async(callback, bindings=None, queues=None):
    """
    Start a consumer using the provided callback and run it using the Twisted
    event loop (reactor).

    .. warning:: Callbacks run in a Twisted-managed thread pool to avoid them
        blocking the event loop. If you wish to use Twisted APIs in your callback
        you must use the blockingCallFromThread or callFromThread APIs.

    This API expects the caller to start the reactor.

    Args:
        callback (callable): A callable object that accepts one positional argument,
            a :class:`Message` or a class object that implements the ``__call__``
            method. The class will be instantiated before use.
        bindings (dict or list of dict): Bindings to declare before consuming. This
            should be the same format as the :ref:`conf-bindings` configuration.
        queues (dict): The queue or queues to declare and consume from. This should be
            in the same format as the :ref:`conf-queues` configuration dictionary where
            each key is a queue name and each value is a dictionary of settings for that
            queue.

    Returns:
        Deferred:
            A deferred object that fires with the results of the consumer. The
            underlying AMQP consumer may be canceled or restarted due to
            network conditions without this deferred firing.

            The deferred will errback in the event of an unhandled error in the
            callback, or if the callback raises HaltConsumer with an exit code
            other than 0.

            The deferred will fire any callbacks if the message callback raises
            HaltConsumer with a exit code of 0.

            To stop the consumer, simply cancel this deferred object. This will
            allow any message that are currently being processed to complete,
            then gracefully shut down the consumer.
    """

It's the same call as the normal consume API, but it returns a Deferred/Future that called back when the consumer either crashed or was requested to stop by raising a HaltConsumer exception. I had trouble with things getting complicated managing several underlying deferreds (because this API can result in multiple AMQP consumers if there are multiple queues) and deciding what to do if one queue consumer fails, etc. I also am not certain I can get cancel to work the way I want, which is to wait for the current message to finish processing, ack it, and then quit.

@abompard
Copy link
Member

The PR has been merged, closing this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants