Skip to content

Commit

Permalink
Introduce Filesystem abstraction
Browse files Browse the repository at this point in the history
Introduces an abstraction for interaction with the file system. This
makes testing easier and faster, and improves the general API. As a
consequence, LazyDict can be completely removed.

In the process a multitude of linting warnings were also fixed.
  • Loading branch information
dansondergaard committed Mar 31, 2020
1 parent 46eb90c commit 3af34a9
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 103 deletions.
45 changes: 29 additions & 16 deletions src/gwf/core.py
@@ -1,4 +1,3 @@
import functools
import logging
import os
import os.path
Expand All @@ -7,7 +6,7 @@

from .backends import Status
from .exceptions import WorkflowError
from .utils import LazyDict, cache, timer
from .utils import cache, timer
from .workflow import Workflow

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -208,16 +207,28 @@ def __contains__(self, target_name):
return target_name in self.targets


def _fileinfo(path):
try:
st = os.stat(path)
except FileNotFoundError:
return None
else:
return st.st_mtime
class CachedFilesystem:
def __init__(self):
self._cache = {}

def _lookup_file(self, path):
if path not in self._cache:
try:
st = os.stat(path)
except FileNotFoundError:
self._cache[path] = None
else:
self._cache[path] = st.st_mtime
return self._cache[path]

def exists(self, path):
return self._lookup_file(path) is not None

FileCache = functools.partial(LazyDict, valfunc=_fileinfo)
def changed_at(self, path):
st = self._lookup_file(path)
if st is None:
raise FileNotFoundError(path)
return st


class Scheduler:
Expand All @@ -240,7 +251,7 @@ class Scheduler:
system and that is not provided by another target.
"""

def __init__(self, graph, backend, dry_run=False, file_cache=FileCache()):
def __init__(self, graph, backend, dry_run=False, filesystem=CachedFilesystem()):
"""
:param gwf.Graph graph:
Graph of the workflow.
Expand All @@ -255,7 +266,7 @@ def __init__(self, graph, backend, dry_run=False, file_cache=FileCache()):
self.backend = backend
self.dry_run = dry_run

self._file_cache = file_cache
self._filesystem = filesystem
self._pretend_known = set()

def prepare_target_options(self, target):
Expand Down Expand Up @@ -354,7 +365,7 @@ def should_run(self, target):
# Check whether all input files actually exists are are being provided
# by another target. If not, it's an error.
for path in target.flattened_inputs():
if path in self.graph.unresolved and self._file_cache[path] is None:
if path in self.graph.unresolved and not self._filesystem.exists(path):
msg = (
'File "{}" is required by "{}", but does not exist and is not '
"provided by any target in the workflow."
Expand All @@ -366,7 +377,7 @@ def should_run(self, target):
return True

for path in target.flattened_outputs():
if self._file_cache[path] is None:
if not self._filesystem.exists(path):
logger.debug(
"%s should run because its output file %s does not exist",
target,
Expand All @@ -379,7 +390,8 @@ def should_run(self, target):
return False

youngest_in_ts, youngest_in_path = max(
(self._file_cache[path], path) for path in target.flattened_inputs()
(self._filesystem.changed_at(path), path)
for path in target.flattened_inputs()
)
logger.debug(
"%s is the youngest input file of %s with timestamp %s",
Expand All @@ -389,7 +401,8 @@ def should_run(self, target):
)

oldest_out_ts, oldest_out_path = min(
(self._file_cache[path], path) for path in target.flattened_outputs()
(self._filesystem.changed_at(path), path)
for path in target.flattened_outputs()
)
logger.debug(
"%s is the oldest output file of %s with timestamp %s",
Expand Down
26 changes: 0 additions & 26 deletions src/gwf/utils.py
Expand Up @@ -100,32 +100,6 @@ def load_workflow(basedir, filename, objname):
)


class LazyDict(dict):
"""A dict which lazily computes values for keys using `valfunc`.
When accessing an key in the dict, it will check whether the key exists. If it does, the value is returned
immediately. If not, `valfunc` will be called on the key and the return value will be assigned as the value of the
key. For example::
>>> d = LazyDict(valfunc=lambda k: k + 1)
>>> 0 in d
False
>>> d[0]
1
>>> d[100]
101
"""

def __init__(self, valfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.valfunc = valfunc

def __getitem__(self, item):
if not super().__contains__(item):
super().__setitem__(item, self.valfunc(item))
return super().__getitem__(item)


class PersistableDict(UserDict):
"""A dictionary which can persist itself to JSON."""

Expand Down

0 comments on commit 3af34a9

Please sign in to comment.