Skip to content

Commit

Permalink
runner: set _LAST symlink to last-created run directory (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Apr 8, 2020
1 parent 57fbf69 commit fd8ab70
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 20 deletions.
5 changes: 5 additions & 0 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ def runner(
logger = logging.getLogger("miniwdl-run")
install_coloredlogs(logger)

if os.geteuid() == 0:
logger.warning(
"running as root; non-root users should be able to `miniwdl run` as long as they're in the `docker` group"
)

# load configuration & apply command-line overrides
cfg_arg = None
if cfg:
Expand Down
38 changes: 27 additions & 11 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import copy
import fcntl
import subprocess
from time import sleep
from datetime import datetime
from contextlib import contextmanager, AbstractContextManager
Expand Down Expand Up @@ -183,7 +184,7 @@ def write_values_json(


@export
def provision_run_dir(name: str, run_dir: Optional[str] = None) -> str:
def provision_run_dir(name: str, run_dir: Optional[str], last_link: bool = False) -> str:
here = (
(run_dir in [".", "./"] or run_dir.endswith("/.") or run_dir.endswith("/./"))
if run_dir
Expand All @@ -192,27 +193,41 @@ def provision_run_dir(name: str, run_dir: Optional[str] = None) -> str:
run_dir = os.path.abspath(run_dir or os.getcwd())

if here:
# user wants to use run_dir exactly
os.makedirs(run_dir, exist_ok=True)
return run_dir

# create timestamp-named directory
now = datetime.today()
run_dir2 = os.path.join(run_dir, now.strftime("%Y%m%d_%H%M%S") + "_" + name)
new_dir = os.path.join(run_dir, now.strftime("%Y%m%d_%H%M%S") + "_" + name)
try:
os.makedirs(run_dir2, exist_ok=False)
return run_dir2
os.makedirs(new_dir, exist_ok=False)
except FileExistsError:
pass
new_dir = None

while True:
run_dir2 = os.path.join(
while not new_dir:
# it already exists; try adding milliseconds
new_dir = os.path.join(
run_dir,
now.strftime("%Y%m%d_%H%M%S_") + str(int(now.microsecond / 1000)).zfill(3) + "_" + name,
)
try:
os.makedirs(run_dir2, exist_ok=False)
return run_dir2
os.makedirs(new_dir, exist_ok=False)
except FileExistsError:
new_dir = None
sleep(1e-3)
assert new_dir

# update the _LAST link
if last_link:
last_link_name = os.path.join(run_dir, "_LAST")
if not os.path.lexists(last_link_name) or os.path.islink(last_link_name):
new_dir_basename = os.path.basename(new_dir)
tmp_link_name = last_link_name + "." + new_dir_basename
os.symlink(new_dir_basename, tmp_link_name)
os.rename(tmp_link_name, last_link_name)

return new_dir


@export
Expand Down Expand Up @@ -700,8 +715,9 @@ def flock( # pyre-fixme
openfile.fileno(), ns=(int(time.time() * 1e9), file_st.st_mtime_ns)
)

# The filename link could have been replaced or removed in the instant between
# our open() and flock() syscalls.
# Even if all concurrent processes obey the advisory flocks, the filename link
# could have been replaced or removed in the duration between our open() and
# fcntl() syscalls.
# - if it was removed, the following os.stat will trigger FileNotFoundError,
# which is reasonable to propagate.
# - if it was replaced, the subsequent condition won't hold, and we'll loop
Expand Down
1 change: 0 additions & 1 deletion WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ copy_input_files = false
# arising from files with multiple hardlinks! See also [task_runtime] delete_work, below.
output_hardlinks = false


[task_runtime]
# Effective maximum values of runtime.cpu and runtime.memory (bytes), which evaluated values are
# rounded down to. 0 = detect host resources, -1 = do not apply a limit.
Expand Down
4 changes: 1 addition & 3 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@ def _run(
# poll for container exit
while exit_code is None:
# spread out work over the GIL
# TODO: adaptive interval before container starts running (poll less frequently
# if it's already been waiting a long time)
time.sleep(random.uniform(1.0, 2.0))
if terminating():
raise Terminated() from None
Expand Down Expand Up @@ -678,7 +676,7 @@ def run_local_task(
# provision run directory and log file
run_id = run_id or task.name
_run_id_stack = _run_id_stack or []
run_dir = provision_run_dir(task.name, run_dir)
run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)

logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
logger = logging.getLogger(".".join(logger_prefix))
Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def run_local_workflow(
# provision run directory and log file
run_id = run_id or workflow.name
_run_id_stack = _run_id_stack or []
run_dir = provision_run_dir(workflow.name, run_dir)
run_dir = provision_run_dir(workflow.name, run_dir, last_link=not _run_id_stack)

logger_prefix = logger_prefix or ["wdl"]
logger_id = logger_prefix + ["w:" + run_id]
Expand Down
4 changes: 3 additions & 1 deletion docs/runner_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ The top-level run directory also contains:
* `rerun` can be "sourced" to run the WDL (as found in the original location, possibly updated) using the same inputs
* The top-level `workflow.log` file is "flocked" while `miniwdl run` is still in progress (`task.log` if running a task directly)

The miniwdl source repository includes several [example scripts](https://github.com/chanzuckerberg/miniwdl/tree/master/examples) illustrating how this structure can inform production automation (e.g. retrieving error messages, uploading output files).
When miniwdl creates a new timestamp-named subdirectory for a run, it also creates a symbolic link `_LAST` to it in the same parent directory. (For convenience referring to the most recent run; should not be relied upon if multiple runs can start concurrently.)

The miniwdl source repository includes several [example scripts](https://github.com/chanzuckerberg/miniwdl/tree/master/examples) illustrating how the structures described here can inform production automation (e.g. retrieving error messages, uploading output files).

## Configuration

Expand Down
7 changes: 4 additions & 3 deletions tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap
export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH"
miniwdl="python3 -m WDL"

plan tests 52
plan tests 53

$miniwdl run_self_test
is "$?" "0" "run_self_test"
Expand Down Expand Up @@ -223,9 +223,9 @@ task second {
}
EOF

$miniwdl run multitask.wdl --task second | tee stdout
$miniwdl run multitask.wdl --task second
is "$?" "0" "multitask"
is "$(jq -r '.outputs["second.msg"]' stdout)" "two" "multitask stdout"
is "$(jq -r '.["second.msg"]' _LAST/outputs.json)" "two" "multitask stdout & _LAST"

cat << 'EOF' > mv_input_file.wdl
version 1.0
Expand All @@ -244,6 +244,7 @@ EOF

$miniwdl run --copy-input-files mv_input_file.wdl file=quick
is "$?" "0" "copy input files"
is "$(basename `jq -r '.["mv_input_file.xxx"]' _LAST/outputs.json`)" "xxx" "updated _LAST"

cat << 'EOF' > uri_inputs.json
{"my_workflow.files": ["https://google.com/robots.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/master/tests/alyssa_ben.txt"]}
Expand Down

0 comments on commit fd8ab70

Please sign in to comment.