Skip to content

Commit

Permalink
Embed scatter variable in run id & subdirectory name (#599)
Browse files Browse the repository at this point in the history
Including a heuristic stringification of the scatter variable facilitates navigation of the logs and run directory tree. The tag length is controlled by [scheduler] scatter_max_tag, which can be set to 0 to disable the new tagging.
  • Loading branch information
mlin authored Nov 3, 2022
1 parent ec4ccee commit 0dc9a21
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 13 deletions.
8 changes: 8 additions & 0 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ container_backend = docker_swarm
# new tasks, but leave those still running to succeed or fail on their own. The latter mode might
# be useful with call caching (see below) to avoid discarding all work done by other tasks.
fail_fast = true
# When scattering over an array, attempt to derive a stringification of the scatter variable, of
# length at most scatter_tag_max, to embed in the "run ID" of calls inside the scatter. This
# facilitates navigation of logs and subdirectory paths, by tagging which item is being processed
# in the i'th scatter iteration. But the tag is derived heuristically, depending on the scatter
# variable type. If the derived tags are undesirable then set scatter_tag_max <= 0, disabling them
# in favor of the scatter array index only.
# (New in v1.8.0)
scatter_tag_max = 16


[docker_swarm]
Expand Down
95 changes: 83 additions & 12 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import traceback
import pickle
import threading
import regex
from concurrent import futures
from typing import Optional, List, Set, Tuple, NamedTuple, Dict, Union, Iterable, Callable, Any
from contextlib import ExitStack
Expand Down Expand Up @@ -353,7 +354,14 @@ def _do_job(
)

if isinstance(job.node, (Tree.Scatter, Tree.Conditional)):
for newjob in _scatter(self.workflow, job.node, env, job.scatter_stack, stdlib):
for newjob in _scatter(
self.workflow,
job.node,
env,
job.scatter_stack,
stdlib,
cfg.get_int("scheduler", "scatter_tag_max"),
):
self._schedule(newjob)
# the section node itself has no outputs, so return an empty env
return Env.Bindings()
Expand Down Expand Up @@ -451,6 +459,7 @@ def _scatter(
env: Env.Bindings[Value.Base],
scatter_stack: List[Tuple[str, Env.Binding[Value.Base]]],
stdlib: StdLib.Base,
max_tag: int,
) -> Iterable[_Job]:
# we'll be tracking, for each body node ID, the IDs of the potentially multiple corresponding
# jobs scheduled
Expand All @@ -476,6 +485,7 @@ def _scatter(

# for each array element, schedule an instance of the body subgraph
last_scatter_indices = None
scatter_tags = _scatter_tags(array, max_tag)
for i, array_i in enumerate(array):

# scatter bookkeeping: format the index as a left-zero-padded string so that it'll sort
Expand All @@ -485,6 +495,8 @@ def _scatter(
assert isinstance(section, Tree.Scatter)
str_i = str(i).zfill(digits)
assert len(str_i) <= digits
if scatter_tags[i]:
str_i += "-" + scatter_tags[i]
scatter_stack_i = scatter_stack_i + [(str_i, Env.Binding(section.variable, array_i))]
scatter_indices_i = [p[0] for p in scatter_stack_i]
assert last_scatter_indices is None or last_scatter_indices < scatter_indices_i
Expand Down Expand Up @@ -542,24 +554,83 @@ def _append_scatter_indices(node_id: str, scatter_indices: List[str]) -> str:
return "-".join([node_id] + scatter_indices)


def _scatter_tags(array: List[Optional[Value.Base]], max_tag: int) -> List[str]:
# Given an array of values, compute an array of names for each item that strives for useful
# human-readability, and can be embedded in the run id/directory safely. This is to help the
# operator navigate the run logs & directory tree, looking for specific items.

# stringify each item and split each string into a list of alphanumeric components
any = False
delimiters = regex.compile("[^0-9a-zA-Z]+")
items = []
for i, array_i in enumerate(array):
if (
isinstance(array_i, Value.Base)
and not isinstance(array_i, Value.Null)
and not (isinstance(array_i, Value.Int) and array_i.value == i)
):
items.append(delimiters.split(json.dumps(array_i.json)))
any = True
else:
items.append([])
if not any or max_tag <= 0:
return [""] * len(array)

# compute & remove those lists' longest common prefix & suffix
lcp = _longest_common_prefix(items)
if lcp:
items = [item[lcp:] for item in items]
lcs = _longest_common_prefix([list(reversed(item)) for item in items])
if lcs:
items = [item[:-lcs] for item in items]

# concatenate remaining components
tags = ["".join(item) for item in items]

# truncate to first max_tag characters
tags_pfx = [tag[:max_tag].rstrip("-") for tag in tags]
if sum(1 for tag in tags_pfx if len(tag)) == sum(1 for tag in set(tags_pfx) if len(tag)):
return tags_pfx
# if those weren't unique, then try suffix; if those aren't unique either, then give up to
# avoid generating misleading tags.
tags_sfx = [tag[-max_tag:].lstrip("-") for tag in tags]
return (
tags_sfx
if sum(1 for tag in tags_sfx if len(tag)) == sum(1 for tag in set(tags_sfx) if len(tag))
else ([""] * len(items))
)


def _longest_common_prefix(items: List[List[str]]) -> int:
ans = 0
for i, item in enumerate(items):
if i == 0:
ans = len(item)
else:
ans = min(ans, len(item))
prev_item = items[i - 1]
j = 0
while j < ans and item[j] == prev_item[j]:
j += 1
ans = j
if ans == 0:
return 0
return ans


def _gather(
gather: Tree.Gather, dependencies: Dict[str, Env.Bindings[Value.Base]]
) -> Env.Bindings[Value.Base]:
# important: the dependency job IDs must sort lexicographically in the desired array order!
dep_ids = sorted(dependencies.keys())

# since it would be so awful to permute the array silently, lets verify the ID order
if isinstance(gather.section, Tree.Scatter):
dep_id_prefix = None
dep_id_values = []
for dep_id in dep_ids:
dep_id_fields = dep_id.split("-")
if dep_id_prefix is not None:
assert dep_id_fields[:-1] == dep_id_prefix
else:
dep_id_prefix = dep_id_fields[:-1]
dep_id_values.append(int(dep_id_fields[-1]))
assert dep_id_values == list(range(len(dep_ids)))
if len(dep_ids) > 1:
assert isinstance(gather.section, Tree.Scatter)
dep_ids_split = [dep_id.split("-") for dep_id in dep_ids]
dep_id_lcp = _longest_common_prefix(dep_ids_split)
for i, dep_id_i in enumerate(dep_ids_split):
assert i == int(dep_id_i[dep_id_lcp])

# figure out names of the values to gather, either the name if the referenced decl,
# or each output of the referenced call.
Expand Down
5 changes: 4 additions & 1 deletion stubs/regex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterator, Tuple
from typing import Any, Iterator, Tuple, List

POSIX: int
UNICODE: int
Expand All @@ -11,6 +11,9 @@ def fullmatch(self, string: str) -> Any:
def sub(self, repl: str, string: str) -> str:
...

def split(self, string: str) -> List[str]:
...

def compile(pattern, flags=0, **kwargs) -> Pattern:
...

Expand Down
38 changes: 38 additions & 0 deletions tests/test_6workflowrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,44 @@ def test_scatters(self):
{"left": 3, "right": 1}
])

def test_scatter_tags(self):
outputs = self._test_workflow("""
version 1.0
workflow scatter_tags {
input {
Array[String] firsts = ["Alyssa P.", "Ben"]
Array[String] lasts = ["Hacker", "Bitdiddle0123456", "Bitdiddle01234567"]
}
scatter (first in firsts) {
scatter (last in lasts) {
call hello {
input:
who = "~{first} ~{last}"
}
}
}
output {
Array[File] messages = flatten(hello.message)
}
}
task hello {
input {
String who
}
command {
echo "Hello, ~{who}!" > message.txt
}
output {
File message = "message.txt"
}
}
""")
for tag in ("-0-AlyssaP-0-Hacker/", "-0-AlyssaP-1-Bitdiddle0123456/", "-0-AlyssaP-2-itdiddle01234567/",
"-1-Ben-0-Hacker/", "-1-Ben-1-Bitdiddle0123456/", "-1-Ben-2-itdiddle01234567/"):
self.assertTrue(next(True for fn in outputs["messages"] if tag in os.path.realpath(fn)))

def test_ifs(self):
outputs = self._test_workflow("""
version 1.0
Expand Down

0 comments on commit 0dc9a21

Please sign in to comment.