Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternatives for current ensure_communicating #6497

Closed
fjetter opened this issue Jun 3, 2022 · 12 comments · Fixed by #6462, #6587, #6623 or dask/dask-benchmarks#50
Closed

Alternatives for current ensure_communicating #6497

fjetter opened this issue Jun 3, 2022 · 12 comments · Fixed by #6462, #6587, #6623 or dask/dask-benchmarks#50
Assignees
Labels
discussion Discussing a topic with no specific actions yet

Comments

@fjetter
Copy link
Member

fjetter commented Jun 3, 2022

We've recently been refactoring the worker state machine (#5736)

The new system works great for single-task-based transitions but feels awkward as soon as we're trying to treat groups or batches of tasks similarly which is particularly important for fetching remote data. We are trying to not perform a network request per key but rather try to perform a request per worker and try to get as many keys of it as reasonable.

This mechanism worked so far by running Worker.(_)ensure_communicating frequently which iterates over all tasks and schedules a fetch/gather_dep (pseudo code below)

def ensure_communicating(...):
    skip = []
    while self.data_needed and free_slots_for_fetch:
        ts = self.data_needed.pop() # highest priority task
        worker = pick_good_worker(ts) # not busy, maybe localhost, etc.
        if not worker:
            skip.add(ts)
        more_keys_on_worker = select_keys_for_gather(worker)
        schedule_gather_dep(ts, more_keys_on_worker, worker) # This schedules a coroutine and actually fetches the data
    self.data_needed.update(skip) # data needed is a heap so this is non-linear

This has a couple of problems

  1. It's unclear when to run this function. Past iterations just ran this frequently (on every incoming request and in various code places where we guess there may be something that needs transitioning
  2. We're iterating over all keys, all the time. This is bad for performance although not critical on the worker Remove EnsureCommunicatingAfterTransitions #6462 (comment)
  3. If we're calling this too soon, e.g. within a released->fetch transition, we may get poor scheduling decisions by fetching tasks prematurely since we didn't work off all fetch recommendations before calling this. For this reason, the EnsureCommunicatingAfterTransitions instruction was introduced (see also Remove EnsureCommunicatingAfterTransitions #6462) to mimic earlier behavior

Proposal

I think we can get rid of Worker.data_needed entirely. Since we're storing everything in heaps, by now, Worker.data_needed_per_worker carries everything we need if we change how ensure_communicating works which might have other interesting side effects.

To illustrate this, consider the following two helper functions

def get_most_important_worker_to_fetch_from(self):
    """
    Get the remote worker that holds the _most important_ data we should fetch. This is typically inferred by looking at task priorities.
    This could also be used to modify what we consider most important worker to fetch from, e.g. due to load/business, memory pressure, etc.
    """

    # Linear in num_workers
    res = None
    prio = None
    for w, heap in self.data_needed_per_worker.items():
        nprio = heap.peek().priority
        if w not in self.busy_workers and (res is None or nprio > prio):
            res = w
            prio = nprio
    return res

def get_tasks_for_worker(self, w, limit=42):
    """Get a list of tasks on worker `w` such that combined they have at most `limit` estimated bytes"""
    # This is more or less the current `Worker._select_keys_for_gather`
    # Linear in number of tasks in local state fetch on remote worker
    res = []
    size = 0
    for ts in self.data_needed_per_worker[w]:
        size += ts.get_nbytes()
        res.append(ts)
        if size > limit:
            break
    return res

With these two helpers we should be able to construct a new, simplified version if we switch our loop from tasks to workers

def _new_ensure_communicate_with_tasks_by_worker():
    if network_limits_not_reached and worker := self.get_most_important_worker_to_fetch_from():
        tasks_to_fetch = self.get_tasks_for_worker(worker)
        instruction = GatherDep(
            worker,
            tasks_to_fetch,
        )

or even change the GatherDep instruction to worker-only and delay the decision of what tasks to fetch until this instruction is acted on, which typically only happens after all transitions in Worker.transitions are worked off

def _new_ensure_communicate_with_tasks_by_worker():
    if network_limits_not_reached and worker := self.get_most_important_worker_to_fetch_from():
        instruction = GatherDep(worker)

def gather_dep(self, worker):
    tasks_to_fetch = self.get_tasks_for_worker(worker)
    ...

cc @crusaderky

@crusaderky
Copy link
Collaborator

crusaderky commented Jun 6, 2022

While I'm warm to deduplicate data_needed vs. data_needed_per_worker, I don't see how any of this removes the problem about when to trigger _new_ensure_communicate_with_tasks_by_worker?

My suggestion:

Remove data_needed_per_worker entirely, and remain only with data_needed.
in_flight_workers becomes {worker: GatherDep} where the GatherDep instruction contains all tasks to be fetched from that worker.
Every iteration of _ensure_communicating fetches the topmost task from data_needed and adds it to the matching GatherDep for that worker. Then, it adds the GatherDep to instructions (possibly the same object multiple times).
_ensure_communicating stops iterating when any one of the following happens:

  1. there are no tasks left in data_needed
  2. the total nbytes in flight across all workers exceeds a global threshold
    self.comm_nbytes >= self.comm_threshold_bytes
  3. the number of saturated pending GatherDep commands exceeds a threshold
    sum(gd.nbytes >= self.target_message_size for gd in self.in_flight_workers.values()) >= self.total_out_connections

Individual GatherDep instructions stop being enriched when their individual nbytes exceed an individual threshold
self.in_flight_workers[worker].nbytes >= self.target_message_size; when that happens, tasks are moved to skipped_worker_in_flight_or_busy (read below).

At the top of _handle_instructions, there's a simple object deduplication:

instructions = list({id(instr): instr for instr in instructions}.values())

_ensure_communicating is still called as soon as possible, e.g. at every transition to fetch. However, as a performance tweak,
skipped_worker_in_flight_or_busy becomes an instance variable - a set which must lose keys every time one would otherwise remove keys from data_needed. Instead of being flushed back into data_needed at the end of each iteration of _ensure_communicating, it is partially flushed

  1. every time a worker exits flight or busy state
  2. every time update_who_has adds a new worker to a task in fetch state

flushing skipped_worker_in_flight_or_busy back into data_needed is O(n) to len(skipped_worker_in_flight_or_busy) and O(n*logn) to the number of tasks being added back in, but the key is that it happens a lot less frequently than now.

This closes #6462 (comment) for both the case of multiple fetches from the same event as well as for the case of multiple fetches from multiple commands in short succession, and will cause calling _ensure_communicating twice in a row to be computationally negligible.

@crusaderky
Copy link
Collaborator

The big difference between my proposal above and both the current system and the proposal to remove data_needed in the op is that my proposal respects global priority. This means that you'll end up with more, individually smaller, parallel tranfers from multiple workers as you'll be more likely to hit self.comm_threshold_bytes than self.target_message_size.

e.g.
each key is nbytes=1
there are 99999 keys to fetch, uniformly distributed across 3 workers a, b, and c, with the same priority, and iteratively inserted in data_needed (a b c a b c a b c...)
self.comm_threshold_bytes=90
self.target_message_size=50

current system will fetch 50 keys from worker a and 40 keys from worker b
my proposal would fetch 30 keys from worker a, 30 from worker b, and 30 from worker c

@crusaderky crusaderky self-assigned this Jun 9, 2022
@fjetter
Copy link
Member Author

fjetter commented Jun 13, 2022

While I'm warm to deduplicate data_needed vs. data_needed_per_worker, I don't see how any of this removes the problem about when to trigger _new_ensure_communicate_with_tasks_by_worker?

The proposal that includes an instruction GatherDep(worker: str) w/out explicit tasks would delay the collection of tasks until the instruction is executed. This collection would be trivially achieved by re-using the data_per_worker (heap)set
Assuming that we perform all transitions before we execute any instructions, this would resolve our problem, wouldn't it?

@fjetter fjetter added the discussion Discussing a topic with no specific actions yet label Jun 13, 2022
@fjetter
Copy link
Member Author

fjetter commented Jun 13, 2022

ensure_communicating would effectively be reduced to simply inferring whether or not we need to communicate and not what to communicate.

@crusaderky
Copy link
Collaborator

crusaderky commented Jun 13, 2022

The proposal that includes an instruction GatherDep(worker: str) w/out explicit tasks would delay the collection of tasks until the instruction is executed.
ensure_communicating would effectively be reduced to simply inferring whether or not we need to communicate and not what to communicate.

I see this as a major antipattern to the current worker state machine refactoring, as it would require the whole contents of _select_keys_for_gather to be moved out of WorkerState and into Worker.gather_dep. This in turn would require a new hack StartedGatheringEvent to allow for the transition from fetch to flight.
The check for self.comm_nbytes < self.comm_threshold_bytes would also need to move there too.

@fjetter
Copy link
Member Author

fjetter commented Jun 13, 2022

I see this as a major antipattern to the current worker state machine refactoring, as it would require the whole contents of _select_keys_for_gather to be moved out of WorkerState and into Worker.gather_dep. This in turn would require a new hack StartedGatheringEvent to allow for the transition from fetch to flight.
The check for self.comm_nbytes < self.comm_threshold_bytes would also need to move there too.

Good point.


Taking a step back, I'm wondering how we'd approach this from a TDD perspective.

  1. It should only involve WorkerState and not Worker(Server)
  2. The event log should have roughly the following two items (modulo unrelevant kwargs)

This could be roughly written up as something like the following

assert ComputeTaskEvent(key='f1', who_has={"f2": {"workerB"}, "f3", {"workerB"}, ...) in state.event_log
...
assert GatherDep(worker="workerB", to_gather={"f2", "f3"}) in state.instructions_log

and also

assert AcquireReplicasEvent(who_has={"f2": {"workerB"}, "f3", {"workerB"}, ...) in state.event_log

assert GatherDep(worker="workerB", to_gather={"f2", "f3"}) in state.instructions_log

correct?

I'm wondering if we are already able to write such a test down, after #6566


About the actual problem, I'm wondering if the order of transitioning cannot be used to influence this. I think right now the above requests would play out like (depth first)

  • f1(released)
  • f1(fetch)
  • f1(flight)
  • f2(released)
  • f2(fetch)
  • f2(flight)

Would it help if we instead performed a breadth-first transitioning?

  • f1(released)
  • f2(released)
  • f1(fetch)
  • f2(fetch)
  • f1(flight)
  • f2(flight)

@crusaderky
Copy link
Collaborator

Breadth-first would help. Not necessary with my design above though - could you comment on it?

For testing, I envision something like

instructions = state.handle_stimulus(AcquireReplicasEvent(...))
assert instructions == [
    GatherDep(...),
    GatherDep(...),
    GatherDep(...),
]
assert state.tasks["x"].state == "flight"
assert state.tasks["y"].state == "fetch"

@fjetter
Copy link
Member Author

fjetter commented Jun 14, 2022

could you comment on it?

sure, I'll need to give it another pass

For testing, I envision something like

I would actually prefer to leave the task states out of this for this case. I think the events and instructions should be enough and would make for great high level tests.

@fjetter
Copy link
Member Author

fjetter commented Jun 14, 2022

IIUC your algorithm would make poor decisions in the following case

Below shows data needed as a sorted list in a simplified format. Omitting key, priority etc. merely nbytes and who_has

Worker.comm_threshold_bytes = 100
[
    # nbytes, who_has
    (1, {"A"}),  # -> new GatherDep for A
    (1000, {"C"}),  # -> new GatherDep for C
    # <--- Break since sum(AllGatherDepNbytes) > Worker.comm_thresholds_bytes
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
]

I think fetching all tasks from A and then fetching the task from C is better.

Your proposal about saturated GatherDep requests is interesting. For the sake of completeness, if we simply limit by number of outgoing connections, we'd be exposed to a similar situation as above with the following queue

Worker.total_out_connections = 3
[
    # nbytes, who_has
    (1, {"A"}),  # -> new GatherDep for A
    (1, {"B"}),  # -> new GatherDep for B
    (1, {"C"}),  # -> new GatherDep for C
    # <--- Break since count(outgoing_requests) >= Worker.total_out_connections
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
    (1, {"A"}),
]

Only filtering by saturated requests would not have this problem. However, I don't feel great about this proposal since it would effectively reduce the total_out_connections (or it's follow up) to a mere suggestion/recommendation and not a limit which lets me wonder why we'd even want to introduce this limit in the first place. I expect our server infrastructure to be capable of handling many concurrent connections and busyness is controlled on the remote / incoming request (Worker.get_data) end. The only reason why I'd want to throttle our data fetching is to protect CPU and memory consumption and both mostly scales with size_of_bytes (i.e. de-/serialization overhead), doesn't it?

It is interesting since it asks the question about what this local/per-request limit should actually be. Do we even need a per-request limit / target_message_size? Every message is already naturally capped by comm_thresholds_bytes, i.e. individual requests are always smaller than this, anyhow. If the local worker can handle it, why can't the remote? The remote might need to handle many (total_in_connections) of these requests and we don't want it to die. Maybe the remote busyness should just work by inspecting size_of_bytes_requested and we'd drop the local limiting entirely. Would that simplify things?


Zooming out a bit, there are a couple of questions that connect our two proposals which I believe can be discussed mostly separately

  1. How do we iterate over the set of tasks and workers

1.A) Maintain data_needed_per_worker

to_fetch = defaultdict(list)
leftover_data_per_worker = {}
while data_per_worker and not above_global_size_threshold():
    worker, tasks = pick_global_best_worker(data_per_worker)
    if worker is None:  # E.g. all in-flight. Handled by pick_global_best_worker
        break
    while tasks:
        ts = tasks.pop()
        if above_local_threshold(worker, ts):
            leftover_data_per_worker[worker] = tasks
            break
        to_fetch[worker].append(ts)

self.data_per_worker.update(leftover_data_per_worker)

1B) Keep data_needed

to_fetch = defaultdict(list)
self.skipped_worker_in_flight_or_busy = []
while data_needed and not above_global_size_threshold():
    ts = data_needed.pop()

    worker = pick_best_worker_for_task(ts)
    if above_local_threshold(worker, ts):
        self.skipped_worker_in_flight_or_busy.append(ts)
        continue
    to_fetch[worker].append(ts)

# TODO: IIUC your proposal would envision having skipped_worker_in_flight_or_busy to be consolidated elsewhere??

We could engage in a thorough runtime analysis and try to estimate which is faster. My gut feeling tells me mine should fare better even with the amortized cost of maintaining data_needed_per_worker (which we're currently paying already). I think this shouldn't matter in practice since on the worker the number of tasks and the number of remote workers should be relatively small. However, you are suggesting to introduce skipped_worker_in_flight_or_busy as a performance optimization which worries me a bit.
The big difference I see is that we have different opportunities to optimize the control flow via pick_global_best_worker vs pick_best_worker_for_task. My gut tells me 1A is more powerful to eventually include load, busyness, etc. in this decision. I am also a bit worried about the additional complexity with skipped_worker_in_flight_or_busy becoming an instance variable. Nonlocal state is scary and prone to errors.

  1. What kind of limits do we want to impose

We may not need local limits. Not sure. I see room for this in both proposals and we can kick this down the road. I just had the feeling it was more important in your proposal than in mine because you talked about skipped_worker_in_flight_or_busy being an instance variable. I had the impression that if we dropped request-specific limiting, this would not be necessary

  1. How do we propagate this information from the transition system back to the instruction w/out moving transition logic to the server class

I think this is the big one. I see a couple of options

3A) Mutate things in place like what you described. I think there are nuances but overall this would amount to either mutating issued instructions or mutating some state the instruction references (e.g. instruction points to a dict/set on the WorkerState we keep updating). Honestly, this feels like cheating and equally opposed to the new design as letting the server perform transition. This proposal would harm replayability / lineage and I would like to avoid this

3B) Delay ensure_communicating artificially after all. We could just call _ensure_communicating every time after self._transitions in handle_stimulus and not worry about this problem anymore. I think this would still be better than before when we called it after every stream comm and occasionally whenever we felt the need for it.
We might still want to refactor some of the loop logic to clean up the batching logic / _select_keys_for_gather but that is less urgent.

3C) Use a breadth first transition. this might benefit us in other areas as well since it would allow us to consider priorities more strictly. FWIW, I encountered an ordering issue about transitions before on scheduler side, see #6248 (comment)

3D) Perform some aggregation before _handle_instructions, i.e.

merge_instructions([
    GatherDepDecision(worker="A", key="foo"),
    GatherDepDecision(worker="A", key="bar"),
    GatherDepDecision(worker="B", key="baz"),
])
>> {GatherDep(worker="A", keys={"foo", "bar"}), GatherDep(worker="B", keys={"baz"})

Of course, these three questions are related but I think we can make any combination work with appropriate adjustments

Conclusion

  • I don't have a strong preference about which way we iterate (worker vs tasks). I slightly favor per-worker because I think we have more room for improvement there. If we need a performance optimization for per-tasks just to make it work, I would again pick the per-worker case.
  • I am worried about mutating instructions and would like to avoid this (See questions 3 above)

At the top of _handle_instructions, there's a simple object deduplication:

FWIW, I've come across use cases for an insertion ordered set a couple of times now. We might want to consider adding such a collection instead of doing any deduplication. Should also be trivial to implement if we re-use the HeapSet.

@crusaderky
Copy link
Collaborator

crusaderky commented Jun 16, 2022

3C) Use a breadth first transition. this might benefit us in other areas as well since it would allow us to consider priorities more strictly. FWIW, I encountered an ordering issue about transitions before on scheduler side, see #6248 (comment)

I played with this and it doesn't work: https://github.com/crusaderky/distributed/tree/WSMR/breadth_first

Story of the new test test_cluster_gather_deps:

- ('x1', 'ensure-task-exists', 'released', 'test', 1655388950.2006707)
- ('x2', 'ensure-task-exists', 'released', 'test', 1655388950.2006752)
- ('x3', 'ensure-task-exists', 'released', 'test', 1655388950.2006783)
- ('gather-dependencies', '127.0.0.1:1235', {'x1'}, 'test', 1655388950.2007372)
- ('x1', 'released', 'fetch', 'fetch', {'x1': ('flight', '127.0.0.1:1235')}, 'test', 1655388950.2007432)
- ('x2', 'released', 'fetch', 'fetch', {}, 'test', 1655388950.2007544)
- ('x3', 'released', 'fetch', 'fetch', {}, 'test', 1655388950.2007656)
- ('x1', 'fetch', 'flight', 'flight', {}, 'test', 1655388950.2007716)

Depth-first (without the change to _transitions), it would be:

- ('x1', 'ensure-task-exists', 'released', 'test', 1655388950.2006707)
- ('x2', 'ensure-task-exists', 'released', 'test', 1655388950.2006752)
- ('x3', 'ensure-task-exists', 'released', 'test', 1655388950.2006783)
- ('gather-dependencies', '127.0.0.1:1235', {'x1'}, 'test', 1655388950.2007372)
- ('x1', 'released', 'fetch', 'fetch', {'x1': ('flight', '127.0.0.1:1235')}, 'test', 1655388950.2007432)
- ('x1', 'fetch', 'flight', 'flight', {}, 'test', 1655388950.2007716)
- ('x2', 'released', 'fetch', 'fetch', {}, 'test', 1655388950.2007544)
- ('x3', 'released', 'fetch', 'fetch', {}, 'test', 1655388950.2007656)

Breadth-first actually doesn't change anything. We're calling transition_released_fetch sequentially on the 3 tasks; each of them invokes _ensure_communicating, so the worker is moved to in_flight_workers by the first task.

3B) Delay ensure_communicating artificially after all. We could just call _ensure_communicating every time after self._transitions in handle_stimulus and not worry about this problem anymore. I think this would still be better than before when we called it after every stream comm and occasionally whenever we felt the need for it.

I appreciate the pragmaticity of this one. However it must be accompanied by a solution to the O(nlogn) issue for unschedulable tasks, since on its own it would further exacerbate it.

3D) Perform some aggregation before _handle_instructions

This is also reasonable.

@crusaderky
Copy link
Collaborator

I've implemented 3B in #6462, and removed data_needed in #6587.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
2 participants