Skip to content

Commit

Permalink
Merge 46ce4ef into 705ec20
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Oct 30, 2020
2 parents 705ec20 + 46ce4ef commit 31b8283
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 291 deletions.
243 changes: 159 additions & 84 deletions WDL/CLI.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions WDL/StdLib.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def _devirtualize_filename(self, filename: str) -> str:
on the local host. Subclasses may further wish to forbid access to files outside of a
designated directory or whitelist (by raising an exception)
"""
# TODO: add directory: bool argument when we have stdlib functions that take Directory
raise NotImplementedError()

def _write(
Expand All @@ -180,6 +181,7 @@ def _virtualize_filename(self, filename: str) -> str:
from a local path in write_dir, 'virtualize' into the filename as it should present in a
File value
"""
# TODO: add directory: bool argument when we have stdlib functions that take Directory
raise NotImplementedError()

def _override_static(self, name: str, f: Callable) -> None:
Expand Down
78 changes: 60 additions & 18 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess
import shutil
import urllib
import hashlib
from time import sleep
from datetime import datetime
from contextlib import contextmanager, AbstractContextManager
Expand Down Expand Up @@ -174,6 +175,23 @@ def write_atomic(contents: str, filename: str, end: str = "\n") -> None:
os.rename(tn, filename)


@export
def rmtree_atomic(path: str) -> None:
"""
Recursively delete a directory (or single file) after first renaming it to a temporary name in
the same parent directory. The atomic rename step ensures a "partial" directory won't be left
behind in its original location, should anything go wrong whilst deleting its contents.
"""
path = os.path.abspath(path)
assert path and path.strip("/")
tmp_path = os.path.join(
os.path.dirname(path), "._rmtree_atomic_" + hashlib.sha256(path.encode("utf-8")).hexdigest()
)
shutil.rmtree(tmp_path, ignore_errors=True)
os.renames(path, tmp_path)
shutil.rmtree(tmp_path)


@export
def write_values_json(
values_env: "Env.Bindings[Value.Base]", filename: str, namespace: str = "" # noqa
Expand Down Expand Up @@ -560,6 +578,29 @@ def parse_byte_size(s: str) -> int:
return int(N)


@export
def pathsize(path: str) -> int:
"""
get byte size of file, or total size of all files under directory & subdirectories (symlinks
excluded)
"""

if not os.path.isdir(path):
return os.path.getsize(path)

def raiser(exc: OSError):
raise exc

ans = 0
for root, subdirs, files in os.walk(path, onerror=raiser, followlinks=False):
for fn in files:
fn = os.path.join(root, fn)
if not os.path.islink(fn):
ans += os.path.getsize(fn)

return ans


def splitall(path: str) -> List[str]:
"""
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch04s16.html
Expand Down Expand Up @@ -716,7 +757,7 @@ class FlockHolder(AbstractContextManager):
"""

_lock: threading.Lock
_flocks: Dict[str, Tuple[IO[Any], bool]]
_flocks: Dict[str, Tuple[int, bool]]
_entries: int
_logger: logging.Logger

Expand All @@ -739,10 +780,10 @@ def __exit__(self, *exc_details) -> None: # pyre-fixme
if self._entries == 0:
exn = None
with self._lock:
for fn, (fh, exclusive) in self._flocks.items():
for fn, (fd, exclusive) in self._flocks.items():
self._logger.debug(StructuredLogMessage("close", file=fn, exclusive=exclusive))
try:
fh.close()
os.close(fd)
except Exception as exn2:
exn = exn or exn2
self._flocks = {}
Expand All @@ -752,21 +793,21 @@ def __exit__(self, *exc_details) -> None: # pyre-fixme
def __del__(self) -> None:
assert self._entries == 0 and not self._flocks, "FlockHolder context was not exited"

def flock( # pyre-fixme
def flock(
self,
filename: str,
mode: str = "",
mode: Optional[int] = None,
exclusive: bool = False,
wait: bool = False,
update_atime: bool = False,
) -> IO[Any]:
) -> int:
"""
Open a file and an advisory lock on it. The file is closed and the lock released upon exit
of the outermost context. Returns the open file, which the caller shouldn't close (this is
taken care of).
Open a file/directory and an advisory lock on it. The file is closed and the lock released
upon exit of the outermost context. Returns the open file descriptor, which the caller
shouldn't close (this is taken care of).
:param filename: file to open & lock
:param mode: open() mode, default: "r+b" if exclusive else "rb"
:param filename: file/directory to open & lock
:param mode: os.open() mode flags, default: OS.O_RDWR if exclusive else os.O_RDONLY
:param exclusive: True to open an exclusive lock (default: shared lock)
:param wait: True to wait as long as needed to obtain the lock, otherwise (default) raise
OSError if the lock isn't available immediately. Self-deadlock is possible;
Expand All @@ -787,7 +828,10 @@ def flock( # pyre-fixme
)
)
return self._flocks[realfilename][0]
openfile = open(realfilename, mode or ("r+b" if exclusive else "rb"))
openfile = os.open(
realfilename,
mode if mode is not None else (os.O_RDWR if exclusive else os.O_RDONLY),
)
try:
op = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
if not wait:
Expand All @@ -804,11 +848,9 @@ def flock( # pyre-fixme
fcntl.flock(openfile, op)
# the flock will release whenever we ultimately openfile.close()

file_st = os.stat(openfile.fileno())
file_st = os.stat(openfile)
if update_atime:
os.utime(
openfile.fileno(), ns=(int(time.time() * 1e9), file_st.st_mtime_ns)
)
os.utime(openfile, ns=(int(time.time() * 1e9), file_st.st_mtime_ns))

# Even if all concurrent processes obey the advisory flocks, the filename link
# could have been replaced or removed in the duration between our open() and
Expand Down Expand Up @@ -836,9 +878,9 @@ def flock( # pyre-fixme
self._flocks[realfilename] = (openfile, exclusive)
return openfile
except:
openfile.close()
os.close(openfile)
raise
openfile.close()
os.close(openfile) # NOT finally -- for next while-loop iteration


@export
Expand Down
67 changes: 49 additions & 18 deletions WDL/runtime/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import itertools
import os
import logging
import shutil
from pathlib import Path
from typing import Dict, Any, Optional, List
from contextlib import AbstractContextManager
Expand All @@ -24,6 +25,7 @@
StructuredLogMessage as _,
FlockHolder,
write_atomic,
rmtree_atomic,
)


Expand All @@ -36,13 +38,15 @@ class CallCache(AbstractContextManager):
# the course of the current workflow run, but not eligible for persistent caching in future
# runs; we just want to remember them for potential reuse later in the current run.
_workflow_downloads: Dict[str, str]
_workflow_directory_downloads: Dict[str, str]
_lock: Lock

def __init__(self, cfg: config.Loader, logger: logging.Logger):
self._cfg = cfg
self._logger = logger.getChild("CallCache")
self._flocker = FlockHolder(self._logger)
self._workflow_downloads = {}
self._workflow_directory_downloads = {}
self._lock = Lock()
self.call_cache_dir = cfg["call_cache"]["dir"]

Expand Down Expand Up @@ -134,13 +138,13 @@ def put(self, task_key: str, input_digest: str, outputs: Env.Bindings[Value.Base
# specialized caching logic for file downloads (not sensitive to the downloader task details,
# and looked up in URI-derived folder structure instead of sqlite db)

def download_path(self, uri: str) -> Optional[str]:
def download_path(self, uri: str, directory: bool = False) -> Optional[str]:
"""
Based on the input download uri, compute the local file path at which the cached copy
should exist (or None if the uri is not cacheable)
"""
# check if URI is properly formatted & normalize
parts = urlparse(uri)
parts = urlparse(uri.rstrip("/"))
if (
parts.scheme
and parts.netloc
Expand All @@ -158,32 +162,46 @@ def download_path(self, uri: str) -> Optional[str]:
):
(dn, fn) = os.path.split(parts.path)
if fn:
# formulate path
# formulate local subdirectory of the cache directory in which to put the
# cached item, manipulating the URI path to ensure consistent local nesting
# depth (that's assumed by clean_download_cache.sh when it's looking for items
# to clean up)
dn = dn.strip("/")
if dn:
dn = dn.replace("_", "__")
dn = dn.replace("/", "_")
dn = "_" + dn
return os.path.join(
self._cfg["download_cache"]["dir"],
"files",
("dirs" if directory else "files"),
parts.scheme,
parts.netloc,
dn,
fn,
)
return None

def get_download(self, uri: str, logger: Optional[logging.Logger] = None) -> Optional[str]:
def get_download(
self, uri: str, directory: bool = False, logger: Optional[logging.Logger] = None
) -> Optional[str]:
"""
Return filename of the cached download of uri, if available. If so then opens a shared
flock on the local file, which will remain for the life of the CallCache object.
flock on the local file/directory, which will remain for the life of the CallCache object.
"""
if directory:
uri = uri.rstrip("/")
with self._lock:
if uri in self._workflow_downloads:
if directory and uri in self._workflow_directory_downloads:
return self._workflow_directory_downloads[uri]
elif not directory and uri in self._workflow_downloads:
return self._workflow_downloads[uri]
logger = logger.getChild("CallCache") if logger else self._logger
p = self.download_path(uri)
if not (self._cfg["download_cache"].get_bool("get") and p and os.path.isfile(p)):
p = self.download_path(uri, directory=directory)
if not (
self._cfg["download_cache"].get_bool("get")
and p
and ((directory and os.path.isdir(p)) or (not directory and os.path.isfile(p)))
):
logger.debug(_("no download cache hit", uri=uri, cache_path=p))
return None
try:
Expand All @@ -201,47 +219,60 @@ def get_download(self, uri: str, logger: Optional[logging.Logger] = None) -> Opt
)
return None

def put_download(self, uri: str, filename: str, logger: Optional[logging.Logger] = None) -> str:
def put_download(
self,
uri: str,
filename: str,
directory: bool = False,
logger: Optional[logging.Logger] = None,
) -> str:
"""
Move the downloaded file to the cache location & return the new path; or if the uri isn't
cacheable, return the given path.
"""
if directory:
uri = uri.rstrip("/")
logger = logger.getChild("CallCache") if logger else self._logger
ans = filename
p = self.download_cacheable(uri)
p = self.download_cacheable(uri, directory=directory)
if p:
# if a file at the cache location has appeared whilst we were downloading, replace it
# iff we can exclusive-flock it
with FlockHolder(logger) as replace_flock:
try:
replace_flock.flock(p, mode="rb", exclusive=True)
replace_flock.flock(p, mode=os.O_RDONLY, exclusive=True)
except FileNotFoundError:
pass
except OSError:
logger.warning(
_(
"existing cached file in use; leaving downloaded in-place",
"existing cache entry in use; leaving downloaded in-place",
uri=uri,
downloaded=filename,
cache_path=p,
)
)
p = None
if p:
os.makedirs(os.path.dirname(p), exist_ok=True)
os.rename(filename, p)
if directory and os.path.isdir(p):
rmtree_atomic(p)
os.renames(filename, p)
# the renames() op should be atomic, because the download operation should have
# been run under the cache directory (download.py:run_cached)
logger.info(_("stored in download cache", uri=uri, cache_path=p))
ans = p
if not p:
with self._lock:
self._workflow_downloads[uri] = ans
(self._workflow_directory_downloads if directory else self._workflow_downloads)[
uri
] = ans
self.flock(ans)
return ans

def download_cacheable(self, uri: str) -> Optional[str]:
def download_cacheable(self, uri: str, directory: bool = False) -> Optional[str]:
if not self._cfg["download_cache"].get_bool("put"):
return None
return self.download_path(uri)
return self.download_path(uri, directory=directory)

def flock(self, filename: str, exclusive: bool = False) -> None:
self._flocker.flock(filename, update_atime=True, exclusive=exclusive)
Expand Down
16 changes: 14 additions & 2 deletions WDL/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,22 @@ def _parse_list(v: str) -> List[Any]:

DEFAULT_PLUGINS = {
"file_download": [
importlib_metadata.EntryPoint(
group="miniwdl.plugin.file_download",
name="s3",
value="WDL.runtime.download:awscli_downloader",
),
importlib_metadata.EntryPoint(
group="miniwdl.plugin.file_download",
name="gs",
value="WDL.runtime.download:gsutil_downloader",
),
],
"directory_download": [
importlib_metadata.EntryPoint(
group="miniwdl.plugin.directory_download",
name="s3",
value="WDL.runtime.download:awscli_directory_downloader",
)
],
"task": [],
Expand All @@ -330,8 +342,8 @@ def load_all_plugins(cfg: Loader, group: str) -> Iterable[Tuple[bool, Any]]:
assert group in DEFAULT_PLUGINS.keys(), group
enable_patterns = cfg["plugins"].get_list("enable_patterns")
disable_patterns = cfg["plugins"].get_list("disable_patterns")
for plugin in importlib_metadata.entry_points().get(
f"miniwdl.plugin.{group}", DEFAULT_PLUGINS[group]
for plugin in DEFAULT_PLUGINS[group] + list(
importlib_metadata.entry_points().get(f"miniwdl.plugin.{group}", [])
):
enabled = next(
(pat for pat in enable_patterns if fnmatchcase(plugin.value, pat)), False
Expand Down
8 changes: 4 additions & 4 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ as_user = false


[download_cache]
# When a File input is supplied with a URI to be downloaded, store the downloaded file in a certain
# directory where it can later be found and reused for the same input URI.
# When File or Directory inputs are given URIs to be downloaded, store the downloaded copy in a
# local directory where it can later be found and reused for the same input URI.
put = false
# Enable retrieval of File input URIs from the local cache
# Enable retrieval of File/Directory input URIs from the local cache
get = false
# Base directory for local download cache
# Base directory for the local download cache.
dir = /tmp/miniwdl_download_cache
# Remove URI query strings for cache key/lookup purposes. By default, downloads from URIs with
# query strings are never cached (neither put nor get).
Expand Down
Loading

0 comments on commit 31b8283

Please sign in to comment.