diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index 69ab771450..02565f9027 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -15,6 +15,7 @@ class GraphLayout(SchedulerPlugin): def __init__(self, scheduler): self.x = {} self.y = {} + self.collision = {} self.scheduler = scheduler self.index = {} self.index_edge = {} @@ -62,6 +63,14 @@ def update_graph(self, scheduler, dependencies=None, priority=None, y = self.next_y self.next_y += 1 + if (x, y) in self.collision: + old_x, old_y = x, y + x, y = self.collision[(x, y)] + y += 0.1 + self.collision[old_x, old_y] = (x, y) + else: + self.collision[(x, y)] = (x, y) + self.x[key] = x self.y[key] = y self.index[key] = self.next_index @@ -85,6 +94,11 @@ def transition(self, key, start, finish, *args, **kwargs): for dep in task.dependencies: self.visible_edge_updates.append((self.index_edge.pop((dep.key, key)), 'False')) + try: + del self.collision[(self.x[key], self.y[key])] + except KeyError: + pass + for collection in [self.x, self.y, self.index]: del collection[key] diff --git a/distributed/diagnostics/tests/test_graph_layout.py b/distributed/diagnostics/tests/test_graph_layout.py index f48168ee3f..63ecb0c700 100644 --- a/distributed/diagnostics/tests/test_graph_layout.py +++ b/distributed/diagnostics/tests/test_graph_layout.py @@ -1,3 +1,5 @@ +import operator + from distributed.utils_test import gen_cluster, inc from distributed.diagnostics import GraphLayout from distributed import wait @@ -79,3 +81,16 @@ def test_forget(c, s, a, b): assert not gl.y assert not gl.index assert not gl.index_edge + assert not gl.collision + + +@gen_cluster(client=True) +def test_unique_positions(c, s, a, b): + gl = GraphLayout(s) + + x = c.submit(inc, 1) + ys = [c.submit(operator.add, x, i) for i in range(5)] + yield wait(ys) + + y_positions = [(gl.x[k], gl.y[k]) for k in gl.x] + assert len(y_positions) == len(set(y_positions))