Skip to content

CurrentMessage don't work with AsyncIO #586

@vhsw

Description

@vhsw

What OS are you using?

Ubuntu 23.10

What version of Dramatiq are you using?

1.15.0

What did you do?

I'm trying to get the current message from an async actor

@dramatiq.actor
async def async_actor():
    print(f"async_actor: {CurrentMessage.get_current_message()=}")

And I getting None

async_actor: CurrentMessage.get_current_message()=None

Probably because CurrentMessage uses threading.local() and async actors runs in separate thread

What did you expect would happen?

I expect CurrentMessage.get_current_message() to return current message in async def actors, just like in regular def actors

What happened?

Here goes the full example

from contextvars import ContextVar

from dramatiq import MessageProxy, Middleware


#  Here is an example implementation of CurrentMessage that seems to work
class CurrentMessageUsingContextVar(Middleware):
    message: ContextVar[MessageProxy | None] = ContextVar("message")

    @classmethod
    def get_current_message(cls):
        return cls.message.get(None)

    def before_process_message(self, broker, message):
        assert self.message.get(None) is None
        self.message.set(message)

    def after_process_message(self, broker, message, *, result=None, exception=None):
        self.message.set(None)


import dramatiq
from dramatiq.brokers.stub import StubBroker
from dramatiq.middleware import (
    AsyncIO,
    CurrentMessage,
)

middleware = [
    AsyncIO(),
    CurrentMessage(),
    CurrentMessageUsingContextVar(),
]


broker = StubBroker(middleware=middleware)
dramatiq.set_broker(broker)


@dramatiq.actor
def sync_actor():
    print(f"sync_actor: {CurrentMessage.get_current_message()=}")
    print(f"sync_actor: {CurrentMessageUsingContextVar.get_current_message()=}")


sync_actor.send()
# Output:
# sync_actor: CurrentMessage.get_current_message()=<dramatiq.brokers.stub._StubMessageProxy object at 0x7f1b6c135490>
# sync_actor: CurrentMessageUsingContextVar.get_current_message()=<dramatiq.brokers.stub._StubMessageProxy object at 0x7f1b6c135490>


@dramatiq.actor
async def async_actor():
    print(f"async_actor: {CurrentMessage.get_current_message()=}")
    print(f"async_actor: {CurrentMessageUsingContextVar.get_current_message()=}")


async_actor.send()
# Output:
# async_actor: CurrentMessage.get_current_message()=None
# async_actor: CurrentMessageUsingContextVar.get_current_message()=<dramatiq.brokers.stub._StubMessageProxy object at 0x7f1b6c60c610>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions