Skip to content

Commit

Permalink
test for #2420
Browse files Browse the repository at this point in the history
  • Loading branch information
tjb900 committed Jan 3, 2019
1 parent 9195971 commit eb2a66b
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions distributed/tests/test_worker.py
Expand Up @@ -1271,3 +1271,53 @@ def test_startup2():
# Final exception test
with pytest.raises(ZeroDivisionError):
yield c.register_worker_callbacks(setup=lambda: 1 / 0)


@gen_cluster(client=True)
def test_worker_keeps_data_gh2420(c, s, a, b):
# Make a piece of data which is slow to serialize, to trigger the race
# Put data X on worker A
# Give task Y to worker B so that it fetches X from A
# Wait until the request has started
# Cancel the task on B
# Wait until the transfer is done
# Ensure that the scheduler's who_has for X is accurate
class SlowSerializeObj(object):
"""An object which is slow to serialize and deserialize."""
def __getstate__(self):
import time
time.sleep(1)
return {}

def __setstate__(self, d):
import time
time.sleep(1)

x = c.submit(SlowSerializeObj, workers=a.address)
yield wait(x)
assert not a.executing
assert x.key in a.data

def bogus_task(bogus_input):
import time
time.sleep(1)
return 2

y = c.submit(bogus_task, x, workers=b.address)

# This is so that y has dependents, and is released rather than
# forgotten (you would think that might be the same...)
z = c.submit(bogus_task, y, workers=b.address)
yield c._cancel(y)

yield gen.sleep(5)
try:
yield wait(y)
except:
pass

# for entry in b.log:
# print("b log: %s" % str(entry))

assert (x.key in a.data) == (a.address in s.who_has[x.key])
assert (x.key in b.data) == (b.address in s.who_has[x.key])

0 comments on commit eb2a66b

Please sign in to comment.