Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in redis broker join #480

Closed
7 tasks done
CaselIT opened this issue Mar 14, 2022 · 0 comments · Fixed by #481
Closed
7 tasks done

Race condition in redis broker join #480

CaselIT opened this issue Mar 14, 2022 · 0 comments · Fixed by #481

Comments

@CaselIT
Copy link
Contributor

CaselIT commented Mar 14, 2022

Issues

GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

windows 10 / ubuntu 20.04

What version of Dramatiq are you using?

1.12.0 and 1.12.3

What did you do?

A test in an automated pipeline tests that an actor enqueued with a delay is executed after the join of the broker returns.

What did you expect would happen?

The actor is correctly run

What happened?

Sometimes the actor does not run, because of a race condition.

Following it's an example code can reproduces the race condition:

import os
import dramatiq
import pytest
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import AgeLimit, Retries, ShutdownNotifications
import pathlib
import logging

logging.basicConfig()
logging.getLogger("dramatiq").setLevel(logging.DEBUG)

_MIDDLEWARES = [AgeLimit(), ShutdownNotifications(), Retries(max_retries=3)]
redis_broker = RedisBroker(
    host="localhost",
    middleware=_MIDDLEWARES,
    namespace=f"ns-{os.getpid()}",
    password="redis-passwd",
)
dramatiq.set_broker(redis_broker)

@dramatiq.actor
def bar(s):
    pathlib.Path(f"file{s}.canary").touch(exist_ok=False)
    print(s)

@pytest.fixture(scope="module")
def wM():
    w = dramatiq.Worker(redis_broker)
    w.start()
    yield w
    w.stop()

N = 500
@pytest.fixture(scope="module")
def clean():
    def do_clean():
        for n in range(N):
            pathlib.Path(f"file{n}.canary").unlink(missing_ok=True)
    do_clean()
    yield
    do_clean()

@pytest.mark.parametrize("n", range(N))
def test(n, wM, clean):
    bar.send_with_options(kwargs={"s": str(n)}, delay=10)
    wM.broker.join(queue_name="default")
    wM.join()
    assert pathlib.Path(f"file{n}.canary").exists()

This can be run with pytest using pytest -x. Note that it can be quite rare on some environments, so a single run may not be sufficient. parallelizing with -n seems to help reproducing this, but it's not required to reporduce.

To better illustrate investigate the issue I've added some logs in the redis broker join operation:

diff --git a/dramatiq/brokers/redis.py b/dramatiq/brokers/redis.py   
index 0f37313..838595d 100644
--- a/dramatiq/brokers/redis.py
+++ b/dramatiq/brokers/redis.py
@@ -233,7 +233,10 @@ class RedisBroker(Broker):
 
             size = 0
             for name in (queue_name, dq_name(queue_name)):
-                size += self.do_qsize(name)
+                current = self.do_qsize(name)
+                self.logger.debug('queue %s size %s', name, current)
+                size += current
+            self.logger.debug('join total size %s', size)
 
             if size == 0:
                 return

with this change in place a failure logs the following:

===================================================================== FAILURES ======================================================================
_____________________________________________________________________ test[300] _____________________________________________________________________
[gw1] win32 -- Python 3.8.12 path\to\python.exe

n = 300, wM = <dramatiq.worker.Worker object at 0x0000012105930070>, clean = None

    @pytest.mark.parametrize("n", range(500))
    def test(n, wM, clean):
        bar.send_with_options(kwargs={"s": str(n)}, delay=10)
        wM.broker.join(queue_name="default")
        wM.join()
>       assert pathlib.Path(f"file{n}.canary").exists()
E       AssertionError: assert False
E        +  where False = <bound method Path.exists of WindowsPath('file300.canary')>()
E        +    where <bound method Path.exists of WindowsPath('file300.canary')> = WindowsPath('file300.canary').exists
E        +      where WindowsPath('file300.canary') = <class 'pathlib.Path'>('file300.canary')
E        +        where <class 'pathlib.Path'> = pathlib.Path

tests\a_test.py:70: AssertionError
----------------------------------------------------------------- Captured log call -----------------------------------------------------------------
DEBUG    dramatiq.broker.RedisBroker:redis.py:184 Enqueueing message '0619fc48-c787-4125-999e-8d439321e0ed' on queue 'default.DQ'.
DEBUG    dramatiq.broker.RedisBroker:redis.py:238 queue default size 0
DEBUG    dramatiq.broker.RedisBroker:redis.py:238 queue default.DQ size 1
DEBUG    dramatiq.broker.RedisBroker:redis.py:240 join total size 1
DEBUG    dramatiq.worker.ConsumerThread(default.DQ):worker.py:320 Pushing message '0619fc48-c787-4125-999e-8d439321e0ed' onto delay queue.
DEBUG    dramatiq.broker.RedisBroker:redis.py:184 Enqueueing message '0619fc48-c787-4125-999e-8d439321e0ed' on queue 'default'.
DEBUG    dramatiq.broker.RedisBroker:redis.py:238 queue default size 0
DEBUG    dramatiq.worker.ConsumerThread(default.DQ):worker.py:350 Acknowledging message '0619fc48-c787-4125-999e-8d439321e0ed'.
DEBUG    dramatiq.broker.RedisBroker:redis.py:238 queue default.DQ size 0
DEBUG    dramatiq.broker.RedisBroker:redis.py:240 join total size 0
============================================================== short test summary info ==============================================================

The race condition seems to be the following:

  • the message delay has elapsed and processing of the message is in progress in the worker thread
  • in the main thread the default queue size gets checked and returns 0
  • the worker thread pushes the message to the default queue, then acks the message in the delay queue
  • in the main thread the delay queue size gets checked and returns 0
  • the join returns, but a message is present in the default queue.

The fix seems to be very easy, and it's just to swap in which order the queues are checked.

diff --git a/dramatiq/brokers/redis.py b/dramatiq/brokers/redis.py
index 0f37313..ce4aa98 100644
--- a/dramatiq/brokers/redis.py
+++ b/dramatiq/brokers/redis.py
@@ -232,7 +232,7 @@ class RedisBroker(Broker):
                 raise QueueJoinTimeout(queue_name)

             size = 0
-            for name in (queue_name, dq_name(queue_name)):
+            for name in (dq_name(queue_name), queue_name):
                 size += self.do_qsize(name)

             if size == 0:

This this change in place I've run the test above 20 times without any failure.

I'm opening a PR with the suggested fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant