New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task routing #35

Closed
Timaqf opened this Issue Jan 25, 2018 · 10 comments

Comments

2 participants
@Timaqf

Timaqf commented Jan 25, 2018

does dramatiq has anything like celery about task routing.

I didn't find any api to send task with queue_name, but I can build message myself to send task to specific queue.

 Message(
            queue_name="only_i_can_receive",
            actor_name="count_words",
            args=("https://www.google.com", ),
        )
broker.enqueue(message, delay=delay)

but i can't start worker only consumer for specific queue like celery.

celery worker tasks -Q tasks.to.nodes.1
@Timaqf

This comment has been minimized.

Timaqf commented Jan 25, 2018

in celery, I can start worker with different queue name, so I can send task to specific worker with different queue name.

but I can't control Worker.consumers from dramatiq-gevent script, maybe I didn't see any api to do this.

@Timaqf

This comment has been minimized.

Timaqf commented Jan 25, 2018

I find a hacking way, I can remove _WorkerMiddleware at here.

self.broker.add_middleware(worker_middleware)

and use worker._add_consumer(queue_name) after here.

worker = Worker(broker, worker_threads=args.threads)

somethings like

parser.add_argument(
    "--queues", "-Q", default=["default"], nargs="*", type=str,
    help="the queues of worker to consuming (default: default)"
)
for queue_name in args.queues:
    worker.logger.debug("Adding consumer for queue %r.", queue_name)
    worker._add_consumer(queue_name)

but this is conflict with current queue_name check.

if not _queue_name_re.fullmatch(queue_name):

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Jan 25, 2018

Yep, something like that ought to work. This use case isn't supported right now, but it's something I can add in the future.

Can you tell me more about your exact use case here, @Timaqf? Would actor priorities not be a good fit?

@Bogdanp Bogdanp self-assigned this Jan 25, 2018

@Bogdanp Bogdanp added the enhancement label Jan 25, 2018

@Timaqf

This comment has been minimized.

Timaqf commented Jan 25, 2018

@Bogdanp I want different worker to do different work in same app depends on work's args and kwargs.

@Timaqf

This comment has been minimized.

Timaqf commented Jan 25, 2018

@Bogdanp
somethings like server A only do priority HIGH job, server B only do priority LOW job. and I can decide job priority depends on job's args and kwargs.

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Jan 25, 2018

Gotcha. Right now you can use actor priorities for something like that but the disadvantage is priorities are per-actor and workers are homogeneous. I'll add something like your proposed enhancement soon.

@Timaqf

This comment has been minimized.

Timaqf commented Jan 25, 2018

nice man!

@ghost ghost referenced this issue Mar 12, 2018

Closed

Something went wrong.. #41

@Bogdanp Bogdanp closed this in b8b2426 Mar 16, 2018

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Mar 16, 2018

Apologies for the delay, @Timaqf. This is now in master and I'll cut a release tomorrow!

@Timaqf

This comment has been minimized.

Timaqf commented Mar 19, 2018

thanks.
but I read the code in 0.20.0, the --queues seems like white queues, it needs to be declared in actor's decoration, so it's in the code not in the command line.

like below

@dramatiq.actor(queue_name="low")
def count_words(*args):
    return 0

@dramatiq.actor(queue_name="high")
def count_words(*args):
    return 0

so I can start the worker only do high jobs like dramatiq count_words --queues high.
the question is I need to define function twice if I have two queues. and also the queues need to be declared in the code before the starting.

if do some changes in worker_process, declard the queue before the worker starts.

https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/__main__.py#L165

        worker = Worker(broker, queues=args.queues, worker_threads=args.threads)
        for queue_name in args.queues:
            worker.broker.declare_queue(queue_name)
        worker.start()

I can write the below code to do the same jobs.

@dramatiq.actor(queue_name="count_words_queue_default")
def count_words(*args):
    return 0

the starts like dramatiq count_words --queues count_words_queue_high.

let me known what you think.

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Mar 19, 2018

I think I see what you're saying. Unfortunately, the fact that each actor is bound to a single queue is a core part of the design and I don't think that should change. You could achieve what you're currently doing with modifying __main__ by writing a middleware and declaring the queues in its after_worker_boot hook.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment