Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pickle for graph submissions from client to scheduler #7564

Merged
merged 27 commits into from Mar 27, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 21, 2023

This is unfortunately much larger than I was hoping. There are couple of notable changes in here

  • This drops the dask_distributed_pack/unpack methods and is using plain pickle to ship graphs from client to scheduler
  • I refactored the update_graph method and broke it up into more or less independent methods. A lot of it is entangled which makes the methods still a bit messy but I believe the update_graph itself is now much easier to read. Functionally speaking, there are only a few intentional behavioral changes. I'll point them out as dedicated comments
  • Futures, Queues, etc. are still implementing a weird "if no client is available don't do anything" route. This basically allows such an object to be initialized and end up in a corrupt state. This should only happen on the scheduler but is still not great.
  • To actually allow for graphs to be shipped via pickle we actually need to implement a custom Pickler class since otherwise user arguments to graphs may end up being not serializable. One example are H5Py objects which intentionally disallow pickling. All of these cases are already handled by our dask_(de)serialize dispatch so the new pickler simply uses the dask serializer if one is available and falls back to ordinary pickle instead.

Minor things

  • I encountered some problems with how cancelled keys are forwarded. I rewrote the handlers around cancellation such that the client is using the batched stream to ensure ordering while the scheduler implements this synchronously. I can break this change out if need be but I believe this has general benefits.

Supersedes

Requires dask branch

Comment on lines 333 to 335
@pytest.mark.xfail(reason="Is this a sane thing to do?")
@gen_cluster(client=True)
async def test_compute_persisted_retries(c, s, a, b):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavioral change. This test tests a condition where a persisted result is annotated by a subsequent compute call.

I don't believe something like this should be allowed since annotations have non-trivial effects on scheduling and we're very likely not handling all of these cases. For instance, this test is annotating a retry. The only reason why this works is because persist+compute are submitted basically in the same event loop tick such that the retry annotation is set before the task even has a chance to be computed.

distributed/tests/test_client.py Outdated Show resolved Hide resolved
distributed/tests/test_client.py Show resolved Hide resolved
Comment on lines +6044 to +6015
async def test_mixing_clients_same_scheduler(s, a, b):
async with Client(s.address, asynchronous=True) as c1, Client(
s.address, asynchronous=True
) as c2:
future = c1.submit(inc, 1)
with pytest.raises(ValueError):
c2.submit(inc, future)
assert await c2.submit(inc, future) == 3
assert not s.tasks
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new. I simplified the serialization of the future and if the key is known to the scheduler, a future may even be shared by clients.
This is not necessary for this change just something I noticed when playing with the futures reducer.
Different scheduler still raises, see below (but only on await)

@mrocklin
Copy link
Member

Cool. Starting to review this now. Thank you for pushing this up. cc @rjzamora

Comment on lines 20 to 31
class _DaskPickler(pickle.Pickler):
def reducer_override(self, obj):
try:
serialize = dask_serialize.dispatch(type(obj))
deserialize = dask_deserialize.dispatch(type(obj))
return deserialize, serialize(obj)
except TypeError:
return NotImplemented
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new pickler class. That is required to allow arguments like H5Py objects to be passed. Otherwise, we'd need to walk and iterate the HLGs/Layers ourselves (effectively a HLG version of our serialize protocol). I figured this is much simpler.

I guess there are more performant ways to do this. For instance, I'm not storing the frames in the buffer callback but I'm happy to defer to a later PR for this unless I'm missing something important

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 4354 to 4400
plugin.update_graph(
self,
client=client,
tasks=[ts.key for ts in touched_tasks],
keys=requested_keys,
dependencies=dependencies,
annotations=final_annotations,
priority=priority,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin API is something I'm not entirely sure what to do with. I wrote it now such that our tests pass but the API did change, I'm not passing all arguments as before and unless we specifiy more explicitly what the arguments are, I believe we should pass them at all

The docs don't say anything https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin.update_graph

For instance, I was surprised to see that tasks are actually keys...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't object to changing the API. However we should check with downstream users. One user in particular I care about is @ntabris who might use this for analytics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One user in particular I care about is @ntabris who might use this for analytics.

I checked. From what I can tell, the plugin uses only the transition hook but not the update_graph hook

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No concerns from me, like @fjetter says we aren't using graph updates.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass of review. I've breifly scanned everything except for scheduler.py. I'll handle that separately.

"parts_out",
"annotations",
]
return (P2PShuffleLayer, tuple(getattr(self, attr) for attr in attrs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask why this was needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I subclassed the SimpleShuffleLayer mostly to avoid code duplication back then. I believe with this change we could avoid the subclassing entirely.

The reason why this is necessary is because SimpleShuffleLayer implements the same reduction but with the SimpleShuffleLayer type. We could generalize this to use type(self) but I believe avoiding subclassing is the overall better approach


assert "Scheduler cancels key" in slogs.getvalue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really care here, but this seems like a significant relaxation of the test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely. this was an accident

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't entirely understand why I need to wait here since I'd expect the logging module to go through the handlers synchronously.

FWIW the usage of caplog is unrelated. Same problem appears with the captured_logger ctx

distributed/tests/test_client.py Show resolved Hide resolved
"client": "f",
"keys": ["z"],
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I dislike white-box tests. There was a lot of this in early days.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I encountered the same test over #7502 (comment) already
I believe the condition this is constructing is actually not possible when using the Client API

This is why I dislike white-box tests

I actually don't consider this a "white box" test. We're not asserting on internal state of the object but are rather using a very low level API where instead a high level API would've been more useful. A different way to look at this is that we actually don't care about the low level API that much.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter is the primary change to scheduler.py refactoring it out into multiple smaller methods and the changes already in the original PR? Are there other sections I should be sure to review? It's hard to tell what has changed based only on the lines-changed.

Comment on lines 4354 to 4400
plugin.update_graph(
self,
client=client,
tasks=[ts.key for ts in touched_tasks],
keys=requested_keys,
dependencies=dependencies,
annotations=final_annotations,
priority=priority,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't object to changing the API. However we should check with downstream users. One user in particular I care about is @ntabris who might use this for analytics.

Comment on lines 4293 to 4316
self._pop_known_tasks(dsk, dependencies)

# Remove aliases
for k in list(tasks):
if tasks[k] is k:
del tasks[k]
if lost_keys := self._pop_lost_tasks(dsk, dependencies, requested_keys):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I dislike pulling non-repeated functionality into methods like this. I find that the non-linearity it produces makes it harder for me to understand code. This is subjective, I acknowledge.

More thoughts here: https://matthewrocklin.com/avoid-indirection.html

Also, bonus points if you remove the walrus operator. My guess is that most devs today don't intuitively know what it does. It would be better, I think, to use another line and more basic syntax.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just preferences, not constraints

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I dislike pulling non-repeated functionality into methods like this. I find that the non-linearity it produces makes it harder for me to understand code. This is subjective, I acknowledge.

I tend to agree here and I was torn myself but the update_graph method is otherwise 400 lines of highly nontrivial code and I believe that hiding a couple of deeply nested loops and conditionals is not hurting readability. This also helps with namespace pollution.

This way I can describe what update_graph does in fewer than ten lines

  • deserialize the graph
  • materialize the graph
  • pop known and lost keys
  • generate the internal state
  • apply annotations
  • compute priorities
  • generate recommendations

I agree there is some nuance in what "popping known and lost keys" means (or other parts) but I believe this makes the code much more accessible.

My guess is that most devs today don't intuitively know what it does.

I think I do not agree with the statement that "most devs don't have an intuition about this". At the very least I don't think this is making accessibility to the code base any worse.
FWIW without my usage of walrus in this PR, there are already 16 occurrences of the operator in our code base

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2023

Process completed with exit code 127.

Ouch, that doesn't look good. That might be a segfault 😱

distributed/queues.py Outdated Show resolved Hide resolved
@github-actions
Copy link
Contributor

github-actions bot commented Feb 21, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  ±  0         26 suites  ±0   12h 6m 17s ⏱️ - 38m 2s
  3 533 tests  -   3    3 426 ✔️  -   2     104 💤  -   1  3 ±0 
44 671 runs   - 37  42 578 ✔️  - 24  2 090 💤  - 13  3 ±0 

For more details on these failures, see this check.

Results for commit e003ec6. ± Comparison against base commit 9bd90c0.

This pull request removes 8 and adds 5 tests. Note that renamed tests count towards both.
distributed.diagnostics.tests.test_widgets ‑ test_multi_progressbar_widget_after_close
distributed.protocol.tests.test_to_pickle ‑ test_non_msgpack_serializable_layer
distributed.tests.test_client ‑ test_badly_serialized_input
distributed.tests.test_client ‑ test_compute_persisted_retries
distributed.tests.test_client ‑ test_fatally_serialized_input
distributed.tests.test_client ‑ test_mixing_clients
distributed.tests.test_scheduler ‑ test_filtered_communication
distributed.tests.test_worker ‑ test_run_spec_deserialize_fail
distributed.diagnostics.tests.test_scheduler_plugin ‑ test_update_graph_hook_complex
distributed.diagnostics.tests.test_scheduler_plugin ‑ test_update_graph_hook_simple
distributed.tests.test_client ‑ test_computation_store_annotations
distributed.tests.test_client ‑ test_mixing_clients_different_scheduler
distributed.tests.test_client ‑ test_mixing_clients_same_scheduler

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2023

Are there other sections I should be sure to review?

The most interesting change, I believe, is the new Pickler object, see #7564 (comment)

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2023

Indeed, I am seeing a segmentation fault....

home/runner/work/_temp/3eb56efc-3510-4d06-a1e2-bc235e8b1035.sh: line 10:  2837 Aborted                 (core dumped) pytest distributed -m "not avoid_ci and not ci1" --runslow --leaks=fds,processes,threads --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID --cov=distributed --cov-report=xml
distributed/protocol/tests/test_numpy.py::test_dumps_serialize_numpy[x22] 
      2838 Done                    | tee reports/stdout

This appears to happen only on ubuntu + python3.8 (so far)

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2023

I factored out the pickler change, see #7567

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2023

I will follow up with another pass and try to reduce the diff tomorrow. If applicable, I'll break out smaller changes.

I believe the most controversial change may be #7567 This is something I only stumbled over today and would appreciate some input from somebody with more historic exposure to our serialization protocol.

@mrocklin
Copy link
Member

@fjetter anything else I should be doing with this PR today?

@fjetter
Copy link
Member Author

fjetter commented Feb 23, 2023

Factoring out changes to future serialization #7580

@fjetter
Copy link
Member Author

fjetter commented Mar 23, 2023

Unfortunately, shuffle="tasks" is still the only shuffle method supported by dask-cudf. So, we still care about task-based shuffle performance a lot :/

I don't intend to drop it or make it any worse, don't worry :)

@fjetter
Copy link
Member Author

fjetter commented Mar 24, 2023

CI is looking good on both dask/dask and dask/distributed side. I am struggling still a bit with benchmark results. There was another priority mixup in the shuffle layer. Will update and rerun benchmarks with new results asap

@fjetter
Copy link
Member Author

fjetter commented Mar 24, 2023

From what I can tell, I found and fixed the last regression. Currently running another set of benchmarks to confirm. If they don't flag anything suspicious, I will move forward with merging this Monday morning unless there are any objections until then.

Comment on lines +178 to +203
def get_output_keys(self) -> set[_T_Key]:
return {(self.name, part) for part in self.parts_out}

def is_materialized(self) -> bool:
return hasattr(self, "_cached_dict")

@property
def _dict(self) -> _T_LowLevelGraph:
"""Materialize full dict representation"""
self._cached_dict: _T_LowLevelGraph
dsk: _T_LowLevelGraph
if hasattr(self, "_cached_dict"):
return self._cached_dict
else:
dsk = self._construct_graph()
self._cached_dict = dsk
return self._cached_dict

def __getitem__(self, key: _T_Key) -> tuple:
return self._dict[key]

def __iter__(self) -> Iterator[_T_Key]:
return iter(self._dict)

def __len__(self) -> int:
return len(self._dict)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This subclassing caused another problem so I just copied the relevant code. IMHO most of these methods should be included in some kind of baseclass and not necessarily in the SimpleShuffleLayer. I see this being repeated often but fixing this right now is out of scope.

@fjetter
Copy link
Member Author

fjetter commented Mar 27, 2023

Ok, all test jobs passed.

Note: before merging I will need to revert the environment.yaml files. This will cause the builds to fail again since this needs to be merged together with the dask change in dask/dask#9988

@fjetter
Copy link
Member Author

fjetter commented Mar 27, 2023

Ok, benchmark results are finally in as well. This time w/out any regressions

Wall Time (right side is null hypothesis, i.e. main vs. main to measure noise; Left side is this PR).

What we can see is that there is not much to see. This change is not intended to change scheduling behavior or speed anything up. These benchmarks mostly confirm that we're dispatching the proper computations.

There is one sizable performance improvement in the test case test_trivial_workload_should_not_cause_work_stealing which is indeed connected to the refactoring. This test case is generating a couple of thousand delayed objects and is computing them embarrassingly parallel. This refactoring is actually shaving off a couple of seconds in serialization time which is relatively speaking a big change for this workflow (from 12.5s down to 8s, i.e. ~36%). This also translate to other almost embarrassingly parallel graphs, e.g. test_set_index[1-p2p-False] is about ~10-15s faster. Nice but relatively speaking not as exciting.

image

Memory comparisons do not show any differences beyond noise.

Benchmark results available at

@fjetter
Copy link
Member Author

fjetter commented Mar 27, 2023

alright, all tests passed, I reverted the environment files. I think we're good to go

@fjetter fjetter merged commit e45b7e5 into dask:main Mar 27, 2023
9 of 30 checks passed
@fjetter fjetter deleted the hlg_pickle branch March 27, 2023 15:50
@mrocklin
Copy link
Member

mrocklin commented Mar 27, 2023 via email

crusaderky pushed a commit to crusaderky/distributed that referenced this pull request Mar 30, 2023
benjaminhwilliams added a commit to DiamondLightSource/python-tristan that referenced this pull request Jan 19, 2024
Avoid problems with serialising h5py objects in task graphs,  introduced
in 2023.4.0.

See https://distributed.dask.org/en/stable/changelog.html#v2023-4-0
and dask/distributed#7564 for details.
benjaminhwilliams added a commit to DiamondLightSource/python-tristan that referenced this pull request Jan 19, 2024
* Remove redundant setuptools constraint

* Limit Dask Distributed version to <2023.4

Avoid problems with serialising h5py objects in task graphs,  introduced
in 2023.4.0.

See https://distributed.dask.org/en/stable/changelog.html#v2023-4-0
and dask/distributed#7564 for details.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants