Skip to content

Skipped messages can get inconsistent results stored #440

@FinnLidbetter

Description

@FinnLidbetter

If a message is dropped because of the Age Limit middleware, then getting the result of that message will be inconsistent depending on how soon after the message is dropped an attempt to get the result is made.

Not sure what the resolution should be here. I think it should either consistently store a None result, or consistently store an exception that is not Exception("unknown").

What OS are you using?

Ubuntu 20.04

What version of Dramatiq are you using?

1.11.0

What did you do?

Use a broker with the AgeLimit and Results middleware. Have a message hit its age limit and get its result in a blocking get_result call. Get the result for the same message again a second or two after the blocking get_result returns a result.

What did you expect would happen?

The result should not change.

What happened?

On the first get_result, the result is None.
On the second get_result, retrieving the result raises a ResultFailure exception with exception Exception("unknown")

This appears to be because:

  1. When the Age Limit expires, the AgeLimit middleware sets the failed flag on the MessageProxy before the message is processed
    def before_process_message(self, broker, message):
    actor = broker.get_actor(message.actor_name)
    max_age = message.options.get("max_age") or actor.options.get("max_age", self.max_age)
    if not max_age:
    return
    if current_millis() - message.message_timestamp >= max_age:
    self.logger.warning("Message %r has exceeded its age limit.", message.message_id)
    message.fail()
    return
  2. In worker.py this causes and emit_after of "process_message" with a result of None
    self.broker.emit_after("process_message", message, result=res)
  3. The Results middleware's after_process_message will then store this None result and no exception
    self.backend.store_result(message, result, result_ttl)
  4. After this happens, worker.py still has some post-processing of the message to do. Since the message was marked with failed, post_process_message will do an emit_after of "nack"
    self.broker.emit_after("nack", message)
  5. Since there was no exception, the results middleware will then replace the result at the message key in the result store with
    def after_nack(self, broker, message):
    store_results, result_ttl = self._lookup_options(broker, message)
    if store_results and message.failed:
    exception = message._exception or Exception("unknown")
    self.backend.store_exception(message, exception, result_ttl)

Sample Code

import dramatiq
import time

from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)

@dramatiq.actor(store_results=True, max_age=1)
def sample_actor():
  return 42

if __name__ == "__main__":
    message = sample_actor.send()
    result_1 = message.get_result(block=True, timeout=2000)
    print(result_1)
    time.sleep(2)
    # An Exception("unknown") will be raised here.
    result_2 = message.get_result()
    print(result_2)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions