Skip to content

RFC / ENH: Add Semaphore extension#2690

Closed
fjetter wants to merge 1 commit intodask:masterfrom
fjetter:semaphore_extension
Closed

RFC / ENH: Add Semaphore extension#2690
fjetter wants to merge 1 commit intodask:masterfrom
fjetter:semaphore_extension

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented May 13, 2019

This implements a distributed Semaphore. The implementation is inspired by the kazoo Semaphore implementation (c.f. here) with some obvious modifications since we're not using zookeeper here.

The internal data structure of this semaphore is a bit complex since we do not support any notion of an ephemeral node, i.e. a value which expires together with the session. In dask.distributed context, I believe, this translates to the Client. Therefore, the Semaphore tracks which lease stems from which Semaphore client instance and stores its associated Client ID. If the client is lost/closed, the semaphore will release all it's acquired values eventually.

This behavior is quite important for resilience: If a worker is shut down ungracefully I want the leases to be released eventually, otherwise my application may be blocked. In early tests I was using a plain tornado Semaphore without the complex state logic but this didn't work out for us.

Instead of opening an issue I wanted to discuss this over a bit of code:

  • Is an implementation like this to be desired as part of the project or is this out of scope?
  • If we decide to merge this, I would propose to replace the Lock implementation (or rather merge it, Lock ~ Semaphore(max_leases=1)) since the complexity is quite high and maintenance of both is probably not desired

Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fjetter . At this point I've only briefly skimmed things. The test suite at least looks promising and, without going deeply into it, nothing in the implementation raises any warnings.

I wonder if @cicdw or @jcrist would be interested in reviewing this? They have both shown interest in distributed internals, and have some experience with tornado/async code in Dask.

yield sem.acquire()
semY = Semaphore(name="y")

with Client(s.address, asynchronous=True, name="ClientB") as clientB:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected this to have to be async with Client. I'm surprised that the normal context manager would work here. (We have tests for Python 3 syntax in the tests/py3_test_*.py files)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not defining these tests using async def to generate the coroutines but rather tornado.gen.coroutine which probably takes care of the matter somehow. With this definition async with it raises a SyntaxError.
The tests in tests/py3_test_*.py define the proper py3 syntax using async def. Do you think it is worth it/good practice to write these async tests as proper async def coroutines instead of relying on the tornado conversion?

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented May 14, 2019

Test failures on appveyor seem to be unrelated (timeouts for bokeh, profiling, etc.). Is this a known issue?

@mrocklin
Copy link
Copy Markdown
Member

Test failures on appveyor seem to be unrelated (timeouts for bokeh, profiling, etc.). Is this a known issue?

There are currently known intermittent failures, yes. I wouldn't be too worried yet if you see an unexpected test fail.

@mrocklin
Copy link
Copy Markdown
Member

@cicdw I noticed that you 👍 the comment about reviewing. Does that mean that you're interested in reviewing this? No pressure (everyone is busy) just seeing if we should expect something from you (which would be great) or try to find someone else to review.

@cicdw
Copy link
Copy Markdown
Contributor

cicdw commented May 15, 2019

@mrocklin I've given it a once over and will do another pass, but I wouldn't wait for any substantive feedback from me.

Copy link
Copy Markdown
Contributor

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor questions

Comment thread distributed/semaphore.py
self, stream=None, name=None, client=None, timeout=None, max_leases=None
):
if name not in self.leases:
assert isinstance(max_leases, int), max_leases
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is there a reason an assert is preferred to a more explicit ValueError here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably even remove this. I used this a bit for debugging. In we keep it, a ValueError or TypeError would make the most sense

Comment thread distributed/semaphore.py
yield self.release()

def __getstate__(self):
return (self.name, self.client.scheduler.address, self.max_leases)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should id also be serialized here, or do we want Semaphores which have gone through the serialization / deserialization process to be considered different?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR: I think we should not serialize the ID. I should probably add a test for this behavior.

Consider the following:

from dask import delayed
from distributed import Semaphore

def access_database(sem, *args):
    with sem:
        ...
    return data

sem = Semaphore("my_database", max_leases=2)
tasks = []
for ix in range(10):
    tasks.append(
        delated(access_database)(sem, ix)
    )

dask.compute(tasks)

This is how I would probably write a piece of code to download data from a DB. I could rely on dask.distributed to serialize the class for me. Otherwise I would need to pass all initialization parameters to the function and initialize the objects myself to keep everything consistent.
When I use the serialization approach, I still require the semaphore instance for every job to be unique by ID. Otherwise the tracking is off and the mechanism for tracking leases by ID and client would not work.

Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, there are a few nitpicky comments below, but first, some larger thoughts:

  • As I said before, the test suite looks great. Thanks for doing this.
  • I'm starting to use Python3 only code. If you wanted to switch to async/await I'd be fine with that, but either way works (switching to await does have slightly different semantics, so I wouldn't be surprised if some things broke)
  • I am curious if it would make sense to use actual Tornado semaphores in the scheduler extension rather than events, leases, and locks. I'm curious how miuch of the tricky logic here we can hand off to a single-machine asynchronous semaphore. Every Dask Semaphore would create a Tornado semaphore in the Scheduler. When clients acquired or released the Dask Semaphore they would ask the Semaphore in the Schdeduler to acquire/release as well. We would handle all timeout/collision logic to the single-machine-semaphore. Dealing with clients going away unexpectedly will still be an issue, but it might not be too bad. To me this seems attractive from a maintenance perspective, but I suspect that I am missing some important details here.

Comment thread distributed/semaphore.py

@gen.coroutine
def create(
self, stream=None, name=None, client=None, timeout=None, max_leases=None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend renaming stream= to comm=. This switch happened a while ago but apparently not all code was updated.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually a little surprised that this worked. We must be using a positional argument somewhere.

Comment thread distributed/semaphore.py
# We should make sure that the client is already properly registered with the scheduler
# otherwise the lease validation will mop up every acquired release immediately
while client not in self.scheduler.clients:
yield
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
yield
yield gen.sleep(0.100)

Otherwise this stresses out the event loop.

Comment thread distributed/semaphore.py
# otherwise the lease validation will mop up every acquired release immediately
while client not in self.scheduler.clients:
yield
with (yield self.locks[name].acquire()):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an async lock around this block? There are no yield points within it, so I wouldn't expect there to be any chance for another coroutine to happen at the same time.

Comment thread distributed/semaphore.py

@gen.coroutine
def _release_value(self, name, client, identifier):
with (yield self.locks[name].acquire()):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that these locks aren't necessary (I don't think they're ever used around code with yields) then my guess is that a bit of this can be simplified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, works without locks. Will remove them again.

Comment thread distributed/semaphore.py
scheduler_clients = set(self.scheduler.clients.keys())
for client in known_clients - scheduler_clients:
client_has_leases = sum(
valmap(len, self.leases_per_client[client]).values()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valmap(func, d).values() -> map(func, d.values()) ?

Comment thread distributed/semaphore.py
def __init__(self, max_leases=1, name=None, client=None):
self.client = client or _get_global_client() or get_worker().client
self.id = uuid.uuid4().hex
self.name = name or "semaphore-" + uuid.uuid4().hex
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse id here?

@mrocklin
Copy link
Copy Markdown
Member

Checking in @fjetter , any thoughts on my comments above?

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented May 20, 2019

  • I tried rewriting the code using async def instead of @gen.coroutine but am hitting an issue where something is complaining that coroutines are not serializable. Haven't tracked it further down, yet
  • Using tornado semaphores was my first prototype but the issue about lost connections is quite important, I believe. Especially in autoscaling environments or volatile environments where a worker might die unexpectedly, I consider this extremely important. An alternative to the tracking and releasing upon loosing the client, I could imagine a maximum lease time to be a viable option. To track an acquisition and expire it after a certain amount of time is probably equally complex and probably less powerful.
  • I encountered another issue which I need to track down. In our environment I receive connection timeouts when semaphores cannot be acquired. Haven't understood this issue yet, but my first guess would be that if semaphore_timeout < connect_timeout we get a connection timeout (worker dies, computation is canceled). I'll try to catch this with another test to be sure I'm not on the wrong track here.

Due to the complexity of this implementation, I was asking if this is considered to be in-scope of dask.distributed.
IMHO, a distributed semaphore needs to be able to deal with worker loss to be useful, at least for production-level services.

@mrocklin
Copy link
Copy Markdown
Member

I tried rewriting the code using async def instead of @gen.coroutine but am hitting an issue where something is complaining that coroutines are not serializable. Haven't tracked it further down, yet

It's totally fine to leave this with @gen.coroutine. There's no need to switch if you don't want to.

Using tornado semaphores was my first prototype but the issue about lost connections is quite important, I believe. Especially in autoscaling environments or volatile environments where a worker might die unexpectedly, I consider this extremely important. An alternative to the tracking and releasing upon loosing the client, I could imagine a maximum lease time to be a viable option. To track an acquisition and expire it after a certain amount of time is probably equally complex and probably less powerful.

OK. Mostly I wanted to verify that we weren't missing an obvious simplification here. If you've already tried this and have some confidence that it's not the right approach then I'm happy to drop it.

Due to the complexity of this implementation, I was asking if this is considered to be in-scope of dask.distributed.

Yes, this seems in-scope to me. I will be glad to have it around. Several other groups have asked for similar functionality.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented May 21, 2019

I'll have another look at the implementation and the issues I mentioned in the following days.

@mrocklin what's your stance towards replacing the Lock implementation with a single valued semaphore? The lock implementation also looks quite complex and uses a similar logic using tornado events and I'd argue that maintaining both implementations isn't necessary. On the contrary, the lock implementation would also benefit from introduced features like "release locks on client loss" or similar.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented May 21, 2019 via email

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 5, 2020

FYI: We're currently taking another look at this. Essentially we intend to migrate the code to asyncio and cover a few more edge cases which came up in an internal review. Will ping again if there is progress.

@lr4d lr4d mentioned this pull request Mar 13, 2020
@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 30, 2020

Closing this now in favor of #3573

@fjetter fjetter closed this Mar 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants