Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal of Finalizer Causes Breakage for UCX Protocol #7726

Open
quasiben opened this issue Mar 30, 2023 · 16 comments
Open

Removal of Finalizer Causes Breakage for UCX Protocol #7726

quasiben opened this issue Mar 30, 2023 · 16 comments

Comments

@quasiben
Copy link
Member

As mentioned in #7639 (comment) , we are seeing what we think is a bug due to the removal of the finalizer for a ThreadPoolExecutor . We are observing this behavior when using UCX/UCX-Py (see #7639 (comment) for a more detailed explanation)

Note: this MRE has no GPU/CUDA elements, only UCX/UCX-py

mamba create -y --name test-weakref -c rapidsai-nightly -c conda-forge python=3.10 dask distributed ucx-py

from dask.distributed import Client

if __name__ == "__main__":
    client = Client(protocol="ucx")
    print("shutting down...")
    print(client)

The above produces errors like the following:

  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/distributed/core.py", line 883, in handle_stream
    msgs = await comm.read()
  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/distributed/utils.py", line 752, in wrapper
    return await func(*args, **kwargs)
  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/distributed/comm/ucx.py", line 366, in read
    self.abort()
  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/distributed/comm/ucx.py", line 434, in abort
    self._ep.abort()
  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 564, in abort
    logger.debug("Endpoint.abort(): %s" % hex(self.uid))
  File "/datasets/bzaitlen/miniconda3/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 550, in uid
    return self._ep.handle
  File "ucp/_libs/ucx_endpoint.pyx", line 359, in ucp._libs.ucx_api.UCXEndpoint.handle.__get__
AssertionError

We think #7644 is the culprit here because when it is added back the errors are gone :). I confirmed the finalizer is being called with the following patch:

diff --git a/distributed/utils.py b/distributed/utils.py
index 4c1a66423..1a70d9f2b 100644
--- a/distributed/utils.py
+++ b/distributed/utils.py
@@ -1400,8 +1400,12 @@ def is_valid_xml(text):
     return xml.etree.ElementTree.fromstring(text) is not None


-_offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
+class CustomThreadPoolExecutor(ThreadPoolExecutor):
+    def shutdown(self, wait=True, *, cancel_future=True):
+        print("shutting down -- finalizer")

+_offload_executor = CustomThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
+weakref.finalize(_offload_executor, _offload_executor.shutdown)

 def import_term(name: str) -> AnyType:
     """Return the fully qualified term

When using this patch we observe no errors and confirmation that the finalizer is being called:

(test-weakref) bzaitlen@dgx15:~$ python test-cluster.py
shutting down...
<Client: 'ucx://127.0.0.1:49213' processes=10 threads=80, memory=0.98 TiB>
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
shutting down -- finalizer
@fjetter
Copy link
Member

fjetter commented Mar 30, 2023

@quasiben can you share a bit more information about what is actually breaking? I outlined in #7639 (comment) that I am deeply confused why this is causing any breakage. I discuss how this finalizer, even if called, should not have any action and everything in CPython docs suggest that this finalizer should not even be called.

I would like to ensure that we cover the functionality you require and make sure this is not just a red herring. I do not want to assert on a functionality that the CPython docs tell me should not even be there.

@pentschev
Copy link
Member

@quasiben can you share a bit more information about what is actually breaking? I outlined in #7639 (comment) that I am deeply confused why this is causing any breakage.

In the MRE, the breakage occurs when client/cluster are shutting down, a bunch of exceptions are raised and everything hangs until the process ultimately times out after 60 seconds. The person who originally reported the issue @randerzander also mentioned it occurs during his workflows, I can't say for sure what happens there because we're trying to work on the MRE first, but I imagine this occurs when workers are getting restarted. @randerzander please add more information about when it happens if this is known.

I discuss how this finalizer, even if called, should not have any action and everything in CPython docs suggest that this finalizer should not even be called.

I think we can for now discarding that it "should not be called", it's pretty clear at this point it is called, even with protocol="tcp" the reproducer above proves it's being called. As for the statement that there should not be any action, I honestly haven't looked in depth on it, but clearly it ensures some ordering to cleanup events that do not occur/happen differently otherwise.

I would like to ensure that we cover the functionality you require and make sure this is not just a red herring. I do not want to assert on a functionality that the CPython docs tell me should not even be there.

Ultimately, I agree. The problem here seems to be that there is no guarantees on the ordering of destruction of objects. This seems to happen because the ordering is in fact unpredictable. For instance, _offload_executor gets created at distributed.utils import time, which makes it hard to debug when things should be destroyed and that we in fact destroy everything correctly, making the cleanup in Distributed very fragile. This sort of issue related to cleaning up has happened multiple times in the past 2 years or so.

@wence-
Copy link
Contributor

wence- commented Mar 30, 2023

FWIW, we just need any finalizer here to fix Ben's initial repro:

e.g. this runs fine for me if I add this patch:

diff --git a/distributed/utils.py b/distributed/utils.py
index 4c1a6642..64904402 100644
--- a/distributed/utils.py
+++ b/distributed/utils.py
@@ -1401,7 +1401,7 @@ def is_valid_xml(text):
 
 
 _offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
-
+weakref.finalize(_offload_executor, lambda: None)
 
 def import_term(name: str) -> AnyType:
     """Return the fully qualified term

@wence-
Copy link
Contributor

wence- commented Mar 30, 2023

I think python makes a best-effort attempt to run finalizers that contain ref-cycles at interpreter shutdown and probably we've just been lucky that the particular combo of finalizers has been such that things run in an order that happens to work.

I don't think this is a clean solution, but it might be sufficient to patch over the issue for now.

As another data point, if I just completely cull the global _offload_executor object, Ben's issue still exhibits

@wence-
Copy link
Contributor

wence- commented Mar 30, 2023

Seems to be something to do with some datastructure setup that I don't think should be relied on, but if I create a no-op finalizer before the atexit handler in distributed/deploy/spec.py is registered, then Ben's example also works fine.

That is:

Cluster closes gracefully

diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py
index c35ec764..134a3bd8 100644
--- a/distributed/deploy/spec.py
+++ b/distributed/deploy/spec.py
@@ -686,6 +686,9 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny
     return workers
 
 
+weakref.finalize(lambda: None, lambda: None)
+
+
 @atexit.register
 def close_clusters():
     for cluster in list(SpecCluster._instances):

Interpreter segfaults:

diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py
index c35ec764..34bbe68f 100644
--- a/distributed/deploy/spec.py
+++ b/distributed/deploy/spec.py
@@ -693,3 +693,6 @@ def close_clusters():
             with suppress(gen.TimeoutError, TimeoutError):
                 if getattr(cluster, "status", Status.closed) != Status.closed:
                     cluster.close(timeout=10)
+
+
+weakref.finalize(lambda: None, lambda: None)

@quasiben
Copy link
Member Author

Apologies @fjetter for not mentioning this earlier. I think you are seeing some escalation here because there is a bit of urgency for us (RAPIDS). We are going through a RAPIDS release now and had just discovered this bug yesterday (bad timing us) and we had/have pinned to Dask 2023.3.2. This Dask release includes PR #7644 and thus we are scrambling a bit for fix short-term.

There was hope that we could work around this issue on our end and that hope lead me to delay mentioning our sense urgency -- my apologies for not laying it all out right way. Perhaps @wence- and I can continue exploring down the path Lawrence mentioned in the previous issue we simultaneously produce a release 2023.3.2.1 with the commit reverted

@fjetter
Copy link
Member

fjetter commented Mar 31, 2023

Thanks @wence- ! What you are posting in #7726 (comment) is very interesting and aligns with my suspicions that there is something else going on.

Interpreter segfaults:

So much fun. What python version was this on? The place where this is raising is interesting since it's again one of those "catch all" atexits, close_clusters. I would prefer not having any of those


I'm happy to add any harmless patches to the code base (i.e. also fine reverting the original change) if that unblocks your release but I would like us to make progress on making this more robust. It looks like the RAPIDS team is hit by this rather frequently. If there is anything we can or should refactor to make this easier to maintain we should talk about this. I'm counting on help from the RAPIDS team for this.
Given this bug report I don't even know what kind of assertion is breaking or what kind of resource is not properly cleaned up.

@wence-
Copy link
Contributor

wence- commented Mar 31, 2023

So much fun. What python version was this on? The place where this is raising is interesting since it's again one of those "catch all" atexits, close_clusters. I would prefer not having any of those

This is 3.10. But it's the combination with UCX I think that is causing a segfault. We've somehow lost well before we get the segfault and so various invariants are not maintained and (sometimes) the asyncio implementation bugs out.

I'm happy to add any harmless patches to the code base (i.e. also fine reverting the original change) if that unblocks your release but I would like us to make progress on making this more robust. It looks like the RAPIDS team is hit by this rather frequently. If there is anything we can or should refactor to make this easier to maintain we should talk about this. I'm counting on help from the RAPIDS team for this.

Yeah, I would like to understand this. I am generally leary of state teardown in atexit handlers since ordering is import-order sensitive and that seems wrong.

My reading of the tea-leaves suggests that something like the following is happening:

  1. ucx-py uses weakref finalizers to tear down internal objects. These seem to be called "correctly". i.e. the lifetime management is correct
  2. In shutdown, however, not all distributed objects are collected in the correct order (AFAICT) so, for example in the close_clusters atexit handler we see this backtrace (and then hang):
Exception ignored in atexit callback: <function close_clusters at 0x7f86149e3e20>
Traceback (most recent call last):
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 695, in close_clusters
    cluster.close(timeout=10)
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/cluster.py", line 219, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 349, in sync
    return sync(
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 389, in f
    result = yield future
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 1849, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 441, in _close
    await self._correct_state()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 348, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/core.py", line 1185, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/core.py", line 1144, in live_comm
    comm = await connect(
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/core.py", line 292, in connect
    comm = await wait_for(
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 1849, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py", line 466, in connect
    ep = await ucp.create_endpoint(ip, port)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 1004, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 304, in create_endpoint
    ucx_ep = ucx_api.UCXEndpoint.create(
  File "ucp/_libs/ucx_endpoint.pyx", line 255, in ucp._libs.ucx_api.UCXEndpoint.create
AssertionError: 

What looks like is going on here is that the cluster shutdown is somehow happening after the ucx-py resources have been released, and so in finalization of the cluster, there is an attempt to resurrect the comm channel which fails because UCX is already shutdown.

I don't really understand how the object ownership model is supposed to work in this case right now (i.e. who is responsible for teardown in which order) in these kind of setups where all the workers and so forth are booted in the background.

@wence-
Copy link
Contributor

wence- commented Mar 31, 2023

Here's an exemplar backtrace for the segfault case. As you can see, the GC is completely hosed at this point

#8  <signal handler called>
#9  _PyObject_IS_GC (obj=<unknown at remote 0x143159b29b44>) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:452
#10 visit_decref (parent=<optimised out>, op=<unknown at remote 0x143159b29b44>)
    at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:456
#11 list_traverse (o=0x7fb55de6e300, visit=0x555b330477b0 <visit_decref>, arg=0x7fb55de6e300)
    at /usr/local/src/conda/python-3.10.10/Objects/listobject.c:2653
#12 0x0000555b330473e5 in subtract_refs (containers=<optimised out>)
    at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:482
#13 deduce_unreachable (base=base@entry=0x555b3340ad60, unreachable=unreachable@entry=0x7fb569c217c0)
    at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1105
#14 0x0000555b3304606f in gc_collect_main (tstate=0x555b33e27eb0, generation=0, n_collected=0x7fb569c218a0, 
    n_uncollectable=0x7fb569c21898, nofail=0) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1239
#15 0x0000555b330fa98e in gc_collect_with_callback (tstate=tstate@entry=0x555b33e27eb0, generation=0)
    at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1413
#16 0x0000555b33044c50 in gc_collect_generations (tstate=0x555b33e27eb0)
    at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1468
#17 _PyObject_GC_Alloc (basicsize=56, use_calloc=0) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:2297
#18 _PyObject_GC_Malloc (basicsize=56) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:2307
#19 PyType_GenericAlloc (type=0x555b3363c3b0, nitems=0) at /usr/local/src/conda/python-3.10.10/Objects/typeobject.c:1156
#20 0x0000555b3305cc09 in type_call (kwds={'lookup_line': False, 'locals': None}, 
    args=('/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', 725, 'recv'), 
    type=0x555b3363c3b0) at /usr/local/src/conda/python-3.10.10/Objects/typeobject.c:1123
#21 _PyObject_MakeTpCall (tstate=0x555b33e27eb0, callable=<type at remote 0x555b3363c3b0>, args=<optimised out>, 
    nargs=<optimised out>, keywords=('lookup_line', 'locals')) at /usr/local/src/conda/python-3.10.10/Objects/call.c:215
#22 0x0000555b33058ec9 in _PyObject_VectorcallTstate (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>, 
    args=<optimised out>, callable=<type at remote 0x555b3363c3b0>, tstate=0x555b33e27eb0)
    at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:112
#23 _PyObject_VectorcallTstate (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>, args=<optimised out>, 
    callable=<type at remote 0x555b3363c3b0>, tstate=0x555b33e27eb0)
    at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:99
#24 PyObject_Vectorcall (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>, args=<optimised out>, 
    callable=<type at remote 0x555b3363c3b0>) at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:123
#25 call_function (kwnames=('lookup_line', 'locals'), oparg=<optimised out>, pp_stack=<synthetic pointer>, 
    trace_info=0x7fb569c21a30, tstate=<optimised out>) at /usr/local/src/conda/python-3.10.10/Python/ceval.c:5891
#26 _PyEval_EvalFrameDefault (tstate=<optimised out>, 
    f=Frame 0x7fb564daa8e0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py, line 376, in extract (klass=<type at remote 0x555b3362e910>, frame_gen=<generator at remote 0x7fb557f22110>, limit=None, lookup_lines=True, capture_locals=False, result=<StackSummary at remote 0x7fb554b583b0>, fnames={'/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', '/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py'}, f=Frame 0x7fb557f84fc0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py, line 725, in recv (self=<Endpoint(_ep=<ucp._libs.ucx_api.UCXEndpoint at remote 0x7fb557f20c80>, _ctx=<ApplicationContext(progress_tasks=[], context=<ucp._libs.ucx_api.UCXContext at remote 0x7fb55de11ea0>, worker=<ucp._libs.ucx_api.UCXWorker at remote 0x7fb55df6dd40>, blocking_progress_mode=True, epoll_fd=49) at remote 0x7fb55de30be0>, _send_count=6, _recv_count=10, _finis...(truncated), throwflag=<optimised out>) at /usr/local/src/conda/python-3.10.10/Python/ceval.c:4231
#27 0x0000555b3306f581 in _PyEval_EvalFrame (throwflag=0, 
    f=Frame 0x7fb564daa8e0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py, line 376, in extract (klass=<type at remote 0x555b3362e910>, frame_gen=<generator at remote 0x7fb557f22110>, limit=None, lookup_lines=True, capture_locals=False, result=<StackSummary at remote 0x7fb554b583b0>, fnames={'/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', '/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py'}, f=Frame 0x7fb557f84fc0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py, line 725, in recv (self=<Endpoint(_ep=<ucp._libs.ucx_api.UCXEndpoint at remote 0x7fb557f20c80>, _ctx=<ApplicationContext(progress_tasks=[], context=<ucp._libs.ucx_api.UCXContext at remote 0x7fb55de11ea0>, worker=<ucp._libs.ucx_api.UCXWorker at remote 0x7fb55df6dd40>, blocking_progress_mode=True, epoll_fd=49) at remote 0x7fb55de30be0>, _send_count=6, _recv_count=10, _finis...(truncated), tstate=0x555b33e27eb0)
    at /usr/local/src/conda/python-3.10.10/Include/internal/pycore_ceval.h:46
#28 _PyEval_Vector (kwnames=<optimised out>, argcount=<optimised out>, args=0x7fb564c21f10, locals=0x0, con=0x7fb675dcdbe0, 

With matching python backtrace

(gdb) py-bt
Traceback (most recent call first):
  Garbage-collecting
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 376, in extract
    result.append(FrameSummary(
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 502, in __init__
    self.stack = StackSummary.extract(
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 552, in __init__
    context = TracebackException(
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 119, in print_exception
    te = TracebackException(type(value), value, tb, limit=limit, compact=True)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 636, in formatException
    traceback.print_exception(ei[0], ei[1], tb, None, sio)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 686, in format
    record.exc_text = self.formatException(record.exc_info)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 943, in format
    return fmt.format(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1100, in emit
    msg = self.format(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
    hdlr.handle(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1634, in handle
    self.callHandlers(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1624, in _log
    self.handle(record)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1506, in error
    self._log(ERROR, msg, args, **kwargs)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1512, in exception
    self.error(msg, *args, exc_info=exc_info, **kwargs)
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 778, in __exit__
    logger.exception(exc_value)
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 751, in wrapper
    with self:
  <built-in method task_wakeup of _asyncio.Task object at remote 0x7fb55df57850>
  <built-in method run of _contextvars.Context object at remote 0x7fb55dddd840>
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 1909, in _run_once
    handle._run()
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
    self._run_once()
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 215, in start
    self.asyncio_loop.run_forever()
  File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 510, in run_loop
    loop.start()
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 973, in _bootstrap
    self._bootstrap_inner()

@fjetter
Copy link
Member

fjetter commented Mar 31, 2023

I am generally leary of state teardown in atexit handlers since ordering is import-order sensitive and that seems wrong.

collecting all atexit handlers in a single module and register them in the "correct" order might already be helpful, wouldn't it? FWIW I don't think anybody likes the atexit handlers. If we can ensure proper cleanup without them somehow that would be very interesting.
The only mechanism I am aware of that works robust is a contextmanager but ages ago the decision was made that our objects should work without the need of context managers and I am afraid this is a decision that is difficult to revoke by now. Particularly in notebook environments I believe this is a requirement.

@wence-
Copy link
Contributor

wence- commented Mar 31, 2023

So there are, AFAICT, three atexit handlers:

from ._concurrent_futures_thread import _python_exit
from .client import _close_global_client
from .deploy.spec import close_clusters

# atexit.register(_python_exit)
# atexit.register(_close_global_client)
# atexit.register(close_clusters)
del _python_exit
del _close_global_client
del close_clusters

If I remove the registration of all of them, and run the bug with python bug.py then the code "completes" but produces a few warnings. If I do python -X dev -X tracemalloc bug.py then I see:

shutting down...
<Client: 'ucx://127.0.0.1:58631' processes=1 threads=1, memory=124.45 GiB>
[1680262822.127658] [shallot:51402] UCXPY  ERROR Non-thread-safe operation invoked on an event loop other than the current one
Traceback (most recent call last):
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/_libs/exceptions.py", line 13, in log_errors
    yield
  File "ucp/_libs/transfer_tag.pyx", line 141, in ucp._libs.ucx_api._tag_recv_callback
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/comm.py", line 16, in _cb_func
    future.set_exception(exception)
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 755, in call_soon
    self._check_thread()
  File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 792, in _check_thread
    raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

So somehow some event loop is out of whack.

@wence-
Copy link
Contributor

wence- commented Mar 31, 2023

Having done some more debugging (with @pentschev) the Comm object is being torn down on a thread other than the one that created it. This is problematic since ucx-py is not thread safe, and it appears that we are either in a deadlock situation (where the main thread has the UCX spinlock but not the GIL, and the communication thread has the GIL, but not the spinlock), or we get a little further release the GIL and cleanup occurs but smashes some part of the interpreter stack and we see segfaults.

The hypothesis as to why this no-op weakref finalizer "fixes" things is that it fortuitously happens to arrange that everything that is torn down is done so on the thread that owns the resource.

So, we don't have a solution right now (certainly not a quick fix that is less dangerous than re-introducing this no-op finalization).

wence- added a commit to wence-/dask-cuda that referenced this issue Apr 3, 2023
The last atexit finalizer in the chain of finalizers that distributed
registers needs to be one in the weakref module. Otherwise we observe
hangs in the shutdown of UCX-Py where one thread has the gil but not
the UCX spinlock and vice-versa.

To ensure this, unregister the known shutdown finalizers in
distributed, add our weakref finalizer, and then re-register them.
Along with a final re-implementation of the "python shutting down"
handler which must run before other handlers in distributed.
wence- added a commit to wence-/distributed that referenced this issue Apr 3, 2023
The first atexit finalizer in the chain of finalizers that distributed
registers needs to be one in the weakref module. Otherwise we observe
hangs in the shutdown of UCX-Py where one thread has the gil but not
the UCX spinlock and vice-versa.

To ensure this, register a no-op handler at the top of `__init__.py`.
wence- added a commit to wence-/distributed that referenced this issue Apr 3, 2023
The first atexit finalizer in the chain of finalizers that distributed
registers needs to be one in the weakref module. Otherwise we observe
hangs in the shutdown of UCX-Py where one thread has the gil but not
the UCX spinlock and vice-versa.

To ensure this, register a no-op handler at the top of `__init__.py`.
@wence-
Copy link
Contributor

wence- commented Apr 4, 2023

The hypothesis as to why this no-op weakref finalizer "fixes" things is that it fortuitously happens to arrange that everything that is torn down is done so on the thread that owns the resource.

A little more detail.

tl:dr; weakref.finalize is not a no-op because it changes the state of the atexit handler registry.

From the Python docs (and also by looking at the implementation of weakref.finalize):

When the program exits, each remaining live finalizer is called unless its atexit attribute has been set to false. They are called in reverse order of creation.

(Note that these calls are always made, even if the object is still alive).

How does this work? The first time weakref.finalize is called it hooks an atexit callback in with atexit.register (let's call it weakref_atexit).

Distributed also registers some atexit handlers to run during shutdown. atexit handlers are run in last-in-first-out order.

So we can have two scenarios:

  1. weakref.finalize called for the first time before distributed registers atexit handlers
  2. weakref.finalize called for the first time only after distributed registers atexit handlers

In scenario 1. the destruction order will be distributed_atexit, weakref_atexit; In scenario 2. it will be weakref_atexit, distributed_atexit.

UCX-Py uses weakref.finalize to clean up after its objects go out of scope, but these finalizers are set at object creation time. Because the cluster objects are only torn down in distributed_atexit these finalizers have not yet fired (because the comm is still live at this point).

If we're in scenario 1. all is good, distributed_atexit is called first, the UCX-Py comm channels are still live, so messages can be sent between client/cluster/scheduler/workers, and everything shuts down. When those resources are released the UCX-Py comm is closed and things work.

In scenario 2. we have a problem. The first atexit handler to fire is weakref_atexit which tears down the comm structures, then distributed_atexit runs which wants to send messages over the comm, only it's already torn down and there's not enough state to resurrect it, so we get hangs and other errors.

@charlesbluca
Copy link
Member

charlesbluca commented Apr 27, 2023

Following up here because I notice that UCX tests are failing intermittently quite often on GPU CI:

https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/6516/

I'm assuming this is related to the fact that the finalizer patch wasn't merged into main, maybe @pentschev or @wence- can confirm?

Interested in what we want to do here in the short term - should we xfail the UCX test module (or a subset of it) for now, or is there a modification we could make to the tests to unblock this trivially?

@fjetter
Copy link
Member

fjetter commented May 2, 2023

Having done some more debugging (with @pentschev) the Comm object is being torn down on a thread other than the one that created it. T

This is interesting. I would expect all Comm object to be created and destroyed on the main thread. Do you know which connection this is? (E.g. part of the ConnectionPool, BatchedSend, some ad-hoc connection)

(Sorry for the late reply, I was busy/out for a while)

@wence-
Copy link
Contributor

wence- commented May 2, 2023

This is interesting. I would expect all Comm object to be created and destroyed on the main thread. Do you know which connection this is? (E.g. part of the ConnectionPool, BatchedSend, some ad-hoc connection)

I don't recall unfortunately. I think that this is because the destructor is happening somehow by an atexit handler which can be on a different thread (it's hard to tell from the documentation/implementation what if any guarantees are given).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants