Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up usage around plain rpc #6082

Merged
merged 4 commits into from
Apr 8, 2022
Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Apr 7, 2022

While looking into #6080 I realized that plain rpc instances are incredibly rare. They have been mostly replaced by ConnectionPool and the PooledRPCCall.

The three places I found them being used can be seen below and they all did something wrong

  • gather_from_workers instantiates a couple but is never using them
  • SpecCluster. _close is using it to call a handler close that doesn't exist.
  • Scheduler.restart is using a couple of rpc to connect to the nannies. This may leave dangling comms or rpcs if exceptions pop up, e.g. during teardown. the async exit stack should be preferred

I think SpecCluster and Scheduler.restart, the pool would work just fine. I didn't want to change too much in one go and think this should already help

@fjetter
Copy link
Member Author

fjetter commented Apr 7, 2022

If the rpc is not closed with close_rpc, rpc.active relies on the finalizer to be called. As we've seen recently with the profiler thread, this can be a problem, particularly with the hot loop in #6081

@fjetter
Copy link
Member Author

fjetter commented Apr 7, 2022

@dask/gpu I am seeing some errors on gpuCI that I cannot understand. Can somebody have a look and tell me if this might be related to the changes I'm proposing?

    @pytest.fixture()
    def event_loop(scope="function"):
        loop = asyncio.new_event_loop()
        loop.set_exception_handler(handle_exception)
        ucp.reset()
        yield loop
>       ucp.reset()

distributed/comm/tests/test_ucx.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def reset():
        """Resets the UCX library by shutting down all of UCX.
    
        The library is initiated at next API call.
        """
        global _ctx
        if _ctx is not None:
            weakref_ctx = weakref.ref(_ctx)
            _ctx = None
            gc.collect()
            if weakref_ctx() is not None:
                msg = (
                    "Trying to reset UCX but not all Endpoints and/or Listeners "
                    "are closed(). The following objects are still referencing "
                    "ApplicationContext: "
                )
                for o in gc.get_referrers(weakref_ctx()):
                    msg += "\n  %s" % str(o)
>               raise UCXError(msg)
E               ucp._libs.exceptions.UCXError: Trying to reset UCX but not all Endpoints and/or Listeners are closed(). The following objects are still referencing ApplicationContext: 
E                 {'_ep': <ucp._libs.ucx_api.UCXEndpoint object at 0x7fa458abe660>, '_ctx': <ucp.core.ApplicationContext object at 0x7fa5188c1430>, '_send_count': 36, '_recv_count': 9, '_finished_recv_count': 9, '_shutting_down_peer': False, '_close_after_n_recv': None, '_tags': {'msg_send': 9391619810627728242, 'msg_recv': 5969494221044562742, 'ctrl_send': 15099518454196155731, 'ctrl_recv': 5379707559227366159}}

/opt/conda/envs/dask/lib/python3.9/site-packages/ucp/core.py:938: UCXError</pre>distributed.comm.tests.test_ucx.test_simple (from pytest)

Failing for the past 1 build (Since [#2320](https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/2320/CUDA_VER=11.5,LINUX_VER=ubuntu18.04,PYTHON_VER=3.9,RAPIDS_VER=22.06/) )
[Took 3.4 sec.](https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/2320/CUDA_VER=11.5,LINUX_VER=ubuntu18.04,PYTHON_VER=3.9,RAPIDS_VER=22.06/testReport/junit/distributed.comm.tests/test_ucx/test_simple/history)
Error Message
failed on teardown with "ucp._libs.exceptions.UCXError: Trying to reset UCX but not all Endpoints and/or Listeners are closed(). The following objects are still referencing ApplicationContext: 
  {'_ep': <ucp._libs.ucx_api.UCXEndpoint object at 0x7fa458abe660>, '_ctx': <ucp.core.ApplicationContext object at 0x7fa5188c1430>, '_send_count': 36, '_recv_count': 9, '_finished_recv_count': 9, '_shutting_down_peer': False, '_close_after_n_recv': None, '_tags': {'msg_send': 9391619810627728242, 'msg_recv': 5969494221044562742, 'ctrl_send': 15099518454196155731, 'ctrl_recv': 5379707559227366159}}"
Stacktrace
scope = 'function'

    @pytest.fixture()
    def event_loop(scope="function"):
        loop = asyncio.new_event_loop()
        loop.set_exception_handler(handle_exception)
        ucp.reset()
        yield loop
>       ucp.reset()

distributed/comm/tests/test_ucx.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def reset():
        """Resets the UCX library by shutting down all of UCX.
    
        The library is initiated at next API call.
        """
        global _ctx
        if _ctx is not None:
            weakref_ctx = weakref.ref(_ctx)
            _ctx = None
            gc.collect()
            if weakref_ctx() is not None:
                msg = (
                    "Trying to reset UCX but not all Endpoints and/or Listeners "
                    "are closed(). The following objects are still referencing "
                    "ApplicationContext: "
                )
                for o in gc.get_referrers(weakref_ctx()):
                    msg += "\n  %s" % str(o)
>               raise UCXError(msg)
E               ucp._libs.exceptions.UCXError: Trying to reset UCX but not all Endpoints and/or Listeners are closed(). The following objects are still referencing ApplicationContext: 
E                 {'_ep': <ucp._libs.ucx_api.UCXEndpoint object at 0x7fa458abe660>, '_ctx': <ucp.core.ApplicationContext object at 0x7fa5188c1430>, '_send_count': 36, '_recv_count': 9, '_finished_recv_count': 9, '_shutting_down_peer': False, '_close_after_n_recv': None, '_tags': {'msg_send': 9391619810627728242, 'msg_recv': 5969494221044562742, 'ctrl_send': 15099518454196155731, 'ctrl_recv': 5379707559227366159}}

/opt/conda/envs/dask/lib/python3.9/site-packages/ucp/core.py:938: UCXError

@pentschev
Copy link
Member

@fjetter yes, I think they are. Locally I don't see those errors on current main, but do with this branch. Those errors normally show up when we don't clean the endpoint and/or objects in order/at all. I haven't looked at the changes here in detail yet, but do you see these changes to potentially cause any of that?

@fjetter
Copy link
Member Author

fjetter commented Apr 7, 2022

normally show up when we don't clean the endpoint and/or objects in order/at all

What exactly are "endpoints" in this context? and what objects are we talking about?

@pentschev
Copy link
Member

What exactly are "endpoints" in this context? and what objects are we talking about?

Apologies, I should have been more specific. An endpoint is an object that provides a connection to another object of the same type (endpoint) on a remote process, such as client and server, but UCX establishes an Endpoint on each side and they can send/recv data only to the remote Endpoint it is connected to (i.e., an Endpoint can't connect to multiple remote processes). The objects I meant are the actual UCX-Py Endpoint objects. So what it looks like is that the Endpoint is still dangling (possibly due to a close()/abort() call).

await asyncio.gather(*self._futures)

if self.scheduler_comm:
await self.scheduler_comm.close_rpc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter I narrowed the UCX errors down to this line. If I simply replace it with the original await self.scheduler_comm.close(close_workers=True) those errors go away. Could you explain what should be the difference of both in this particular context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From how I read the code, close like

self.scheduler_comm.close(close_workers=True)

can actually never work since there is no RPC handler called close but we'd never know because we suppress all OSErrors.
I replaced it with the proper RPC handler, terminate


If we're not calling self.scheduler_comm.close_rpc we need to rely on the finalizer which is something I'd like to avoid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest change passes locally, if you think that solution is reasonable/appropriate then I'm good with that too, gpuCI tests already passed.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor typo suggestion, but otherwise looks good. Thanks @fjetter for helping me also figure out the issue with UCX and ultimately for fixing it.

distributed/scheduler.py Outdated Show resolved Hide resolved
Co-authored-by: Peter Andreas Entschev <peter@entschev.com>
@fjetter
Copy link
Member Author

fjetter commented Apr 7, 2022

Thanks for your help and review @pentschev !

@github-actions
Copy link
Contributor

github-actions bot commented Apr 8, 2022

Unit Test Results

       18 files  ±0         18 suites  ±0   9h 17m 25s ⏱️ + 12m 50s
  2 724 tests ±0    2 641 ✔️ +1       82 💤 ±0  1  - 1 
24 364 runs  ±0  23 144 ✔️  - 1  1 218 💤 +1  2 ±0 

For more details on these failures, see this check.

Results for commit ca9894a. ± Comparison against base commit 6e30766.

@fjetter
Copy link
Member Author

fjetter commented Apr 8, 2022

Failing tests are due to #6088

@fjetter fjetter merged commit 336b844 into dask:main Apr 8, 2022
@fjetter fjetter deleted the usage_of_plain_rpc branch April 8, 2022 09:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants