Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible prefetch issue affecting performances #183

Closed
xelhark opened this Issue Mar 7, 2019 · 5 comments

Comments

Projects
None yet
2 participants
@xelhark
Copy link
Contributor

xelhark commented Mar 7, 2019

What OS are you using?

Docker Python instance (FROM python:3.6). Also tested on OSX 10.14

What version of Dramatiq are you using?

1.5.0

What did you do?

I launched 15 tasks with 15 workers

What did you expect would happen?

Each worker should execute exactly one task

What happened?

Some worker executed multiple tasks, other workers executed none, slowing down the performances of the final results.

Detailed description:

I noticed some performance issues in my app and managed to reproduce them with the following code:

Basically, I have an actor that will spawn multiple tasks, each task will take some time to complete then print something.

I'd expect the tasks to be processed concurrently, given enough workers, but apparently it isn't so. Some workers execute multiple tasks while others execute none, which makes me think that there might be some preload issue:

Some workers might take multiple tasks and execute them one by one.

I created an example app that I used to reproduce the issue:

@dramatiq.actor
def dummy_job_secondary():
    time.sleep(1)
    logger.info(f"[DUMMY] Executed")


@dramatiq.actor
def dummy_job():
    tasks = [dummy_job_secondary.message() for _ in range(15)]
    pipe_group(tasks).run()

I started 15 workers with the following script:

#!/usr/bin/env bash

for (( VAR = 0; VAR < 15; ++VAR )); do
    dramatiq config.dramatiq --threads 1 --processes 1 --log-file=dramatiq_log${VAR}.txt &
done

Some log files contain no executions:

[2019-03-07 15:41:19,625] [PID 57866] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.5.0' is booting up.
[2019-03-07 15:41:21,114] [PID 57916] [MainThread] [raven.base.Client] [INFO] Raven is not configured (logging is disabled). Please see the documentation for more information.
[2019-03-07 15:41:21,174] [PID 57916] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2019-03-07 15:46:12,901] [PID 57916] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
[2019-03-07 15:46:13,364] [PID 57916] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-03-07 15:46:14,644] [PID 57916] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.   

while others contain multiple executions:

[2019-03-07 15:41:19,621] [PID 57856] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.5.0' is booting up.
[2019-03-07 15:41:21,119] [PID 57913] [MainThread] [raven.base.Client] [INFO] Raven is not configured (logging is disabled). Please see the documentation for more information.
[2019-03-07 15:41:21,180] [PID 57913] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2019-03-07 15:41:33,768] [PID 57913] [Thread-5] [IT Worker] [INFO] [DUMMY] Executed
[2019-03-07 15:41:34,786] [PID 57913] [Thread-5] [IT Worker] [INFO] [DUMMY] Executed
[2019-03-07 15:46:12,901] [PID 57913] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
[2019-03-07 15:46:13,377] [PID 57913] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-03-07 15:46:14,207] [PID 57913] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. 

As you can see, the executions are happening one after the other after a full second (the time it takes for the first task to complete).

This is incredibly detrimental for the performances of my application, having all of the tasks executed at the same time would be great.

NOTE:

I'm using a custom broker that allows me to send multiple tasks with a redis pipeline, here's the code of the broker, but I don't think it has any effect on the preload issue:

class RedisPipeBroker(RedisBroker):
    def __init__(self, *args, **kwargs):
        super(RedisPipeBroker, self).__init__(*args, **kwargs)
        self.pipe = None

    def init_pipe(self):
        self.pipe = self.client.pipeline()

    def execute(self):
        self.pipe.execute()

    def _dispatch(self, command):
        # Micro-optimization: by hoisting these up here we avoid
        # allocating the list on every call.
        dispatch = self.scripts["dispatch"]
        keys = [self.namespace]

        def do_dispatch(queue_name, *args):
            timestamp = current_millis()
            args = [
                command,
                timestamp,
                queue_name,
                self.broker_id,
                self.heartbeat_timeout,
                self.dead_message_ttl,
                self._should_do_maintenance(command),
                *args,
            ]
            client = None
            if command == 'enqueue':
                client = self.pipe
            return dispatch(args=args, keys=keys, client=client)

        return do_dispatch


class pipe_group(group):
    def run(self, *args, **kwargs):
        assert isinstance(self.broker, RedisPipeBroker)
        self.broker.init_pipe()
        super(pipe_group, self).run(*args, **kwargs)
        self.broker.execute()

Config

I used the following configuration:

import dramatiq
from my_brokers import RedisPipeBroker

broker = RedisPipeBroker(url="redis://localhost...")
dramatiq.set_broker(broker)

# Import tasks
from dummy_tasks import *
@Bogdanp

This comment has been minimized.

Copy link
Owner

Bogdanp commented Mar 7, 2019

This is expected since we prefetch 2 messages for every worker thread (see this code). As noted in the comment, starvation (what you're seeing here) is possible. If you want to work around this either change that line in your own distribution of Dramatiq or make a PR to make that value configurable via an environment variable (like this) and I'll merge it in.

@xelhark

This comment has been minimized.

Copy link
Contributor Author

xelhark commented Mar 7, 2019

Alternatively, is it possible to use a custom Worker class?

That code is executed in the init function of the worker so it would be trivial and also allow more user customization

@xelhark

This comment has been minimized.

Copy link
Contributor Author

xelhark commented Mar 11, 2019

@Bogdanp news?

I also added a system to allow the user to specify his own Worker Class, but I'll send you a different PR for that if you prefer.

I would much rather install a packaged version rather than my own fork honestly, if you give me a couple of guidelines I can add docs for these features too or adjust the PR according to your preferences, just let me know.

@Bogdanp

This comment has been minimized.

Copy link
Owner

Bogdanp commented Mar 12, 2019

This'll get merged this week.

I also added a system to allow the user to specify his own Worker Class, but I'll send you a different PR for that if you prefer.

I'll take a look if you open another PR, but I don't think that's something I would be willing to merge into Dramatiq. Being configurable at that level is not one of the project's goals.

@Bogdanp Bogdanp added the enhancement label Mar 13, 2019

@Bogdanp

This comment has been minimized.

Copy link
Owner

Bogdanp commented Mar 13, 2019

Fixed in 8a103e0.

@Bogdanp Bogdanp closed this Mar 13, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.