-
-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provenance Support for cwltool - Single Job Executor #676
Changes from 73 commits
0661a89
eb5cd6f
04caac0
9f1019b
2e44cab
3999772
b48e95f
7b892ea
a29ec09
757b117
0497e6e
ecc62e2
f564e5a
cc4feb5
9daf544
d492747
a27e0a6
5dd64ad
778b167
11b81d1
9d4ab3c
9a0ec6a
76c5b2b
0e82791
8bc016d
fa945d2
e6618b2
8f648a2
f18bab8
eb6f8c6
da9bdb9
9d74a2b
10ec554
e950673
df8f4c4
b30dc6d
12f3ea6
509cb3a
86dcef6
dbc94d2
371bb76
d62bd96
6cbc086
2ce5934
c4645c2
65ac0b9
69e1637
fdadf51
6cc95c1
55d5003
7985eb9
fddcc4e
b617c96
5898751
64e42c8
c19c513
0dd09df
f752266
973cff2
dee57c2
04b7baa
1c46eb4
a0a9c91
12bf53e
8b99c03
ee2bb97
affe143
40f8f70
31f6176
bd2d9bf
5248d81
abbe3b8
e27f061
c164ee1
fb2aad5
bb8f5a6
c09066d
bef92b4
a400574
ab09a22
45348dc
e5a714f
dd6c81b
7cc3807
06b456d
d13a9a0
897ef6a
ee8e420
a7ca40e
61797e0
921fc1d
97768d9
ce09136
205a95a
c800f1f
495406d
13745fb
3672c21
ff7db8b
d3d9ff0
22e2f2a
81f7e2b
7f1e64d
dbe7791
e710230
0e6300b
cf455b5
f80da96
d549db9
e7df842
5ead782
13e0bf7
54b72ed
8f3ba1c
6b1c876
f82c794
fcd9acc
623fbfb
9d48df5
1cc863d
1998505
f002ec4
c431e9d
81518f7
fbf2aac
ebb372a
4f53bef
32744ab
ec8b886
a79abbd
d581772
c70f62a
be370e6
4d06603
628c4ae
e81f497
3a51227
a699e46
3212eb9
673326b
7a326c2
0cf6dde
2187441
e4e094f
a50c171
4f9369f
2bc2f84
4b1978e
919c9eb
ad0726a
2e56f5d
d739519
ec5ae84
6004b86
debac27
0fe1ed8
7229e4f
be33530
0b4d085
d3dfd32
2e9a035
b036aaf
72307d8
865c15f
bd57c6a
e7ee228
fb0522a
d257ef4
d8e7759
0f9a8f3
4c9e747
53be19e
4ddfb99
c7421b1
3cdeaef
d6c3cf5
feab3b8
d108848
9685afc
0ea5b90
53fc04d
8a1d437
f89d05f
9110d8e
536fc57
24a96b9
c3de49d
8aa5a37
a2df93c
383b701
4db2079
7684384
9c625f0
7d56623
d15820d
7edde4a
a146089
5ffc743
6011c27
41e39f4
0e2c425
941947e
3ea49fb
47ccd50
7daca5c
65ba208
86b1594
c176a5a
f8ea873
e121c5c
89acdf5
d06462e
079983c
90c01c9
3cc1b67
74c2af1
3aff37d
d3a468c
1287114
a8cf68e
3a4bfa5
d6cfb9a
2df1775
a5e887e
fe3a427
8a0179c
a8b518f
22a8efe
e1bffed
77a232c
56ff004
f72dd27
f18dcff
67679df
3603b50
c55da30
05e3549
db0902c
f2db454
657d0d7
afa8442
6a4b32a
579f8a4
a918768
389a73c
487fea3
02753be
728bcb8
84dcdf0
1a68957
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,3 +30,5 @@ typeshed/2and3/ruamel/yaml | |
|
||
#mypy | ||
.mypy_cache/ | ||
bin/ | ||
lib/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,16 +3,22 @@ | |
import threading | ||
|
||
import os | ||
import copy | ||
import uuid | ||
import datetime | ||
import time | ||
from abc import ABCMeta, abstractmethod | ||
|
||
import prov.model as prov | ||
from typing import Dict, Text, Any, Tuple, Set, List | ||
|
||
|
||
from .builder import Builder | ||
from .errors import WorkflowException | ||
from .mutation import MutationManager | ||
from .job import JobBase | ||
from .process import relocateOutputs, cleanIntermediate, Process | ||
from .process import relocateOutputs, cleanIntermediate, Process, shortname, uniquename, get_overrides | ||
from . import loghandler | ||
from schema_salad.sourceline import SourceLine | ||
|
||
_logger = logging.getLogger("cwltool") | ||
|
||
|
@@ -36,6 +42,7 @@ def output_callback(self, out, processStatus): | |
def run_jobs(self, | ||
t, # type: Process | ||
job_order_object, # type: Dict[Text, Any] | ||
provDoc, | ||
logger, | ||
**kwargs # type: Any | ||
): | ||
|
@@ -44,6 +51,9 @@ def run_jobs(self, | |
def execute(self, t, # type: Process | ||
job_order_object, # type: Dict[Text, Any] | ||
logger=_logger, | ||
provDoc=None, | ||
engineID=None, | ||
WorkflowID=None, | ||
**kwargs # type: Any | ||
): | ||
# type: (...) -> Tuple[Dict[Text, Any], Text] | ||
|
@@ -66,7 +76,7 @@ def execute(self, t, # type: Process | |
for req in jobReqs: | ||
t.requirements.append(req) | ||
|
||
self.run_jobs(t, job_order_object, logger, **kwargs) | ||
self.run_jobs(t, job_order_object, provDoc, engineID, WorkflowID, logger, **kwargs) | ||
|
||
if self.final_output and self.final_output[0] and finaloutdir: | ||
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir, | ||
|
@@ -87,22 +97,83 @@ class SingleJobExecutor(JobExecutor): | |
def run_jobs(self, | ||
t, # type: Process | ||
job_order_object, # type: Dict[Text, Any] | ||
document, | ||
engineUUID, | ||
WorkflowRunID, | ||
logger, | ||
**kwargs # type: Any | ||
): | ||
reference_locations={} | ||
ProvActivity_dict={} | ||
jobiter = t.job(job_order_object, | ||
self.output_callback, | ||
**kwargs) | ||
|
||
try: | ||
ro = kwargs.get("ro") | ||
for r in jobiter: | ||
if r: | ||
builder = kwargs.get("builder", None) # type: Builder | ||
|
||
if builder is not None: | ||
r.builder = builder | ||
if r.outdir: | ||
self.output_dirs.add(r.outdir) | ||
r.run(**kwargs) | ||
if ro: | ||
#here we are recording provenance of each subprocess of the workflow | ||
if ".cwl" in getattr(r, "name"): #for prospective provenance | ||
steps=[] | ||
for s in r.steps: | ||
stepname="wf:main/"+str(s.name)[5:] | ||
steps.append(stepname) | ||
print("step name is: ", stepname) | ||
document.entity(stepname, {prov.PROV_TYPE: "wfdesc:Process", "prov:type": "prov:Plan"}) | ||
#create prospective provenance recording for the workflow | ||
document.entity("wf:main", {prov.PROV_TYPE: "wfdesc:Process", "prov:type": "prov:Plan", "wfdesc:hasSubProcess=":str(steps), "prov:label":"Prospective provenance"}) | ||
customised_job={} #new job object for RO | ||
for e, i in enumerate(r.tool["inputs"]): | ||
with SourceLine(r.tool["inputs"], e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | ||
iid = shortname(i["id"]) | ||
if iid in job_order_object: | ||
customised_job[iid]= copy.deepcopy(job_order_object[iid]) #add the input element in dictionary for provenance | ||
elif "default" in i: | ||
customised_job[iid]= copy.deepcopy(i["default"]) #add the defualt elements in the dictionary for provenance | ||
else: | ||
raise WorkflowException( | ||
u"Input '%s' not in input object and does not have a default value." % (i["id"])) | ||
##create master-job.json and returns a dictionary with workflow level identifiers as keys and locations or actual values of the attributes as values. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please name the file There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
relativised_input_object=ro.create_job(customised_job, kwargs) #call the method to generate a file with customised job | ||
for key, value in relativised_input_object.items(): | ||
strvalue=str(value) | ||
if "data" in strvalue: | ||
shahash="data:"+value.split("/")[-1] | ||
rel_path=value[3:] | ||
reference_locations[job_order_object[key]["location"]]=relativised_input_object[key][11:] | ||
document.entity(shahash, {prov.PROV_TYPE:"wfprov:Artifact"}) | ||
#document.specializationOf(rel_path, shahash) NOTE:THIS NEEDS FIXING as it required both params as entities. | ||
else: | ||
ArtefactValue="data:"+strvalue | ||
document.entity(ArtefactValue, {prov.PROV_TYPE:"wfprov:Artifact"}) | ||
if ".cwl" not in getattr(r, "name"): | ||
if ro: | ||
ProcessRunID="run:"+str(uuid.uuid4()) | ||
#each subprocess is defined as an activity() | ||
provLabel="Run of workflow/packed.cwl#main/"+str(r.name) | ||
ProcessProvActivity = document.activity(ProcessRunID, None, None, {prov.PROV_TYPE: "wfprov:ProcessRun", "prov:label": provLabel}) | ||
if hasattr(r, 'name') and ".cwl" not in getattr(r, "name"): | ||
document.wasAssociatedWith(ProcessRunID, engineUUID, str("wf:main/"+r.name)) | ||
document.wasStartedBy(ProcessRunID, None, WorkflowRunID, datetime.datetime.now(), None, None) | ||
#this is where you run each step. so start and end time for the step | ||
r.run(document, WorkflowRunID, ProcessProvActivity, reference_locations, **kwargs) | ||
else: | ||
r.run(**kwargs) | ||
#capture workflow level outputs in the prov doc | ||
if ro: | ||
for eachOutput in self.final_output: | ||
for key, value in eachOutput.items(): | ||
outputProvRole="wf:main"+"/"+str(key) | ||
output_checksum="data:"+str(value["checksum"][5:]) | ||
document.entity(output_checksum, {prov.PROV_TYPE:"wfprov:Artifact"}) | ||
document.wasGeneratedBy(output_checksum, WorkflowRunID, datetime.datetime.now(), None, {"prov:role":outputProvRole }) | ||
else: | ||
logger.error("Workflow cannot make any more progress.") | ||
break | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,14 +12,20 @@ | |
import subprocess | ||
import sys | ||
import tempfile | ||
import prov.model as prov | ||
from abc import ABCMeta, abstractmethod | ||
from io import open | ||
from threading import Lock | ||
|
||
import shellescape | ||
|
||
import time | ||
import datetime | ||
from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows | ||
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text, | ||
Union, cast) | ||
|
||
|
||
from .builder import Builder | ||
from .errors import WorkflowException | ||
from .pathmapper import PathMapper | ||
|
@@ -170,11 +176,10 @@ def _setup(self, kwargs): # type: (Dict) -> None | |
_logger.debug(u"[job %s] initial work dir %s", self.name, | ||
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) | ||
|
||
def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | ||
def _execute(self, runtime, env, kwargs, document=None, WorkflowRunID=None, ProcessProvActivity=None,reference_locations=None, rm_tmpdir=True, move_outputs="move"): | ||
# type: (List[Text], MutableMapping[Text, Text], bool, Text) -> None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to update the type signature to match the changes you made There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
ro = kwargs.get("ro") | ||
scr, _ = get_feature(self, "ShellCommandRequirement") | ||
|
||
shouldquote = None # type: Callable[[Any], Any] | ||
if scr: | ||
shouldquote = lambda x: False | ||
|
@@ -189,7 +194,19 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
u' < %s' % self.stdin if self.stdin else '', | ||
u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', | ||
u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') | ||
|
||
if hasattr(self, "joborder"): | ||
for key, value in getattr(self, "joborder").items(): | ||
if ro: | ||
provRole=self.name+"/"+str(key) | ||
ProcessRunID=str(ProcessProvActivity._identifier) | ||
if 'location' in str(value): | ||
location=str(value['location']) | ||
if location in reference_locations: #workflow level inputs referenced as hash in prov document | ||
document.used(ProcessRunID, "data:"+str(reference_locations[location]), datetime.datetime.now(), None, {"prov:role":provRole }) | ||
else: #add checksum created by cwltool of the intermediate data products. NOTE: will only work if --compute-checksums is enabled. | ||
document.used(ProcessRunID, "data:"+str(value['checksum'][5:]), datetime.datetime.now(),None, {"prov:role":provRole }) | ||
else: #add the actual data value in the prov document | ||
document.used(ProcessRunID, "data:"+str(value), datetime.datetime.now(),None, {"prov:role":provRole }) | ||
outputs = {} # type: Dict[Text,Text] | ||
|
||
try: | ||
|
@@ -214,6 +231,7 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
stdout_path = absout | ||
|
||
commands = [Text(x) for x in (runtime + self.command_line)] | ||
|
||
job_script_contents = None # type: Text | ||
builder = getattr(self, "builder", None) # type: Builder | ||
if builder is not None: | ||
|
@@ -227,7 +245,6 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
cwd=self.outdir, | ||
job_script_contents=job_script_contents, | ||
) | ||
|
||
if self.successCodes and rcode in self.successCodes: | ||
processStatus = "success" | ||
elif self.temporaryFailCodes and rcode in self.temporaryFailCodes: | ||
|
@@ -244,6 +261,15 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
|
||
outputs = self.collect_outputs(self.outdir) | ||
outputs = bytes2str_in_dicts(outputs) # type: ignore | ||
#creating entities for the outputs produced by each step (in the provenance document) and associating them with | ||
#the ProcessRunID | ||
if ro: | ||
for key, value in outputs.items(): | ||
StepOutput_checksum="data:"+str(value["checksum"][5:]) | ||
document.entity(StepOutput_checksum, {prov.PROV_TYPE:"wfprov:SubProcessArtifact"}) | ||
stepProv="wf:main"+"/"+str(self.name)+"/"+str(key) | ||
ProcessRunID=str(ProcessProvActivity._identifier) | ||
document.wasGeneratedBy(StepOutput_checksum, ProcessRunID, datetime.datetime.now(), None, {"prov:role":stepProv}) | ||
|
||
except OSError as e: | ||
if e.errno == 2: | ||
|
@@ -263,8 +289,12 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
|
||
if processStatus != "success": | ||
_logger.warning(u"[job %s] completed %s", self.name, processStatus) | ||
if ro: | ||
document.wasEndedBy(str(ProcessProvActivity._identifier), None, WorkflowRunID, datetime.datetime.now()) | ||
else: | ||
_logger.info(u"[job %s] completed %s", self.name, processStatus) | ||
if ro: | ||
document.wasEndedBy(str(ProcessProvActivity._identifier), None, WorkflowRunID, datetime.datetime.now()) | ||
|
||
if _logger.isEnabledFor(logging.DEBUG): | ||
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4)) | ||
|
@@ -283,8 +313,8 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): | |
|
||
class CommandLineJob(JobBase): | ||
|
||
def run(self, pull_image=True, rm_container=True, | ||
rm_tmpdir=True, move_outputs="move", **kwargs): | ||
def run(self, document=None, WorkflowRunID=None, ProcessProvActivity=None,reference_locations=None, pull_image=True, rm_container=True, | ||
rm_tmpdir=True, move_outputs="move", **kwargs): | ||
# type: (bool, bool, bool, Text, **Any) -> None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. likewise for this type signature as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
self._setup(kwargs) | ||
|
@@ -312,7 +342,7 @@ def run(self, pull_image=True, rm_container=True, | |
stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True) | ||
relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update) | ||
|
||
self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) | ||
self._execute([], env, kwargs, document, WorkflowRunID, ProcessProvActivity,reference_locations, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) | ||
|
||
|
||
class ContainerCommandLineJob(JobBase): | ||
|
@@ -323,17 +353,18 @@ def get_from_requirements(self, r, req, pull_image, dry_run=False): | |
# type: (Dict[Text, Text], bool, bool, bool) -> Text | ||
pass | ||
|
||
|
||
# type: (bool, bool, bool, Text, **Any) -> None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the line above needs to be removed, or put where it belongs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
@abstractmethod | ||
def create_runtime(self, env, rm_container, record_container_id, cidfile_dir, | ||
cidfile_prefix, **kwargs): | ||
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List | ||
pass | ||
|
||
def run(self, pull_image=True, rm_container=True, | ||
def run(self, document=None, WorkflowRunID=None, ProcessProvActivity=None, | ||
reference_locations=None, pull_image=True, rm_container=True, | ||
record_container_id=False, cidfile_dir="", | ||
cidfile_prefix="", | ||
rm_tmpdir=True, move_outputs="move", **kwargs): | ||
# type: (bool, bool, bool, Text, Text, bool, Text, **Any) -> None | ||
cidfile_prefix="", rm_tmpdir=True, move_outputs="move", **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happened to the type annotations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added back |
||
|
||
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement") | ||
|
||
|
@@ -382,7 +413,7 @@ def run(self, pull_image=True, rm_container=True, | |
runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs) | ||
runtime.append(img_id) | ||
|
||
self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) | ||
self._execute(runtime, env, kwargs, document, WorkflowRunID, ProcessProvActivity, reference_locations, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) #included kwargs to see if the workflow has been executed using the provenance flag. | ||
|
||
|
||
def _job_popen( | ||
|
@@ -461,6 +492,7 @@ def _job_popen( | |
stderr_path=stderr_path, | ||
stdin_path=stdin_path, | ||
) | ||
|
||
with open(os.path.join(job_dir, "job.json"), "wb") as f: | ||
json.dump(job_description, codecs.getwriter('utf-8')(f), ensure_ascii=False) # type: ignore | ||
try: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest creating a separate subclass of
JobExecutor
here to keep the API simpler and cleaner since the number of changes andif
clauses for a single feature is large enough.