Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse of keys in blockwise fusion can cause spurious KeyErrors on distributed cluster #9888

Open
fjetter opened this issue Jan 27, 2023 · 13 comments
Labels
highlevelgraph Issues relating to HighLevelGraphs. needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Comments

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

Subsequent Blockwise layers are currently fused into a single layer. This reduces the number of tasks, the overhead and is very generally a good thing to do. Currently, the fused output does not generate unique key names which is a problem from a UX perspective but can also cause severe failure cases when being executed on the distributed scheduler since distributed assumes that a task key is a unique identifier for the entire task. While it is true that the data output of the fused key and the non-fused key is identical, the runspec and the local topology is intentionally very different. Specifically, a fused task, for example, may not have any dependencies while the non-fused task does have dependencies.

An example where this matters is the following (async code is not necessary but the condition is actually a bit difficult to trigger and this helps. Paste this code in a Jupyter notebook and run it a couple of times).

import asyncio
from distributed import Client, Scheduler, Worker

import dask
import time
import dask.dataframe as dd
async with (
    Scheduler() as s,
    Worker(s.address) as a,
    Worker(s.address) as b,
    Client(s.address, asynchronous=True) as c
):
    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2000-01-10",
        dtypes={"x": float, "y": float},
        freq="100 s",
    )
    out = dd.shuffle.shuffle(df, "x")
    out = out.persist()

    while not a.tasks:
        await asyncio.sleep(0.05)

    del out

    out = dd.shuffle.shuffle(df, "x")
    x, y = c.compute([df.x.size, out.x.size])
    x = await c.gather(x, y)

Note how initial shuffle is persisted and a slightly different version of this graph is computed again below but the graph below is slightly different. From what I can gather, the initial persist is fusing the keys while the latter one does not (maybe it's the other way round, I'm not sure. Either way a different issue).

This specific reproducer actually triggers (not every time) a KeyError in a workers data buffer while trying to read data.

023-01-27 16:28:34,648 - distributed.worker - ERROR - Exception during execution of task ('size-chunk-0482e0de93343089cd64837d139d9a80-49c0404470df4695b3f5aa383f11c345', 3).
Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2366, in _prepare_args_for_execution
    data[k] = self.data[k]
  File "/Users/fjetter/workspace/distributed/distributed/spill.py", line 257, in __getitem__
    return super().__getitem__(key)
  File "/Users/fjetter/mambaforge/envs/dask-distributed-310/lib/python3.10/site-packages/zict/buffer.py", line 108, in __getitem__
    raise KeyError(key)
KeyError: "('simple-shuffle-539a650502e21de2dabd2021c9c9e684', 3)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2247, in execute
    args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
  File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2370, in _prepare_args_for_execution
    data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
KeyError: "('simple-shuffle-539a650502e21de2dabd2021c9c9e684', 3)"

This is caused because the dependency relations between tasks are no longer accurate on the scheduler and it considers a task "ready", i.e. all dependencies in memory, too soon, causing a failure on the worker.
When validate is activated, the scheduler catches these cases earlier and raises appropriate AssertionErrors. This is not checked at runtime for performance reasons and is typically not necessary since we rely on the assumption that keys identify a task uniquely.

Apart from this artificial example, we do have internal reports about such a spurious KeyError in combination with an xgboost workload

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Jan 27, 2023
@fjetter fjetter added highlevelgraph Issues relating to HighLevelGraphs. and removed needs triage Needs a response from a contributor labels Jan 27, 2023
@crusaderky
Copy link
Collaborator

crusaderky commented Jan 30, 2023

If I understood correctly:

  1. x is in memory on a
  2. a receives the stimulus to compute y, which depends on x. y transitions to executing.
  3. before y can reach _prepare_args_for_execution, y is cancelled and x is released (which deletes it from the SpillBuffer).
  4. before Worker.execute can complete, a receives a stimulus to execute a new task, which has the same key as y but different spec and crucially no dependency on x.
  5. y goes into resumed state
  6. Worker.execute terminates with KeyError, raised by _prepare_args_for_execution. If the task was still cancelled this would not be a problem as the error would be ignored.

I don't think this should be in dask/dask. This is (yet another) fragility of the cancelled state and the dask/dask graph shouldn't work around it. It should be OK to submit to the scheduler a new, different task with the same key as a released future.

@crusaderky
Copy link
Collaborator

crusaderky commented Jan 30, 2023

Actually - I don't think you need to change the graph. You can get this error if you resubmit the same exact tasks.

  1. x is in memory on a
  2. a receives the stimulus to compute y, which depends on x. y transitions to executing.
  3. before y can reach _prepare_args_for_execution, y is cancelled and x is released (which deletes it from the SpillBuffer).
  4. client resubmits x and y
  5. x transitions to memory on b
  6. before Worker.execute can complete on a, a receives {op: compute-task, key: y, who_has: {x: b.address}}
  7. y transitions back to executing (_transition_cancelled_waiting)
  8. Worker.execute terminates with KeyError, raised by _prepare_args_for_execution.

This requires x to go through its whole lifecycle between client, scheduler, and worker b before a has had the chance of doing even a single round of the event loop. The easiest way to achieve this is to hamstring a's event loop by spilling many GiBs worth of data.

@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2023

TLDR The current system assumes strongly, in many places, that a key uniquely identifies a task, not just the output. Reusing a key for a different task violates this and distributed can fail on multiple levels.


The scheduler doesn't have the possibility to understand that the tasks are new and different. The problem has nothing to do with the Worker, this error is just surfacing there. As stated above, if validation is on, this raises in various places on the scheduler (I found three different assertion that could be triggered depending on queuing/no-queuing and some timing; here, here and I forgot where the third one was).

If you inspect closely what happens if you are executing the above script (and the del is not necessary, the del is merely there for being explicit) is that the second update-graph contains a key assign-* with a different runspec but the same key as in the first update-graph. This new state is then not properly handled by the scheduler since the scheduler assumes in many places that keys are uniquely identifying a task.

What happens here is that the scheduler knows a certain task A already that doesn't have any dependencies (first persist, w/ task fusion). Then a subsequent update-graph submits the same task but with dependencies now. This causes the previous TaskState object to be updated, i.e if task A/wout dependencies is in state ready it is updated to now have dependencies. The TaskState object after this update is now corrupt and would be caught by the scheduler validation rules but instead we're submitting this to the worker where it somehow ends up being executed.

It may be true that the worker could handle this better but the scheduler is also corrupted already.

It should be OK to submit to the scheduler a new, different task with the same key as a released future.

I believe this is where the first fallacy is hidden. In the code above, what actually happens is

  • update-graph (from the persist)
  • Client gets NPartitions future handles
  • update-graph (from compute; uses same keys with different spec+topologies)
    and only then...
  • client-releases-keys for every future

i.e. the scheduler doesn't even know that the futures were released. This timing is unavoidable due to how refcounting works.

Even if you disregard this release-chain, the same issue can very very easily be constructed with a second client and different graphs.

@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2023

@crusaderky I can't truly follow your example here but it is a different issue. I'm talking about a guarantee that the task has to be uniquely identifiable by the key which is violated here which causes all sorts of failures in distributed. If you found something else, I would like you to open a new ticket, please.


I believe this issue is cause by this line reusing root instead of generating a new key.

root,

@rjzamora @jrbourbeau Can either one of you motivate why blockwise fusion layers cannot generate a new key? @jrbourbeau you mentioned this recently as a UX problem that could be solved by a "better version of HLGs". I feel like I'm missing some context

@rjzamora
Copy link
Member

Can either one of you motivate why blockwise fusion layers cannot generate a new key?

Agree that this is a significant problem with HLGs. The reason is the simple fact that there is no (formal/robust) mechanism to "replay" the construction of a graph after the names/properties for a subset of its tasks has been changed. That is, you can't really change the name of a Blockwise layer from "operation-c-*" to "operation-abc-*" after it has been fused with its dependencies, because you would also need to materialize the rest of the graph and manually replace all task keys beginning with "operation-c-*" to "operation-abc-*". So, to clarify: You can change the name, but it forces graph materialization.

@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2023

The reason is the simple fact that there is no (formal/robust) mechanism to "replay" the construction of a graph after the names/properties for a subset of its tasks has been changed.

So this is a limitation because there might've been an "accidental" materialization before the fusing?

@rjzamora
Copy link
Member

rjzamora commented Jan 30, 2023

So this is a limitation because there might've been an "accidental" materialization before the fusing?

Yes and no - The limitation is that there is no API to replace a layer in the HLG (e.g. hlg_graph.replace(<old-key>=new_layer)), because (1) there is no formal rule linking the output task names to the Layer name, and (2) [EDIT: Mostly just this second reason] many HLG Layers are simply wrapping already-materialized dictionaries (on purpose). Therefore, renaming the output tasks of a Blockwise Layer will often require you to materialized the entire graph.

@gjoseph92
Copy link
Collaborator

xref #8635. Like @rjzamora said:

if you change the name of the layer, you also have to change the names referenced in the dependent layers. That's just not currently possible with HLG layers—we could feasibly do it to MaterializedLayers, but in general, there's no rename_input_key interface on a Layer.

@fjetter
Copy link
Member Author

fjetter commented Jan 31, 2023

xref #8635. Like @rjzamora said:

if you change the name of the layer, you also have to change the names referenced in the dependent layers. That's just not currently possible with HLG layers—we could feasibly do it to MaterializedLayers, but in general, there's no rename_input_key interface on a Layer.

I'm still trying to wrap my head around it but I'm getting there.

IIUC this is mostly about layer dependencies? Best case, layer dependencies is simply exchanging a single key; Worst case would be if the layer dependent of that Blockwise layer is a MaterializedLayer in which case we'd need to walk the entire graph to rename the key. is this roughly correct?

@rjzamora
Copy link
Member

IIUC this is mostly about layer dependencies?

Yes. The problem is that you would need to regenerate all dependent layers, and there is no clear way to do this.

Why there is no clear way to do this: There is no formal mechanism to change the dependencies for an existing Layer. The required Layer API is very limited, and does not provide a way to set the dependencies. Therefore, different logic would need to be used to change dependencies for different layers. For the MaterializedLayer case we can do this manually. However, we can't use the same logic for other layers unless we materialize them first.

Given that the logic here would be "ugly" with the existing Layer API, the ideal solution is probably to update the Layer API to include a clear mechanism for tracking dependencies. However, we have been waiting to clean up serialization. I have personally been waiting to remove the Mapping foundation as well.

@fjetter
Copy link
Member Author

fjetter commented Jan 31, 2023

Thanks, that clears things up for me.

@crusaderky
Copy link
Collaborator

Found a nice textbook example in the distributed tests https://github.com/dask/distributed/pull/8185/files#r1479864806

@crusaderky
Copy link
Collaborator

(repost from dask/distributed#8185 (comment))
Found another use case:

ddf = dd.from_pandas(df, npartitions=4)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
    ddf = ddf.set_index("a").sort_values("b")
    result = ddf.compute()

This code:

  1. sends some keys to the scheduler on set_index,
  2. waits for the result to return,
  3. then sends keys to the scheduler on sort_values,
  4. waits for the result to return,
  5. and finally sends the computation proper with compute.

Some keys are the same, but run_spec changes; this occasionally triggers a race condition where the scheduler didn't have the time to forget those keys yet.

This is (fortuitously) not causing failures because the key is always close to its released->forgotten transition when this happens (although in other cases it may be in memory; I'm not sure).

If I add a line print(tokenize(ts.run_spec)) to _generate_taskstates, you'll get:

('assign-f0dc4405b24a6c54160e65ddfeb867eb', 0) 60dfef541bc8fe30d091a769b90d6fae
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 0) 6d2d7e33c168fc1a0d0902905b811aca
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 1) 4c91541c56f2890b6a7dce01aa06fab9
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 1) 81485c4a5cf24f8cedf4fe5dcf84acee
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 2) 96326838be2ca074533222b80e986acd
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 2) f0096bf0edd773d5a48fe6f8495ac4f2
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 3) 153143fb0b822dbeeb17601e883b1e45
('assign-f0dc4405b24a6c54160e65ddfeb867eb', 3) c0fb8a09ad926986969e39adf100a1a3

@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Mar 18, 2024
coroa added a commit to IAMconsortium/concordia that referenced this issue Mar 22, 2024
They are currently triggering dask warnings related to a bug
dask/dask#9888 . Retry including
them after it's fixed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
highlevelgraph Issues relating to HighLevelGraphs. needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.
Projects
None yet
Development

No branches or pull requests

4 participants