Skip to content

Commit

Permalink
Merge 0d63ec9 into cff8beb
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 14, 2020
2 parents cff8beb + 0d63ec9 commit bd54502
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 62 deletions.
138 changes: 137 additions & 1 deletion WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import logging
import signal
import threading
import time
import copy
import fcntl
from time import sleep
from datetime import datetime
from contextlib import contextmanager
from contextlib import contextmanager, AbstractContextManager
from typing import (
Tuple,
Dict,
Expand All @@ -23,6 +25,7 @@
Optional,
Callable,
Generator,
IO,
Any,
)
from types import FrameType
Expand Down Expand Up @@ -587,3 +590,136 @@ def _impl() -> Generator[Any, Any, None]: # pyre-fixme
raise
finally:
chain.close()


@export
class FlockHolder(AbstractContextManager):
"""
Context manager exposing a method to take an advisory lock on a file (flock) and hold it until
context exit. The context manager is reentrant; locks are released upon exit of the outermost
nested context.
"""

_lock: threading.Lock
_flocks: Dict[str, Tuple[IO[Any], bool]]
_entries: int
_logger: logging.Logger

def __init__(self, logger: Optional[logging.Logger] = None) -> None:
self._lock = threading.Lock()
self._flocks = {}
self._entries = 0
self._logger = (
logger.getChild("FlockHolder") if logger else logging.getLogger("FlockHolder")
)

def __enter__(self) -> "FlockHolder":
assert self._entries > 0 or not self._flocks
self._entries += 1
return self

def __exit__(self, *exc_details) -> None: # pyre-fixme
assert self._entries > 0, "FlockHolder context exited prematurely"
self._entries -= 1
if self._entries == 0:
exn = None
with self._lock:
for fn, (fh, exclusive) in self._flocks.items():
self._logger.debug(StructuredLogMessage("close", file=fn, exclusive=exclusive))
try:
fh.close()
except Exception as exn2:
exn = exn or exn2
self._flocks = {}
if exn:
raise exn

def __del__(self) -> None:
assert self._entries == 0 and not self._flocks, "FlockHolder context was not exited"

def flock( # pyre-fixme
self,
filename: str,
mode: str = "rb",
exclusive: bool = False,
wait: bool = False,
update_atime: bool = False,
) -> IO[Any]:
"""
Open a file and an advisory lock on it. The file is closed and the lock released upon exit
of the outermost context. Returns the open file, which the caller shouldn't close (this is
taken care of).
:param filename: file to open & lock
:param mode: open() mode
:param exclusive: True to open an exclusive lock (default: shared lock)p
:param wait: True to wait as long as needed to obtain the lock, otherwise (default) raise
OSError if the lock isn't available immediately. Self-deadlock is possible;
see Python fcntl.flock docs for further details.
:param update_atime: True to 'touch -a' the file after obtaining the lock
"""
assert self._entries, "FlockHolder.flock() used out of context"
while True:
realfilename = os.path.realpath(filename)
with self._lock: # only needed to synchronize self._flocks
if realfilename in self._flocks and not exclusive:
self._logger.debug(
StructuredLogMessage(
"reuse prior flock",
filename=filename,
realpath=realfilename,
exclusive=self._flocks[realfilename][1],
)
)
return self._flocks[realfilename][0]
openfile = open(realfilename, mode)
try:
op = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
if not wait:
op |= fcntl.LOCK_NB
self._logger.debug(
StructuredLogMessage(
"flock",
file=filename,
realpath=realfilename,
exclusive=exclusive,
wait=wait,
)
)
fcntl.flock(openfile, op)
# the flock will release whenever we ultimately openfile.close()

file_st = os.stat(openfile.fileno())
if update_atime:
os.utime(
openfile.fileno(), ns=(int(time.time() * 1e9), file_st.st_mtime_ns)
)

# The filename link could have been replaced or removed in the instant between
# our open() and flock() syscalls.
# - if it was removed, the following os.stat will trigger FileNotFoundError,
# which is reasonable to propagate.
# - if it was replaced, the subsequent condition won't hold, and we'll loop
# around to try again on the replacement file.
filename_st = os.stat(realfilename)
self._logger.debug(
StructuredLogMessage(
"flocked",
file=filename,
realpath=realfilename,
exclusive=exclusive,
name_inode=filename_st.st_ino,
fd_inode=file_st.st_ino,
)
)
if (
filename_st.st_dev == file_st.st_dev
and filename_st.st_ino == file_st.st_ino
):
assert realfilename not in self._flocks
self._flocks[realfilename] = (openfile, exclusive)
return openfile
except:
openfile.close()
raise
openfile.close()
79 changes: 23 additions & 56 deletions WDL/runtime/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,35 @@
"""

import os
import time
import fcntl
import logging
import threading
from typing import Iterator, Dict, Any, Optional, Set, List, IO
from contextlib import contextmanager, ExitStack
from urllib.parse import urlparse, urlunparse
from fnmatch import fnmatchcase
from . import config
from .. import Env, Value, Type
from .._util import StructuredLogMessage as _
from .._util import StructuredLogMessage as _, FlockHolder


class CallCache:
_cfg: config.Loader
_lock: threading.Lock
_flocked_files: Set[str]
_flocks: List[ExitStack]
_flocker: FlockHolder
_logger: logging.Logger

def __init__(self, cfg: config.Loader):
def __init__(self, cfg: config.Loader, logger: logging.Logger):
self._cfg = cfg
self._lock = threading.Lock()
self._flocked_files = set()
self._flocks = []

def _flock(self, filenames: List[str]) -> None:
# open shared flocks on the specified filenames (all or none)
filenames2 = set(os.path.realpath(fn) for fn in filenames)
with self._lock:
filenames2 = filenames2 - self._flocked_files
if filenames2:
with ExitStack() as stack:
for fn in filenames2:
stack.enter_context(_open_and_flock(fn)) # pylint: disable=no-member
self._flocked_files |= filenames2
self._flocks.append(stack.pop_all()) # pylint: disable=no-member
self._logger = logger.getChild("CallCache")
self._flocker = FlockHolder(self._logger)
self._flocker.__enter__()

def __del__(self):
with self._lock:
for lock in self._flocks:
lock.close()
self._flocker.__exit__()

def get(
self, logger: logging.Logger, key: str, output_types: Env.Bindings[Type.Base]
self,
key: str,
output_types: Env.Bindings[Type.Base],
logger: Optional[logging.Logger] = None,
) -> Optional[Env.Bindings[Value.Base]]:
"""
Resolve cache key to call outputs, if available, or None. When matching outputs are found,
Expand All @@ -57,7 +42,9 @@ def get(
"""
raise NotImplementedError()

def put(self, logger: logging.Logger, key: str, outputs: Env.Bindings[Value.Base]) -> None:
def put(
self, key: str, outputs: Env.Bindings[Value.Base], logger: Optional[logging.Logger] = None
) -> None:
"""
Store call outputs for future reuse
"""
Expand Down Expand Up @@ -105,17 +92,18 @@ def download_path(self, uri: str) -> Optional[str]:
)
return None

def get_download(self, logger: logging.Logger, uri: str) -> Optional[str]:
def get_download(self, uri: str, logger: Optional[logging.Logger] = None) -> Optional[str]:
"""
Return filename of the cached download of uri, if available. If so then opens a shared
flock on the local file, which will remain for the life of the CallCache object.
"""
logger = logger.getChild("CallCache") if logger else self._logger
p = self.download_path(uri)
if not (self._cfg["download_cache"].get_bool("get") and p and os.path.isfile(p)):
logger.debug(_("no download cache hit", uri=uri, cache_path=p))
return None
try:
self._flock([p])
self._flocker.flock(p, update_atime=True)
logger.info(_("found in download cache", uri=uri, cache_path=p))
return p
except Exception as exn:
Expand All @@ -129,11 +117,14 @@ def get_download(self, logger: logging.Logger, uri: str) -> Optional[str]:
)
return None

def put_download(self, logger: logging.Logger, uri: str, filename: str) -> str:
def put_download(
self, uri: str, filename: str, logger: Optional[logging.Logger] = None,
) -> str:
"""
Move the downloaded file to the cache location & return the new path; or if the uri isn't
cacheable, return the given path.
"""
logger = logger.getChild("CallCache") if logger else self._logger
ans = filename
if self._cfg["download_cache"].get_bool("put"):
p = self.download_path(uri)
Expand All @@ -142,29 +133,5 @@ def put_download(self, logger: logging.Logger, uri: str, filename: str) -> str:
os.rename(filename, p)
logger.info(_("stored in download cache", uri=uri, cache_path=p))
ans = p
self._flock([ans])
self._flocker.flock(ans, update_atime=True)
return ans


@contextmanager
def _open_and_flock(
filename: str, mode: str = "rb", exclusive: bool = False, wait: bool = False
) -> Iterator[IO[Any]]:
"""
context manager yields an open BinaryIO/TextIO with a flock on the file, also updating atime
"""
while True:
with open(filename, mode) as openfile:
op = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
if not wait:
op |= fcntl.LOCK_NB
fcntl.flock(openfile, op)
# verify the hardlink didn't change in between our open & flock syscalls
filename_st = os.stat(filename)
file_st = os.stat(openfile.fileno())
if filename_st.st_dev == file_st.st_dev and filename_st.st_ino == file_st.st_ino:
# touch -a
os.utime(openfile.fileno(), ns=(int(time.time() * 1e9), file_st.st_mtime_ns))
yield openfile
return
# the flock should expire automatically when we close openfile
4 changes: 2 additions & 2 deletions WDL/runtime/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ def run_cached(
Cached download logic: returns the file from the cache if available; otherwise, runs the
download and puts it into the cache before returning
"""
cached = cache.get_download(logger, uri)
cached = cache.get_download(uri, logger=logger)
if cached:
return True, cached
if not cfg["download_cache"].get_bool("put") or not cache.download_path(uri):
return False, run(cfg, logger, uri, run_dir=run_dir, **kwargs)
# run the download within the cache directory
run_dir = os.path.join(cfg["download_cache"]["dir"], "ops")
filename = run(cfg, logger, uri, run_dir=run_dir, **kwargs)
return False, cache.put_download(logger, uri, os.path.realpath(filename))
return False, cache.put_download(uri, os.path.realpath(filename), logger=logger)


def gsutil_downloader(
Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ def run_local_task(
)
)
logger.info(_("thread", ident=threading.get_ident()))
cache = _cache or CallCache(cfg)
cache = _cache or CallCache(cfg, logger)

try:
# start plugin coroutines and process inputs through them
Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def run_local_workflow(
),
futures.ThreadPoolExecutor(max_workers=16),
)
cache = _cache or CallCache(cfg)
cache = _cache or CallCache(cfg, logger)
try:
# run workflow state machine
outputs = _workflow_main_loop(
Expand Down
12 changes: 11 additions & 1 deletion tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,8 @@ def test_download_input_files(self):

@log_capture()
def test_download_cache(self, capture):
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()))
logger = logging.getLogger(self.id())
cfg = WDL.runtime.config.Loader(logger)
cfg.override({
"download_cache": {
"put": True,
Expand Down Expand Up @@ -884,6 +885,15 @@ def test_download_cache(self, capture):
self.assertTrue("downloaded: 1" in logs[0])
self.assertTrue("cached: 1" in logs[1])

# quick test CallCache reentrancy
cache = WDL.runtime.cache.CallCache(cfg, logger)
self.assertIsNotNone(cache.get_download("https://google.com/robots.txt", logger=logger))
self.assertIsNotNone(cache.get_download("https://google.com/robots.txt", logger=logger))
self.assertEqual(len(cache._flocker._flocks), 1)
with self.assertRaises(OSError):
cache._flocker.flock(cache.download_path("https://google.com/robots.txt"), exclusive=True)
del cache

def test_workdir_ownership(self):
# verify that everything within working directory is owned by the invoking user
txt = R"""
Expand Down

0 comments on commit bd54502

Please sign in to comment.