Skip to content

Commit

Permalink
Remove yaml support
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Feb 28, 2024
1 parent 1602d74 commit 851d631
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 71 deletions.
36 changes: 10 additions & 26 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dask.optimization import SubgraphCallable
from dask.typing import no_default
from dask.utils import (
_deprecated_kwarg,
apply,
ensure_dict,
format_bytes,
Expand Down Expand Up @@ -4232,12 +4233,13 @@ def scheduler_info(self, **kwargs):
self.sync(self._update_scheduler_info)
return self._scheduler_identity

@_deprecated_kwarg("format", None)

Check warning on line 4236 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L4236

Added line #L4236 was not covered by tests
def dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
format: Literal["msgpack"] = "msgpack",
**storage_options,
):
"""Extract a dump of the entire cluster state and persist to disk or a URL.
Expand All @@ -4247,7 +4249,7 @@ def dump_cluster_state(
can be large. On a large or long-running cluster, this can take several minutes.
The scheduler may be unresponsive while the dump is processed.
Results will be stored in a dict::
Results will be stored in a dict in a ``.msgpack.gz`` file::
{
"scheduler": {...}, # scheduler state
Expand All @@ -4264,11 +4266,15 @@ def dump_cluster_state(
}
}
To read::
import gzip, msgpack
with gzip.open("filename") as fd:
state = msgpack.unpack(fd)
Parameters
----------
filename:
The path or URL to write to. The appropriate file suffix (``.msgpack.gz`` or
``.yaml``) will be appended automatically.
The path or URL to write to. The ``.msgpack.gz`` file suffix is appended automatically.
Must be a path supported by :func:`fsspec.open` (like ``s3://my-bucket/cluster-dump``,
or ``cluster-dumps/dump``). See ``write_from_scheduler`` to control whether
Expand Down Expand Up @@ -4296,25 +4302,6 @@ def dump_cluster_state(
Defaults to exclude ``run_spec``, which is the serialized user code.
This is typically not required for debugging. To allow serialization
of this, pass an empty tuple.
format:
Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default),
the output will be stored in a gzipped file as msgpack.
To read::
import gzip, msgpack
with gzip.open("filename") as fd:
state = msgpack.unpack(fd)
or::
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
with open("filename") as fd:
state = yaml.load(fd, Loader=Loader)
**storage_options:
Any additional arguments to :func:`fsspec.open` when writing to a URL.
"""
Expand All @@ -4332,7 +4319,6 @@ async def _dump_cluster_state(
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE,
format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options,
):
filename = str(filename)
Expand All @@ -4343,14 +4329,12 @@ async def _dump_cluster_state(
await self.scheduler.dump_cluster_state_to_url(
url=filename,
exclude=exclude,
format=format,
**storage_options,
)
else:
await cluster_dump.write_state(
partial(self.scheduler.get_cluster_state, exclude=exclude),
filename,
format,
**storage_options,
)

Expand Down
63 changes: 28 additions & 35 deletions distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
from collections import defaultdict
from collections.abc import Awaitable, Callable, Collection, Mapping
from pathlib import Path
from typing import IO, Any, Literal
from typing import Any, Literal

Check warning on line 9 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L9

Added line #L9 was not covered by tests

import msgpack

from dask.typing import Key
from dask.utils import _deprecated_kwarg

Check warning on line 14 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L14

Added line #L14 was not covered by tests

from distributed._stories import scheduler_story as _scheduler_story
from distributed._stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack"] = "msgpack"

Check warning on line 19 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L19

Added line #L19 was not covered by tests
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)


Expand All @@ -28,48 +29,40 @@ def _tuple_to_list(node):
return node


@_deprecated_kwarg("format", None)

Check warning on line 32 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L32

Added line #L32 was not covered by tests
async def write_state(
get_state: Callable[[], Awaitable[Any]],
url: str,
format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
format: Literal["msgpack"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
) -> None:
"Await a cluster dump, then serialize and write it to a path"
if format == "msgpack":
mode = "wb"
suffix = ".msgpack.gz"
if not url.endswith(suffix):
url += suffix
writer = msgpack.pack
elif format == "yaml":
import yaml

mode = "w"
suffix = ".yaml"
if not url.endswith(suffix):
url += suffix

def writer(state: dict, f: IO) -> None:
# YAML adds unnecessary `!!python/tuple` tags; convert tuples to lists to avoid them.
# Unnecessary for msgpack, since tuples and lists are encoded the same.
yaml.dump(_tuple_to_list(state), f)

pass
elif format == "yaml": # type: ignore[unreachable]
raise ValueError(

Check warning on line 43 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L41-L43

Added lines #L41 - L43 were not covered by tests
"The 'yaml' format is not supported anymore; please use 'msgpack' instead."
)
else:
raise ValueError(
f"Unsupported format {format!r}. Possible values are 'msgpack' or 'yaml'."
f"Unsupported format {format!r}; please use 'msgpack' instead."
)

suffix = ".msgpack.gz"
if not url.endswith(suffix):
url += suffix

Check warning on line 53 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L51-L53

Added lines #L51 - L53 were not covered by tests

# Eagerly open the file to catch any errors before doing the full dump
# NOTE: `compression="infer"` will automatically use gzip via the `.gz` suffix
# This module is the only place where fsspec is used and it is a relatively
# heavy import. Do lazy import to reduce import time
import fsspec

with fsspec.open(url, mode, compression="infer", **storage_options) as f:
with fsspec.open(url, "wb", compression="infer", **storage_options) as f:

Check warning on line 61 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L61

Added line #L61 was not covered by tests
state = await get_state()
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await asyncio.to_thread(writer, state, f)
await asyncio.to_thread(msgpack.pack, state, f)

Check warning on line 65 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L65

Added line #L65 was not covered by tests


def load_cluster_dump(url: str, **kwargs: Any) -> dict:
Expand All @@ -78,8 +71,8 @@ def load_cluster_dump(url: str, **kwargs: Any) -> dict:
Parameters
----------
url : str
Name of the disk artefact. This should have either a
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
Name of the disk artefact. This should have a
``.msgpack.gz`` suffix.
**kwargs :
Extra arguments passed to :func:`fsspec.open`.
Expand All @@ -89,23 +82,23 @@ def load_cluster_dump(url: str, **kwargs: Any) -> dict:
The cluster state at the time of the dump.
"""
if url.endswith(".msgpack.gz"):
mode = "rb"
reader = msgpack.unpack
elif url.endswith(".yaml"):
import yaml
pass

Check warning on line 85 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L85

Added line #L85 was not covered by tests

mode = "r"
reader = yaml.safe_load
elif url.endswith(".yaml"):
raise ValueError(

Check warning on line 88 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L87-L88

Added lines #L87 - L88 were not covered by tests
"The 'yaml' format is not supported anymore; cluster dumps area written in"
" the 'msgpack' format instead."
)
else:
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")
raise ValueError(f"url ({url}) must have a .msgpack.gz suffix")

Check warning on line 93 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L93

Added line #L93 was not covered by tests

kwargs.setdefault("compression", "infer")
# This module is the only place where fsspec is used and it is a relatively
# heavy import. Do lazy import to reduce import time
import fsspec

with fsspec.open(url, mode, **kwargs) as f:
return reader(f)
with fsspec.open(url, "rb", **kwargs) as f:
return msgpack.unpack(f)

Check warning on line 101 in distributed/cluster_dump.py

View check run for this annotation

Codecov / codecov/patch

distributed/cluster_dump.py#L100-L101

Added lines #L100 - L101 were not covered by tests


class DumpArtefact(Mapping):
Expand Down
11 changes: 3 additions & 8 deletions distributed/diagnostics/cluster_dump.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from __future__ import annotations

from collections.abc import Collection
from typing import Any, Literal
from typing import Any

from distributed.cluster_dump import (
DEFAULT_CLUSTER_DUMP_EXCLUDE,
DEFAULT_CLUSTER_DUMP_FORMAT,
)
from distributed.cluster_dump import DEFAULT_CLUSTER_DUMP_EXCLUDE
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.scheduler import Scheduler

Expand All @@ -24,18 +21,16 @@ def __init__(
self,
url: str,
exclude: Collection[str] = DEFAULT_CLUSTER_DUMP_EXCLUDE,
format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
):
self.url = url
self.exclude = exclude
self.format = format_
self.storage_options = storage_options

async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler

async def before_close(self) -> None:
await self.scheduler.dump_cluster_state_to_url(
self.url, self.exclude, self.format, **self.storage_options
self.url, self.exclude, format="msgpack", **self.storage_options
)
5 changes: 3 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@
from tornado.ioloop import IOLoop

import dask
import dask.utils
from dask.base import TokenizationError, normalize_token, tokenize
from dask.core import get_deps, iskey, validate_key
from dask.typing import Key, no_default
from dask.utils import (
_deprecated_kwarg,
ensure_dict,
format_bytes,
format_time,
Expand Down Expand Up @@ -3988,11 +3988,12 @@ async def get_cluster_state(
"versions": {"scheduler": self.versions(), "workers": worker_versions},
}

@_deprecated_kwarg("format", None)

Check warning on line 3991 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3991

Added line #L3991 was not covered by tests
async def dump_cluster_state_to_url(
self,
url: str,
exclude: Collection[str],
format: Literal["msgpack", "yaml"],
format: Literal["msgpack"] = "msgpack",
**storage_options: dict[str, Any],
) -> None:
"Write a cluster state dump to an fsspec-compatible URL."
Expand Down

0 comments on commit 851d631

Please sign in to comment.