Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forking doesn't work sometimes #297

Closed
Krukov opened this issue Mar 30, 2020 · 9 comments
Closed

Forking doesn't work sometimes #297

Krukov opened this issue Mar 30, 2020 · 9 comments
Milestone

Comments

@Krukov
Copy link

Krukov commented Mar 30, 2020

Issues

Thank you for Dramatiq.

We use dramatiq and Prometheus widely on our production and after update version of dramatiq we faced with a problem that sometimes (50/50) dramatiq didn't start listening port (forks from Prometheus middleware).
My suggestion that we should wait till worker init (we have additional code at one of middleware and execution of this code may take 0-1 second) and then run forks. What do you think?(1ff83f5#diff-597db348179f5c72c5ad215480791da6R443)

There are logs from process that didn't start process


30/03/2020 15:59:11[2020-03-30 12:59:11,395] [PID 1] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.8.1' is booting up.
30/03/2020 15:59:11[2020-03-30 12:59:11,544 INFO] src.queue.tasks | Worker init
....
30/03/2020 15:59:11[2020-03-30 12:59:11,796 DEBUG] dramatiq.middleware.prometheus.PrometheusMiddleware | Setting up metrics...
30/03/2020 15:59:11[2020-03-30 12:59:11,797 DEBUG] dramatiq.WorkerProcess(0) | Loading modules...
30/03/2020 15:59:11[2020-03-30 12:59:11,797 DEBUG] dramatiq.WorkerProcess(0) | Sending forks to main process...
30/03/2020 15:59:11[2020-03-30 12:59:11,797 DEBUG] dramatiq.WorkerProcess(0) | Starting worker threads...
30/03/2020 15:59:11[2020-03-30 12:59:11,808 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'enable_trade'.
30/03/2020 15:59:11[2020-03-30 12:59:11,809 DEBUG] dramatiq.worker.ConsumerThread(enable_trade) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,809 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'gather'.
30/03/2020 15:59:11[2020-03-30 12:59:11,811 DEBUG] dramatiq.worker.ConsumerThread(gather) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,811 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'iterations'.
30/03/2020 15:59:11[2020-03-30 12:59:11,813 DEBUG] dramatiq.worker.ConsumerThread(iterations) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,813 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'default'.
30/03/2020 15:59:11[2020-03-30 12:59:11,816 DEBUG] dramatiq.worker.ConsumerThread(default) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,817 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'gather.DQ'.
30/03/2020 15:59:11[2020-03-30 12:59:11,818 DEBUG] dramatiq.worker.ConsumerThread(gather.DQ) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,818 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'default.DQ'.
30/03/2020 15:59:11[2020-03-30 12:59:11,820 DEBUG] dramatiq.worker.ConsumerThread(default.DQ) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,820 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'iterations.DQ'.
30/03/2020 15:59:11[2020-03-30 12:59:11,825 DEBUG] dramatiq.worker.ConsumerThread(iterations.DQ) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,825 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'enable_trade.DQ'.
30/03/2020 15:59:11[2020-03-30 12:59:11,827 DEBUG] dramatiq.worker.ConsumerThread(enable_trade.DQ) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,828 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,829 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,830 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,831 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,831 INFO] dramatiq.WorkerProcess(0) | Worker process is ready for action.
30/03/2020 15:59:11[2020-03-30 12:59:11,857 INFO] src.queue.tasks | Worker init
...
30/03/2020 15:59:11[2020-03-30 12:59:11,982 DEBUG] dramatiq.middleware.prometheus.PrometheusMiddleware | Setting up metrics...
30/03/2020 15:59:11[2020-03-30 12:59:11,986 DEBUG] dramatiq.WorkerProcess(1) | Loading modules...
30/03/2020 15:59:11[2020-03-30 12:59:11,987 DEBUG] dramatiq.WorkerProcess(1) | Starting worker threads...
30/03/2020 15:59:11[2020-03-30 12:59:11,987 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'enable_trade'.
30/03/2020 15:59:11[2020-03-30 12:59:11,991 DEBUG] dramatiq.worker.ConsumerThread(enable_trade) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,991 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'gather'.
30/03/2020 15:59:11[2020-03-30 12:59:11,996 DEBUG] dramatiq.worker.ConsumerThread(gather) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,996 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'iterations'.
30/03/2020 15:59:11[2020-03-30 12:59:11,998 DEBUG] dramatiq.worker.ConsumerThread(iterations) | Running consumer thread...
30/03/2020 15:59:11[2020-03-30 12:59:11,998 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for queue 'default'.
30/03/2020 15:59:12[2020-03-30 12:59:11,999 DEBUG] dramatiq.worker.ConsumerThread(default) | Running consumer thread...
30/03/2020 15:59:12[2020-03-30 12:59:11,999 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'gather.DQ'.
30/03/2020 15:59:12[2020-03-30 12:59:12,001 DEBUG] dramatiq.worker.ConsumerThread(gather.DQ) | Running consumer thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,001 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'default.DQ'.
30/03/2020 15:59:12[2020-03-30 12:59:12,002 DEBUG] dramatiq.worker.ConsumerThread(default.DQ) | Running consumer thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,002 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'iterations.DQ'.
30/03/2020 15:59:12[2020-03-30 12:59:12,012 DEBUG] dramatiq.worker.ConsumerThread(iterations.DQ) | Running consumer thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,012 DEBUG] dramatiq.worker._WorkerMiddleware | Adding consumer for delay queue 'enable_trade.DQ'.
30/03/2020 15:59:12[2020-03-30 12:59:12,013 DEBUG] dramatiq.worker.ConsumerThread(enable_trade.DQ) | Running consumer thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,014 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,015 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,015 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,015 DEBUG] dramatiq.worker.WorkerThread | Running worker thread...
30/03/2020 15:59:12[2020-03-30 12:59:12,016 INFO] dramatiq.WorkerProcess(1) | Worker process is ready for action.

python:3.7-slim

dramatiq --version
1.8.1
dramatiq --processes ${WORKER_PROCESSES} --threads 4 src.queue.tasks -Q ${QUEUE_NAME}
@Bogdanp
Copy link
Owner

Bogdanp commented Apr 4, 2020

Tell me more about what you're doing at the worker level that is affecting your forks. I think it'd be safe to move the forks to after worker init, but I want to understand your use case better.

Also, are you able to work around this problem by making the forks wait/retry?

@Krukov
Copy link
Author

Krukov commented Apr 4, 2020

I'll try...
We have a custom prometheus middleware that redefine the 'after_process_boot' method with metrics init, because we need to add some extra labels and metrics. Those labels based on a responce of some http service, so we make a request at constructor of our middleware...
I think that I can try to remove our logic from the init of the middleware to somewhere else to boost workers initialization and i hope it can help.

@Krukov
Copy link
Author

Krukov commented Apr 4, 2020

I can try to work with this problem by contribute to dramatiq, if you prefer

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 4, 2020

I think it shouldn't be a hard change to make and I think I'd prefer to make it myself since there are a few rough edges in the cli module. I just want to make sure I have a good understanding of the problem and implications of making the fork functions wait for initialization.

Can you post that middleware, maybe with irrelevant stuff stripped?

@Krukov
Copy link
Author

Krukov commented Apr 6, 2020

I was wrong, we are not redefining init , Our middleware:

from dramatiq.middleware.prometheus import Prometheus as OriginalPrometheus, DB_PATH


class PrometheusMiddleware(OriginalPrometheus):

    default_labels = ("queue_name", "actor_name")

    def _init_meta(self):
        self._meta = requests.get("...").json()

    def get_labels(self):
        return list(self.default_labels) + list(self._meta.keys())

    def get_message_labels(self, message):
        return tuple([message.queue_name, message.actor_name, *list(self._meta.values())])

    def after_process_boot(self, broker):
        self._init_meta()
        os.environ["prometheus_multiproc_dir"] = DB_PATH

        # This import MUST happen at runtime, after process boot and
        # after the env variable has been set up.
        import prometheus_client as prom

        labels = self.get_labels()
        self.logger.debug("Setting up metrics...")
        registry = prom.CollectorRegistry()
        self.total_messages = prom.Counter(
            "dramatiq_messages_total", "The total number of messages processed.", labels, registry=registry
        )
        ....
        self.add_custom_metrics(prom, labels, registry)

    def after_nack(self, broker, message):
        labels = self.get_message_labels(message)
        self.total_rejected_messages.labels(*labels).inc()

    def after_enqueue(self, broker, message, delay):
        if "retries" in message.options:
            labels = self.get_message_labels(message)
            self.total_retried_messages.labels(*labels).inc()

    def before_delay_message(self, broker, message):
        ...

    def before_process_message(self, broker, message):
        ...

    def after_process_message(self, broker, message, *, result=None, exception=None):
        del result
        labels = self.get_message_labels(message)
        message_start_time = self.message_start_times.pop(message.message_id, current_millis())
        message_duration = current_millis() - message_start_time
        self.message_durations.labels(*labels).observe(message_duration)
        self.inprogress_messages.labels(*labels).dec()
        self.total_messages.labels(*labels).inc()
        if exception is not None:
            self.total_errored_messages.labels(*labels).inc()

        self.set_custom_metrics(message)

    after_skip_message = after_process_message

    def set_custom_metrics(self, message):
        ...

... - copy-paste from OriginalPrometheus middleware

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 18, 2020

Thanks! Just to make sure changing this would fix things for you, can you try making your own version of the fork function that sleeps for a few seconds before calling _run_exposition_server?

Something along these lines:

def run_prometheus_fork():
  sleep(10)
  from dramatiq.middleware.prometheus import _run_exposition_server
  _run_exposition_server()

@Ecno92
Copy link

Ecno92 commented May 9, 2020

We can confirm this issue also for the default Prometheus middleware. We have been able to successfully work around the issue by using a --fork-function variant based on the suggestion of @Bogdanp:

from time import sleep
from dramatiq.middleware.prometheus import _run_exposition_server

def run_prometheus_fork():
    sleep(10)
    logger.debug("Starting Prometheus server after 10s sleep.")
    try:
        _run_exposition_server()
    except OSError:
        logger.debug("Prometheus server already started.")

We noticed that we could reproduce the issue by increasing the amounts of processes from 2 (matching CPU count) to a higher amount like 4-20.
We are running the worker with --threads 1.

@Bogdanp Bogdanp added this to the v1.9.0 milestone May 10, 2020
@Bogdanp
Copy link
Owner

Bogdanp commented May 10, 2020

Thanks @Ecno92 . I think there is definitely a race here that I'll have to fix. I'll take a look sometime this week.

@Bogdanp Bogdanp closed this as completed in a8ff8a1 Jun 8, 2020
@ClimenteA
Copy link

This worked for me:

In prometheus.py file select a free port using socketserver module.

def _run_exposition_server():
    logger = get_logger(__name__, "_run_exposition_server")
    logger.debug("Starting exposition server...")

    try:
        import socketserver
        
        with socketserver.TCPServer(("localhost", 0), None) as s:
            free_port = s.server_address[1]
        
        address = (HTTP_HOST, free_port)
        
        # address = (HTTP_HOST, HTTP_PORT)
        httpd = HTTPServer(address, _metrics_handler)
        httpd.serve_forever()
    except KeyboardInterrupt:
        logger.debug("Stopping exposition server...")
        httpd.shutdown()

    return 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants