Skip to content

Commit

Permalink
Merge 9b8fade into a1556f0
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 2, 2020
2 parents a1556f0 + 9b8fade commit 39e5675
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 29 deletions.
70 changes: 46 additions & 24 deletions WDL/runtime/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging
import shutil
from pathlib import Path
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, List, Iterable, Union
from contextlib import AbstractContextManager
from urllib.parse import urlparse, urlunparse
from fnmatch import fnmatchcase
Expand Down Expand Up @@ -105,15 +105,20 @@ def get(
)
if cache:
self._logger.notice(_("call cache hit", cache_path=file_path)) # pyre-fixme
file_list = []
# check output and input files

def get_files(file):
file_list.append(file)

Value.rewrite_env_files(cache, get_files)
Value.rewrite_env_files(inputs, get_files)
if file_coherence_checker.check_files(file_path, file_list):
file_list = set()
dir_list = set()
# check output and input file timestamps

def get_files(v: Union[Value.File, Value.Directory]):
if isinstance(v, Value.File):
file_list.add(v.value)
else:
assert isinstance(v, Value.Directory)
dir_list.add(v.value)

Value.rewrite_env_paths(cache, get_files)
Value.rewrite_env_paths(inputs, get_files)
if file_coherence_checker.check_files(file_path, file_list, dir_list):
return cache
return None

Expand Down Expand Up @@ -392,19 +397,34 @@ def __init__(self, cfg: config.Loader, logger: logging.Logger):

self._downloadable = downloadable

def check_files(self, cache_file_path: str, files: list) -> bool:
def check_files(self, cache_file_path: str, files: Iterable[str], dirs: Iterable[str]) -> bool:
if self.cache_file_modification_time == 0.0:
self.cache_file_modification_time = self.get_last_modified_time(cache_file_path)
for file_path in files:

def raiser(exc):
raise exc

for directory, path in itertools.chain(
((False, f) for f in files), ((True, d) for d in dirs)
):
try:
if not self._downloadable(self._cfg, file_path):
self.check_cache_younger_than_file(output_file_path=file_path)
except (FileNotFoundError, CacheOutputFileAgeError):
if not self._downloadable(self._cfg, path):
self.check_cache_younger_than_file(path)
if directory:
# check everything in directory
for root, subdirs, subfiles in os.walk(
path, onerror=raiser, followlinks=False
):
for subdir in subdirs:
self.check_cache_younger_than_file(os.path.join(root, subdir))
for fn in subfiles:
self.check_cache_younger_than_file(os.path.join(root, fn))
except (FileNotFoundError, NotADirectoryError, CacheOutputFileAgeError):
self._logger.warning(
_(
"cache entry invalid due to deleted or modified file",
cache_path=file_path,
file_changed=file_path,
"cache entry invalid due to deleted or modified file/directory",
cache_path=path,
file_changed=path,
)
)
try:
Expand All @@ -413,22 +433,24 @@ def check_files(self, cache_file_path: str, files: list) -> bool:
self._logger.warning(
_(
"unable to delete invalidated cache entry",
cache_path=file_path,
cache_path=path,
error=str(exn),
)
)
return False
return True

def get_last_modified_time(self, file_path: str) -> float:
# returned as seconds since epoch
file_modification_time = os.path.getmtime(file_path)
sym_link_modification_time = os.lstat(file_path).st_mtime

return max(file_modification_time, sym_link_modification_time)
# max mtime of hardlink & symlink pointing to it (if applicable)
return max(
os.stat(file_path, follow_symlinks=False).st_mtime_ns,
os.stat(file_path, follow_symlinks=True).st_mtime_ns,
)

def check_cache_younger_than_file(self, output_file_path: str) -> bool:
output_file_modification_time = self.get_last_modified_time(output_file_path)
# self._logger.debug(_("check_cache_younger_than_file", path=output_file_path,
# mtime=output_file_modification_time/1e9, cache_mtime=self.cache_file_modification_time/1e9))
if self.cache_file_modification_time >= output_file_modification_time:
return True
else:
Expand Down
65 changes: 60 additions & 5 deletions tests/test_8cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import tempfile
import time
import unittest
import subprocess
from unittest.mock import MagicMock, patch

from WDL import values_from_json, values_to_json
from WDL.runtime.cache import CallCache
from .context import WDL


class TestTaskRunner(unittest.TestCase):
class TestCallCache(unittest.TestCase):
test_wdl: str = R"""
version 1.0
task hello_blank {
Expand Down Expand Up @@ -341,7 +342,7 @@ def test_cache_not_used_when_output_files_updated_after_cache_creation(self):
inputs = {"who": "Bethie"}
self._run(self.test_wdl_with_output_files, inputs, cfg=self.cfg)
# change modified time on outputs
time.sleep(2)
time.sleep(0.1)
for x in glob.glob(f"{self._dir}/*_hello/out/a_tsv/*"):
os.utime(x)

Expand All @@ -356,7 +357,7 @@ def test_cache_not_used_when_output_files_but_not__sym_links_updated_after_cache
inputs = {"who": "Bethie"}
self._run(self.test_wdl_with_output_files, inputs, cfg=self.cfg)
# change modified time on outputs
time.sleep(2)
time.sleep(0.1)
for x in glob.glob(f"{self._dir}/*_hello/out/a_tsv/*"):
os.utime(x, follow_symlinks=False)

Expand Down Expand Up @@ -393,13 +394,13 @@ def test_cache_not_used_when_file_in_array_recently_updated(self):
"""

self._run(wdl, inputs, cfg=self.cfg)
time.sleep(2)
#check cache used
mock = MagicMock(side_effect=WDL.runtime.task._try_task)
with patch('WDL.runtime.task._try_task', mock):
self._run(wdl, inputs, cfg=self.cfg)
self.assertEqual(mock.call_count, 0)
# change time
time.sleep(0.1)
for x in glob.glob(f"{self._dir}/*_return_file_array/work/files_out/file1"):
os.utime(x)
# check cache not used
Expand Down Expand Up @@ -432,16 +433,70 @@ def test_cache_not_used_when_input_file_recently_updated(self):
"""

self._run(wdl, inputs, cfg=self.cfg)
time.sleep(2)
#check cache used
mock = MagicMock(side_effect=WDL.runtime.task._try_task)
with patch('WDL.runtime.task._try_task', mock):
self._run(wdl, inputs, cfg=self.cfg)
self.assertEqual(mock.call_count, 0)
# change time on input file
time.sleep(0.1)
for x in glob.glob(f"{self._dir}/butterfinger"):
os.utime(x)
# check cache not used
with patch('WDL.runtime.task._try_task', mock):
self._run(wdl, inputs, cfg=self.cfg)
self.assertEqual(mock.call_count, 1)

def test_directory_coherence(self):
# test outputting files/subdirectories inside input Directory
wdl = R"""
version development
task t {
input {
Directory d
}
command {}
output {
Array[File] files = ["~{d}/alice.txt", "~{d}/sub/bob.txt"]
Array[Directory] dirs = ["~{d}/sub/dir"]
}
}
"""
os.makedirs(os.path.join(self._dir, "d/sub/dir"))
with open(os.path.join(self._dir, "d/alice.txt"), mode="w") as outfile:
print("Alice", file=outfile)
with open(os.path.join(self._dir, "d/sub/bob.txt"), mode="w") as outfile:
print("Bob", file=outfile)
with open(os.path.join(self._dir, "d/sub/dir/carol.txt"), mode="w") as outfile:
print("Carol", file=outfile)
inp = {"d": os.path.join(self._dir, "d")}
outp = self._run(wdl, inp, cfg=self.cfg)

WDL.Value.rewrite_env_files(outp[1], lambda fn: fn) # game coverage of deprecated fn

mock = MagicMock(side_effect=WDL.runtime.task._try_task)
with patch('WDL.runtime.task._try_task', mock):
# control
self._run(wdl, inp, cfg=self.cfg)
self.assertEqual(mock.call_count, 0)

# touch a file & check cache invalidated
subprocess.run(["touch", os.path.join(self._dir, "d/sub/dir/carol.txt")], check=True)
self._run(wdl, inp, cfg=self.cfg)
self.assertEqual(mock.call_count, 1)

# add a symlink
time.sleep(0.1)
os.symlink("sub/dir", os.path.join(self._dir, "d/link1"))
self._run(wdl, inp, cfg=self.cfg)
self.assertEqual(mock.call_count, 2)

# delete the symlink
time.sleep(0.1)
os.unlink(os.path.join(self._dir, "d/link1"))
self._run(wdl, inp, cfg=self.cfg)
self.assertEqual(mock.call_count, 3)

# control
self._run(wdl, inp, cfg=self.cfg)
self.assertEqual(mock.call_count, 3)

0 comments on commit 39e5675

Please sign in to comment.