Skip to content

Commit

Permalink
Merge e6913bd into f7627fe
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 5, 2020
2 parents f7627fe + e6913bd commit 8e3a2b8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 104 deletions.
218 changes: 124 additions & 94 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,138 +1209,168 @@ def ensure_cromwell_jar(jarfile=None):
def fill_localize_subparser(subparsers):
localize_parser = subparsers.add_parser(
"localize",
help="Download URIs found in Cromwell-style input JSON and rewrite",
description=f"Download URIs found in Cromwell-style input JSON, and rewrite it with the local filenames.",
help="Download URI input files to local cache",
description=f"Prime the local file download cache with URI File inputs found in Cromwell-style input JSON",
)
localize_parser.add_argument(
"wdlfile", metavar="DOC.wdl", type=str, help="WDL document filename/URI"
"wdlfile",
metavar="DOC.wdl",
type=str,
help="WDL document filename/URI",
default=None,
nargs="?",
)
localize_parser.add_argument(
"infile",
metavar="INPUT.json",
type=str,
help="input JSON filename (- for standard input) or literal object",
default=None,
nargs="?",
)
localize_parser.add_argument(
"name",
metavar="NAME",
type=str,
nargs="?",
default=None,
help="short name to include in local paths (default: basename of JSON file)",
"--task",
metavar="TASK_NAME",
help="name of task (for WDL documents with multiple tasks & no workflow)",
)
localize_parser.add_argument(
"-d",
"--dir",
metavar="DIR",
dest="run_dir",
help="base directory in which to store downloaded files",
"--uri",
metavar="URI",
action="append",
help="additional URI to process; or, omit WDL & JSON and just specify one or more --uri",
)
localize_parser.add_argument(
"-o",
metavar="LOCAL.json",
type=str,
dest="outfile",
help="write transformed JSON to file instead of standard output",
"--no-cache",
action="store_true",
help="if a URI is already cached, re-download and replace it",
)
localize_parser.add_argument(
"--task",
metavar="TASK_NAME",
help="name of task (for WDL documents with multiple tasks & no workflow)",
"--cfg",
metavar="FILE",
type=str,
default=None,
help="configuration file to load (in preference to file named by MINIWDL_CFG environment, or XDG_CONFIG_{HOME,DIRS}/miniwdl.cfg)",
)
return localize_parser


def localize(
wdlfile, infile, name=None, outfile=None, task=None, path=None, check_quant=True, **kwargs
wdlfile=None,
infile=None,
uri=None,
no_cache=False,
task=None,
cfg=None,
path=None,
check_quant=True,
**kwargs,
):
# load WDL document
doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source)

# parse the provided input JSON
logging.basicConfig(level=NOTICE_LEVEL)
logger = logging.getLogger("miniwdl-localize")
logger.critical(
"DEPRECATION NOTICE: `miniwdl localize` will soon be retired, superseded by the configurable URI download cache in >= v0.7.x"
install_coloredlogs(logger)

cfg_arg = None
if cfg:
assert os.path.isfile(cfg), "--cfg file not found"
cfg_arg = [cfg]
cfg = runtime.config.Loader(logger, filenames=cfg_arg)
cache_cfg = cfg["download_cache"]
original_get = cache_cfg.get_bool("get")
if original_get and no_cache:
cfg.override({"download_cache": {"get": False}})
logger.notice(
_(
"effective configuration",
put=cache_cfg.get_bool("put"),
get=cache_cfg.get_bool("get"),
dir=cache_cfg["dir"],
ignore_query=cache_cfg.get_bool("ignore_query"),
enable_patterns=cache_cfg.get_list("enable_patterns"),
disable_patterns=cache_cfg.get_list("disable_patterns"),
)
)
cfg = runtime.config.Loader(logger)

def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)
uri = uri or []
uri = set(uri)

target, input_env, input_json = runner_input(
doc, [], infile, [], task=task, check_required=False, file_found=file_found
)
if infile:
# load WDL document
doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source)

# read input JSON
name = name or os.path.basename(infile).split(".")[0]
def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)

# scan for Files that appear to be downloadable URIs
def scan(x):
if isinstance(x, Value.File) and runtime.download.able(cfg, x.value):
yield x.value
for y in x.children:
yield from scan(y)
target, input_env, input_json = runner_input(
doc, [], infile, [], task=task, check_required=False, file_found=file_found
)

uris = set()
for b in input_env:
uris |= set(scan(b.value))
# scan for Files that appear to be downloadable URIs
def scan(x):
if isinstance(x, Value.File) and runtime.download.able(cfg, x.value):
yield x.value
for y in x.children:
yield from scan(y)

if uris:
# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)
for b in input_env:
uri |= set(scan(b.value))

# cheesy trick: provide the list of URIs as File inputs to a dummy workflow, causing the
# runtime to download them
localizer_wdl = (
"""
version 1.0
workflow localize_%s {
input {
Array[File] uris
}
output {
Array[File] files = uris
}
}
"""
% name
if not uri:
logger.warning(
"nothing to do; if inputs use special URI schemes, make sure necessary downloader plugin(s) are installed and enabled"
)
localizer = parse_document(localizer_wdl)
localizer.typecheck()
cfg = runtime.config.Loader(logger)
subdir, outputs = runtime.run(
cfg,
localizer.workflow,
values_from_json({"uris": list(uris)}, localizer.workflow.available_inputs),
sys.exit(0)

if not cache_cfg.get_bool("put"):
logger.error(
'configuration section "download_cache", option "put" (env MINIWDL__DOWNLOAD_CACHE__PUT) must be true for this operation to be effective'
)
sys.exit(2)

# recover the mapping of URIs to downloaded files
uri_to_file = {}
assert isinstance(outputs["files"], Value.Array)
for uri, elt in zip(uris, outputs["files"].value):
assert isinstance(elt, Value.File) and os.path.isfile(elt.value)
uri_to_file[uri] = elt.value

# rewrite the input Env to replace URIs with filenames
def rewrite(x):
if isinstance(x, Value.File) and x.value in uri_to_file:
x.value = uri_to_file[x.value]
for y in x.children:
rewrite(y)
cache = runtime.cache.CallCache(cfg)
disabled = [u for u in uri if not cache.download_path(u)]
if disabled:
logger.notice(_("URIs found but not cacheable per configuration", uri=disabled))
uri = list(uri - set(disabled))

for b in input_env:
rewrite(b.value)
if not uri:
logger.warning("nothing to do; check configured enable_patterns and disable_patterns")
sys.exit(0)
logger.notice(_("starting downloads", uri=uri))

# write out the possibly-modified JSON
result_json = values_to_json(
input_env, namespace=(target.name if isinstance(target, Workflow) else "")
# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)

# cheesy trick: provide the list of URIs as File inputs to a dummy workflow, causing the
# runtime to download & cache them
localizer_wdl = """
version 1.0
workflow localize {
input {
Array[File] uris
}
output {
Array[File] files = uris
}
}
"""
localizer = parse_document(localizer_wdl)
localizer.typecheck()
cfg = runtime.config.Loader(logger)
subdir, outputs = runtime.run(
cfg,
localizer.workflow,
values_from_json({"uris": uri}, localizer.workflow.available_inputs),
run_dir=os.environ.get("TMPDIR", "/tmp"),
_cache=cache,
)
if outfile in [None, "", "-"]:
print(json.dumps(result_json, indent=2))
else:
with open(outfile, "w") as outp:
print(json.dumps(result_json, indent=2), file=outp)

logger.notice(
_("success", files=[os.path.realpath(p) for p in values_to_json(outputs)["files"]])
)
if not original_get:
logger.warning(
"""future runs won't use the cache unless configuration section "download_cache", key "get" (env MINIWDL__DOWNLOAD_CACHE__GET) is set to true"""
)


def die(msg, status=2):
Expand Down
8 changes: 5 additions & 3 deletions docs/runner_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ The download cache functionality must be enabled in the configuration. The relev

Details:

* Enabling the cache changes where downloaded files are stored: if the cache is enabled, they're stored in the cache directory; otherwise, they're stored under the triggering run directory.
* The cache is keyed by URI: when a workflow starts with a URI file input, a cached file is used if previously stored for the same URI. This doesn't depend on which task/workflow is running, and doesn't use checksums or timestamps of the file contents. Therefore, the cache should only be used with immutable remote files, or if there's no need for immediate coherence with remote content changes.
* Miniwdl doesn't internally manage the cache's total storage usage; but to support such processes, it updates the access timestamp (atime) and opens a shared `flock()` on any cached file it's using. The script [examples/clean_download_cache.sh](https://github.com/chanzuckerberg/miniwdl/blob/master/examples/clean_download_cache.sh) illustrates a process to evict the least-recently used cache files that can be exclusively flocked (the latter condition needed only if the cleaner must run alongside concurrent workflows).
* Enabling the cache changes **where downloaded files are stored**: if the cache is enabled, they're stored in the cache directory; otherwise, they're stored under the triggering run directory.
* The cache is **keyed by URI**: when a workflow starts with a URI file input, a cached file is used if previously stored for the same URI. This doesn't depend on which task/workflow is running, and doesn't use checksums or timestamps of the file contents. Therefore, the cache should only be used with immutable remote files, or if there's no need for immediate coherence with remote content changes.
* If needed, the `miniwdl localize` subcommand can **"prime" the local cache** with URIs found in a given JSON input template (or a simple list of URIs) before actually running any workflow.
* Cached files that are no longer needed can simply be **deleted from the cache directory**, once they're no longer in use by a running workflow.
* Miniwdl itself doesn't delete files from the cache, but to support a **cleanup process**, it updates the access timestamp (atime) and opens a shared `flock()` on any cached file it's using. The script [examples/clean_download_cache.sh](https://github.com/chanzuckerberg/miniwdl/blob/master/examples/clean_download_cache.sh) illustrates a process to shrink the cache to a desired maximum size, by evicting the least-recently used files that can be exclusively flocked (the latter condition needed only if the cleaner must run alongside concurrent workflows).
11 changes: 4 additions & 7 deletions tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,7 @@ workflow my_workflow {
}
}
EOF
$miniwdl localize -o local_inputs.json localize_me.wdl uri_inputs.json
fn=$(jq -r '.["my_workflow.files"][0]' local_inputs.json)
test -f "$fn"
is "$?" "0" "localized robots.txt"
fn=$(jq -r '.["my_workflow.files"][1]' local_inputs.json)
test -f "$fn"
is "$?" "0" "localized alyssa_ben.txt"
MINIWDL__DOWNLOAD_CACHE__PUT=true MINIWDL__DOWNLOAD_CACHE__DIR="${DN}/test_localize/cache" MINIWDL__DOWNLOAD_CACHE__ENABLE_PATTERNS='["*"]' MINIWDL__DOWNLOAD_CACHE__DISABLE_PATTERNS='["*/alyssa_ben.txt"]' \
$miniwdl localize localize_me.wdl uri_inputs.json --uri gs://gcp-public-data-landsat/LC08/01/044/034/LC08_L1GT_044034_20130330_20170310_01_T2/LC08_L1GT_044034_20130330_20170310_01_T2_MTL.txt > localize.stdout
is "$?" "0" "localize exit code"
is "$(find "${DN}/test_localize/cache/files" -type f | wc -l)" "2" "localize cache"

0 comments on commit 8e3a2b8

Please sign in to comment.