Skip to content

Commit

Permalink
Packing refactor (#177)
Browse files Browse the repository at this point in the history
* Refactor 'pack into single graph' functionality into separate module.
* Don't modify input step object in WorkflowStep, copy it instead.
* Use Text type
* Add test for packing feature.
* Sort output pack output for deterministic results.
  • Loading branch information
tetron committed Aug 29, 2016
1 parent 8a685e6 commit 0433da3
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 67 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -8,6 +8,7 @@ eggs/
.eggs/
*.egg-info/
*.egg
.tox/

# Editor Temps
.*.sw?
Expand Down
70 changes: 3 additions & 67 deletions cwltool/main.py
Expand Up @@ -30,6 +30,7 @@
from . import draft2tool
from .builder import adjustFileObjs, adjustDirObjs
from .stdfsaccess import StdFsAccess
from .pack import pack

_logger = logging.getLogger("cwltool")

Expand Down Expand Up @@ -495,74 +496,9 @@ def makeRelative(ob):

stdout.write(json.dumps(deps, indent=4))

def flatten_deps(d, files): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
flatten_deps(s, files)
elif isinstance(d, dict):
files.add(d["location"])
if "secondaryFiles" in d:
flatten_deps(d["secondaryFiles"], files)

def find_run(d, runs): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
find_run(s, runs)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (Text, Text)):
runs.add(d["run"])
for s in d.values():
find_run(s, runs)

def replace_refs(d, rewrite, stem, newstem):
# type: (Any, Dict[Text, Text], Text, Text) -> None
if isinstance(d, list):
for s,v in enumerate(d):
if isinstance(v, (str, Text)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
else:
replace_refs(v, rewrite, stem, newstem)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (str, Text)):
d["run"] = rewrite[d["run"]]
for s,v in d.items():
if isinstance(v, (str, Text)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
replace_refs(v, rewrite, stem, newstem)

def print_pack(document_loader, processobj, uri, metadata):
# type: (Loader, Any, Text, Dict[Text, Text]) -> Text
def loadref(b, u):
# type: (Text, Text) -> Union[Dict, List, Text]
return document_loader.resolve_ref(u, base_url=b)[0]
deps = scandeps(uri, processobj, set(("run",)), set(), loadref)

fdeps = set((uri,))
flatten_deps(deps, fdeps)

runs = set() # type: Set[Text]
for f in fdeps:
find_run(document_loader.idx[f], runs)

rewrite = {}
if isinstance(processobj, list):
for p in processobj:
rewrite[p["id"]] = "#" + shortname(p["id"])
else:
rewrite[uri] = "#main"

for r in runs:
rewrite[r] = "#" + shortname(r)

packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"]
} # type: Dict[Text, Any]
for r,v in rewrite.items():
dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r]))
dc["id"] = v
dc["name"] = v
replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/")
packed["$graph"].append(dc)

# type: (Loader, Union[Dict[unicode, Any], List[Dict[unicode, Any]]], unicode, Dict[unicode, Any]) -> str
packed = pack(document_loader, processobj, uri, metadata)
if len(packed["$graph"]) > 1:
return json.dumps(packed, indent=4)
else:
Expand Down
84 changes: 84 additions & 0 deletions cwltool/pack.py
@@ -0,0 +1,84 @@
import copy
import json

from schema_salad.ref_resolver import Loader

from .process import scandeps, shortname

from typing import Union, Any, cast, Callable, Dict, Tuple, Type, IO, Text

def flatten_deps(d, files): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
flatten_deps(s, files)
elif isinstance(d, dict):
files.add(d["location"])
if "secondaryFiles" in d:
flatten_deps(d["secondaryFiles"], files)

def find_run(d, runs): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
find_run(s, runs)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (str, unicode)):
runs.add(d["run"])
for s in d.values():
find_run(s, runs)

def replace_refs(d, rewrite, stem, newstem):
# type: (Any, Dict[Text, Text], Text, Text) -> None
if isinstance(d, list):
for s,v in enumerate(d):
if isinstance(v, (str, unicode)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
else:
replace_refs(v, rewrite, stem, newstem)
elif isinstance(d, dict):
if "package" in d:
raise Exception("where the fuck did this come from %s" % json.dumps(d, indent=4))
if "run" in d and isinstance(d["run"], (str, unicode)):
d["run"] = rewrite[d["run"]]
for s,v in d.items():
if isinstance(v, (str, unicode)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
replace_refs(v, rewrite, stem, newstem)

def pack(document_loader, processobj, uri, metadata):
# type: (Loader, Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Dict[Text, Text]) -> Dict[Text, Any]
def loadref(b, u):
# type: (Text, Text) -> Union[Dict, List, Text]
return document_loader.resolve_ref(u, base_url=b)[0]
deps = scandeps(uri, processobj, set(("run",)), set(), loadref)

fdeps = set((uri,))
flatten_deps(deps, fdeps)

runs = set() # type: Set[Text]
for f in fdeps:
find_run(document_loader.idx[f], runs)

rewrite = {}
if isinstance(processobj, list):
for p in processobj:
rewrite[p["id"]] = "#" + shortname(p["id"])
else:
rewrite[uri] = "#main"

for r in runs:
rewrite[r] = "#" + shortname(r)

packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"]
} # type: Dict[Text, Any]

for r in sorted(rewrite.keys()):
v = rewrite[r]
dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r]))
dc["id"] = v
for n in ("name", "package", "cwlVersion"):
if n in dc:
del dc[n]
replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/")
packed["$graph"].append(dc)

return packed
30 changes: 30 additions & 0 deletions cwltool/process.py
Expand Up @@ -312,6 +312,17 @@ class Process(object):

def __init__(self, toolpath_object, **kwargs):
# type: (Dict[Text, Any], **Any) -> None
"""
kwargs:
metadata: tool document metadata
requirements: inherited requirements
hints: inherited hints
loader: schema_salad.ref_resolver.Loader used to load tool document
avsc_names: CWL Avro schema object used to validate document
strict: flag to determine strict validation (fail on unrecognized fields)
"""

self.metadata = kwargs.get("metadata", {}) # type: Dict[Text,Any]
self.names = None # type: avro.schema.Names

Expand All @@ -338,6 +349,9 @@ def __init__(self, toolpath_object, **kwargs):
if "loader" in kwargs:
self.formatgraph = kwargs["loader"].graph

self.doc_loader = kwargs["loader"]
self.doc_schema = kwargs["avsc_names"]

checkRequirements(self.tool, supportedProcessRequirements)
self.validate_hints(kwargs["avsc_names"], self.tool.get("hints", []),
strict=kwargs.get("strict"))
Expand Down Expand Up @@ -395,6 +409,22 @@ def __init__(self, toolpath_object, **kwargs):

def _init_job(self, joborder, **kwargs):
# type: (Dict[Text, Text], **Any) -> Builder
"""
kwargs:
eval_timeout: javascript evaluation timeout
use_container: do/don't use Docker when DockerRequirement hint provided
make_fs_access: make an FsAccess() object with given basedir
basedir: basedir for FsAccess
docker_outdir: output directory inside docker for this job
docker_tmpdir: tmpdir inside docker for this job
docker_stagedir: stagedir inside docker for this job
outdir: outdir on host for this job
tmpdir: tmpdir on host for this job
stagedir: stagedir on host for this job
select_resources: callback to select compute resources
"""

builder = Builder()
builder.job = cast(Dict[Text, Union[Dict[Text, Any], List,
Text]], copy.deepcopy(joborder))
Expand Down
1 change: 1 addition & 0 deletions cwltool/workflow.py
Expand Up @@ -443,6 +443,7 @@ def __init__(self, toolpath_object, pos, **kwargs):
u"Tool definition %s failed validation:\n%s" %
(toolpath_object["run"], validate.indent(str(v))))

self.tool = toolpath_object = copy.deepcopy(toolpath_object)
for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")):
toolpath_object[toolfield] = []
for step_entry in toolpath_object[stepfield]:
Expand Down
17 changes: 17 additions & 0 deletions tests/test_pack.py
@@ -0,0 +1,17 @@
import unittest
import json
from cwltool.load_tool import fetch_document, validate_document
import cwltool.pack
import cwltool.workflow

class TestPack(unittest.TestCase):
def test_pack(self):
self.maxDiff = None

document_loader, workflowobj, uri = fetch_document("tests/wf/revsort.cwl")
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri)
packed = cwltool.pack.pack(document_loader, processobj, uri, metadata)
with open("tests/wf/expect_packed.cwl") as f:
expect_packed = json.load(f)
self.assertEqual(expect_packed, packed)
125 changes: 125 additions & 0 deletions tests/wf/expect_packed.cwl
@@ -0,0 +1,125 @@
{
"cwlVersion": "v1.0",
"$graph": [
{
"inputs": [
{
"doc": "The input file to be processed.",
"type": "File",
"id": "#main/input"
},
{
"default": true,
"doc": "If true, reverse (decending) sort",
"type": "boolean",
"id": "#main/reverse_sort"
}
],
"doc": "Reverse the lines in a document, then sort those lines.",
"class": "Workflow",
"steps": [
{
"out": [
"#main/rev/output"
],
"run": "#revtool.cwl",
"id": "#main/rev",
"in": [
{
"source": "#main/input",
"id": "#main/rev/input"
}
]
},
{
"out": [
"#main/sorted/output"
],
"run": "#sorttool.cwl",
"id": "#main/sorted",
"in": [
{
"source": "#main/rev/output",
"id": "#main/sorted/input"
},
{
"source": "#main/reverse_sort",
"id": "#main/sorted/reverse"
}
]
}
],
"outputs": [
{
"outputSource": "#main/sorted/output",
"type": "File",
"id": "#main/output",
"doc": "The output with the lines reversed and sorted."
}
],
"id": "#main",
"hints": [
{
"dockerPull": "debian:8",
"class": "DockerRequirement"
}
]
},
{
"inputs": [
{
"inputBinding": {},
"type": "File",
"id": "#revtool.cwl/input"
}
],
"stdout": "output.txt",
"doc": "Reverse each line using the `rev` command",
"baseCommand": "rev",
"class": "CommandLineTool",
"outputs": [
{
"outputBinding": {
"glob": "output.txt"
},
"type": "File",
"id": "#revtool.cwl/output"
}
],
"id": "#revtool.cwl"
},
{
"inputs": [
{
"inputBinding": {
"position": 1,
"prefix": "--reverse"
},
"type": "boolean",
"id": "#sorttool.cwl/reverse"
},
{
"inputBinding": {
"position": 2
},
"type": "File",
"id": "#sorttool.cwl/input"
}
],
"stdout": "output.txt",
"doc": "Sort lines using the `sort` command",
"baseCommand": "sort",
"class": "CommandLineTool",
"outputs": [
{
"outputBinding": {
"glob": "output.txt"
},
"type": "File",
"id": "#sorttool.cwl/output"
}
],
"id": "#sorttool.cwl"
}
]
}
6 changes: 6 additions & 0 deletions tests/wf/revsort-job.json
@@ -0,0 +1,6 @@
{
"input": {
"class": "File",
"location": "whale.txt"
}
}

0 comments on commit 0433da3

Please sign in to comment.