diff --git a/dvc/output/base.py b/dvc/output/base.py index 2e555119cb..87cfd224c3 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -155,6 +155,9 @@ def checksum(self): def checksum(self, checksum): self.info[self.remote.PARAM_CHECKSUM] = checksum + def get_checksum(self): + return self.remote.get_checksum(self.path_info) + @property def is_dir_checksum(self): return self.remote.is_dir_checksum(self.checksum) @@ -167,7 +170,7 @@ def save_info(self): return self.remote.save_info(self.path_info) def changed_checksum(self): - return self.checksum != self.remote.get_checksum(self.path_info) + return self.checksum != self.get_checksum() def changed_cache(self, filter_info=None): if not self.use_cache or not self.checksum: diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 8aa95af4a2..4d2fa5dde1 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -102,7 +102,6 @@ def _reproduce_stages( G, stages, downstream=False, - ignore_build_cache=False, single_item=False, **kwargs ): @@ -172,7 +171,7 @@ def _reproduce_stages( try: ret = _reproduce_stage(stage, **kwargs) - if len(ret) != 0 and ignore_build_cache: + if len(ret) != 0 and kwargs.get("ignore_build_cache", False): # NOTE: we are walking our pipeline from the top to the # bottom. If one stage is changed, it will be reproduced, # which tells us that we should force reproducing all of diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 806cc8da65..7de4c4c698 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -57,6 +57,9 @@ def run(self, fname=None, no_exec=False, **kwargs): self.check_modified_graph([stage], self.pipeline_stages) if not no_exec: - stage.run(no_commit=kwargs.get("no_commit", False)) + stage.run( + no_commit=kwargs.get("no_commit", False), + ignore_build_cache=kwargs.get("ignore_build_cache", False), + ) dvcfile.dump(stage, update_dvcfile=True) return stage diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 1120772ed6..17989497ab 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -24,7 +24,7 @@ MissingDep, MissingDataSource, ) -from . import params +from . import params, cache as stage_cache from dvc.utils import dict_md5 from dvc.utils import fix_env from dvc.utils import relpath @@ -482,6 +482,8 @@ def save(self): self.md5 = self._compute_md5() + stage_cache.save(self) + @staticmethod def _changed_entries(entries): return [ @@ -608,7 +610,9 @@ def _run(self): raise StageCmdFailedError(self) @rwlocked(read=["deps"], write=["outs"]) - def run(self, dry=False, no_commit=False, force=False): + def run( + self, dry=False, no_commit=False, force=False, ignore_build_cache=False + ): if (self.cmd or self.is_import) and not self.locked and not dry: self.remove_outs(ignore_remove=False, force=False) @@ -643,16 +647,20 @@ def run(self, dry=False, no_commit=False, force=False): self.check_missing_outputs() else: - logger.info("Running command:\n\t{}".format(self.cmd)) if not dry: + if not force and not ignore_build_cache: + stage_cache.restore(self) + if ( not force and not self.is_callback and not self.always_changed and self._already_cached() ): + logger.info("Stage is cached, skipping.") self.checkout() else: + logger.info("Running command:\n\t{}".format(self.cmd)) self._run() if not dry: diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py new file mode 100644 index 0000000000..6f567a847f --- /dev/null +++ b/dvc/stage/cache.py @@ -0,0 +1,75 @@ +import os +import json +import hashlib + +from dvc.utils.fs import makedirs + + +def _sha256(string): + return hashlib.sha256(string.encode()).hexdigest() + + +def _get_hash(stage): + if not stage.cmd or not stage.deps or not stage.outs: + return None + + string = _sha256(stage.cmd) + for dep in stage.deps: + if not dep.def_path or not dep.get_checksum(): + return None + + string += _sha256(dep.def_path) + string += _sha256(dep.get_checksum()) + + for out in stage.outs: + if not out.def_path or out.persist: + return None + + string += _sha256(out.def_path) + + return _sha256(string) + + +def _get_cache(stage): + return { + "cmd": stage.cmd, + "deps": {dep.def_path: dep.get_checksum() for dep in stage.deps}, + "outs": {out.def_path: out.get_checksum() for out in stage.outs}, + } + + +def _get_cache_path(stage): + sha = _get_hash(stage) + if not sha: + return None + + cache_dir = os.path.join(stage.repo.cache.local.cache_dir, "stages") + + return os.path.join(cache_dir, sha[:2], sha) + + +def save(stage): + path = _get_cache_path(stage) + if not path or os.path.exists(path): + return + + dpath = os.path.dirname(path) + makedirs(dpath, exist_ok=True) + with open(path, "w+") as fobj: + json.dump(_get_cache(stage), fobj) + + +def restore(stage): + path = _get_cache_path(stage) + if not path or not os.path.exists(path): + return + + with open(path, "r") as fobj: + cache = json.load(fobj) + + outs = {out.def_path: out for out in stage.outs} + for def_path, checksum in cache["outs"].items(): + outs[def_path].checksum = checksum + + for dep in stage.deps: + dep.save() diff --git a/tests/func/__pycache__/tmpvpl_3i8b b/tests/func/__pycache__/tmpvpl_3i8b new file mode 100644 index 0000000000..7bc8bc6420 Binary files /dev/null and b/tests/func/__pycache__/tmpvpl_3i8b differ diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index eecc3e4001..901a04a961 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -1290,7 +1290,9 @@ def test(self): ["repro", self._get_stage_target(self.stage), "--no-commit"] ) self.assertEqual(ret, 0) - self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir)) + self.assertEqual( + os.listdir(self.dvc.cache.local.cache_dir), ["stages"] + ) class TestReproAlreadyCached(TestRepro):