-
-
Notifications
You must be signed in to change notification settings - Fork 355
Description
Issues
Checklist
- Does your title concisely summarize the problem?
- Did you include a minimal, reproducible example?
- What OS are you using?
- What version of Dramatiq are you using?
- What did you do?
- What did you expect would happen?
- What happened?
What OS are you using?
Ubuntu 20.04.3 LTS
What version of Dramatiq are you using?
1.12.1, 1.12.2 (both versions are affected)
What did you do?
- Started a new process from an actor using
subprocess.Popen(geckodriver). - Communicated with the process.
- Called
geckodriver.terminate(). - Called
geckodriver.wait()to wait until the process terminates.
What did you expect would happen?
- The subprocess terminates.
- The subprocess can be terminated from the shell by issuing a
SIGTERM.
What happened?
Worker thread hung on geckodriver.wait() indefinitely and the process could not be terminated by SIGTERM.
Beginning with Dramatiq 1.12.1 (8da5157), the worker process calls signal.pthread_sigmask during its initialization procedure to block SIGTERM and other signals proper handlers are registered. However, the sigmask is inherited by all threads and processes started from the current thread.
When a worker starts:
- Main process blocks
SIGHUP,SIGINTandSIGTERMsignals. - Forks all worker processes - each starts with signals blocked.
- Each worker process:
- Spawns its worker threads.
- Registers signal handlers.
- Unblocks signals.
This causes all worker threads to be spawned with signals blocked, and all processes started within these threads also inherit the sigmask. This prevents any process started from a worker thread to react to SIGTERM unless they change their own sigmask.
This might also affect any threads/processes:
- Started from hooks in middlewares (
after_process_boot,before_worker_bootetc.). forksin middlewares.- Fork functions given via CLI.
Minimum code to reproduce the bug
worker.py:
import signal
import dramatiq
@dramatiq.actor
def get_sigmask():
current_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
print(f'current_sigmask = {current_sigmask}')From Python console:
>>> worker.get_sigmask.send()Output:
$ dramatiq -p 1 -t 1 setup_broker worker
[2022-01-15 23:16:21,666] [PID 789] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.12.1' is booting up.
[2022-01-15 23:16:21,665] [PID 793] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2022-01-15 23:16:21,666] [PID 798] [MainThread] [dramatiq.ForkProcess(0)] [INFO] Fork process 'dramatiq.middleware.prometheus:_run_exposition_server' is ready for action.
current_sigmask = {<Signals.SIGHUP: 1>, <Signals.SIGINT: 2>, <Signals.SIGTERM: 15>}
Expected output (works on 1.11.0):
$ dramatiq -p 1 -t 1 setup_broker worker
[2022-01-15 23:49:07,579] [PID 827] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.11.0' is booting up.
[2022-01-15 23:49:07,578] [PID 831] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2022-01-15 23:49:07,580] [PID 836] [MainThread] [dramatiq.ForkProcess(0)] [INFO] Fork process 'dramatiq.middleware.prometheus:_run_exposition_server' is ready for action.
current_sigmask = set()