From 9d95e3de3256e2caf91931f222f2d65f5a258c92 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Mar 2022 19:07:23 -0600 Subject: [PATCH 01/24] improve ergonomics of `to_yamls` Add print statements to show progress, option to run in background thread, and more easily cross-referenceable names for worker directores. YAML dumping is still so slow! Why? --- distributed/cluster_dump.py | 57 ++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 2d3a400b256..077867acbcf 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -261,6 +261,8 @@ def to_yamls( "transition_log", "workers", ), + background: bool = False, + log: bool | None = None, ): """ Splits the Dump Artefact into a tree of yaml files with @@ -289,7 +291,33 @@ def to_yamls( into separate yaml files. Keys that are not in this iterable are compacted into a ``general.yaml`` file. + background: + If True, run in a separate thread in the background. + Returns the `threading.Thread` object immediately. + log: + Print progress updates if True. Defaults to None, which means + False if ``background`` is True, and True otherwise. """ + if background: + import threading + + t = threading.Thread( + target=self.to_yamls, + name="to-yamls", + kwargs=dict( + root_dir=root_dir, + worker_expand_keys=worker_expand_keys, + scheduler_expand_keys=scheduler_expand_keys, + background=False, + log=log if log is not None else False, + ), + ) + t.start() + return t + + if log is None: + log = True + import yaml root_dir = Path(root_dir) if root_dir else Path.cwd() @@ -298,31 +326,40 @@ def to_yamls( worker_expand_keys = set(worker_expand_keys) workers = self.dump["workers"] - for info in workers.values(): + for i, (addr, info) in enumerate(workers.items()): try: - worker_id = info["id"] + worker_name = self.dump["scheduler"]["workers"][addr]["name"] except KeyError: - continue - - worker_state = self._compact_state(info, worker_expand_keys) + if log: + print(f" Worker {addr} unknown to scheduler") + worker_name = addr.replace("://", "-").replace("/", "_") - log_dir = root_dir / worker_id + log_dir = root_dir / worker_name log_dir.mkdir(parents=True, exist_ok=True) + if log: + print(f"Dumping worker {i:>4}/{len(workers)} to {log_dir}") + + worker_state = self._compact_state(info, worker_expand_keys) + for name, _logs in worker_state.items(): filename = str(log_dir / f"{name}.yaml") with open(filename, "w") as fd: yaml.dump(_logs, fd, Dumper=dumper) context = "scheduler" - scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys) - log_dir = root_dir / context log_dir.mkdir(parents=True, exist_ok=True) - # Compact smaller keys into a general dict - for name, _logs in scheduler_state.items(): + if log: + print(f"Dumping scheduler to {log_dir}") + + # Compact smaller keys into a general dict + scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys) + for i, (name, _logs) in enumerate(scheduler_state.items()): filename = str(log_dir / f"{name}.yaml") + if log: + print(f" Dumping {i:>2}/{len(scheduler_state)} {filename}") with open(filename, "w") as fd: yaml.dump(_logs, fd, Dumper=dumper) From 1cc6908b84016825a33a69a7adc110898d6ef084 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Mar 2022 19:12:38 -0600 Subject: [PATCH 02/24] update `dump_cluster_state` docs --- distributed/client.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index ce7652e762b..0ec0b0db27b 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3899,21 +3899,17 @@ def dump_cluster_state( Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default), the output will be stored in a gzipped file as msgpack. - To read:: + ``"msgpack"`` is generally preferred, since it's far faster to write + and far smaller on disk. If you want to examine the dump in human-readable + format, consider creating the dump in msgpack format, then (in an IPython session + or separate script), using `DumpArtefact.to_yamls` to create a directory + tree of separate YAML files, which are easier to work with. - import gzip, msgpack - with gzip.open("filename") as fd: - state = msgpack.unpack(fd) + To read:: - or:: + from distributed.cluster_dump import DumpArtefact + dump = DumpArtefact.from_url("filename") - import yaml - try: - from yaml import CLoader as Loader - except ImportError: - from yaml import Loader - with open("filename") as fd: - state = yaml.load(fd, Loader=Loader) **storage_options: Any additional arguments to :func:`fsspec.open` when writing to a URL. """ From 8dbe0306dfd92b6dfd45f4020c16cb390758e56d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Mar 2022 19:17:51 -0600 Subject: [PATCH 03/24] expand `incoming_transfer_log` --- distributed/cluster_dump.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 077867acbcf..fbb234b9497 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -251,7 +251,13 @@ def _compact_state(self, state: dict, expand_keys: set[str]): def to_yamls( self, root_dir: str | Path | None = None, - worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"), + worker_expand_keys: Collection[str] = ( + "config", + "incoming_transfer_log", + "log", + "logs", + "tasks", + ), scheduler_expand_keys: Collection[str] = ( "events", "extensions", From 69fb54c2d6560b48a6d8d8c19cbadafebc9dc7eb Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Mar 2022 20:07:26 -0600 Subject: [PATCH 04/24] just use worker addrs, it's more common --- distributed/cluster_dump.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index fbb234b9497..4ab6dcc1633 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -333,13 +333,7 @@ def to_yamls( workers = self.dump["workers"] for i, (addr, info) in enumerate(workers.items()): - try: - worker_name = self.dump["scheduler"]["workers"][addr]["name"] - except KeyError: - if log: - print(f" Worker {addr} unknown to scheduler") - worker_name = addr.replace("://", "-").replace("/", "_") - + worker_name = addr.replace("://", "-").replace("/", "_") log_dir = root_dir / worker_name log_dir.mkdir(parents=True, exist_ok=True) From 18aeb3d8073b208d395c8183e59ee3830a784268 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 25 Mar 2022 17:05:37 -0600 Subject: [PATCH 05/24] daemon thread for background dump --- distributed/cluster_dump.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 4ab6dcc1633..1861b39783a 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -298,7 +298,7 @@ def to_yamls( Keys that are not in this iterable are compacted into a ``general.yaml`` file. background: - If True, run in a separate thread in the background. + If True, run in a separate daemon thread in the background. Returns the `threading.Thread` object immediately. log: Print progress updates if True. Defaults to None, which means @@ -317,6 +317,7 @@ def to_yamls( background=False, log=log if log is not None else False, ), + daemon=True, ) t.start() return t From eae40693aeb0738a48fa74d973aaff1aac40b23f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 25 Mar 2022 17:48:24 -0600 Subject: [PATCH 06/24] worker stories by worker and worker story yamls --- distributed/cluster_dump.py | 44 ++++++++++++++++++++++++++++--------- distributed/stories.py | 24 +++++++++++++++++--- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 1861b39783a..87b4f081453 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -145,6 +145,10 @@ def _extract_tasks(self, state: str | None, context: dict): else: return list(context.values()) + @staticmethod + def _slugify_addr(addr: str) -> str: + return addr.replace("://", "-").replace("/", "_") + def scheduler_tasks_in_state(self, state: str | None = None) -> list: """ Parameters @@ -198,22 +202,43 @@ def scheduler_story(self, *key_or_stimulus_id: str) -> dict: return dict(stories) - def worker_story(self, *key_or_stimulus_id: str) -> dict: + def worker_stories(self, *key_or_stimulus_id: str) -> dict: """ Returns ------- stories : dict - A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.` + A dict for each worker of the story for all the keys/stimulus IDs + in ``*key_or_stimulus_id`.` """ keys = set(key_or_stimulus_id) - stories = defaultdict(list) + return { + addr: _worker_story(keys, wlog, datetimes=True) + for addr, worker_dump in self.dump["workers"].items() + if isinstance(worker_dump, dict) and (wlog := worker_dump.get("log")) + } + + def worker_stories_to_yamls( + self, root_dir: str | Path | None = None, *key_or_stimulus_id: str + ) -> None: + """ + Write the results of `worker_stories` to separate YAML files per worker. + """ + import yaml - for worker_dump in self.dump["workers"].values(): - if isinstance(worker_dump, dict) and "log" in worker_dump: - for story in _worker_story(keys, worker_dump["log"]): - stories[story[0]].append(tuple(story)) + root_dir = Path(root_dir) if root_dir else Path.cwd() - return dict(stories) + stories = self.worker_stories(*key_or_stimulus_id) + for i, (addr, story) in enumerate(stories.items()): + worker_dir = root_dir / self._slugify_addr(addr) + worker_dir.mkdir(parents=True, exist_ok=True) + path = ( + worker_dir + / f"story-{key_or_stimulus_id[0] if len(key_or_stimulus_id) == 1 else key_or_stimulus_id}.yaml" + ) + + print(f"Dumping story {i:>3}/{len(stories)} to {path}") + with open(path, "w") as f: + yaml.dump(story, f, Dumper=yaml.CSafeDumper) def missing_workers(self) -> list: """ @@ -334,8 +359,7 @@ def to_yamls( workers = self.dump["workers"] for i, (addr, info) in enumerate(workers.items()): - worker_name = addr.replace("://", "-").replace("/", "_") - log_dir = root_dir / worker_name + log_dir = root_dir / self._slugify_addr(addr) log_dir.mkdir(parents=True, exist_ok=True) if log: diff --git a/distributed/stories.py b/distributed/stories.py index d17e54df53f..02d371db900 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -1,4 +1,7 @@ -from typing import Iterable +from __future__ import annotations + +from datetime import datetime +from typing import Iterable, TypeVar def scheduler_story(keys: set, transition_log: Iterable) -> list: @@ -19,7 +22,7 @@ def scheduler_story(keys: set, transition_log: Iterable) -> list: return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] -def worker_story(keys: set, log: Iterable) -> list: +def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list: """Creates a story from the worker log given a set of keys describing tasks or stimuli. @@ -29,16 +32,31 @@ def worker_story(keys: set, log: Iterable) -> list: A set of task `keys` or `stimulus_id`'s log : iterable The worker log + datetimes : bool + Whether to convert timestamps into `datetime.datetime` objects + (default False) Returns ------- story : list """ return [ - msg + _msg_with_datetime(msg) if datetimes else msg for msg in log if any(key in msg for key in keys) or any( key in c for key in keys for c in msg if isinstance(c, (tuple, list, set)) ) ] + + +T = TypeVar("T", list, tuple) + + +def _msg_with_datetime(msg: T) -> T: + dt = msg[-1] + try: + dt = datetime.fromtimestamp(dt) + except (TypeError, ValueError): + pass + return msg[:-1] + type(msg)((dt,)) From c9aa85b95b2b749d307231718064b710fd4d38cd Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Mar 2022 13:09:47 -0600 Subject: [PATCH 07/24] i+1 in prints --- distributed/cluster_dump.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 87b4f081453..2053d0f1eee 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -236,7 +236,7 @@ def worker_stories_to_yamls( / f"story-{key_or_stimulus_id[0] if len(key_or_stimulus_id) == 1 else key_or_stimulus_id}.yaml" ) - print(f"Dumping story {i:>3}/{len(stories)} to {path}") + print(f"Dumping story {i+1:>3}/{len(stories)} to {path}") with open(path, "w") as f: yaml.dump(story, f, Dumper=yaml.CSafeDumper) @@ -363,7 +363,7 @@ def to_yamls( log_dir.mkdir(parents=True, exist_ok=True) if log: - print(f"Dumping worker {i:>4}/{len(workers)} to {log_dir}") + print(f"Dumping worker {i+1:>4}/{len(workers)} to {log_dir}") worker_state = self._compact_state(info, worker_expand_keys) @@ -384,7 +384,7 @@ def to_yamls( for i, (name, _logs) in enumerate(scheduler_state.items()): filename = str(log_dir / f"{name}.yaml") if log: - print(f" Dumping {i:>2}/{len(scheduler_state)} {filename}") + print(f" Dumping {i+1:>2}/{len(scheduler_state)} {filename}") with open(filename, "w") as fd: yaml.dump(_logs, fd, Dumper=dumper) From e41d6ed5ee3dd68dcfb48fc728d4376b6cbab2c7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Mar 2022 14:07:04 -0600 Subject: [PATCH 08/24] YAML block literals for logs --- distributed/cluster_dump.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 2053d0f1eee..76f94fb7f5d 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -4,8 +4,9 @@ from collections import defaultdict from collections.abc import Mapping +from contextlib import contextmanager, nullcontext from pathlib import Path -from typing import IO, Any, Awaitable, Callable, Collection, Literal +from typing import IO, TYPE_CHECKING, Any, Awaitable, Callable, Collection, Literal import fsspec import msgpack @@ -14,6 +15,9 @@ from distributed.stories import scheduler_story as _scheduler_story from distributed.stories import worker_story as _worker_story +if TYPE_CHECKING: + import yaml + def _tuple_to_list(node): if isinstance(node, (list, tuple)): @@ -370,7 +374,8 @@ def to_yamls( for name, _logs in worker_state.items(): filename = str(log_dir / f"{name}.yaml") with open(filename, "w") as fd: - yaml.dump(_logs, fd, Dumper=dumper) + with _block_literals(dumper) if name == "logs" else nullcontext(): + yaml.dump(_logs, fd, Dumper=dumper) context = "scheduler" log_dir = root_dir / context @@ -387,4 +392,24 @@ def to_yamls( print(f" Dumping {i+1:>2}/{len(scheduler_state)} {filename}") with open(filename, "w") as fd: - yaml.dump(_logs, fd, Dumper=dumper) + with _block_literals(dumper) if name == "logs" else nullcontext(): + yaml.dump(_logs, fd, Dumper=dumper) + + +@contextmanager +def _block_literals(dumper: type[yaml.Dumper | yaml.CDumper]): + "Contextmanager to use literal-block YAML syntax for multiline strings. Not thread-safe." + # based on https://stackoverflow.com/a/33300001/17100540 + original_respresenter = dumper.yaml_representers[str] + + def represent_str(self, data: str): + if "\n" in data: + return self.represent_scalar("tag:yaml.org,2002:str", data, style="|") + return self.represent_scalar("tag:yaml.org,2002:str", data) + + dumper.add_representer(str, represent_str) + + try: + yield + finally: + dumper.add_representer(str, original_respresenter) From 1f0871f4b335afcbc6a7cf4482f0c62102df225c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Mar 2022 16:36:32 -0600 Subject: [PATCH 09/24] split outgoing transfer log and pending data Why does pending data have so much in it?? xref https://github.com/dask/distributed/issues/5960#issuecomment-1080592939 --- distributed/cluster_dump.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 76f94fb7f5d..9c1437f109a 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -283,6 +283,8 @@ def to_yamls( worker_expand_keys: Collection[str] = ( "config", "incoming_transfer_log", + "outgoing_transfer_log", + "pending_data_per_worker", "log", "logs", "tasks", From a4b50132b5b80b2e38e649740a54763ec677e8df Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Mar 2022 16:58:23 -0600 Subject: [PATCH 10/24] format timestamps in `to_yamls` --- distributed/cluster_dump.py | 7 +++++++ distributed/stories.py | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 9c1437f109a..f047fc255d7 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -12,6 +12,7 @@ import msgpack from distributed.compatibility import to_thread +from distributed.stories import msg_with_datetime from distributed.stories import scheduler_story as _scheduler_story from distributed.stories import worker_story as _worker_story @@ -375,7 +376,10 @@ def to_yamls( for name, _logs in worker_state.items(): filename = str(log_dir / f"{name}.yaml") + if name == "log": + _logs = list(map(msg_with_datetime, _logs)) with open(filename, "w") as fd: + with _block_literals(dumper) if name == "logs" else nullcontext(): yaml.dump(_logs, fd, Dumper=dumper) @@ -393,6 +397,9 @@ def to_yamls( if log: print(f" Dumping {i+1:>2}/{len(scheduler_state)} {filename}") + if name == "transition_log": + _logs = list(map(msg_with_datetime, _logs)) + with open(filename, "w") as fd: with _block_literals(dumper) if name == "logs" else nullcontext(): yaml.dump(_logs, fd, Dumper=dumper) diff --git a/distributed/stories.py b/distributed/stories.py index 02d371db900..4f05710a91e 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -41,7 +41,7 @@ def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list: story : list """ return [ - _msg_with_datetime(msg) if datetimes else msg + msg_with_datetime(msg) if datetimes else msg for msg in log if any(key in msg for key in keys) or any( @@ -53,7 +53,7 @@ def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list: T = TypeVar("T", list, tuple) -def _msg_with_datetime(msg: T) -> T: +def msg_with_datetime(msg: T) -> T: dt = msg[-1] try: dt = datetime.fromtimestamp(dt) From 4b3e0c2595d29dc0069fbfb6d2cff8bd3fa83949 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Mar 2022 16:59:01 -0600 Subject: [PATCH 11/24] separate clients from scheduler general the `wants_what` can be extremely long --- distributed/cluster_dump.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index f047fc255d7..353ea20b7f4 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -291,6 +291,7 @@ def to_yamls( "tasks", ), scheduler_expand_keys: Collection[str] = ( + "clients", "events", "extensions", "log", From 7f35ba8a35ab743f140c8b46d8aa61c523afa2d6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 12 Apr 2022 09:25:31 -0600 Subject: [PATCH 12/24] handle failed workers in to_yamls --- distributed/cluster_dump.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 353ea20b7f4..d0edcecd34d 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -367,6 +367,11 @@ def to_yamls( workers = self.dump["workers"] for i, (addr, info) in enumerate(workers.items()): + if not isinstance(info, dict): + if log: + print(f"Skipping worker {i+1:>3}/{len(workers)} - {info}") + continue + log_dir = root_dir / self._slugify_addr(addr) log_dir.mkdir(parents=True, exist_ok=True) From fb8a0a4e4eb40c9ff75c9d5b5c5dc12d556dd130 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 13 Apr 2022 17:28:59 -0600 Subject: [PATCH 13/24] simplify `scheduler_story` --- distributed/cluster_dump.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index d0edcecd34d..501c903599a 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,7 +2,6 @@ from __future__ import annotations -from collections import defaultdict from collections.abc import Mapping from contextlib import contextmanager, nullcontext from pathlib import Path @@ -190,22 +189,15 @@ def worker_tasks_in_state(self, state: str | None = None) -> list: return tasks - def scheduler_story(self, *key_or_stimulus_id: str) -> dict: + def scheduler_story(self, *key_or_stimulus_id: str) -> list: """ Returns ------- - stories : dict - A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``. + story : list + A list of events for the keys/stimulus ID's in ``*key_or_stimulus_id``. """ - stories = defaultdict(list) - - log = self.dump["scheduler"]["transition_log"] keys = set(key_or_stimulus_id) - - for story in _scheduler_story(keys, log): - stories[story[0]].append(tuple(story)) - - return dict(stories) + return _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) def worker_stories(self, *key_or_stimulus_id: str) -> dict: """ From b96eb5f1d1e72a0cbbb91c2af81967a98bb26c58 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 13 Apr 2022 18:07:32 -0600 Subject: [PATCH 14/24] datetimes in scheduler_story --- distributed/cluster_dump.py | 4 +++- distributed/stories.py | 13 +++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 501c903599a..ea7134a7185 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -197,7 +197,9 @@ def scheduler_story(self, *key_or_stimulus_id: str) -> list: A list of events for the keys/stimulus ID's in ``*key_or_stimulus_id``. """ keys = set(key_or_stimulus_id) - return _scheduler_story(keys, self.dump["scheduler"]["transition_log"]) + return _scheduler_story( + keys, self.dump["scheduler"]["transition_log"], datetimes=True + ) def worker_stories(self, *key_or_stimulus_id: str) -> dict: """ diff --git a/distributed/stories.py b/distributed/stories.py index 4f05710a91e..eb5354efa12 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -4,7 +4,9 @@ from typing import Iterable, TypeVar -def scheduler_story(keys: set, transition_log: Iterable) -> list: +def scheduler_story( + keys: set, transition_log: Iterable, datetimes: bool = False +) -> list: """Creates a story from the scheduler transition log given a set of keys describing tasks or stimuli. @@ -14,12 +16,19 @@ def scheduler_story(keys: set, transition_log: Iterable) -> list: A set of task `keys` or `stimulus_id`'s log : iterable The scheduler transition log + datetimes : bool + Whether to convert timestamps into `datetime.datetime` objects + (default False) Returns ------- story : list """ - return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] + return [ + msg_with_datetime(t) if datetimes else t + for t in transition_log + if t[0] in keys or keys.intersection(t[3]) + ] def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list: From 8ad979d7cf30fac42cb0cdd634ddae4e96c26115 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 13 Apr 2022 18:07:46 -0600 Subject: [PATCH 15/24] scheduler short stories --- distributed/cluster_dump.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index ea7134a7185..d5a51cfbc00 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -201,6 +201,28 @@ def scheduler_story(self, *key_or_stimulus_id: str) -> list: keys, self.dump["scheduler"]["transition_log"], datetimes=True ) + def scheduler_short_story(self, *key_or_stimulus_id: str) -> list[str]: + """ + Returns + ------- + story : list + A list of just the final states for the keys/stimulus ID's in ``*key_or_stimulus_id``. + """ + return [x[2] for x in self.scheduler_story(*key_or_stimulus_id)] + + def scheduler_short_stories(self, *key_or_stimulus_id: str) -> dict[str, list[str]]: + """ + Returns + ------- + stories : dict + A dict of the short story for each key or stimulus ID. Keys missing from the logs are dropped. + """ + return { + k: s + for k, s in ((k, self.scheduler_short_story(k)) for k in key_or_stimulus_id) + if s + } + def worker_stories(self, *key_or_stimulus_id: str) -> dict: """ Returns From ae2b35c9d4656fb471f788ad1170742b163f8f7b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 13 Apr 2022 18:08:09 -0600 Subject: [PATCH 16/24] timestamps in scheduler events --- distributed/cluster_dump.py | 8 +++++++- distributed/stories.py | 8 +++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index d5a51cfbc00..e8cf71a19e1 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -420,7 +420,13 @@ def to_yamls( print(f" Dumping {i+1:>2}/{len(scheduler_state)} {filename}") if name == "transition_log": - _logs = list(map(msg_with_datetime, _logs)) + _logs = [msg_with_datetime(e) for e in _logs] + + if name == "events": + _logs = { + k: [msg_with_datetime(e, idx=0) for e in events] + for k, events in _logs.items() + } with open(filename, "w") as fd: with _block_literals(dumper) if name == "logs" else nullcontext(): diff --git a/distributed/stories.py b/distributed/stories.py index eb5354efa12..53505d8d2bd 100644 --- a/distributed/stories.py +++ b/distributed/stories.py @@ -62,10 +62,12 @@ def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list: T = TypeVar("T", list, tuple) -def msg_with_datetime(msg: T) -> T: - dt = msg[-1] +def msg_with_datetime(msg: T, idx: int = -1) -> T: + if idx < 0: + idx = len(msg) + idx + dt = msg[idx] try: dt = datetime.fromtimestamp(dt) except (TypeError, ValueError): pass - return msg[:-1] + type(msg)((dt,)) + return msg[:idx] + type(msg)((dt,)) + msg[idx + 1 :] From 7717ee9c3fab38ace0871de656b7244e2ab6b2b5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 14:55:43 -0400 Subject: [PATCH 17/24] fix literal --- distributed/cluster_dump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 0241c1bd41e..4508b5b7304 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -28,7 +28,7 @@ if TYPE_CHECKING: import yaml -DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack" +DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack", "yaml"] = "msgpack" DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",) From 9c69c2540f41881ddc46c706b7359527441fcd1b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 15:19:39 -0400 Subject: [PATCH 18/24] `enumerate(..., 1)` trick --- distributed/cluster_dump.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 4508b5b7304..b05648bc89b 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -262,7 +262,7 @@ def worker_stories_to_yamls( root_dir = Path(root_dir) if root_dir else Path.cwd() stories = self.worker_stories(*key_or_stimulus_id) - for i, (addr, story) in enumerate(stories.items()): + for i, (addr, story) in enumerate(stories.items(), 1): worker_dir = root_dir / self._slugify_addr(addr) worker_dir.mkdir(parents=True, exist_ok=True) path = ( @@ -270,7 +270,7 @@ def worker_stories_to_yamls( / f"story-{key_or_stimulus_id[0] if len(key_or_stimulus_id) == 1 else key_or_stimulus_id}.yaml" ) - print(f"Dumping story {i+1:>3}/{len(stories)} to {path}") + print(f"Dumping story {i:>3}/{len(stories)} to {path}") with open(path, "w") as f: yaml.dump(story, f, Dumper=yaml.CSafeDumper) @@ -393,17 +393,17 @@ def to_yamls( worker_expand_keys = set(worker_expand_keys) workers = self.dump["workers"] - for i, (addr, info) in enumerate(workers.items()): + for i, (addr, info) in enumerate(workers.items(), 1): if not isinstance(info, dict): if log: - print(f"Skipping worker {i+1:>3}/{len(workers)} - {info}") + print(f"Skipping worker {i:>3}/{len(workers)} - {info}") continue log_dir = root_dir / self._slugify_addr(addr) log_dir.mkdir(parents=True, exist_ok=True) if log: - print(f"Dumping worker {i+1:>4}/{len(workers)} to {log_dir}") + print(f"Dumping worker {i:>4}/{len(workers)} to {log_dir}") worker_state = self._compact_state(info, worker_expand_keys) @@ -425,10 +425,10 @@ def to_yamls( # Compact smaller keys into a general dict scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys) - for i, (name, _logs) in enumerate(scheduler_state.items()): + for i, (name, _logs) in enumerate(scheduler_state.items(), 1): filename = str(log_dir / f"{name}.yaml") if log: - print(f" Dumping {i+1:>2}/{len(scheduler_state)} {filename}") + print(f" Dumping {i:>2}/{len(scheduler_state)} {filename}") if name == "transition_log": _logs = [msg_with_datetime(e) for e in _logs] From 3716dfa19573c7d82b2cdf0cd74cce2d37560c4c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 15:24:10 -0400 Subject: [PATCH 19/24] add `processing_on` --- distributed/cluster_dump.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index b05648bc89b..3ee461a65f3 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -166,6 +166,13 @@ def _extract_tasks(self, state: str | None, context: dict[str, dict]) -> list[di def _slugify_addr(addr: str) -> str: return addr.replace("://", "-").replace("/", "_") + def processing_on(self) -> dict[str, str]: + "Tasks currently in ``processing`` on the scheduler, and which worker they're processing on" + return { + t["key"]: t["processing_on"] + for t in self.scheduler_tasks_in_state("processing") + } + def scheduler_tasks_in_state(self, state: str | None = None) -> list: """ Parameters From 83d66c1fda8577aa23056f31350f1e6492bc6591 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 18:02:44 -0400 Subject: [PATCH 20/24] fix tests --- distributed/cluster_dump.py | 17 ++++++++++------- distributed/tests/test_cluster_dump.py | 24 ++++++++++++------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 3ee461a65f3..eaf37c9e733 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -209,7 +209,7 @@ def worker_tasks_in_state(self, state: str | None = None) -> list: return tasks - def scheduler_story(self, *key_or_stimulus_id: str) -> list: + def scheduler_story(self, *key_or_stimulus_id: str, datetimes: bool = True) -> list: """ Returns ------- @@ -217,9 +217,12 @@ def scheduler_story(self, *key_or_stimulus_id: str) -> list: A list of events for the keys/stimulus ID's in ``*key_or_stimulus_id``. """ keys = set(key_or_stimulus_id) - return _scheduler_story( - keys, self.dump["scheduler"]["transition_log"], datetimes=True - ) + return [ + tuple(s) + for s in _scheduler_story( + keys, self.dump["scheduler"]["transition_log"], datetimes=datetimes + ) + ] def scheduler_short_story(self, *key_or_stimulus_id: str) -> list[str]: """ @@ -243,7 +246,7 @@ def scheduler_short_stories(self, *key_or_stimulus_id: str) -> dict[str, list[st if s } - def worker_stories(self, *key_or_stimulus_id: str) -> dict: + def worker_stories(self, *key_or_stimulus_id: str, datetimes: bool = True) -> dict: """ Returns ------- @@ -253,7 +256,7 @@ def worker_stories(self, *key_or_stimulus_id: str) -> dict: """ keys = set(key_or_stimulus_id) return { - addr: _worker_story(keys, wlog, datetimes=True) + addr: [tuple(s) for s in _worker_story(keys, wlog, datetimes=datetimes)] for addr, worker_dump in self.dump["workers"].items() if isinstance(worker_dump, dict) and (wlog := worker_dump.get("log")) } @@ -279,7 +282,7 @@ def worker_stories_to_yamls( print(f"Dumping story {i:>3}/{len(stories)} to {path}") with open(path, "w") as f: - yaml.dump(story, f, Dumper=yaml.CSafeDumper) + yaml.dump([list(s) for s in story], f, Dumper=yaml.CSafeDumper) def missing_workers(self) -> list: """ diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index a69238e9ec5..24d604bbf18 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -124,16 +124,13 @@ async def test_cluster_dump_story(c, s, a, b, tmp_path): dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") - story = dump.scheduler_story("f1", "f2") - assert story.keys() == {"f1", "f2"} + scheduler_story = dump.scheduler_story("f1", "f2", datetimes=False) + assert_story(scheduler_story, s.story("f1", "f2")) - for k, task_story in story.items(): - assert_story(task_story, s.story(k)) - - story = dump.worker_story("f1", "f2") - assert story.keys() == {"f1", "f2"} - for k, task_story in story.items(): - assert_story(task_story, a.state.story(k) + b.state.story(k)) + worker_stories = dump.worker_stories("f1", "f2", datetimes=False) + assert worker_stories.keys() == {a.address, b.address} + assert_story(worker_stories[a.address], a.state.story("f1", "f2")) + assert_story(worker_stories[b.address], b.state.story("f1", "f2")) @gen_cluster(client=True) @@ -154,6 +151,7 @@ async def test_cluster_dump_to_yamls(c, s, a, b, tmp_path): dump.to_yamls(yaml_path) scheduler_files = { + "clients.yaml", "events.yaml", "extensions.yaml", "general.yaml", @@ -166,20 +164,22 @@ async def test_cluster_dump_to_yamls(c, s, a, b, tmp_path): scheduler_yaml_path = yaml_path / "scheduler" expected = {scheduler_yaml_path / f for f in scheduler_files} - assert expected == set(scheduler_yaml_path.iterdir()) + assert set(scheduler_yaml_path.iterdir()) == expected worker_files = { "config.yaml", "general.yaml", + "incoming_transfer_log.yaml", "log.yaml", "logs.yaml", + "outgoing_transfer_log.yaml", "tasks.yaml", } for worker in (a, b): - worker_yaml_path = yaml_path / worker.id + worker_yaml_path = yaml_path / dump._slugify_addr(worker.address) expected = {worker_yaml_path / f for f in worker_files} - assert expected == set(worker_yaml_path.iterdir()) + assert set(worker_yaml_path.iterdir()) == expected # Internal dictionary state compaction # has not been destructive of the original dictionary From 47a63a726aef3215f19d26142ba4567c86ad2303 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 18:21:45 -0400 Subject: [PATCH 21/24] fix `test_cluster_dump_plugin` --- distributed/diagnostics/tests/test_cluster_dump_plugin.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/distributed/diagnostics/tests/test_cluster_dump_plugin.py b/distributed/diagnostics/tests/test_cluster_dump_plugin.py index 8c99a567ac1..cd2f81401b3 100644 --- a/distributed/diagnostics/tests/test_cluster_dump_plugin.py +++ b/distributed/diagnostics/tests/test_cluster_dump_plugin.py @@ -2,7 +2,7 @@ from distributed.cluster_dump import DumpArtefact from distributed.diagnostics.cluster_dump import ClusterDump -from distributed.utils_test import gen_cluster, inc +from distributed.utils_test import assert_story, gen_cluster, inc @gen_cluster(client=True) @@ -19,5 +19,7 @@ async def test_cluster_dump_plugin(c, s, *workers, tmp_path): await s.close() dump = DumpArtefact.from_url(str(dump_file)) - assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys()) - assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys()) + assert_story( + s.story(f1.key, f2.key), dump.scheduler_story(f1.key, f2.key, datetimes=False) + ) + assert len(dump["workers"]) == len(workers) From 2b0b23ddff217ad9ce68ccaca2f2ddf3f3f9370e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 13:14:47 -0600 Subject: [PATCH 22/24] and as separate files --- distributed/cluster_dump.py | 2 ++ distributed/tests/test_cluster_dump.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index eaf37c9e733..e4e41197097 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -322,11 +322,13 @@ def to_yamls( root_dir: str | Path | None = None, worker_expand_keys: Collection[str] = ( "config", + "data", "incoming_transfer_log", "outgoing_transfer_log", "pending_data_per_worker", "log", "logs", + "stimulus_log", "tasks", ), scheduler_expand_keys: Collection[str] = ( diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 24d604bbf18..a573cf270bb 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -168,10 +168,12 @@ async def test_cluster_dump_to_yamls(c, s, a, b, tmp_path): worker_files = { "config.yaml", + "data.yaml", "general.yaml", "incoming_transfer_log.yaml", "log.yaml", "logs.yaml", + "stimulus_log.yaml", "outgoing_transfer_log.yaml", "tasks.yaml", } From 1c8de3df1c66399989318f7944f2c9f62fa72978 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 13:43:23 -0600 Subject: [PATCH 23/24] add `worker_short_stories` --- distributed/cluster_dump.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index e4e41197097..b4603d9f1e3 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -261,6 +261,26 @@ def worker_stories(self, *key_or_stimulus_id: str, datetimes: bool = True) -> di if isinstance(worker_dump, dict) and (wlog := worker_dump.get("log")) } + def worker_short_stories(self, key_or_stimulus_id: str) -> dict[str, list[str]]: + """ + Returns + ------- + stories : dict + A dict of the short story for the key or stimulus ID, for each worker. + Workers missing from the logs are dropped. + """ + return { + addr: [ + x[2] + for x in story + if x[0] == key_or_stimulus_id or x[-1] == key_or_stimulus_id + ] + for addr, story in self.worker_stories( + key_or_stimulus_id, datetimes=False + ).items() + if story + } + def worker_stories_to_yamls( self, root_dir: str | Path | None = None, *key_or_stimulus_id: str ) -> None: From 5bbaaa49b78e71b5c9036060982cac9ed8f2d0c6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 23 Sep 2022 13:32:28 -0600 Subject: [PATCH 24/24] sometimes keys are null somehow? --- distributed/cluster_dump.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index b4603d9f1e3..c545a0987af 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -5,6 +5,7 @@ import threading from collections.abc import Mapping from contextlib import contextmanager, nullcontext +from functools import partial from pathlib import Path from typing import ( IO, @@ -99,7 +100,7 @@ def load_cluster_dump(url: str, **kwargs: Any) -> dict: """ if url.endswith(".msgpack.gz"): mode = "rb" - reader = msgpack.unpack + reader = partial(msgpack.unpack, strict_map_key=False) elif url.endswith(".yaml"): import yaml