Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Client connection pool semaphore attaches to the Client's event loop #3546

Merged
merged 5 commits into from Mar 24, 2020

Conversation

jrbourbeau
Copy link
Member

When creating a Client a ConnectionPool is created when the Client superclass constructor is called here

super(Client, self).__init__(
connection_args=self.connection_args,
io_loop=self.loop,
serializers=serializers,
deserializers=deserializers,
timeout=timeout,
)

Currently, when operating in in synchronous mode, the ConnectionPool semaphore attaches the main thread's event loop instead of the Client's event loop (the same situation as outlined in #3397 (comment)).

When we need to acquire the ConnectionPool semaphore, we end up getting errors about it being attached to the wrong loop. For example, the following example

import asyncio
from distributed import Client

if __name__ == "__main__":

    with Client() as c:
        x = c.sync(asyncio.gather, *[c.scheduler.who_has() for _ in range(1000)])
        print(x)

currently results in a RuntimeError: Future <Future pending> attached to a different loop

Full traceback:
Traceback (most recent call last):
  File "test-sempahore.py", line 8, in <module>
    x = c.sync(asyncio.gather, *[c.scheduler.who_has() for _ in range(1000)])
  File "/Users/jbourbeau/github/Quansight-Labs/distributed/distributed/client.py", line 779, in sync
    return sync(
  File "/Users/jbourbeau/github/Quansight-Labs/distributed/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/Users/jbourbeau/github/Quansight-Labs/distributed/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/Users/jbourbeau/miniconda/envs/distributed-ql/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/jbourbeau/github/Quansight-Labs/distributed/distributed/core.py", line 754, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/Users/jbourbeau/github/Quansight-Labs/distributed/distributed/core.py", line 891, in connect
    await self.semaphore.acquire()
  File "/Users/jbourbeau/miniconda/envs/distributed-ql/lib/python3.8/asyncio/locks.py", line 496, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-548' coro=<PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc() running at /Users/jbourbeau/github/Quansight-Labs/distributed/distributed/core.py:754> cb=[gather.<locals>._done_callback() at /Users/jbourbeau/miniconda/envs/distributed-ql/lib/python3.8/asyncio/tasks.py:751]> got Future <Future pending> attached to a different loop

(Note that I had to run $ ulimit -n 5000 locally in my terminal to increase the number of allowed connections and trigger acquiring the semaphore)

This PR delays the creation of ConnectionPool.semaphore to happen inside an async function so it attaches to the correct event loop (like was done in #3437).

cc @mrocklin

@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2020

In principle this seems fine. The alternative is to create this in the async def start function that tends to be called during startup. If this is possible I would prefer it just because it seems a bit simpler. If it's a pain to do so though then we can probably go ahead with this.

@jrbourbeau
Copy link
Member Author

Thanks for the feedback @mrocklin!

The alternative is to create this in the async def start function that tends to be called during startup

I think I've addressed this in my latest commit, but let me know if I misunderstood and you were referring to a different start method

@mrocklin
Copy link
Member

mrocklin commented Mar 6, 2020

Ah, if this requires moving the entire constructor over then that's probably too disruptive (I think?). I would expect the superclass constructor to be called during construction.

I hadn't realized that the semaphore was being created within the superclass. I think that the clean way to do what I was proposing would be for the superclass to also have an async def _start method, put the semaphore creation in there, and then have subclasses call that during their own start method. Maybe?

@jrbourbeau
Copy link
Member Author

I added Node.start and ConnectionPool.start async def methods. The ConnectionPool.start method is where the semaphore is created and Node.start calls down to the underlying ConnectionPool.start method.

As a note, I initially tried to move the creation of a Node's ConnectionPool inside Node.start, but this didn't work well with existing code that expected a Node.rpc attribute to exist upon creation of a Node.

@jrbourbeau
Copy link
Member Author

@mrocklin when you get a moment (not urgent) could you let me know if the current set of changes is preferred over the original changes (6aeeec0). I ended up making ConnectionPools awaitable which seemed cleaner than calling a seperate .start method

@mrocklin
Copy link
Member

could you let me know if the current set of changes is preferred over the original changes

I have some thoughts, but they're not particularly strong. I'm curious, what do you think we should do here? As you've been doing more and more maintenance work on dask/distributed my guess is that you're developing your own aesthetic sense of how things should be done here.

@jrbourbeau
Copy link
Member Author

I'm happy with the current state of this PR. I like that we're defining asyncio synchronization primitives inside async functions instead of doing a try/except in a normal function. That gives me more confidence that we'll attach to the correct event loop. For instance, with the original try/except approach, if someone called ConnectionPoo._validate the Semaphore could unintentionally attach to the wrong event loop

@mrocklin mrocklin merged commit dd28d08 into dask:master Mar 24, 2020
@mrocklin
Copy link
Member

Grand. Merging!

@jrbourbeau jrbourbeau deleted the fix-semaphore-loop branch March 24, 2020 23:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants