Skip to content

Commit

Permalink
Merge d3d874f into f7627fe
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 12, 2020
2 parents f7627fe + d3d874f commit 706ec9b
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 111 deletions.
218 changes: 124 additions & 94 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,138 +1209,168 @@ def ensure_cromwell_jar(jarfile=None):
def fill_localize_subparser(subparsers):
localize_parser = subparsers.add_parser(
"localize",
help="Download URIs found in Cromwell-style input JSON and rewrite",
description=f"Download URIs found in Cromwell-style input JSON, and rewrite it with the local filenames.",
help="Download URI input files to local cache",
description=f"Prime the local file download cache with URI File inputs found in Cromwell-style input JSON",
)
localize_parser.add_argument(
"wdlfile", metavar="DOC.wdl", type=str, help="WDL document filename/URI"
"wdlfile",
metavar="DOC.wdl",
type=str,
help="WDL document filename/URI",
default=None,
nargs="?",
)
localize_parser.add_argument(
"infile",
metavar="INPUT.json",
type=str,
help="input JSON filename (- for standard input) or literal object",
default=None,
nargs="?",
)
localize_parser.add_argument(
"name",
metavar="NAME",
type=str,
nargs="?",
default=None,
help="short name to include in local paths (default: basename of JSON file)",
"--task",
metavar="TASK_NAME",
help="name of task (for WDL documents with multiple tasks & no workflow)",
)
localize_parser.add_argument(
"-d",
"--dir",
metavar="DIR",
dest="run_dir",
help="base directory in which to store downloaded files",
"--uri",
metavar="URI",
action="append",
help="additional URI to process; or, omit WDL & JSON and just specify one or more --uri",
)
localize_parser.add_argument(
"-o",
metavar="LOCAL.json",
type=str,
dest="outfile",
help="write transformed JSON to file instead of standard output",
"--no-cache",
action="store_true",
help="if a URI is already cached, re-download and replace it",
)
localize_parser.add_argument(
"--task",
metavar="TASK_NAME",
help="name of task (for WDL documents with multiple tasks & no workflow)",
"--cfg",
metavar="FILE",
type=str,
default=None,
help="configuration file to load (in preference to file named by MINIWDL_CFG environment, or XDG_CONFIG_{HOME,DIRS}/miniwdl.cfg)",
)
return localize_parser


def localize(
wdlfile, infile, name=None, outfile=None, task=None, path=None, check_quant=True, **kwargs
wdlfile=None,
infile=None,
uri=None,
no_cache=False,
task=None,
cfg=None,
path=None,
check_quant=True,
**kwargs,
):
# load WDL document
doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source)

# parse the provided input JSON
logging.basicConfig(level=NOTICE_LEVEL)
logger = logging.getLogger("miniwdl-localize")
logger.critical(
"DEPRECATION NOTICE: `miniwdl localize` will soon be retired, superseded by the configurable URI download cache in >= v0.7.x"
install_coloredlogs(logger)

cfg_arg = None
if cfg:
assert os.path.isfile(cfg), "--cfg file not found"
cfg_arg = [cfg]
cfg = runtime.config.Loader(logger, filenames=cfg_arg)
cache_cfg = cfg["download_cache"]
original_get = cache_cfg.get_bool("get")
if original_get and no_cache:
cfg.override({"download_cache": {"get": False}})
logger.notice(
_(
"effective configuration",
put=cache_cfg.get_bool("put"),
get=cache_cfg.get_bool("get"),
dir=cache_cfg["dir"],
ignore_query=cache_cfg.get_bool("ignore_query"),
enable_patterns=cache_cfg.get_list("enable_patterns"),
disable_patterns=cache_cfg.get_list("disable_patterns"),
)
)
cfg = runtime.config.Loader(logger)

def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)
uri = uri or []
uri = set(uri)

target, input_env, input_json = runner_input(
doc, [], infile, [], task=task, check_required=False, file_found=file_found
)
if infile:
# load WDL document
doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source)

# read input JSON
name = name or os.path.basename(infile).split(".")[0]
def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)

# scan for Files that appear to be downloadable URIs
def scan(x):
if isinstance(x, Value.File) and runtime.download.able(cfg, x.value):
yield x.value
for y in x.children:
yield from scan(y)
target, input_env, input_json = runner_input(
doc, [], infile, [], task=task, check_required=False, file_found=file_found
)

uris = set()
for b in input_env:
uris |= set(scan(b.value))
# scan for Files that appear to be downloadable URIs
def scan(x):
if isinstance(x, Value.File) and runtime.download.able(cfg, x.value):
yield x.value
for y in x.children:
yield from scan(y)

if uris:
# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)
for b in input_env:
uri |= set(scan(b.value))

# cheesy trick: provide the list of URIs as File inputs to a dummy workflow, causing the
# runtime to download them
localizer_wdl = (
"""
version 1.0
workflow localize_%s {
input {
Array[File] uris
}
output {
Array[File] files = uris
}
}
"""
% name
if not uri:
logger.warning(
"nothing to do; if inputs use special URI schemes, make sure necessary downloader plugin(s) are installed and enabled"
)
localizer = parse_document(localizer_wdl)
localizer.typecheck()
cfg = runtime.config.Loader(logger)
subdir, outputs = runtime.run(
cfg,
localizer.workflow,
values_from_json({"uris": list(uris)}, localizer.workflow.available_inputs),
sys.exit(0)

if not cache_cfg.get_bool("put"):
logger.error(
'configuration section "download_cache", option "put" (env MINIWDL__DOWNLOAD_CACHE__PUT) must be true for this operation to be effective'
)
sys.exit(2)

# recover the mapping of URIs to downloaded files
uri_to_file = {}
assert isinstance(outputs["files"], Value.Array)
for uri, elt in zip(uris, outputs["files"].value):
assert isinstance(elt, Value.File) and os.path.isfile(elt.value)
uri_to_file[uri] = elt.value

# rewrite the input Env to replace URIs with filenames
def rewrite(x):
if isinstance(x, Value.File) and x.value in uri_to_file:
x.value = uri_to_file[x.value]
for y in x.children:
rewrite(y)
cache = runtime.cache.CallCache(cfg)
disabled = [u for u in uri if not cache.download_path(u)]
if disabled:
logger.notice(_("URIs found but not cacheable per configuration", uri=disabled))
uri = list(uri - set(disabled))

for b in input_env:
rewrite(b.value)
if not uri:
logger.warning("nothing to do; check configured enable_patterns and disable_patterns")
sys.exit(0)
logger.notice(_("starting downloads", uri=uri))

# write out the possibly-modified JSON
result_json = values_to_json(
input_env, namespace=(target.name if isinstance(target, Workflow) else "")
# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)

# cheesy trick: provide the list of URIs as File inputs to a dummy workflow, causing the
# runtime to download & cache them
localizer_wdl = """
version 1.0
workflow localize {
input {
Array[File] uris
}
output {
Array[File] files = uris
}
}
"""
localizer = parse_document(localizer_wdl)
localizer.typecheck()
cfg = runtime.config.Loader(logger)
subdir, outputs = runtime.run(
cfg,
localizer.workflow,
values_from_json({"uris": uri}, localizer.workflow.available_inputs),
run_dir=os.environ.get("TMPDIR", "/tmp"),
_cache=cache,
)
if outfile in [None, "", "-"]:
print(json.dumps(result_json, indent=2))
else:
with open(outfile, "w") as outp:
print(json.dumps(result_json, indent=2), file=outp)

logger.notice(
_("success", files=[os.path.realpath(p) for p in values_to_json(outputs)["files"]])
)
if not original_get:
logger.warning(
"""future runs won't use the cache unless configuration section "download_cache", key "get" (env MINIWDL__DOWNLOAD_CACHE__GET) is set to true"""
)


def die(msg, status=2):
Expand Down
36 changes: 26 additions & 10 deletions docs/runner_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Upon run success, the JSON outputs are written to `outputs.json` in the run dire
For tasks, the run directory also contains:

* `task.log`
* `stdout.txt` and `stderr.txt` if the task container started
* `stdout.txt` and `stderr.txt` if the task container started. These files are streamed "live" as the task runs.
* `write_/` with any files written during the evaluation of WDL expressions (e.g. `write_lines()`)
* `download/` with any files downloaded from URIs in task inputs
* `work/` the working directory mounted into the task container, where the command leaves its output files
Expand All @@ -31,7 +31,7 @@ The miniwdl source repository includes several [example scripts](https://github.

## Configuration

The miniwdl runner's configuration loader sources from command-line options, environment variables, and a configuration file, in that priority order. If in doubt, running with `--debug` logs the effective configuration and sources.
The miniwdl runner's configuration loader sources from command-line options, environment variables, and a configuration file, in that priority order.

**default.cfg**

Expand All @@ -46,24 +46,40 @@ Upon starting, miniwdl looks for a custom configuration file in the following lo
1. File named by `--cfg` command-line argument
2. File named by `MINIWDL_CFG` environment variable
3. `XDG_CONFIG_HOME/miniwdl.cfg` (typically `${HOME}/.config/miniwdl.cfg`)
4. `miniwdl.cfg` in [XDG_CONFIG_DIRS](https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html) (typically `/etc/xdg/.config`)
4. `miniwdl.cfg` in [XDG_CONFIG_DIRS](https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html) (typically `/etc/xdg/.config/miniwdl.cfg`)

Miniwdl loads *only the first file found* in this priority order, and merges its options into the defaults; so the file needs only contain selected sections & options to override.
Miniwdl loads *only the first file found* in this priority order, and merges its options into the defaults; so the file needs only contain selected sections & options to override. For example, the following overrides the default docker image (used when a task doesn't specify `runtime.docker`), leaving other defaults in place:

```
$ cat << 'EOF' > ${HOME}/.config/miniwdl.cfg
[task_runtime]
defaults = {
"docker": "ubuntu:19.10"
}
EOF
```

**Environment and command line**

Environment variables following the convention `MINIWDL__SECTION__KEY=VALUE` override cfg file and default options. Note the variable name is all-uppercase and delimited with *double* underscores (as section and key names may contain underscores).
Environment variables following the convention `MINIWDL__SECTION__KEY=VALUE` override individual cfg file and default options. Note the variable name is all-uppercase and delimited with *double* underscores (as section and key names may contain underscores). Reusing the previous example, the default docker image may be changed by setting in miniwdl's environment:

```
MINIWDL__TASK_RUNTIME__DEFAULTS='{"docker":"ubuntu:19.10"}'
```

`miniwdl run` command-line arguments override the other sources.
`miniwdl run` command-line arguments override the other sources. If in doubt, running with `--debug` logs the effective configuration and sources.

## File download cache

Miniwdl automatically downloads input files supplied as URIs. It's also able to cache these downloads in a local directory, so that multiple workflow runs can reference files by URI without downloading them repeatedly. This permits efficient use of WDL input templates referring to public databases by URI (e.g. reference genomes, sequence databases, interval lists), without having to compromise portability by rewriting them with local paths.
Miniwdl automatically downloads input files supplied as URIs instead of locally-mounted filenames. It's also able to cache these downloads in a local directory, so that multiple workflow runs can reference files by URI without downloading them repeatedly. This permits efficient use of WDL input templates referring to public databases by URI (e.g. reference genomes, sequence databases, interval lists), without having to compromise portability by rewriting them with local paths.

The download cache functionality must be enabled in the configuration. The relevant options, exemplified in the [`default.cfg`](https://github.com/chanzuckerberg/miniwdl/blob/master/WDL/runtime/config_templates/default.cfg) template, are in the `download_cache` section, especially `put = true`, `get = true`, and `dir`. Additional options such as `ignore_query`, `enable_patterns`, and `disable_patterns` provide control over which URIs will be cached. If the cache is enabled in persistent configuration, then `--no-cache` disables it for one run.

Details:

* Enabling the cache changes where downloaded files are stored: if the cache is enabled, they're stored in the cache directory; otherwise, they're stored under the triggering run directory.
* The cache is keyed by URI: when a workflow starts with a URI file input, a cached file is used if previously stored for the same URI. This doesn't depend on which task/workflow is running, and doesn't use checksums or timestamps of the file contents. Therefore, the cache should only be used with immutable remote files, or if there's no need for immediate coherence with remote content changes.
* Miniwdl doesn't internally manage the cache's total storage usage; but to support such processes, it updates the access timestamp (atime) and opens a shared `flock()` on any cached file it's using. The script [examples/clean_download_cache.sh](https://github.com/chanzuckerberg/miniwdl/blob/master/examples/clean_download_cache.sh) illustrates a process to evict the least-recently used cache files that can be exclusively flocked (the latter condition needed only if the cleaner must run alongside concurrent workflows).
* The cache is **keyed by URI**: when a workflow starts with a URI file input, a cached file is used if previously stored for the same URI. This doesn't depend on which task/workflow is running, and doesn't use checksums or timestamps of the file contents. Therefore, the cache should only be used with immutable remote files, or if there's no need for immediate coherence with remote content changes.
* Enabling the cache changes **where downloaded files are stored**: if the cache is enabled, they're stored in the cache directory; otherwise, they're stored under the triggering run directory.
* URIs excluded from the cache by the enable/disable patterns fall back to being downloaded under the current run directory. Typically, write the patterns to **include reusable reference data while excluding any run-specific inputs** that might be supplied as URIs.
* If needed, the `miniwdl localize` subcommand can **"prime" the local cache** with URIs found in a given JSON input template (or a simple list of URIs) before actually running any workflow.
* Cached files that are no longer needed can simply be **deleted from the cache directory**, once they're no longer in use by a running workflow.
* Miniwdl itself doesn't delete files from the cache, but to support an **external cleanup process**, it updates the access timestamp (atime) and opens a shared `flock()` on any cached file it's using. The script [examples/clean_download_cache.sh](https://github.com/chanzuckerberg/miniwdl/blob/master/examples/clean_download_cache.sh) illustrates a process to shrink the cache to a desired maximum size, by evicting the least-recently used files that can be exclusively flocked (the latter condition needed only if the cleaner must run alongside concurrent workflows).
11 changes: 4 additions & 7 deletions tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,7 @@ workflow my_workflow {
}
}
EOF
$miniwdl localize -o local_inputs.json localize_me.wdl uri_inputs.json
fn=$(jq -r '.["my_workflow.files"][0]' local_inputs.json)
test -f "$fn"
is "$?" "0" "localized robots.txt"
fn=$(jq -r '.["my_workflow.files"][1]' local_inputs.json)
test -f "$fn"
is "$?" "0" "localized alyssa_ben.txt"
MINIWDL__DOWNLOAD_CACHE__PUT=true MINIWDL__DOWNLOAD_CACHE__DIR="${DN}/test_localize/cache" MINIWDL__DOWNLOAD_CACHE__ENABLE_PATTERNS='["*"]' MINIWDL__DOWNLOAD_CACHE__DISABLE_PATTERNS='["*/alyssa_ben.txt"]' \
$miniwdl localize localize_me.wdl uri_inputs.json --uri gs://gcp-public-data-landsat/LC08/01/044/034/LC08_L1GT_044034_20130330_20170310_01_T2/LC08_L1GT_044034_20130330_20170310_01_T2_MTL.txt > localize.stdout
is "$?" "0" "localize exit code"
is "$(find "${DN}/test_localize/cache/files" -type f | wc -l)" "2" "localize cache"

0 comments on commit 706ec9b

Please sign in to comment.