Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions distributed/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ def acquire(self, stream=None, name=None, id=None, timeout=None):
future = gen.with_timeout(timedelta(seconds=timeout), future)
try:
yield future
except gen.TimeoutError:
result = False
else:
result = True
finally:
event2 = self.events[name].popleft()
assert event is event2
assert name not in self.ids
self.ids[name] = id
else:
result = True
if result:
assert name not in self.ids
self.ids[name] = id
raise gen.Return(result)

def release(self, stream=None, name=None, id=None):
with log_errors():
Expand Down Expand Up @@ -97,6 +105,10 @@ def acquire(self, timeout=None):
--------
>>> lock = Lock('x') # doctest: +SKIP
>>> lock.acquire(timeout=1) # doctest: +SKIP

Returns
-------
True or False whether or not it sucessfully acquired the lock
"""
result = self.client.sync(self.client.scheduler.lock_acquire,
name=self.name, id=self.id, timeout=timeout)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/py3_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def f():
assert c.asynchronous == True
async with Lock('x'):
lock2 = Lock('x')
with pytest.raises(gen.TimeoutError):
await lock2.acquire(timeout=0.1)
result = await lock2.acquire(timeout=0.1)
assert result is False

loop.run_sync(f)
14 changes: 9 additions & 5 deletions distributed/tests/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@ def f(x):
def test_timeout(c, s, a, b):
locks = s.extensions['locks']
lock = Lock('x')
yield lock.acquire()
result = yield lock.acquire()
assert result is True
assert locks.ids['x'] == lock.id

lock2 = Lock('x')
assert lock.id != lock2.id

start = time()
with pytest.raises(gen.TimeoutError):
yield lock2.acquire(timeout=0.1)
result = yield lock2.acquire(timeout=0.1)
stop = time()
assert stop - start < 0.3
assert result is False
assert locks.ids['x'] == lock.id
assert not locks.events['x']

yield lock.release()

Expand All @@ -64,8 +69,7 @@ def test_timeout_sync(loop):
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c:
with Lock('x') as lock:
with pytest.raises(gen.TimeoutError):
Lock('x').acquire(timeout=0.1)
assert Lock('x').acquire(timeout=0.1) is False


@gen_cluster(client=True)
Expand Down