From ce87313be1bf3d723a4a317e75ae806263a8f52f Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 22 Aug 2022 18:48:31 +0100 Subject: [PATCH] Optimise scheduler.get_comm_cost set difference Computing set(A).difference(B) is O(max(len(A), len(B))). When estimating the communication cost of a task's dependencies it is usual that the number of dependencies (A) will be small but the number of tasks the worker has (B) is large. In this case it is better to manually construct the set difference by iterating over A and checking if each element is in B. Performing a left.merge(right, on="key", how="inner) of a distributed dataframe with eight workers with chunks_per_worker * rows_per_chunk held constant, I observe the following timings using the tcp communication protocol: | chunks_per_worker | rows_per_chunk | before | after | |-------------------|----------------|--------|-------| | 100 | 50000 | 75s | 48s | | 10 | 500000 | ~9s | ~9s | | 1 | 5000000 | ~8s | ~8s | --- distributed/scheduler.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e38447c70b..cb73a0d0dc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2621,7 +2621,17 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> float: on the given worker. """ dts: TaskState - deps: set = ts.dependencies.difference(ws.has_what) + deps: set + if 10 * len(ts.dependencies) < len(ws.has_what): + # In the common case where the number of dependencies is + # much less than the number of tasks that we have, + # construct the set of deps that require communication in + # O(len(dependencies)) rather than O(len(has_what)) time. + # Factor of 10 is a guess at the overhead of explicit + # iteration as opposed to just calling set.difference + deps = {dep for dep in ts.dependencies if dep not in ws.has_what} + else: + deps = ts.dependencies.difference(ws.has_what) nbytes: int = 0 for dts in deps: nbytes += dts.nbytes