Skip to content

Exception in WorkerPlugin is not handled correctly #6906

@vepadulano

Description

@vepadulano

I have an application that may output errors to stderr (which I then save to a file). The errors may sometimes lead to the worker dying and the Nanny restarting the worker. I was hoping to use a worker plugin to read the errors that were output from the worker to a file and raise an exception accordingly. After reading #4297 it seems that the only place where exceptions are actually handled is in the WorkerPlugin's setup method. With the following reproducer, a simple exception is not raised properly

import inspect
import os

from dask.distributed import LocalCluster, Client, WorkerPlugin
from dask import delayed


class MyWorkerPlugin(WorkerPlugin):
    def setup(self, *args, **kwargs):
        with open("worker_plugin.txt", "a") as f:
            f.write("#"*20 + "\n")
            f.write(f"BEGIN {inspect.stack()[0][3]}\n")
            f.write(f"{args}\n")
            f.write(f"{kwargs}\n")
            f.write(f"END {inspect.stack()[0][3]}\n")
            f.write("#"*20 + "\n")

        if os.path.exists("error_log.txt"):
            with open("worker_plugin.txt", "a") as f:
                f.write("FOUND ERROR WHEN STARTING WORKER\n")
            raise RuntimeError("Error during worker setup!")

        super().setup(*args, **kwargs)

    def transition(self, *args, **kwargs):
        with open("worker_plugin.txt", "a") as f:
            f.write("#"*20 + "\n")
            f.write(f"BEGIN {inspect.stack()[0][3]}\n")
            f.write(f"{args}\n")
            f.write(f"{kwargs}\n")
            f.write(f"END {inspect.stack()[0][3]}\n")
            f.write("#"*20 + "\n")

        super().transition(*args, **kwargs)

    def teardown(self, *args, **kwargs):
        with open("worker_plugin.txt", "a") as f:
            f.write("#"*20 + "\n")
            f.write(f"BEGIN {inspect.stack()[0][3]}\n")
            f.write(f"{args}\n")
            f.write(f"{kwargs}\n")
            f.write(f"END {inspect.stack()[0][3]}\n")
            f.write("#"*20 + "\n")

        super().teardown(*args, **kwargs)


def create_connection():
    cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=True)
    client = Client(cluster)
    client.register_worker_plugin(MyWorkerPlugin())
    return client


@delayed
def task():
    print("IN TASK")
    with open("error_log.txt", "w") as f:
        f.write("An error was triggered.\n")
    os._exit(0)  # Used to trigger worker restart


def main():
    client = create_connection()
    task().compute()


if __name__ == "__main__":
    raise SystemExit(main())

Notice that the os._exit call is made in the delayed task to reproduce the scenario of a worker dying and being restarted after some crash. As you can see, the WorkerPlugin setup method checks whether the output error log exists and if so it raises an exception. Unfortunately, this doesn't work as intended:

$: python test.py
IN TASK
2022-08-18 15:38:27,372 - distributed.nanny - WARNING - Restarting worker
IN TASK
2022-08-18 15:38:27,842 - distributed.nanny - WARNING - Restarting worker
IN TASK
2022-08-18 15:38:28,303 - distributed.nanny - WARNING - Restarting worker
2022-08-18 15:38:28,599 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-bc_8koau', purging
IN TASK
2022-08-18 15:38:28,778 - distributed.nanny - WARNING - Restarting worker
Traceback (most recent call last):
  File "/home/vpadulan/projects/rootcode/dask-segfault-repro/trigger-error-from-log-file/test.py", line 69, in <module>
    raise SystemExit(main())
  File "/home/vpadulan/projects/rootcode/dask-segfault-repro/trigger-error-from-log-file/test.py", line 65, in main
    task().compute()
  File "/home/vpadulan/.local/lib/python3.10/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/dask/base.py", line 598, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/client.py", line 3001, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/client.py", line 2175, in gather
    return self.sync(
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync
    return sync(
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
    result = yield future
  File "/home/vpadulan/.local/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/vpadulan/.local/lib/python3.10/site-packages/distributed/client.py", line 2038, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('task-54fed301-8571-4966-9392-0380265630ea', <WorkerState 'tcp://127.0.0.1:44289', name: 0, status: closed, memory: 0, processing: 1>)

The nanny restarts the worker but the exception is never raised and the app continues until the scheduler kills the worker for too many retries.
The output log file from the worker plugin correctly shows that the error was found:

####################
BEGIN setup
()
{'worker': <Worker 'tcp://127.0.0.1:40513', name: 0, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
END setup
####################
####################
BEGIN transition
('task-54fed301-8571-4966-9392-0380265630ea', 'released', 'waiting')
{}
END transition
####################
####################
BEGIN transition
('task-54fed301-8571-4966-9392-0380265630ea', 'waiting', 'ready')
{}
END transition
####################
####################
BEGIN transition
('task-54fed301-8571-4966-9392-0380265630ea', 'ready', 'executing')
{}
END transition
####################
####################
BEGIN setup
()
{'worker': <Worker 'tcp://127.0.0.1:38679', name: 0, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
END setup
####################
FOUND ERROR WHEN STARTING WORKER
[...]

Bottom line, I have the following two questions:

  1. Why is setup the only place where a WorkerPlugin can raise an exception that is properly handled?
  2. Why is the exception being handled only if it's written at the main scope of the function?

Setup

Python 3.10
Dask 2022.7.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions