diff --git a/src/gwf/core.py b/src/gwf/core.py index 07b886ca..fe15e3b9 100644 --- a/src/gwf/core.py +++ b/src/gwf/core.py @@ -1,4 +1,3 @@ -import functools import logging import os import os.path @@ -7,7 +6,7 @@ from .backends import Status from .exceptions import WorkflowError -from .utils import LazyDict, cache, timer +from .utils import cache, timer from .workflow import Workflow logger = logging.getLogger(__name__) @@ -208,16 +207,28 @@ def __contains__(self, target_name): return target_name in self.targets -def _fileinfo(path): - try: - st = os.stat(path) - except FileNotFoundError: - return None - else: - return st.st_mtime +class CachedFilesystem: + def __init__(self): + self._cache = {} + def _lookup_file(self, path): + if path not in self._cache: + try: + st = os.stat(path) + except FileNotFoundError: + self._cache[path] = None + else: + self._cache[path] = st.st_mtime + return self._cache[path] + + def exists(self, path): + return self._lookup_file(path) is not None -FileCache = functools.partial(LazyDict, valfunc=_fileinfo) + def changed_at(self, path): + st = self._lookup_file(path) + if st is None: + raise FileNotFoundError(path) + return st class Scheduler: @@ -240,7 +251,7 @@ class Scheduler: system and that is not provided by another target. """ - def __init__(self, graph, backend, dry_run=False, file_cache=FileCache()): + def __init__(self, graph, backend, dry_run=False, filesystem=CachedFilesystem()): """ :param gwf.Graph graph: Graph of the workflow. @@ -255,7 +266,7 @@ def __init__(self, graph, backend, dry_run=False, file_cache=FileCache()): self.backend = backend self.dry_run = dry_run - self._file_cache = file_cache + self._filesystem = filesystem self._pretend_known = set() def prepare_target_options(self, target): @@ -354,7 +365,7 @@ def should_run(self, target): # Check whether all input files actually exists are are being provided # by another target. If not, it's an error. for path in target.flattened_inputs(): - if path in self.graph.unresolved and self._file_cache[path] is None: + if path in self.graph.unresolved and not self._filesystem.exists(path): msg = ( 'File "{}" is required by "{}", but does not exist and is not ' "provided by any target in the workflow." @@ -366,7 +377,7 @@ def should_run(self, target): return True for path in target.flattened_outputs(): - if self._file_cache[path] is None: + if not self._filesystem.exists(path): logger.debug( "%s should run because its output file %s does not exist", target, @@ -379,7 +390,8 @@ def should_run(self, target): return False youngest_in_ts, youngest_in_path = max( - (self._file_cache[path], path) for path in target.flattened_inputs() + (self._filesystem.changed_at(path), path) + for path in target.flattened_inputs() ) logger.debug( "%s is the youngest input file of %s with timestamp %s", @@ -389,7 +401,8 @@ def should_run(self, target): ) oldest_out_ts, oldest_out_path = min( - (self._file_cache[path], path) for path in target.flattened_outputs() + (self._filesystem.changed_at(path), path) + for path in target.flattened_outputs() ) logger.debug( "%s is the oldest output file of %s with timestamp %s", diff --git a/src/gwf/utils.py b/src/gwf/utils.py index 48af5380..0d7485a6 100644 --- a/src/gwf/utils.py +++ b/src/gwf/utils.py @@ -100,32 +100,6 @@ def load_workflow(basedir, filename, objname): ) -class LazyDict(dict): - """A dict which lazily computes values for keys using `valfunc`. - - When accessing an key in the dict, it will check whether the key exists. If it does, the value is returned - immediately. If not, `valfunc` will be called on the key and the return value will be assigned as the value of the - key. For example:: - - >>> d = LazyDict(valfunc=lambda k: k + 1) - >>> 0 in d - False - >>> d[0] - 1 - >>> d[100] - 101 - """ - - def __init__(self, valfunc, *args, **kwargs): - super().__init__(*args, **kwargs) - self.valfunc = valfunc - - def __getitem__(self, item): - if not super().__contains__(item): - super().__setitem__(item, self.valfunc(item)) - return super().__getitem__(item) - - class PersistableDict(UserDict): """A dictionary which can persist itself to JSON.""" diff --git a/tests/test_core.py b/tests/test_core.py index 8be24b52..8172c735 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -289,6 +289,22 @@ def test_graph_raises_circular_dependency_error(): Graph.from_targets({"Target1": t1, "Target2": t2, "Target3": t3}) +class FakeFilesystem: + def __init__(self): + self._files = {} + + def add_file(self, path, changed_at): + self._files[path] = changed_at + + def exists(self, path): + return path in self._files + + def changed_at(self, path): + if path not in self._files: + raise FileNotFoundError(path) + return self._files[path] + + class TestShouldRun(unittest.TestCase): def setUp(self): workflow = Workflow(working_dir="/some/dir") @@ -309,7 +325,10 @@ def setUp(self): self.graph = Graph.from_targets(workflow.targets) self.backend = DummyBackend() - self.scheduler = Scheduler(graph=self.graph, backend=self.backend) + self.filesystem = FakeFilesystem() + self.scheduler = Scheduler( + graph=self.graph, backend=self.backend, filesystem=self.filesystem + ) def test_target_should_run_if_one_of_its_dependencies_does_not_exist(self): with self.assertLogs(level="DEBUG") as logs: @@ -339,7 +358,9 @@ def test_target_should_run_if_it_is_a_sink(self): "TestTarget", inputs=[], outputs=[], options={}, working_dir="/some/dir" ) graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler(graph=graph, backend=DummyBackend()) + scheduler = Scheduler( + graph=graph, backend=DummyBackend(), filesystem=self.filesystem + ) with self.assertLogs(level="DEBUG") as logs: self.assertTrue(scheduler.schedule(target)) self.assertEqual( @@ -358,55 +379,48 @@ def test_target_should_not_run_if_it_is_a_source_and_all_outputs_exist(self): ) graph = Graph.from_targets(workflow.targets) - scheduler = Scheduler(graph=graph, backend=DummyBackend()) - mock_file_cache = { - "/some/dir/test_output1.txt": 1, - "/some/dir/test_output2.txt": 2, - } - with patch.dict(scheduler._file_cache, mock_file_cache): - self.assertFalse(scheduler.should_run(target)) + self.filesystem.add_file("/some/dir/test_output1.txt", changed_at=1) + self.filesystem.add_file("/some/dir/test_output2.txt", changed_at=2) + + scheduler = Scheduler( + graph=graph, backend=DummyBackend(), filesystem=self.filesystem + ) + self.assertFalse(scheduler.should_run(target)) def test_should_run_if_any_input_file_is_newer_than_any_output_file(self): - mock_file_cache = { - "/some/dir/test_output1.txt": 0, - "/some/dir/test_output2.txt": 1, - "/some/dir/test_output3.txt": 3, - "/some/dir/final_output.txt": 2, - } + self.filesystem.add_file("/some/dir/test_output1.txt", changed_at=0) + self.filesystem.add_file("/some/dir/test_output2.txt", changed_at=1) + self.filesystem.add_file("/some/dir/test_output3.txt", changed_at=3) + self.filesystem.add_file("/some/dir/final_output.txt", changed_at=2) - with patch.dict(self.scheduler._file_cache, mock_file_cache): - self.assertFalse(self.scheduler.should_run(self.target1)) - self.assertFalse(self.scheduler.should_run(self.target2)) - self.assertFalse(self.scheduler.should_run(self.target3)) - self.assertTrue(self.scheduler.should_run(self.target4)) + self.assertFalse(self.scheduler.should_run(self.target1)) + self.assertFalse(self.scheduler.should_run(self.target2)) + self.assertFalse(self.scheduler.should_run(self.target3)) + self.assertTrue(self.scheduler.should_run(self.target4)) def test_should_run_not_run_if_all_outputs_are_newer_then_the_inputs(self): - mock_file_cache = { - "/some/dir/test_output1.txt": 0, - "/some/dir/test_output2.txt": 1, - "/some/dir/test_output3.txt": 3, - "/some/dir/final_output.txt": 4, - } + self.filesystem.add_file("/some/dir/test_output1.txt", changed_at=0) + self.filesystem.add_file("/some/dir/test_output2.txt", changed_at=1) + self.filesystem.add_file("/some/dir/test_output3.txt", changed_at=3) + self.filesystem.add_file("/some/dir/final_output.txt", changed_at=4) - with patch.dict(self.scheduler._file_cache, mock_file_cache): - self.assertFalse(self.scheduler.should_run(self.target1)) - self.assertFalse(self.scheduler.should_run(self.target2)) - self.assertFalse(self.scheduler.should_run(self.target3)) - self.assertFalse(self.scheduler.should_run(self.target4)) + self.assertFalse(self.scheduler.should_run(self.target1)) + self.assertFalse(self.scheduler.should_run(self.target2)) + self.assertFalse(self.scheduler.should_run(self.target3)) + self.assertFalse(self.scheduler.should_run(self.target4)) def test_exception_if_input_file_is_not_provided_and_output_file_exists(): workflow = Workflow(working_dir="/some/dir") target = workflow.target("TestTarget", inputs=["in.txt"], outputs=["out.txt"]) - graph = Graph.from_targets(workflow.targets) backend = DummyBackend() - scheduler = Scheduler( - graph=graph, - backend=backend, - file_cache={"/some/dir/in.txt": None, "/some/dir/out.txt": 1}, - ) + + filesystem = FakeFilesystem() + filesystem.add_file("/some/dir/out.txt", changed_at=1) + + scheduler = Scheduler(graph=graph, backend=backend, filesystem=filesystem) with pytest.raises(WorkflowError): scheduler.should_run(target) @@ -429,11 +443,11 @@ def test_scheduling_submitted_target(backend, monkeypatch): "TestTarget", inputs=[], outputs=[], options={}, working_dir="/some/dir" ) graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) backend.submit(target, dependencies=set()) assert len(backend.submit.call_args_list) == 1 - assert scheduler.schedule(target) == True + assert scheduler.schedule(target) assert len(backend.submit.call_args_list) == 1 @@ -442,9 +456,9 @@ def test_scheduling_unsubmitted_target(backend, monkeypatch): "TestTarget", inputs=[], outputs=[], options={}, working_dir="/some/dir" ) graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) - assert scheduler.schedule(target) == True + assert scheduler.schedule(target) assert len(backend.submit.call_args_list) == 1 assert call(target, dependencies=set()) in backend.submit.call_args_list @@ -458,9 +472,7 @@ def test_non_existing_files_not_provided_by_other_target(backend): working_dir="/some/dir", ) graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler( - graph=graph, backend=backend, file_cache={"/some/dir/test_input.txt": None} - ) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) with pytest.raises(WorkflowError): scheduler.schedule(target) @@ -474,10 +486,12 @@ def test_existing_files_not_provided_by_other_target(backend): working_dir="/some/dir", ) graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler( - graph=graph, backend=backend, file_cache={"/some/dir/test_input.txt": 0} - ) - assert scheduler.schedule(target) == True + + filesystem = FakeFilesystem() + filesystem.add_file("/some/dir/test_input.txt", changed_at=0) + + scheduler = Scheduler(graph=graph, backend=backend, filesystem=filesystem) + assert scheduler.schedule(target) def test_scheduling_target_with_deps_that_are_not_submitted(backend, monkeypatch): @@ -496,9 +510,9 @@ def test_scheduling_target_with_deps_that_are_not_submitted(backend, monkeypatch working_dir="/some/dir", ) graph = Graph.from_targets({"TestTarget1": target1, "TestTarget2": target2}) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) - assert scheduler.schedule(target2) == True + assert scheduler.schedule(target2) assert len(backend.submit.call_args_list) == 2 assert call(target1, dependencies=set()) in backend.submit.call_args_list assert call(target2, dependencies=set([target1])) in backend.submit.call_args_list @@ -536,9 +550,9 @@ def test_scheduling_target_with_deep_deps_that_are_not_submitted(backend, monkey graph = Graph.from_targets( {"target1": target1, "target2": target2, "target3": target3, "target4": target4} ) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) - assert scheduler.schedule(target4) == True + assert scheduler.schedule(target4) assert len(backend.submit.call_args_list) == 4 assert call(target1, dependencies=set()) in backend.submit.call_args_list assert call(target2, dependencies=set([target1])) in backend.submit.call_args_list @@ -578,9 +592,9 @@ def test_scheduling_branch_and_join_structure(backend, monkeypatch): graph = Graph.from_targets( {"target1": target1, "target2": target2, "target3": target3, "target4": target4} ) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) - assert scheduler.schedule(target4) == True + assert scheduler.schedule(target4) assert len(backend.submit.call_args_list) == 4 assert call(target1, dependencies=set([])) in backend.submit.call_args_list assert call(target2, dependencies=set([target1])) in backend.submit.call_args_list @@ -626,12 +640,12 @@ def test_scheduling_branch_and_join_structure_with_previously_submitted_dependen graph = Graph.from_targets( {"target1": target1, "target2": target2, "target3": target3, "target4": target4} ) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: True) backend.submit(target1, dependencies=set()) - assert scheduler.schedule(target4) == True + assert scheduler.schedule(target4) assert len(backend.submit.call_args_list) == 4 assert call(target2, dependencies=set([target1])) in backend.submit.call_args_list assert call(target3, dependencies=set([target1])) in backend.submit.call_args_list @@ -666,9 +680,9 @@ def test_scheduling_non_submitted_targets_that_should_not_run(backend, monkeypat graph = Graph.from_targets( {"TestTarget1": target1, "TestTarget2": target2, "TestTarget3": target3} ) - scheduler = Scheduler(graph=graph, backend=backend) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) monkeypatch.setattr(scheduler, "should_run", lambda t: False) - assert scheduler.schedule(target3) == False + assert not scheduler.schedule(target3) assert backend.submit.call_args_list == [] @@ -937,7 +951,7 @@ def test_scheduler_injects_target_defaults_into_target_options_on_submit(mocker) mocker.patch.object(backend, "submit", autospec=True) graph = Graph.from_targets({"TestTarget1": target1, "TestTarget2": target2}) - scheduler = Scheduler(graph=graph, backend=backend, file_cache={}) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) scheduler.schedule(target1) assert target1.options == {"cores": 1, "memory": "1g"} @@ -962,7 +976,7 @@ def test_scheduler_warns_user_when_submitting_target_with_unsupported_option( graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler(graph=graph, backend=backend, file_cache={}) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) scheduler.schedule(target) assert target.options == {"cores": 1, "memory": "1g"} @@ -989,7 +1003,7 @@ def test_scheduler_removes_options_with_none_value(mocker): graph = Graph.from_targets({"TestTarget": target}) - scheduler = Scheduler(graph=graph, backend=backend, file_cache={}) + scheduler = Scheduler(graph=graph, backend=backend, filesystem=FakeFilesystem()) scheduler.schedule(target) assert target.options == {"memory": "1g"}