Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Jul 23, 2023
1 parent c0144bf commit 0b2cd2f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
23 changes: 23 additions & 0 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import pickle
import threading
import regex
import hashlib
from concurrent import futures
from typing import Optional, List, Set, Tuple, NamedTuple, Dict, Union, Iterable, Callable, Any
from contextlib import ExitStack
Expand Down Expand Up @@ -695,6 +696,28 @@ def _devirtualize_filename(self, filename: str) -> str:
)

def _virtualize_filename(self, filename: str) -> str:
# After write_* generates a file at the workflow level, query CallCache for an existing
# identical file; if available, then return that copy. This improves cacheability of
# downstream tasks that consume the written file, given the unique temp filename for each
# such file.
# We fully content-digest the file, but it can't be too large since it was originally
# serialized from miniwdl memory.
assert not filename.endswith("/") # FIXME if/when stdlib functions handle directories
hasher = hashlib.sha256()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(1048576), b""):
hasher.update(chunk)
cache_in = Env.Bindings().bind("file_sha256", Value.String(hasher.hexdigest()))
cache_key = "write_/" + Value.digest_env(cache_in)
cache_out_types = Env.Bindings().bind("file", Type.File())
cache_out = self.cache.get(cache_key, cache_in, cache_out_types)
if cache_out:
filename = cache_out.resolve("file").value
else:
# otherwise, put our newly-written file to the cache
self.cache.put(cache_key, Env.Bindings().bind("file", Value.File(filename)))

# allow-list file generated by workflow
self.state.fspath_allowlist.add(filename)
return filename

Expand Down
12 changes: 8 additions & 4 deletions tests/test_8cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,14 @@ def test_directory_coherence(self):
input:
full_name = read_person.full_name
}
call hello as hello2 {
input:
full_name = write_lines([read_string(read_person.full_name)])
}
}
output {
Array[File] messages = hello.message
Array[File] messages = flatten([hello.message, hello2.message])
}
}
Expand All @@ -542,7 +546,7 @@ def test_directory_coherence(self):
command {}
output {
File full_name = write_lines([sep(" ", [person.first, person.last])])
File full_name = write_lines([sep(" ", select_all([person.first, person.middle, person.last]))])
}
}
Expand Down Expand Up @@ -576,7 +580,7 @@ def test_workflow_digest(self):

# ensure digest is sensitive to changes in the struct type and called task (but not the
# uncalled task, or comments/whitespace)
doc2 = WDL.parse_document(self.test_workflow_wdl.replace("String? middle", ""))
doc2 = WDL.parse_document(self.test_workflow_wdl.replace("String? middle", "String? middle Int? age"))
doc2.typecheck()
self.assertNotEqual(doc.workflow.digest, doc2.workflow.digest)

Expand Down Expand Up @@ -626,5 +630,5 @@ def test_workflow_cache(self):
print('{"first":"Alyssa","last":"Hacker","middle":"P"}', file=outfile)
_, outp2 = self._run(self.test_workflow_wdl, inp, cfg=self.cfg)
self.assertEqual(wmock.call_count, 1)
self.assertEqual(tmock.call_count, 2) # reran Alyssa, cached Ben
self.assertEqual(tmock.call_count, 3) # reran Alyssa, cached Ben
self.assertNotEqual(WDL.values_to_json(outp), WDL.values_to_json(outp2))

0 comments on commit 0b2cd2f

Please sign in to comment.