From 4c67b0bb1bc8e30a20f04494cbde9e9646643cb0 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 21:05:18 -0600 Subject: [PATCH 1/7] 10ms penalty per transfer I'd like to incorporate measured latency somehow too instead of a magic 10ms, but it's a start. --- distributed/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 04496b26e0..f9b1d32ad4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3416,13 +3416,15 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: dts: TaskState nbytes: Py_ssize_t comm_bytes: Py_ssize_t = 0 + xfers: Py_ssize_t = 0 for dts in ts._dependencies: if ws not in dts._who_has: nbytes = dts.get_nbytes() comm_bytes += nbytes + xfers += 1 stack_time: double = ws._occupancy / ws._nthreads - start_time: double = stack_time + comm_bytes / self._bandwidth + start_time: double = stack_time + comm_bytes / self._bandwidth + xfers * 0.01 if ts._actor: return (len(ws._actors), start_time, ws._nbytes) From b4ebbeeb82328c2eff04119516fb488a8a6fd822 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 21:17:35 -0600 Subject: [PATCH 2/7] Amortize transfer cost As discussed in https://github.com/dask/distributed/pull/5325. The idea is that if a key we need has many dependents, we should amortize the cost of transferring it to a new worker, since those other dependencies could then run on the new worker more cheaply. "We'll probably have to move this at some point anyway, might as well do it now." This isn't actually intended to encourage transfers though. It's more meant to discourage transferring keys that could have just stayed in one place. The goal is that if A and B are on different workers, and we're the only task that will ever need A, but plenty of other tasks will need B, we should schedule alongside A even if B is a bit larger to move. But this is all a theory and needs some tests. --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f9b1d32ad4..00cc3d0b9c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3415,12 +3415,13 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: """ dts: TaskState nbytes: Py_ssize_t - comm_bytes: Py_ssize_t = 0 + comm_bytes: double = 0 xfers: Py_ssize_t = 0 for dts in ts._dependencies: if ws not in dts._who_has: nbytes = dts.get_nbytes() - comm_bytes += nbytes + # amortize transfer cost over all waiters + comm_bytes += nbytes / len(dts._waiters) xfers += 1 stack_time: double = ws._occupancy / ws._nthreads From cbc145a797881c9e1fda5548722d7cc9e6646fa4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 17 Sep 2021 13:34:05 -0600 Subject: [PATCH 3/7] Add fixed transfer cost in `get_comm_cost` as well --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 00cc3d0b9c..1c810f4ad4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3304,7 +3304,8 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> double: nbytes: Py_ssize_t = 0 for dts in deps: nbytes += dts._nbytes - return nbytes / self._bandwidth + # Add a fixed 10ms penalty per transfer. See distributed#5324 + return nbytes / self._bandwidth + 0.01 * len(deps) @ccall def get_task_duration(self, ts: TaskState, default: double = -1) -> double: From a6c23158f72c5a538a8a2167762220ce8b1c628c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 4 May 2022 20:40:15 +0100 Subject: [PATCH 4/7] Update distributed/scheduler.py --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 304cd5803e..e093c04223 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2808,8 +2808,8 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: comm_bytes += nbytes / len(dts.waiters) xfers += 1 - stack_time: double = ws.occupancy / ws.nthreads - start_time: double = stack_time + comm_bytes / self.bandwidth + xfers * 0.01 + stack_time = ws.occupancy / ws.nthreads + start_time = stack_time + comm_bytes / self.bandwidth + xfers * 0.01 if ts.actor: return (len(ws.actors), start_time, ws.nbytes) From 5f181518e81736e659702549857321993ee2dc77 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 4 May 2022 20:42:33 +0100 Subject: [PATCH 5/7] Update distributed/scheduler.py --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e093c04223..a557aa419c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2809,6 +2809,7 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: xfers += 1 stack_time = ws.occupancy / ws.nthreads + # Add a fixed 10ms penalty per transfer. See distributed#5324 start_time = stack_time + comm_bytes / self.bandwidth + xfers * 0.01 if ts.actor: From 01a0e8c2b0e979ddb363622306f8794ff09666f8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 4 May 2022 20:45:20 +0100 Subject: [PATCH 6/7] Update distributed/scheduler.py --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a557aa419c..d511473cc4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2800,7 +2800,7 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: """ dts: TaskState comm_bytes: int = 0 - xfers: int = 0 + xfers = 0 for dts in ts.dependencies: if ws not in dts.who_has: nbytes = dts.get_nbytes() From e3d62f66f9b84eff5c97f3288c9651531a03303c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 4 May 2022 22:29:45 +0100 Subject: [PATCH 7/7] Update distributed/scheduler.py --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d511473cc4..873a41be56 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2688,7 +2688,7 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> float: for dts in deps: nbytes += dts.nbytes # Add a fixed 10ms penalty per transfer. See distributed#5324 - return nbytes / self._bandwidth + 0.01 * len(deps) + return nbytes / self.bandwidth + 0.01 * len(deps) def get_task_duration(self, ts: TaskState) -> float: """Get the estimated computation cost of the given task (not including