Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework some tests related to gather_dep #6472

Merged
merged 7 commits into from Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions distributed/diagnostics/tests/test_eventstream.py
Expand Up @@ -30,14 +30,13 @@ async def test_eventstream(c, s, *workers):
name: collections.deque(maxlen=100)
for name in "start duration key name color worker worker_thread y alpha".split()
}
workers = dict()
workers = {}
for msg in es.buffer:
task_stream_append(lists, msg, workers)

assert len([n for n in lists["name"] if n.startswith("transfer")]) == 2
assert sum(n == "transfer-sum" for n in lists["name"]) == 2
for name, color in zip(lists["name"], lists["color"]):
if name == "transfer":
assert color == "red"
assert (name == "transfer-sum") == (color == "red")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tightened tested conditions


assert any(c == "black" for c in lists["color"])

Expand Down
28 changes: 17 additions & 11 deletions distributed/tests/test_worker.py
Expand Up @@ -745,17 +745,18 @@ async def test_clean_nbytes(c, s, a, b):


@pytest.mark.parametrize("as_deps", [True, False])
@gen_cluster(client=True, nthreads=[("", 1)] * 20)
async def test_gather_many_small(c, s, a, *workers, as_deps):
"""If the dependencies of a given task are very small, do not limit the
number of concurrent outgoing connections
@gen_cluster(client=True, nthreads=[("", 1)] * 21)
async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
"""If the dependencies of a given task are very small, do not limit the number of
concurrent outgoing connections. If multiple small fetches from the same worker are
scheduled all at once, they will result in a single call to gather_dep.
"""
a.total_out_connections = 2
futures = await c.scatter(
{f"x{i}": i for i in range(100)},
workers=[w.address for w in workers],
workers=[w.address for w in snd_workers],
)
assert all(w.data for w in workers)
assert all(w.data for w in snd_workers)

if as_deps:
future = c.submit(lambda _: None, futures, key="y", workers=[a.address])
Expand All @@ -765,13 +766,18 @@ async def test_gather_many_small(c, s, a, *workers, as_deps):
while len(a.data) < 100:
await asyncio.sleep(0.01)

types = list(pluck(0, a.log))
req = [i for i, t in enumerate(types) if t == "request-dep"]
recv = [i for i, t in enumerate(types) if t == "receive-dep"]
assert len(req) == len(recv) == 19
assert min(recv) > max(req)
assert a.comm_nbytes == 0

story = a.story("request-dep", "receive-dep")
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
# All GatherDep instructions are fired at the same time; each fetches all keys
# available on the sender worker
for ev in story[:20]:
assert ev[0] == "request-dep"
assert len(ev[2]) > 1
for ev in story[20:]:
assert ev[0] == "receive-dep"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tighten tested condition + clarifications



@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_multiple_transfers(c, s, w1, w2, w3):
Expand Down