Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9d95e3d
improve ergonomics of `to_yamls`
gjoseph92 Mar 25, 2022
1cc6908
update `dump_cluster_state` docs
gjoseph92 Mar 25, 2022
8dbe030
expand `incoming_transfer_log`
gjoseph92 Mar 25, 2022
69fb54c
just use worker addrs, it's more common
gjoseph92 Mar 25, 2022
18aeb3d
daemon thread for background dump
gjoseph92 Mar 25, 2022
eae4069
worker stories by worker and worker story yamls
gjoseph92 Mar 25, 2022
c9aa85b
i+1 in prints
gjoseph92 Mar 28, 2022
e41d6ed
YAML block literals for logs
gjoseph92 Mar 28, 2022
1f0871f
split outgoing transfer log and pending data
gjoseph92 Mar 28, 2022
a4b5013
format timestamps in `to_yamls`
gjoseph92 Mar 28, 2022
4b3e0c2
separate clients from scheduler general
gjoseph92 Mar 28, 2022
7f35ba8
handle failed workers in to_yamls
gjoseph92 Apr 12, 2022
fb8a0a4
simplify `scheduler_story`
gjoseph92 Apr 13, 2022
b96eb5f
datetimes in scheduler_story
gjoseph92 Apr 14, 2022
8ad979d
scheduler short stories
gjoseph92 Apr 14, 2022
ae2b35c
timestamps in scheduler events
gjoseph92 Apr 14, 2022
19a1622
Merge remote-tracking branch 'upstream/main' into cluster-dump-helpers
gjoseph92 Jul 19, 2022
7717ee9
fix literal
gjoseph92 Jul 19, 2022
9c69c25
`enumerate(..., 1)` trick
gjoseph92 Jul 19, 2022
3716dfa
add `processing_on`
gjoseph92 Jul 19, 2022
83d66c1
fix tests
gjoseph92 Jul 19, 2022
47a63a7
fix `test_cluster_dump_plugin`
gjoseph92 Jul 19, 2022
2b0b23d
and as separate files
gjoseph92 Aug 4, 2022
1c8de3d
add `worker_short_stories`
gjoseph92 Aug 4, 2022
5bbaaa4
sometimes keys are null somehow?
gjoseph92 Sep 23, 2022
c3cbee3
Merge remote-tracking branch 'upstream/main' into cluster-dump-helpers
gjoseph92 Oct 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions distributed/_stories.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,73 @@
from __future__ import annotations

from typing import Iterable
from datetime import datetime
from typing import Iterable, TypeVar


def scheduler_story(
keys_or_stimuli: set[str], transition_log: Iterable[tuple]
) -> list[tuple]:
keys: set, transition_log: Iterable, datetimes: bool = False
) -> list:
"""Creates a story from the scheduler transition log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys_or_stimuli : set[str]
Task keys or stimulus_id's
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The scheduler transition log
datetimes : bool
Whether to convert timestamps into `datetime.datetime` objects
(default False)

Returns
-------
story : list[tuple]
story : list
"""
return [
t
msg_with_datetime(t) if datetimes else t
for t in transition_log
if t[0] in keys_or_stimuli or keys_or_stimuli.intersection(t[3])
if t[0] in keys or keys.intersection(t[3])
]


def worker_story(keys_or_stimuli: set[str], log: Iterable[tuple]) -> list:
def worker_story(keys: set, log: Iterable, datetimes: bool = False) -> list:
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.

Parameters
----------
keys_or_stimuli : set[str]
Task keys or stimulus_id's
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log
datetimes : bool
Whether to convert timestamps into `datetime.datetime` objects
(default False)

Returns
-------
story : list[str]
story : list
"""
return [
msg
msg_with_datetime(msg) if datetimes else msg
for msg in log
if any(key in msg for key in keys_or_stimuli)
if any(key in msg for key in keys)
or any(
key in c
for key in keys_or_stimuli
for c in msg
if isinstance(c, (tuple, list, set))
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]


T = TypeVar("T", list, tuple)


def msg_with_datetime(msg: T, idx: int = -1) -> T:
if idx < 0:
idx = len(msg) + idx
dt = msg[idx]
try:
dt = datetime.fromtimestamp(dt)
except (TypeError, ValueError):
pass
return msg[:idx] + type(msg)((dt,)) + msg[idx + 1 :]
20 changes: 8 additions & 12 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4071,21 +4071,17 @@ def dump_cluster_state(
Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default),
the output will be stored in a gzipped file as msgpack.

To read::
``"msgpack"`` is generally preferred, since it's far faster to write
and far smaller on disk. If you want to examine the dump in human-readable
format, consider creating the dump in msgpack format, then (in an IPython session
or separate script), using `DumpArtefact.to_yamls` to create a directory
tree of separate YAML files, which are easier to work with.

import gzip, msgpack
with gzip.open("filename") as fd:
state = msgpack.unpack(fd)
To read::

or::
from distributed.cluster_dump import DumpArtefact
dump = DumpArtefact.from_url("filename")

import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
with open("filename") as fd:
state = yaml.load(fd, Loader=Loader)
**storage_options:
Any additional arguments to :func:`fsspec.open` when writing to a URL.
"""
Expand Down
Loading