diff --git a/cwltool.py b/cwltool.py index a3b9c6928..0cdd91a14 100755 --- a/cwltool.py +++ b/cwltool.py @@ -6,6 +6,7 @@ """ import sys + from cwltool import main if __name__ == "__main__": diff --git a/cwltool/__main__.py b/cwltool/__main__.py index 2b15c84f5..4d84a444f 100644 --- a/cwltool/__main__.py +++ b/cwltool/__main__.py @@ -1,4 +1,5 @@ -from . import main import sys +from . import main + sys.exit(main.main()) diff --git a/cwltool/builder.py b/cwltool/builder.py index 40cb789bd..595bdbdea 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -1,13 +1,15 @@ import copy -from .utils import aslist -from . import expression + import avro import schema_salad.validate as validate from schema_salad.sourceline import SourceLine from typing import Any, Callable, Text, Type, Union + +from . import expression from .errors import WorkflowException +from .pathmapper import PathMapper, adjustFileObjs, normalizeFilesDirs from .stdfsaccess import StdFsAccess -from .pathmapper import PathMapper, adjustFileObjs, adjustDirObjs, normalizeFilesDirs +from .utils import aslist CONTENT_LIMIT = 64 * 1024 @@ -18,8 +20,8 @@ def substitute(value, replace): # type: (Text, Text) -> Text else: return value + replace -class Builder(object): +class Builder(object): def __init__(self): # type: () -> None self.names = None # type: avro.schema.Names self.schemaDefs = None # type: Dict[Text, Dict[Text, Any]] @@ -39,8 +41,12 @@ def __init__(self): # type: () -> None self.build_job_script = None # type: Callable[[List[str]], Text] self.debug = False # type: bool - def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]): + def bind_input(self, schema, datum, lead_pos=None, tail_pos=None): # type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]] + if tail_pos is None: + tail_pos = [] + if lead_pos is None: + lead_pos = [] bindings = [] # type: List[Dict[Text,Text]] binding = None # type: Dict[Text,Any] if "inputBinding" in schema and isinstance(schema["inputBinding"], dict): @@ -137,7 +143,6 @@ def _capture_files(f): if schema["type"] == "Directory": self.files.append(datum) - # Position to front of the sort key if binding: for bi in bindings: @@ -198,7 +203,7 @@ def do_eval(self, ex, context=None, pull_image=True, recursive=False): # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any if recursive: if isinstance(ex, dict): - return {k: self.do_eval(v, context, pull_image, recursive) for k,v in ex.iteritems()} + return {k: self.do_eval(v, context, pull_image, recursive) for k, v in ex.iteritems()} if isinstance(ex, list): return [self.do_eval(v, context, pull_image, recursive) for v in ex] diff --git a/cwltool/cwlrdf.py b/cwltool/cwlrdf.py index 950357b06..af94966b6 100644 --- a/cwltool/cwlrdf.py +++ b/cwltool/cwlrdf.py @@ -1,11 +1,12 @@ -import json import urlparse -from .process import Process -from schema_salad.ref_resolver import Loader, ContextType + +from rdflib import Graph from schema_salad.jsonld_context import makerdf -from rdflib import Graph, plugin, URIRef -from rdflib.serializer import Serializer -from typing import Any, Dict, IO, Text, Union +from schema_salad.ref_resolver import ContextType +from typing import Any, Dict, IO, Text + +from .process import Process + def gather(tool, ctx): # type: (Process, ContextType) -> Graph g = Graph() @@ -16,14 +17,16 @@ def visitor(t): tool.visit(visitor) return g + def printrdf(wf, ctx, sr, stdout): # type: (Process, ContextType, Text, IO[Any]) -> None stdout.write(gather(wf, ctx).serialize(format=sr)) + def lastpart(uri): # type: (Any) -> Text uri = Text(uri) if "/" in uri: - return uri[uri.rindex("/")+1:] + return uri[uri.rindex("/") + 1:] else: return uri @@ -84,6 +87,7 @@ def dot_with_parameters(g, stdout): # type: (Graph, IO[Any]) -> None for (inp,) in qres: stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp))) + def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None dotname = {} # type: Dict[Text,Text] clusternode = {} @@ -163,7 +167,7 @@ def printdot(wf, ctx, stdout, include_parameters=False): stdout.write("digraph {") - #g.namespace_manager.qname(predicate) + # g.namespace_manager.qname(predicate) if include_parameters: dot_with_parameters(g, stdout) diff --git a/cwltool/docker.py b/cwltool/docker.py index 812852ff9..0b9ef202b 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -1,15 +1,18 @@ -import subprocess import logging -import sys -import requests import os -from .errors import WorkflowException import re +import subprocess +import sys import tempfile -from typing import Any, Text, Union + +import requests +from typing import Text + +from .errors import WorkflowException _logger = logging.getLogger("cwltool") + def get_image(dockerRequirement, pull_image, dry_run=False): # type: (Dict[Text, Text], bool, bool) -> bool found = False @@ -44,7 +47,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False): with open(os.path.join(dockerfile_dir, "Dockerfile"), "w") as df: df.write(dockerRequirement["dockerFile"]) cmd = ["docker", "build", "--tag=%s" % - str(dockerRequirement["dockerImageId"]), dockerfile_dir] + str(dockerRequirement["dockerImageId"]), dockerfile_dir] _logger.info(Text(cmd)) if not dry_run: subprocess.check_call(cmd, stdout=sys.stderr) @@ -62,7 +65,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False): _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"]) req = requests.get(dockerRequirement["dockerLoad"], stream=True) n = 0 - for chunk in req.iter_content(1024*1024): + for chunk in req.iter_content(1024 * 1024): n += len(chunk) _logger.info("\r%i bytes" % (n)) loadproc.stdin.write(chunk) @@ -73,7 +76,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False): found = True elif "dockerImport" in dockerRequirement: cmd = ["docker", "import", str(dockerRequirement["dockerImport"]), - str(dockerRequirement["dockerImageId"])] + str(dockerRequirement["dockerImageId"])] _logger.info(Text(cmd)) if not dry_run: subprocess.check_call(cmd, stdout=sys.stderr) diff --git a/cwltool/docker_uid.py b/cwltool/docker_uid.py index 11223c74b..5b63e833d 100644 --- a/cwltool/docker_uid.py +++ b/cwltool/docker_uid.py @@ -1,5 +1,6 @@ import subprocess -from typing import Text, Union + +from typing import Text def docker_vm_uid(): # type: () -> int diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 9ca34d6db..54590a84b 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -1,32 +1,28 @@ -import shutil -from functools import partial -import json import copy -import os -import glob -import logging import hashlib +import json +import logging +import os import re -import urlparse +import shutil import tempfile -import errno +import urlparse +from functools import partial -import avro.schema import schema_salad.validate as validate -from schema_salad.ref_resolver import file_uri, uri_file_path import shellescape +from schema_salad.ref_resolver import file_uri, uri_file_path +from schema_salad.sourceline import SourceLine, indent from typing import Any, Callable, cast, Generator, Text, Union -from .process import Process, shortname, uniquename, getListing, normalizeFilesDirs, compute_checksums +from .builder import CONTENT_LIMIT, substitute, Builder, adjustFileObjs +from .pathmapper import adjustDirObjs from .errors import WorkflowException -from .utils import aslist -from . import expression -from .builder import CONTENT_LIMIT, substitute, Builder, adjustFileObjs, adjustDirObjs -from .pathmapper import PathMapper from .job import CommandLineJob +from .pathmapper import PathMapper +from .process import Process, shortname, uniquename, getListing, normalizeFilesDirs, compute_checksums from .stdfsaccess import StdFsAccess - -from schema_salad.sourceline import SourceLine, indent +from .utils import aslist ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") ACCEPTLIST_EN_RELAXED_RE = re.compile(r"^[ a-zA-Z0-9._+-]+$") # with spaces @@ -36,6 +32,7 @@ _logger = logging.getLogger("cwltool") + class ExpressionTool(Process): def __init__(self, toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> None @@ -60,7 +57,7 @@ def run(self, **kwargs): # type: (**Any) -> None self.output_callback(ev, "success") except Exception as e: _logger.warn(u"Failed to evaluate expression:\n%s", - e, exc_info=kwargs.get('debug')) + e, exc_info=kwargs.get('debug')) self.output_callback({}, "permanentFail") def job(self, joborder, output_callback, **kwargs): @@ -83,6 +80,7 @@ def remove_path(f): # type: (Dict[Text, Any]) -> None if "path" in f: del f["path"] + def revmap_file(builder, outdir, f): # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None] @@ -104,7 +102,7 @@ def revmap_file(builder, outdir, f): if revmap_f: f["location"] = revmap_f[1] elif path.startswith(builder.outdir): - f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir)+1:]) + f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:]) return f if "path" in f: @@ -115,10 +113,11 @@ def revmap_file(builder, outdir, f): f["location"] = revmap_f[1] return f elif path.startswith(builder.outdir): - f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir)+1:]) + f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:]) return f else: - raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input file pass through." % (path, builder.outdir)) + raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input " + u"file pass through." % (path, builder.outdir)) raise WorkflowException(u"Output File object is missing both `location` and `path` fields: %s" % f) @@ -139,6 +138,7 @@ def run(self, **kwargs): self.outdir, kwargs.get("compute_checksum", True)), "success") + # map files to assigned path inside a container. We need to also explicitly # walk over input as implicit reassignment doesn't reach everything in builder.bindings def check_adjust(builder, f): @@ -151,6 +151,7 @@ def check_adjust(builder, f): raise WorkflowException("Invalid filename: '%s' contains illegal characters" % (f["basename"])) return f + class CommandLineTool(Process): def __init__(self, toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> None @@ -191,7 +192,7 @@ def job(self, joborder, output_callback, **kwargs): cmdline = ["docker", "run", dockerimg] + cmdline keydict = {u"cmdline": cmdline} - for _,f in cachebuilder.pathmapper.items(): + for _, f in cachebuilder.pathmapper.items(): if f.type == "File": st = os.stat(f.resolved) keydict[f.resolved] = [st.st_size, int(st.st_mtime * 1000)] @@ -205,11 +206,11 @@ def job(self, joborder, output_callback, **kwargs): if r["class"] in interesting and r["class"] not in keydict: keydict[r["class"]] = r - keydictstr = json.dumps(keydict, separators=(',',':'), sort_keys=True) + keydictstr = json.dumps(keydict, separators=(',', ':'), sort_keys=True) cachekey = hashlib.md5(keydictstr).hexdigest() _logger.debug("[job %s] keydictstr is %s -> %s", jobname, - keydictstr, cachekey) + keydictstr, cachekey) jobcache = os.path.join(kwargs["cachedir"], cachekey) jobcachepending = jobcache + ".pending" @@ -235,11 +236,12 @@ def rm_pending_output_callback(output_callback, jobcachepending, if processStatus == "success": os.remove(jobcachepending) output_callback(outputs, processStatus) + output_callback = cast( Callable[..., Any], # known bug in mypy # https://github.com/python/mypy/issues/797 partial(rm_pending_output_callback, output_callback, - jobcachepending)) + jobcachepending)) builder = self._init_job(joborder, **kwargs) @@ -260,12 +262,11 @@ def rm_pending_output_callback(output_callback, jobcachepending, if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"[job %s] initializing from %s%s", - j.name, - self.tool.get("id", ""), - u" as part of %s" % kwargs["part_of"] if "part_of" in kwargs else "") + j.name, + self.tool.get("id", ""), + u" as part of %s" % kwargs["part_of"] if "part_of" in kwargs else "") _logger.debug(u"[job %s] %s", j.name, json.dumps(joborder, indent=4)) - builder.pathmapper = None make_path_mapper_kwargs = kwargs if "stagedir" in make_path_mapper_kwargs: @@ -275,7 +276,8 @@ def rm_pending_output_callback(output_callback, jobcachepending, builder.requirements = j.requirements if _logger.isEnabledFor(logging.DEBUG): - _logger.debug(u"[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4)) + _logger.debug(u"[job %s] path mappings is %s", j.name, + json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4)) _check_adjust = partial(check_adjust, builder) @@ -334,7 +336,7 @@ def rm_pending_output_callback(output_callback, jobcachepending, ls.append(et) else: ls.append(builder.do_eval(t)) - for i,t in enumerate(ls): + for i, t in enumerate(ls): if "entry" in t: if isinstance(t["entry"], basestring): ls[i] = { @@ -381,8 +383,9 @@ def rm_pending_output_callback(output_callback, jobcachepending, def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] + ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] try: - ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] + fs_access = builder.make_fs_access(outdir) custom_output = fs_access.join(outdir, "cwl.output.json") if fs_access.exists(custom_output): @@ -395,7 +398,8 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): with SourceLine(ports, i, WorkflowException): fragment = shortname(port["id"]) try: - ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum) + ret[fragment] = self.collect_output(port, builder, outdir, fs_access, + compute_checksum=compute_checksum) except Exception as e: _logger.debug( u"Error collecting output for parameter '%s'" @@ -406,9 +410,9 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): if ret: adjustFileObjs(ret, - cast(Callable[[Any], Any], # known bug in mypy - # https://github.com/python/mypy/issues/797 - partial(revmap_file, builder, outdir))) + cast(Callable[[Any], Any], # known bug in mypy + # https://github.com/python/mypy/issues/797 + partial(revmap_file, builder, outdir))) adjustFileObjs(ret, remove_path) adjustDirObjs(ret, remove_path) normalizeFilesDirs(ret) @@ -438,7 +442,7 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr for gb in globpatterns: if gb.startswith(outdir): - gb = gb[len(outdir)+1:] + gb = gb[len(outdir) + 1:] elif gb == ".": gb = outdir elif gb.startswith("/"): @@ -464,7 +468,7 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr checksum = hashlib.sha1() while contents != "": checksum.update(contents) - contents = f.read(1024*1024) + contents = f.read(1024 * 1024) files["checksum"] = "sha1$%s" % checksum.hexdigest() f.seek(0, 2) filesize = f.tell() @@ -523,8 +527,7 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr if not r and optional: r = None - if (not r and isinstance(schema["type"], dict) and - schema["type"]["type"] == "record"): + if (not r and isinstance(schema["type"], dict) and schema["type"]["type"] == "record"): out = {} for f in schema["type"]["fields"]: out[shortname(f["name"])] = self.collect_output( # type: ignore diff --git a/cwltool/errors.py b/cwltool/errors.py index 6bf187cf2..9df1236fa 100644 --- a/cwltool/errors.py +++ b/cwltool/errors.py @@ -1,5 +1,6 @@ class WorkflowException(Exception): pass + class UnsupportedRequirement(WorkflowException): pass diff --git a/cwltool/expression.py b/cwltool/expression.py index 96cbe99fe..72e4d449f 100644 --- a/cwltool/expression.py +++ b/cwltool/expression.py @@ -1,21 +1,16 @@ -import subprocess +import copy import json import logging -import os import re -import copy from typing import Any, AnyStr, Union, Text, Dict, List -import schema_salad.validate as validate -import schema_salad.ref_resolver -from .utils import aslist, get_feature -from .errors import WorkflowException from . import sandboxjs -from . import docker +from .errors import WorkflowException _logger = logging.getLogger("cwltool") + def jshead(engineConfig, rootvars): # type: (List[Text], Dict[Text, Any]) -> Text return u"\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()]) @@ -29,11 +24,13 @@ def jshead(engineConfig, rootvars): segment_re = re.compile(segments, flags=re.UNICODE) param_re = re.compile(r"\((%s)%s*\)$" % (seg_symbol, segments), flags=re.UNICODE) -JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None] +JSON = Union[Dict[Any, Any], List[Any], Text, int, long, float, bool, None] + class SubstitutionError(Exception): pass + def scanner(scan): # type: (Text) -> List[int] DEFAULT = 0 DOLLAR = 1 @@ -58,13 +55,13 @@ def scanner(scan): # type: (Text) -> List[int] elif state == BACKSLASH: stack.pop() if stack[-1] == DEFAULT: - return [i-1, i+1] + return [i - 1, i + 1] elif state == DOLLAR: if c == '(': - start = i-1 + start = i - 1 stack.append(PAREN) elif c == '{': - start = i-1 + start = i - 1 stack.append(BRACE) else: stack.pop() @@ -74,7 +71,7 @@ def scanner(scan): # type: (Text) -> List[int] elif c == ')': stack.pop() if stack[-1] == DOLLAR: - return [start, i+1] + return [start, i + 1] elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': @@ -85,7 +82,7 @@ def scanner(scan): # type: (Text) -> List[int] elif c == '}': stack.pop() if stack[-1] == DOLLAR: - return [start, i+1] + return [start, i + 1] elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': @@ -103,10 +100,12 @@ def scanner(scan): # type: (Text) -> List[int] i += 1 if len(stack) > 1: - raise SubstitutionError("Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:])) + raise SubstitutionError( + "Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:])) else: return None + def next_seg(remain, obj): # type: (Text, Any) -> Any if remain: m = segment_re.match(remain) @@ -139,6 +138,7 @@ def next_seg(remain, obj): # type: (Text, Any) -> Any else: return obj + def evaluator(ex, jslib, obj, fullJS=False, timeout=None, debug=False): # type: (Text, Text, Dict[Text, Any], bool, int, bool) -> JSON m = param_re.match(ex) @@ -150,7 +150,10 @@ def evaluator(ex, jslib, obj, fullJS=False, timeout=None, debug=False): elif fullJS: return sandboxjs.execjs(ex, jslib, timeout=timeout, debug=debug) else: - raise sandboxjs.JavascriptException("Syntax error in parameter reference '%s' or used Javascript code without specifying InlineJavascriptRequirement.", ex) + raise sandboxjs.JavascriptException( + "Syntax error in parameter reference '%s' or used Javascript code without specifying InlineJavascriptRequirement.", + ex) + def interpolate(scan, rootvars, timeout=None, fullJS=None, jslib="", debug=False): @@ -162,7 +165,7 @@ def interpolate(scan, rootvars, parts.append(scan[0:w[0]]) if scan[w[0]] == '$': - e = evaluator(scan[w[0]+1:w[1]], jslib, rootvars, fullJS=fullJS, + e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, fullJS=fullJS, timeout=timeout, debug=debug) if w[0] == 0 and w[1] == len(scan): return e @@ -171,7 +174,7 @@ def interpolate(scan, rootvars, leaf = leaf[1:-1] parts.append(leaf) elif scan[w[0]] == '\\': - e = scan[w[1]-1] + e = scan[w[1] - 1] parts.append(e) scan = scan[w[1]:] @@ -179,6 +182,7 @@ def interpolate(scan, rootvars, parts.append(scan) return ''.join(parts) + def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, context=None, pull_image=True, timeout=None, debug=False): # type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool) -> Any @@ -190,7 +194,7 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, rootvars = { u"inputs": jobinput, u"self": context, - u"runtime": runtime } + u"runtime": runtime} if isinstance(ex, (str, Text)): fullJS = False diff --git a/cwltool/factory.py b/cwltool/factory.py index be2468b8d..eeb4a7cff 100644 --- a/cwltool/factory.py +++ b/cwltool/factory.py @@ -1,11 +1,13 @@ -from . import main -from . import load_tool -from . import workflow import os -from .process import Process + from typing import Any, Text, Union, Tuple from typing import Callable as tCallable -import argparse + +from . import load_tool +from . import main +from . import workflow +from .process import Process + class WorkflowStatus(Exception): def __init__(self, out, status): @@ -14,6 +16,7 @@ def __init__(self, out, status): self.out = out self.status = status + class Callable(object): def __init__(self, t, factory): # type: (Process, Factory) -> None self.t = t @@ -29,6 +32,7 @@ def __call__(self, **kwargs): else: return out + class Factory(object): def __init__(self, makeTool=workflow.defaultMakeTool, executor=main.single_job_executor, diff --git a/cwltool/flatten.py b/cwltool/flatten.py index 477444aad..0e3e437ce 100644 --- a/cwltool/flatten.py +++ b/cwltool/flatten.py @@ -1,5 +1,6 @@ from typing import Any, Callable, List, cast + # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html diff --git a/cwltool/job.py b/cwltool/job.py index 8b331a87c..8f1b9718a 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -1,26 +1,24 @@ -import subprocess -import io -import os -import tempfile -import glob +import functools import json import logging -import sys -import requests -from . import docker -from .process import get_feature, empty_subtree, stageFiles -from .errors import WorkflowException +import os +import re import shutil import stat -import re +import subprocess +import sys +import tempfile + import shellescape -import string -from .docker_uid import docker_vm_uid +from typing import (Any, Callable, Union, Iterable, MutableMapping, + IO, Text, Tuple) + +from . import docker from .builder import Builder -from typing import (Any, Callable, Union, Iterable, Mapping, MutableMapping, - IO, cast, Text, Tuple) +from .docker_uid import docker_vm_uid +from .errors import WorkflowException from .pathmapper import PathMapper -import functools +from .process import get_feature, empty_subtree, stageFiles _logger = logging.getLogger("cwltool") @@ -91,8 +89,8 @@ def deref_links(outputs): # type: (Any) -> None for v in outputs: deref_links(v) -class CommandLineJob(object): +class CommandLineJob(object): def __init__(self): # type: () -> None self.builder = None # type: Builder self.joborder = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]] @@ -121,7 +119,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, if not os.path.exists(self.outdir): os.makedirs(self.outdir) - #with open(os.path.join(outdir, "cwl.input.json"), "w") as fp: + # with open(os.path.join(outdir, "cwl.input.json"), "w") as fp: # json.dump(self.joborder, fp) runtime = [] # type: List[Text] @@ -173,7 +171,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, euid = docker_vm_uid() or os.geteuid() - if kwargs.get("no_match_user",None) is False: + if kwargs.get("no_match_user", None) is False: runtime.append(u"--user=%s" % (euid)) if rm_container: @@ -186,7 +184,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, # runtime.append("--env=HOME=/tmp") runtime.append("--env=HOME=%s" % self.builder.outdir) - for t,v in self.environment.items(): + for t, v in self.environment.items(): runtime.append(u"--env=%s=%s" % (t, v)) runtime.append(img_id) @@ -216,7 +214,8 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, _logger.info(u"[job %s] %s$ %s%s%s%s", self.name, self.outdir, - " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in (runtime + self.command_line)]), + " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in + (runtime + self.command_line)]), u' < %s' % self.stdin if self.stdin else '', u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') @@ -240,6 +239,7 @@ def linkoutdir(src, tgt): if src == item.resolved: os.symlink(item.target, tgt) break + stageFiles(generatemapper, linkoutdir) stdin_path = None @@ -291,6 +291,7 @@ def linkoutdir(src, tgt): if os.path.islink(tgt): os.remove(tgt) os.symlink(src, tgt) + stageFiles(generatemapper, linkoutdir, ignoreWritable=True) outputs = self.collect_outputs(self.outdir) @@ -335,14 +336,14 @@ def linkoutdir(src, tgt): def _job_popen( - commands, # type: List[str] - stdin_path, # type: Text - stdout_path, # type: Text - stderr_path, # type: Text - env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] - cwd, # type: Text - job_dir=None, # type: Text - build_job_script=None, # type: Callable[[List[str]], Text] + commands, # type: List[str] + stdin_path, # type: Text + stdout_path, # type: Text + stderr_path, # type: Text + env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] + cwd, # type: Text + job_dir=None, # type: Text + build_job_script=None, # type: Callable[[List[str]], Text] ): # type: (...) -> int diff --git a/cwltool/load_tool.py b/cwltool/load_tool.py index 7096bfc68..d197aadce 100644 --- a/cwltool/load_tool.py +++ b/cwltool/load_tool.py @@ -1,33 +1,33 @@ # pylint: disable=unused-import """Loads a CWL document.""" -import os -import uuid import logging +import os import re import urlparse +import uuid -from typing import Any, AnyStr, Callable, cast, Dict, Text, Tuple, Union -from ruamel.yaml.comments import CommentedSeq, CommentedMap -from avro.schema import Names import requests.sessions - -from schema_salad.ref_resolver import Loader, Fetcher, file_uri -import schema_salad.validate as validate -from schema_salad.validate import ValidationException import schema_salad.schema as schema +from avro.schema import Names +from ruamel.yaml.comments import CommentedSeq, CommentedMap +from schema_salad.ref_resolver import Loader, Fetcher, file_uri from schema_salad.sourceline import cmap +from schema_salad.validate import ValidationException +from typing import Any, Callable, cast, Dict, Text, Tuple, Union -from . import update from . import process -from .process import Process, shortname +from . import update from .errors import WorkflowException +from .process import Process, shortname _logger = logging.getLogger("cwltool") -def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]] + +def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]] resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text] - fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] + fetcher_constructor=None + # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] ): # type: (...) -> Tuple[Loader, CommentedMap, Text] """Retrieve a CWL document.""" @@ -62,12 +62,13 @@ def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]] return document_loader, workflowobj, uri + def _convert_stdstreams_to_files(workflowobj): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None if isinstance(workflowobj, dict): if ('class' in workflowobj - and workflowobj['class'] == 'CommandLineTool'): + and workflowobj['class'] == 'CommandLineTool'): if 'outputs' in workflowobj: for out in workflowobj['outputs']: for streamtype in ['stdout', 'stderr']: @@ -106,13 +107,15 @@ def _convert_stdstreams_to_files(workflowobj): for entry in workflowobj: _convert_stdstreams_to_files(entry) -def validate_document(document_loader, # type: Loader - workflowobj, # type: CommentedMap - uri, # type: Text + +def validate_document(document_loader, # type: Loader + workflowobj, # type: CommentedMap + uri, # type: Text enable_dev=False, # type: bool - strict=True, # type: bool - preprocess_only=False, # type: bool - fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] + strict=True, # type: bool + preprocess_only=False, # type: bool + fetcher_constructor=None + # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] ): # type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text] """Validate a CWL document.""" @@ -159,7 +162,7 @@ def validate_document(document_loader, # type: Loader processobj = None # type: Union[CommentedMap, CommentedSeq, unicode] document_loader = Loader(sch_document_loader.ctx, schemagraph=sch_document_loader.graph, - idx=document_loader.idx, cache=sch_document_loader.cache, + idx=document_loader.idx, cache=sch_document_loader.cache, fetcher_constructor=fetcher_constructor) workflowobj["id"] = fileuri @@ -171,9 +174,9 @@ def validate_document(document_loader, # type: Loader if not isinstance(processobj, dict): raise ValidationException("Draft-2 workflows must be a dict.") metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}), - "$schemas": processobj.get("$schemas", []), - "cwlVersion": processobj["cwlVersion"]}, - fn=fileuri)) + "$schemas": processobj.get("$schemas", []), + "cwlVersion": processobj["cwlVersion"]}, + fn=fileuri)) _convert_stdstreams_to_files(workflowobj) @@ -193,11 +196,11 @@ def validate_document(document_loader, # type: Loader def make_tool(document_loader, # type: Loader - avsc_names, # type: Names - metadata, # type: Dict[Text, Any] - uri, # type: Text - makeTool, # type: Callable[..., Process] - kwargs # type: dict + avsc_names, # type: Names + metadata, # type: Dict[Text, Any] + uri, # type: Text + makeTool, # type: Callable[..., Process] + kwargs # type: dict ): # type: (...) -> Process """Make a Python CWL object.""" @@ -236,16 +239,17 @@ def make_tool(document_loader, # type: Loader def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] - makeTool, # type: Callable[..., Process] - kwargs=None, # type: dict + makeTool, # type: Callable[..., Process] + kwargs=None, # type: dict enable_dev=False, # type: bool - strict=True, # type: bool - resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text] + strict=True, # type: bool + resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text] fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] ): # type: (...) -> Process - document_loader, workflowobj, uri = fetch_document(argsworkflow, resolver=resolver, fetcher_constructor=fetcher_constructor) + document_loader, workflowobj, uri = fetch_document(argsworkflow, resolver=resolver, + fetcher_constructor=fetcher_constructor) document_loader, avsc_names, processobj, metadata, uri = validate_document( document_loader, workflowobj, uri, enable_dev=enable_dev, strict=strict, fetcher_constructor=fetcher_constructor) diff --git a/cwltool/main.py b/cwltool/main.py index 35c4f2e27..83dd9f112 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -1,39 +1,33 @@ #!/usr/bin/env python import argparse +import functools import json +import logging import os import sys -import logging -import copy import tempfile -import ruamel.yaml as yaml -import urlparse -import hashlib -import pkg_resources # part of setuptools -import functools -import rdflib +import pkg_resources # part of setuptools import requests -from typing import (Union, Any, AnyStr, cast, Callable, Dict, Sequence, Text, - Tuple, Type, IO) - -from schema_salad.ref_resolver import Loader, Fetcher, file_uri, uri_file_path +import ruamel.yaml as yaml import schema_salad.validate as validate -import schema_salad.jsonld_context -import schema_salad.makedoc +from schema_salad.ref_resolver import Loader, Fetcher, file_uri, uri_file_path from schema_salad.sourceline import strip_dup_lineno +from typing import (Union, Any, AnyStr, cast, Callable, Dict, Sequence, Text, + Tuple, IO) +from . import draft2tool from . import workflow -from .errors import WorkflowException, UnsupportedRequirement +from .builder import adjustFileObjs +from .pathmapper import adjustDirObjs from .cwlrdf import printrdf, printdot -from .process import shortname, Process, getListing, relocateOutputs, cleanIntermediate, scandeps, normalizeFilesDirs +from .errors import WorkflowException, UnsupportedRequirement from .load_tool import fetch_document, validate_document, make_tool -from . import draft2tool +from .pack import pack +from .process import shortname, Process, getListing, relocateOutputs, cleanIntermediate, scandeps, normalizeFilesDirs from .resolver import tool_resolver -from .builder import adjustFileObjs, adjustDirObjs from .stdfsaccess import StdFsAccess -from .pack import pack _logger = logging.getLogger("cwltool") @@ -65,12 +59,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--rm-container", action="store_true", default=True, - help="Delete Docker container used by jobs after they exit (default)", - dest="rm_container") + help="Delete Docker container used by jobs after they exit (default)", + dest="rm_container") exgroup.add_argument("--leave-container", action="store_false", - default=True, help="Do not delete Docker container used by jobs after they exit", - dest="rm_container") + default=True, help="Do not delete Docker container used by jobs after they exit", + dest="rm_container") parser.add_argument("--tmpdir-prefix", type=Text, help="Path prefix for temporary directories", @@ -78,29 +72,29 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--tmp-outdir-prefix", type=Text, - help="Path prefix for intermediate output directories", - default="tmp") + help="Path prefix for intermediate output directories", + default="tmp") exgroup.add_argument("--cachedir", type=Text, default="", - help="Directory to cache intermediate workflow outputs to avoid recomputing steps.") + help="Directory to cache intermediate workflow outputs to avoid recomputing steps.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--rm-tmpdir", action="store_true", default=True, - help="Delete intermediate temporary directories (default)", - dest="rm_tmpdir") + help="Delete intermediate temporary directories (default)", + dest="rm_tmpdir") exgroup.add_argument("--leave-tmpdir", action="store_false", - default=True, help="Do not delete intermediate temporary directories", - dest="rm_tmpdir") + default=True, help="Do not delete intermediate temporary directories", + dest="rm_tmpdir") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--move-outputs", action="store_const", const="move", default="move", - help="Move output files to the workflow output directory and delete intermediate output directories (default).", - dest="move_outputs") + help="Move output files to the workflow output directory and delete intermediate output directories (default).", + dest="move_outputs") exgroup.add_argument("--leave-outputs", action="store_const", const="leave", default="move", - help="Leave output files in intermediate output directories.", - dest="move_outputs") + help="Leave output files in intermediate output directories.", + dest="move_outputs") exgroup.add_argument("--copy-outputs", action="store_const", const="copy", default="move", help="Copy output files to the workflow output directory, don't delete intermediate output directories.", @@ -108,10 +102,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-pull", default=True, action="store_true", - help="Try to pull Docker images", dest="enable_pull") + help="Try to pull Docker images", dest="enable_pull") exgroup.add_argument("--disable-pull", default=True, action="store_false", - help="Do not try to pull Docker images", dest="enable_pull") + help="Do not try to pull Docker images", dest="enable_pull") parser.add_argument("--rdf-serializer", help="Output RDF serialization format used by --print-rdf (one of turtle (default), n3, nt, xml)", @@ -124,8 +118,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--print-rdf", action="store_true", - help="Print corresponding RDF graph for workflow and exit") - exgroup.add_argument("--print-dot", action="store_true", help="Print workflow visualization in graphviz format and exit") + help="Print corresponding RDF graph for workflow and exit") + exgroup.add_argument("--print-dot", action="store_true", + help="Print workflow visualization in graphviz format and exit") exgroup.add_argument("--print-pre", action="store_true", help="Print CWL document after preprocessing.") exgroup.add_argument("--print-deps", action="store_true", help="Print CWL document dependencies.") exgroup.add_argument("--print-input-deps", action="store_true", help="Print input object document dependencies.") @@ -134,7 +129,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() - exgroup.add_argument("--strict", action="store_true", help="Strict validation (unrecognized or out of place fields are error)", + exgroup.add_argument("--strict", action="store_true", + help="Strict validation (unrecognized or out of place fields are error)", default=True, dest="strict") exgroup.add_argument("--non-strict", action="store_false", help="Lenient validation (ignore unrecognized fields)", default=True, dest="strict") @@ -147,12 +143,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool") parser.add_argument("--relative-deps", choices=['primary', 'cwd'], - default="primary", help="When using --print-deps, print paths " - "relative to primary file or current working directory.") + default="primary", help="When using --print-deps, print paths " + "relative to primary file or current working directory.") parser.add_argument("--enable-dev", action="store_true", help="Allow loading and running development versions " - "of CWL spec.", default=False) + "of CWL spec.", default=False) parser.add_argument("--default-container", help="Specify a default docker container that will be used if the workflow fails to specify one.") @@ -160,26 +156,26 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Disable passing the current uid to 'docker run --user`") parser.add_argument("--disable-net", action="store_true", help="Use docker's default networking for containers;" - " the default is to enable networking.") + " the default is to enable networking.") parser.add_argument("--custom-net", type=Text, help="Will be passed to `docker run` as the '--net' " - "parameter. Implies '--enable-net'.") + "parameter. Implies '--enable-net'.") parser.add_argument("--on-error", type=str, help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. " - "Default is 'stop'.", default="stop", choices=("stop", "continue")) + "Default is 'stop'.", default="stop", choices=("stop", "continue")) exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--compute-checksum", action="store_true", default=True, - help="Compute checksum of contents while collecting outputs", - dest="compute_checksum") + help="Compute checksum of contents while collecting outputs", + dest="compute_checksum") exgroup.add_argument("--no-compute-checksum", action="store_false", - help="Do not compute checksum of contents while collecting outputs", - dest="compute_checksum") + help="Do not compute checksum of contents while collecting outputs", + dest="compute_checksum") parser.add_argument("--relax-path-checks", action="store_true", - default=False, help="Relax requirements on path names. Currently " - "allows spaces.", dest="relax_path_checks") + default=False, help="Relax requirements on path names. Currently " + "allows spaces.", dest="relax_path_checks") parser.add_argument("workflow", type=Text, nargs="?", default=None) parser.add_argument("job_order", nargs=argparse.REMAINDER) @@ -200,14 +196,14 @@ def output_callback(out, processStatus): output_dirs = set() finaloutdir = kwargs.get("outdir") - kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get("tmp_outdir_prefix") else tempfile.mkdtemp() + kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( + "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) jobReqs = None if "cwl:requirements" in job_order_object: jobReqs = job_order_object["cwl:requirements"] - elif ("cwl:defaults" in t.metadata and "cwl:requirements" in - t.metadata["cwl:defaults"]): + elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]): jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"] if jobReqs: for req in jobReqs: @@ -244,6 +240,7 @@ def output_callback(out, processStatus): else: return (None, "permanentFail") + class FSAction(argparse.Action): objclass = None # type: Text @@ -256,9 +253,10 @@ def __init__(self, option_strings, dest, nargs=None, **kwargs): def __call__(self, parser, namespace, values, option_string=None): # type: (argparse.ArgumentParser, argparse.Namespace, Union[AnyStr, Sequence[Any], None], AnyStr) -> None setattr(namespace, - self.dest, # type: ignore - {"class": self.objclass, - "location": file_uri(str(os.path.abspath(cast(AnyStr, values))))}) + self.dest, # type: ignore + {"class": self.objclass, + "location": file_uri(str(os.path.abspath(cast(AnyStr, values))))}) + class FSAppendAction(argparse.Action): objclass = None # type: Text @@ -283,90 +281,94 @@ def __call__(self, parser, namespace, values, option_string=None): {"class": self.objclass, "location": file_uri(str(os.path.abspath(cast(AnyStr, values))))}) + class FileAction(FSAction): objclass = "File" + class DirectoryAction(FSAction): objclass = "Directory" + class FileAppendAction(FSAppendAction): objclass = "File" + class DirectoryAppendAction(FSAppendAction): objclass = "Directory" def add_argument(toolparser, name, inptype, records, description="", - default=None): + default=None): # type: (argparse.ArgumentParser, Text, Any, List[Text], Text, Any) -> None - if len(name) == 1: - flag = "-" - else: - flag = "--" - - required = True - if isinstance(inptype, list): - if inptype[0] == "null": - required = False - if len(inptype) == 2: - inptype = inptype[1] - else: - _logger.debug(u"Can't make command line argument from %s", inptype) - return None - - ahelp = description.replace("%", "%%") - action = None # type: Union[argparse.Action, Text] - atype = None # type: Any - - if inptype == "File": - action = cast(argparse.Action, FileAction) - elif inptype == "Directory": - action = cast(argparse.Action, DirectoryAction) - elif isinstance(inptype, dict) and inptype["type"] == "array": - if inptype["items"] == "File": - action = cast(argparse.Action, FileAppendAction) - elif inptype["items"] == "Directory": - action = cast(argparse.Action, DirectoryAppendAction) - else: - action = "append" - elif isinstance(inptype, dict) and inptype["type"] == "enum": - atype = Text - elif isinstance(inptype, dict) and inptype["type"] == "record": - records.append(name) - for field in inptype['fields']: - fieldname = name+"."+shortname(field['name']) - fieldtype = field['type'] - fielddescription = field.get("doc", "") - add_argument( - toolparser, fieldname, fieldtype, records, - fielddescription) - return - if inptype == "string": - atype = Text - elif inptype == "int": - atype = int - elif inptype == "double": - atype = float - elif inptype == "float": - atype = float - elif inptype == "boolean": - action = "store_true" - - if default: - required = False - - if not atype and not action: - _logger.debug(u"Can't make command line argument from %s", inptype) - return None + if len(name) == 1: + flag = "-" + else: + flag = "--" - if inptype != "boolean": - typekw = { 'type': atype } + required = True + if isinstance(inptype, list): + if inptype[0] == "null": + required = False + if len(inptype) == 2: + inptype = inptype[1] + else: + _logger.debug(u"Can't make command line argument from %s", inptype) + return None + + ahelp = description.replace("%", "%%") + action = None # type: Union[argparse.Action, Text] + atype = None # type: Any + + if inptype == "File": + action = cast(argparse.Action, FileAction) + elif inptype == "Directory": + action = cast(argparse.Action, DirectoryAction) + elif isinstance(inptype, dict) and inptype["type"] == "array": + if inptype["items"] == "File": + action = cast(argparse.Action, FileAppendAction) + elif inptype["items"] == "Directory": + action = cast(argparse.Action, DirectoryAppendAction) else: - typekw = {} + action = "append" + elif isinstance(inptype, dict) and inptype["type"] == "enum": + atype = Text + elif isinstance(inptype, dict) and inptype["type"] == "record": + records.append(name) + for field in inptype['fields']: + fieldname = name + "." + shortname(field['name']) + fieldtype = field['type'] + fielddescription = field.get("doc", "") + add_argument( + toolparser, fieldname, fieldtype, records, + fielddescription) + return + if inptype == "string": + atype = Text + elif inptype == "int": + atype = int + elif inptype == "double": + atype = float + elif inptype == "float": + atype = float + elif inptype == "boolean": + action = "store_true" + + if default: + required = False + + if not atype and not action: + _logger.debug(u"Can't make command line argument from %s", inptype) + return None + + if inptype != "boolean": + typekw = {'type': atype} + else: + typekw = {} - toolparser.add_argument( # type: ignore - flag + name, required=required, help=ahelp, action=action, - default=default, **typekw) + toolparser.add_argument( # type: ignore + flag + name, required=required, help=ahelp, action=action, + default=default, **typekw) def generate_parser(toolparser, tool, namemap, records): @@ -431,16 +433,17 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False, for record_name in records: record = {} record_items = { - k:v for k,v in cmd_line.iteritems() + k: v for k, v in cmd_line.iteritems() if k.startswith(record_name)} for key, value in record_items.iteritems(): - record[key[len(record_name)+1:]] = value + record[key[len(record_name) + 1:]] = value del cmd_line[key] cmd_line[str(record_name)] = record if cmd_line["job_order"]: try: - input_basedir = args.basedir if args.basedir else os.path.abspath(os.path.dirname(cmd_line["job_order"])) + input_basedir = args.basedir if args.basedir else os.path.abspath( + os.path.dirname(cmd_line["job_order"])) job_order_object = loader.resolve_ref(cmd_line["job_order"]) except Exception as e: _logger.error(Text(e), exc_info=args.debug) @@ -448,7 +451,7 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False, else: job_order_object = {"id": args.workflow} - job_order_object.update({namemap[k]: v for k,v in cmd_line.items()}) + job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"Parsed job order from command line: %s", json.dumps(job_order_object, indent=4)) @@ -471,7 +474,7 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False, if print_input_deps: printdeps(job_order_object, loader, stdout, relative_deps, "", - basedir=file_uri(input_basedir+"/")) + basedir=file_uri(input_basedir + "/")) return 0 def pathToLoc(p): @@ -483,7 +486,7 @@ def pathToLoc(p): adjustFileObjs(job_order_object, pathToLoc) normalizeFilesDirs(job_order_object) adjustDirObjs(job_order_object, cast(Callable[..., Any], - functools.partial(getListing, make_fs_access(input_basedir)))) + functools.partial(getListing, make_fs_access(input_basedir)))) if "cwl:tool" in job_order_object: del job_order_object["cwl:tool"] @@ -492,6 +495,7 @@ def pathToLoc(p): return (job_order_object, input_basedir) + def makeRelative(base, ob): u = ob.get("location", ob.get("path")) if ":" in u.split("/")[0] and not u.startswith("file://"): @@ -501,6 +505,7 @@ def makeRelative(base, ob): u = uri_file_path(u) ob["location"] = os.path.relpath(u, base) + def printdeps(obj, document_loader, stdout, relative_deps, uri, basedir=None): # type: (Dict[Text, Any], Loader, IO[Any], bool, Text, Text) -> None deps = {"class": "File", @@ -510,8 +515,8 @@ def loadref(b, u): return document_loader.fetch(document_loader.fetcher.urljoin(b, u)) sf = scandeps( - basedir if basedir else uri, obj, set(("$import", "run")), - set(("$include", "$schemas", "location")), loadref) + basedir if basedir else uri, obj, {"$import", "run"}, + {"$include", "$schemas", "location"}, loadref) if sf: deps["secondaryFiles"] = sf @@ -528,6 +533,7 @@ def loadref(b, u): stdout.write(json.dumps(deps, indent=4)) + def print_pack(document_loader, processobj, uri, metadata): # type: (Loader, Union[Dict[unicode, Any], List[Dict[unicode, Any]]], unicode, Dict[unicode, Any]) -> str packed = pack(document_loader, processobj, uri, metadata) @@ -536,6 +542,7 @@ def print_pack(document_loader, processobj, uri, metadata): else: return json.dumps(packed["$graph"][0], indent=4) + def versionstring(): # type: () -> Text pkg = pkg_resources.require("cwltool") @@ -546,7 +553,7 @@ def versionstring(): def main(argsl=None, # type: List[str] - args=None, # type: argparse.Namespace + args=None, # type: argparse.Namespace executor=single_job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]] makeTool=workflow.defaultMakeTool, # type: Callable[..., Process] selectResources=None, # type: Callable[[Dict[Text, int]], Dict[Text, int]] @@ -577,29 +584,29 @@ def main(argsl=None, # type: List[str] # If caller provided custom arguments, it may be not every expected # option is set, so fill in no-op defaults to avoid crashing when # dereferencing them in args. - for k,v in {'print_deps': False, - 'print_pre': False, - 'print_rdf': False, - 'print_dot': False, - 'relative_deps': False, - 'tmp_outdir_prefix': 'tmp', - 'tmpdir_prefix': 'tmp', - 'print_input_deps': False, - 'cachedir': None, - 'quiet': False, - 'debug': False, - 'version': False, - 'enable_dev': False, - 'strict': True, - 'rdf_serializer': None, - 'basedir': None, - 'tool_help': False, - 'workflow': None, - 'job_order': None, - 'pack': False, - 'on_error': 'continue', - 'relax_path_checks': False, - 'validate': False}.iteritems(): + for k, v in {'print_deps': False, + 'print_pre': False, + 'print_rdf': False, + 'print_dot': False, + 'relative_deps': False, + 'tmp_outdir_prefix': 'tmp', + 'tmpdir_prefix': 'tmp', + 'print_input_deps': False, + 'cachedir': None, + 'quiet': False, + 'debug': False, + 'version': False, + 'enable_dev': False, + 'strict': True, + 'rdf_serializer': None, + 'basedir': None, + 'tool_help': False, + 'workflow': None, + 'job_order': None, + 'pack': False, + 'on_error': 'continue', + 'relax_path_checks': False, + 'validate': False}.iteritems(): if not hasattr(args, k): setattr(args, k, v) @@ -625,7 +632,8 @@ def main(argsl=None, # type: List[str] draft2tool.ACCEPTLIST_RE = draft2tool.ACCEPTLIST_EN_RELAXED_RE try: - document_loader, workflowobj, uri = fetch_document(args.workflow, resolver=resolver, fetcher_constructor=fetcher_constructor) + document_loader, workflowobj, uri = fetch_document(args.workflow, resolver=resolver, + fetcher_constructor=fetcher_constructor) if args.print_deps: printdeps(workflowobj, document_loader, stdout, args.relative_deps, uri) @@ -649,7 +657,7 @@ def main(argsl=None, # type: List[str] return 0 tool = make_tool(document_loader, avsc_names, metadata, uri, - makeTool, vars(args)) + makeTool, vars(args)) if args.print_rdf: printrdf(tool, document_loader.ctx, args.rdf_serializer, stdout) @@ -682,7 +690,7 @@ def main(argsl=None, # type: List[str] if getattr(args, dirprefix) and getattr(args, dirprefix) != 'tmp': sl = "/" if getattr(args, dirprefix).endswith("/") or dirprefix == "cachedir" else "" setattr(args, dirprefix, - os.path.abspath(getattr(args, dirprefix))+sl) + os.path.abspath(getattr(args, dirprefix)) + sl) if not os.path.exists(os.path.dirname(getattr(args, dirprefix))): try: os.makedirs(os.path.dirname(getattr(args, dirprefix))) @@ -711,10 +719,10 @@ def main(argsl=None, # type: List[str] del args.workflow del args.job_order (out, status) = executor(tool, job_order_object[0], - makeTool=makeTool, - select_resources=selectResources, - make_fs_access=make_fs_access, - **vars(args)) + makeTool=makeTool, + select_resources=selectResources, + make_fs_access=make_fs_access, + **vars(args)) # This is the workflow output, it needs to be written if out is not None: @@ -741,7 +749,7 @@ def locToPath(p): except (validate.ValidationException) as exc: _logger.error(u"Input object failed validation:\n%s", exc, - exc_info=args.debug) + exc_info=args.debug) return 1 except UnsupportedRequirement as exc: _logger.error( @@ -759,7 +767,6 @@ def locToPath(p): " %s", exc, exc_info=args.debug) return 1 - return 0 finally: _logger.removeHandler(stderr_handler) _logger.addHandler(defaultStreamHandler) diff --git a/cwltool/pack.py b/cwltool/pack.py index 6d41f6e00..30f95cdea 100644 --- a/cwltool/pack.py +++ b/cwltool/pack.py @@ -2,10 +2,10 @@ import urlparse from schema_salad.ref_resolver import Loader +from typing import Union, Any, cast, Callable, Dict, Text -from .process import scandeps, shortname, uniquename +from .process import shortname, uniquename -from typing import Union, Any, cast, Callable, Dict, Tuple, Type, IO, Text def flatten_deps(d, files): # type: (Any, Set[Text]) -> None if isinstance(d, list): @@ -19,6 +19,7 @@ def flatten_deps(d, files): # type: (Any, Set[Text]) -> None if "listing" in d: flatten_deps(d["listing"], files) + def find_run(d, loadref, runs): # type: (Any, Callable[[Text, Text], Union[Dict, List, Text]], Set[Text]) -> None if isinstance(d, list): for s in d: @@ -31,6 +32,7 @@ def find_run(d, loadref, runs): # type: (Any, Callable[[Text, Text], Union[Dict for s in d.values(): find_run(s, loadref, runs) + def find_ids(d, ids): # type: (Any, Set[Text]) -> None if isinstance(d, list): for s in d: @@ -42,10 +44,11 @@ def find_ids(d, ids): # type: (Any, Set[Text]) -> None for s in d.values(): find_ids(s, ids) + def replace_refs(d, rewrite, stem, newstem): # type: (Any, Dict[Text, Text], Text, Text) -> None if isinstance(d, list): - for s,v in enumerate(d): + for s, v in enumerate(d): if isinstance(v, (str, unicode)): if v in rewrite: d[s] = rewrite[v] @@ -54,7 +57,7 @@ def replace_refs(d, rewrite, stem, newstem): else: replace_refs(v, rewrite, stem, newstem) elif isinstance(d, dict): - for s,v in d.items(): + for s, v in d.items(): if isinstance(v, (str, unicode)): if v in rewrite: d[s] = rewrite[v] @@ -62,13 +65,14 @@ def replace_refs(d, rewrite, stem, newstem): d[s] = newstem + v[len(stem):] replace_refs(v, rewrite, stem, newstem) + def pack(document_loader, processobj, uri, metadata): # type: (Loader, Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Dict[Text, Text]) -> Dict[Text, Any] def loadref(b, u): # type: (Text, Text) -> Union[Dict, List, Text] return document_loader.resolve_ref(u, base_url=b)[0] - runs = set((uri,)) + runs = {uri} find_run(processobj, loadref, runs) ids = set() # type: Set[Text] @@ -76,7 +80,7 @@ def loadref(b, u): find_ids(document_loader.resolve_ref(f)[0], ids) names = set() # type: Set[Text] - rewrite = {} # type: Dict[Text, Text] + rewrite = {} # type: Dict[Text, Text] mainpath, _ = urlparse.urldefrag(uri) @@ -101,7 +105,7 @@ def rewrite_id(r, mainuri): rewrite_id(r, uri) packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"] - } # type: Dict[Text, Any] + } # type: Dict[Text, Any] schemas = set() # type: Set[Text] for r in sorted(runs): @@ -127,6 +131,6 @@ def rewrite_id(r, mainuri): for r in rewrite: v = rewrite[r] - replace_refs(packed, rewrite, r+"/" if "#" in r else r+"#", v+"/") + replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/") return packed diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 3e1aa2e3f..2bdfbc517 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -1,20 +1,22 @@ -import os +import collections import logging +import os import stat -import collections -import uuid import urllib import urlparse +import uuid from functools import partial -from typing import Any, Callable, Set, Text, Tuple, Union + import schema_salad.validate as validate -from schema_salad.sourceline import SourceLine from schema_salad.ref_resolver import uri_file_path +from schema_salad.sourceline import SourceLine +from typing import Any, Callable, Set, Text, Tuple, Union _logger = logging.getLogger("cwltool") MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type"]) + def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply a mapping function to each File path in the object `rec`.""" @@ -27,6 +29,7 @@ def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]] for d in rec: adjustFiles(d, op) + def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each File object in the object `rec`.""" @@ -39,6 +42,7 @@ def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[An for d in rec: adjustFileObjs(d, op) + def adjustDirObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each Directory object in the object `rec`.""" @@ -52,6 +56,7 @@ def adjustDirObjs(rec, op): for d in rec: adjustDirObjs(d, op) + def normalizeFilesDirs(job): # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]]) -> None def addLocation(d): @@ -59,7 +64,8 @@ def addLocation(d): if d["class"] == "File" and ("contents" not in d): raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.") if d["class"] == "Directory" and ("listing" not in d or "basename" not in d): - raise validate.ValidationException("Anonymous directory object must have 'listing' and 'basename' fields.") + raise validate.ValidationException( + "Anonymous directory object must have 'listing' and 'basename' fields.") d["location"] = "_:" + Text(uuid.uuid4()) if "basename" not in d: d["basename"] = Text(uuid.uuid4()) @@ -79,6 +85,7 @@ def abspath(src, basedir): # type: (Text, Text) -> Text ab = src if os.path.isabs(src) else os.path.join(basedir, src) return ab + def dedup(listing): # type: (List[Any]) -> List[Any] marksub = set() @@ -102,7 +109,6 @@ def mark(d): class PathMapper(object): - """Mapping of files from relative path provided in the file to a tuple of (absolute local path, absolute container path) diff --git a/cwltool/process.py b/cwltool/process.py index ca0b19550..92f3903d8 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -1,39 +1,36 @@ -import os -import json +import abc import copy +import errno +import hashlib +import json import logging -import pprint +import os +import shutil import stat import tempfile -import glob import urlparse -from collections import Iterable -import errno -import shutil import uuid -import hashlib +from collections import Iterable -import abc -import schema_salad.validate as validate -import schema_salad.schema -from schema_salad.ref_resolver import Loader, file_uri -from schema_salad.sourceline import SourceLine import avro.schema -from typing import (Any, AnyStr, Callable, cast, Dict, List, Generator, IO, Text, - Tuple, Union) +import schema_salad.schema +import schema_salad.validate as validate +from pkg_resources import resource_stream +from rdflib import Graph from rdflib import URIRef from rdflib.namespace import RDFS, OWL -from rdflib import Graph -from pkg_resources import resource_stream - - from ruamel.yaml.comments import CommentedSeq, CommentedMap +from schema_salad.ref_resolver import Loader, file_uri +from schema_salad.sourceline import SourceLine +from typing import (Any, AnyStr, Callable, cast, Dict, List, Generator, Text, + Tuple, Union) -from .utils import aslist, get_feature -from .stdfsaccess import StdFsAccess -from .builder import Builder, adjustFileObjs, adjustDirObjs +from .builder import Builder, adjustFileObjs +from .pathmapper import adjustDirObjs from .errors import WorkflowException, UnsupportedRequirement -from .pathmapper import PathMapper, abspath, normalizeFilesDirs +from .pathmapper import PathMapper, normalizeFilesDirs +from .stdfsaccess import StdFsAccess +from .utils import aslist, get_feature _logger = logging.getLogger("cwltool") @@ -61,31 +58,32 @@ salad_files = ('metaschema.yml', 'metaschema_base.yml', - 'salad.md', - 'field_name.yml', - 'import_include.md', - 'link_res.yml', - 'ident_res.yml', - 'vocab_res.yml', - 'vocab_res.yml', - 'field_name_schema.yml', - 'field_name_src.yml', - 'field_name_proc.yml', - 'ident_res_schema.yml', - 'ident_res_src.yml', - 'ident_res_proc.yml', - 'link_res_schema.yml', - 'link_res_src.yml', - 'link_res_proc.yml', - 'vocab_res_schema.yml', - 'vocab_res_src.yml', - 'vocab_res_proc.yml') + 'salad.md', + 'field_name.yml', + 'import_include.md', + 'link_res.yml', + 'ident_res.yml', + 'vocab_res.yml', + 'vocab_res.yml', + 'field_name_schema.yml', + 'field_name_src.yml', + 'field_name_proc.yml', + 'ident_res_schema.yml', + 'ident_res_src.yml', + 'ident_res_proc.yml', + 'link_res_schema.yml', + 'link_res_src.yml', + 'link_res_proc.yml', + 'vocab_res_schema.yml', + 'vocab_res_src.yml', + 'vocab_res_proc.yml') SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[avro.schema.Names, avro.schema.SchemaParseException], Dict[Text, Any], Loader]] SCHEMA_FILE = None # type: Dict[Text, Any] SCHEMA_DIR = None # type: Dict[Text, Any] SCHEMA_ANY = None # type: Dict[Text, Any] + def get_schema(version): # type: (Text) -> Tuple[Loader, Union[avro.schema.Names, avro.schema.SchemaParseException], Dict[Text,Any], Loader] @@ -108,7 +106,7 @@ def get_schema(version): try: res = resource_stream( __name__, 'schemas/%s/salad/schema_salad/metaschema/%s' - % (version, f)) + % (version, f)) cache["https://w3id.org/cwl/salad/schema_salad/metaschema/" + f] = res.read() res.close() @@ -120,6 +118,7 @@ def get_schema(version): return SCHEMA_CACHE[version] + def shortname(inputid): # type: (Text) -> Text d = urlparse.urlparse(inputid) @@ -128,6 +127,7 @@ def shortname(inputid): else: return d.path.split(u"/")[-1] + def checkRequirements(rec, supportedProcessRequirements): # type: (Any, Iterable[Any]) -> None if isinstance(rec, dict): @@ -142,6 +142,7 @@ def checkRequirements(rec, supportedProcessRequirements): for d in rec: checkRequirements(d, supportedProcessRequirements) + def adjustFilesWithSecondary(rec, op, primary=None): """Apply a mapping function to each File path in the object `rec`, propagating the primary file associated with a group of secondary files. @@ -159,6 +160,7 @@ def adjustFilesWithSecondary(rec, op, primary=None): for d in rec: adjustFilesWithSecondary(d, op, primary) + def getListing(fs_access, rec): # type: (StdFsAccess, Dict[Text, Any]) -> None if "listing" not in rec: @@ -174,6 +176,7 @@ def getListing(fs_access, rec): listing.append({"class": "File", "location": ld}) rec["listing"] = listing + def stageFiles(pm, stageFunc, ignoreWritable=False): # type: (PathMapper, Callable[..., Any], bool) -> None for f, p in pm.items(): @@ -187,6 +190,7 @@ def stageFiles(pm, stageFunc, ignoreWritable=False): with open(p.target, "w") as n: n.write(p.resolved.encode("utf-8")) + def collectFilesAndDirs(obj, out): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], List[Dict[Text, Any]]) -> None if isinstance(obj, dict): @@ -199,6 +203,7 @@ def collectFilesAndDirs(obj, out): for l in obj: collectFilesAndDirs(l, out) + def relocateOutputs(outputObj, outdir, output_dirs, action): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Set[Text], Text) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] if action not in ("move", "copy"): @@ -232,6 +237,7 @@ def _check_adjust(f): return outputObj + def cleanIntermediate(output_dirs): # type: (Set[Text]) -> None for a in output_dirs: if os.path.exists(a) and empty_subtree(a): @@ -256,17 +262,17 @@ def formatSubclassOf(fmt, cls, ontology, visited): uriRefFmt = URIRef(fmt) - for s,p,o in ontology.triples( (uriRefFmt, RDFS.subClassOf, None) ): + for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): # Find parent classes of `fmt` and search upward if formatSubclassOf(o, cls, ontology, visited): return True - for s,p,o in ontology.triples( (uriRefFmt, OWL.equivalentClass, None) ): + for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(o, cls, ontology, visited): return True - for s,p,o in ontology.triples( (None, OWL.equivalentClass, uriRefFmt) ): + for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(s, cls, ontology, visited): return True @@ -282,7 +288,9 @@ def checkFormat(actualFile, inputFormats, ontology): for inpf in aslist(inputFormats): if af["format"] == inpf or formatSubclassOf(af["format"], inpf, ontology, set()): return - raise validate.ValidationException(u"Incompatible file format %s required format(s) %s" % (af["format"], inputFormats)) + raise validate.ValidationException( + u"Incompatible file format %s required format(s) %s" % (af["format"], inputFormats)) + def fillInDefaults(inputs, job): # type: (List[Dict[Text, Text]], Dict[Text, Union[Dict[Text, Any], List, Text]]) -> None @@ -309,13 +317,14 @@ def avroize_type(field_type, name_prefix=""): elif isinstance(field_type, dict): if field_type["type"] in ("enum", "record"): if "name" not in field_type: - field_type["name"] = name_prefix+Text(uuid.uuid4()) + field_type["name"] = name_prefix + Text(uuid.uuid4()) if field_type["type"] == "record": avroize_type(field_type["fields"], name_prefix) if field_type["type"] == "array": avroize_type(field_type["items"], name_prefix) return field_type + class Process(object): __metaclass__ = abc.ABCMeta @@ -339,9 +348,9 @@ def __init__(self, toolpath_object, **kwargs): if SCHEMA_FILE is None: get_schema("v1.0") SCHEMA_ANY = cast(Dict[Text, Any], - SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) + SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) SCHEMA_FILE = cast(Dict[Text, Any], - SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) + SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) SCHEMA_DIR = cast(Dict[Text, Any], SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"]) @@ -363,7 +372,7 @@ def __init__(self, toolpath_object, **kwargs): checkRequirements(self.tool, supportedProcessRequirements) self.validate_hints(kwargs["avsc_names"], self.tool.get("hints", []), - strict=kwargs.get("strict")) + strict=kwargs.get("strict")) self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]] @@ -407,14 +416,17 @@ def __init__(self, toolpath_object, **kwargs): self.inputs_record_schema = schema_salad.schema.make_valid_avro(self.inputs_record_schema, {}, set()) avro.schema.make_avsc_object(self.inputs_record_schema, self.names) except avro.schema.SchemaParseException as e: - raise validate.ValidationException(u"Got error `%s` while processing inputs of %s:\n%s" % (Text(e), self.tool["id"], json.dumps(self.inputs_record_schema, indent=4))) + raise validate.ValidationException(u"Got error `%s` while processing inputs of %s:\n%s" % + (Text(e), self.tool["id"], + json.dumps(self.inputs_record_schema, indent=4))) try: self.outputs_record_schema = schema_salad.schema.make_valid_avro(self.outputs_record_schema, {}, set()) avro.schema.make_avsc_object(self.outputs_record_schema, self.names) except avro.schema.SchemaParseException as e: - raise validate.ValidationException(u"Got error `%s` while processing outputs of %s:\n%s" % (Text(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4))) - + raise validate.ValidationException(u"Got error `%s` while processing outputs of %s:\n%s" % + (Text(e), self.tool["id"], + json.dumps(self.outputs_record_schema, indent=4))) def _init_job(self, joborder, **kwargs): # type: (Dict[Text, Text], **Any) -> Builder @@ -436,7 +448,7 @@ def _init_job(self, joborder, **kwargs): builder = Builder() builder.job = cast(Dict[Text, Union[Dict[Text, Any], List, - Text]], copy.deepcopy(joborder)) + Text]], copy.deepcopy(joborder)) # Validate job order try: @@ -459,13 +471,15 @@ def _init_job(self, joborder, **kwargs): dockerReq, is_req = self.get_requirement("DockerRequirement") if dockerReq and is_req and not kwargs.get("use_container"): - raise WorkflowException("Document has DockerRequirement under 'requirements' but use_container is false. DockerRequirement must be under 'hints' or use_container must be true.") + raise WorkflowException( + "Document has DockerRequirement under 'requirements' but use_container is false. DockerRequirement must be under 'hints' or use_container must be true.") builder.make_fs_access = kwargs.get("make_fs_access") or StdFsAccess builder.fs_access = builder.make_fs_access(kwargs["basedir"]) if dockerReq and kwargs.get("use_container"): - builder.outdir = builder.fs_access.realpath(dockerReq.get("dockerOutputDirectory") or kwargs.get("docker_outdir") or "/var/spool/cwl") + builder.outdir = builder.fs_access.realpath( + dockerReq.get("dockerOutputDirectory") or kwargs.get("docker_outdir") or "/var/spool/cwl") builder.tmpdir = builder.fs_access.realpath(kwargs.get("docker_tmpdir") or "/tmp") builder.stagedir = builder.fs_access.realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl") else: @@ -541,18 +555,18 @@ def evalResources(self, builder, kwargs): for a in ("cores", "ram", "tmpdir", "outdir"): mn = None mx = None - if resourceReq.get(a+"Min"): - mn = builder.do_eval(resourceReq[a+"Min"]) - if resourceReq.get(a+"Max"): - mx = builder.do_eval(resourceReq[a+"Max"]) + if resourceReq.get(a + "Min"): + mn = builder.do_eval(resourceReq[a + "Min"]) + if resourceReq.get(a + "Max"): + mx = builder.do_eval(resourceReq[a + "Max"]) if mn is None: mn = mx elif mx is None: mx = mn if mn: - request[a+"Min"] = mn - request[a+"Max"] = mx + request[a + "Min"] = mn + request[a + "Max"] = mx if kwargs.get("select_resources"): return kwargs["select_resources"](request) @@ -570,8 +584,8 @@ def validate_hints(self, avsc_names, hints, strict): sl = SourceLine(hints, i, validate.ValidationException) with sl: if avsc_names.get_name(r["class"], "") is not None: - plain_hint = dict((key,r[key]) for key in r if key not in - self.doc_loader.identifiers) # strip identifiers + plain_hint = dict((key, r[key]) for key in r if key not in + self.doc_loader.identifiers) # strip identifiers validate.validate_ex( avsc_names.get_name(plain_hint["class"], ""), plain_hint, strict=strict) @@ -589,6 +603,7 @@ def job(self, job_order, output_callbacks, **kwargs): # type: (Dict[Text, Text], Callable[[Any, Any], Any], **Any) -> Generator[Any, None, None] return None + def empty_subtree(dirpath): # type: (Text) -> bool # Test if a directory tree contains any files (does not count empty # subdirectories) @@ -610,6 +625,7 @@ def empty_subtree(dirpath): # type: (Text) -> bool _names = set() # type: Set[Text] + def uniquename(stem, names=None): # type: (Text, Set[Text]) -> Text global _names if names is None: @@ -622,6 +638,7 @@ def uniquename(stem, names=None): # type: (Text, Set[Text]) -> Text names.add(u) return u + def nestdir(base, deps): # type: (Text, Dict[Text, Any]) -> Dict[Text, Any] dirname = os.path.dirname(base) + "/" @@ -639,6 +656,7 @@ def nestdir(base, deps): } return deps + def mergedirs(listing): # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]] r = [] # type: List[Dict[Text, Any]] @@ -654,6 +672,7 @@ def mergedirs(listing): r.extend(ents.itervalues()) return r + def scandeps(base, doc, reffields, urlfields, loadref, urljoin=urlparse.urljoin): # type: (Text, Any, Set[Text], Set[Text], Callable[[Text, Text], Any], Callable[[Text, Text], Text]) -> List[Dict[Text, Text]] r = [] # type: List[Dict[Text, Text]] @@ -725,14 +744,15 @@ def scandeps(base, doc, reffields, urlfields, loadref, urljoin=urlparse.urljoin) return r + def compute_checksums(fs_access, fileobj): if "checksum" not in fileobj: checksum = hashlib.sha1() with fs_access.open(fileobj["location"], "rb") as f: - contents = f.read(1024*1024) + contents = f.read(1024 * 1024) while contents != "": checksum.update(contents) - contents = f.read(1024*1024) + contents = f.read(1024 * 1024) f.seek(0, 2) filesize = f.tell() fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() diff --git a/cwltool/resolver.py b/cwltool/resolver.py index cf753f362..8a08fbba3 100644 --- a/cwltool/resolver.py +++ b/cwltool/resolver.py @@ -1,12 +1,11 @@ -import os import logging -import urllib -import urlparse +import os from schema_salad.ref_resolver import file_uri _logger = logging.getLogger("cwltool") + def resolve_local(document_loader, uri): if uri.startswith("/"): return None @@ -24,6 +23,7 @@ def resolve_local(document_loader, uri): return file_uri(s) return None + def tool_resolver(document_loader, uri): for r in [resolve_local]: ret = r(document_loader, uri) diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py index 70a9f1976..21c3a532f 100644 --- a/cwltool/sandboxjs.py +++ b/cwltool/sandboxjs.py @@ -1,15 +1,16 @@ -import subprocess -import json -import threading +import cStringIO import errno +import json import logging -import select import os - -import cStringIO +import select +import subprocess +import threading from cStringIO import StringIO -from typing import Any, Dict, List, Mapping, Text, TypeVar, Union + from pkg_resources import resource_stream +from typing import Any, Dict, List, Mapping, Text, Union + class JavascriptException(Exception): pass @@ -17,12 +18,13 @@ class JavascriptException(Exception): _logger = logging.getLogger("cwltool") -JSON = Union[Dict[Text,Any], List[Any], Text, int, long, float, bool, None] +JSON = Union[Dict[Text, Any], List[Any], Text, int, long, float, bool, None] localdata = threading.local() have_node_slim = False + def new_js_proc(): # type: () -> subprocess.Popen @@ -33,7 +35,8 @@ def new_js_proc(): trynodes = ("nodejs", "node") for n in trynodes: try: - nodejs = subprocess.Popen([n, "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + nodejs = subprocess.Popen([n, "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) break except OSError as e: if e.errno == errno.ENOENT: @@ -80,7 +83,8 @@ def execjs(js, jslib, timeout=None, debug=False): # type: (Union[Mapping, Text] nodejs = localdata.proc - fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js)) + fn = u"\"use strict\";\n%s\n(function()%s)()" %\ + (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js)) killed = [] @@ -97,7 +101,7 @@ def term(): tm = threading.Timer(timeout, term) tm.start() - stdin_buf = StringIO(json.dumps(fn)+"\n") + stdin_buf = StringIO(json.dumps(fn) + "\n") stdout_buf = StringIO() stderr_buf = StringIO() @@ -132,9 +136,9 @@ def fn_linenum(): # type: () -> Text ofs = 0 maxlines = 99 if len(lines) > maxlines: - ofs = len(lines)-maxlines + ofs = len(lines) - maxlines lines = lines[-maxlines:] - return u"\n".join(u"%02i %s" % (i+ofs+1, b) for i, b in enumerate(lines)) + return u"\n".join(u"%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines)) def stdfmt(data): # type: (unicode) -> unicode if "\n" in data: @@ -142,7 +146,8 @@ def stdfmt(data): # type: (unicode) -> unicode return data if debug: - info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" % (nodejs.returncode, fn_linenum(), stdfmt(stdoutdata), stdfmt(stderrdata)) + info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" %\ + (nodejs.returncode, fn_linenum(), stdfmt(stdoutdata), stdfmt(stderrdata)) else: info = stdfmt(stderrdata) @@ -155,4 +160,5 @@ def stdfmt(data): # type: (unicode) -> unicode try: return json.loads(stdoutdata) except ValueError as e: - raise JavascriptException(u"%s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (e, fn_linenum(), stdoutdata, stderrdata)) + raise JavascriptException(u"%s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % + (e, fn_linenum(), stdoutdata, stderrdata)) diff --git a/cwltool/stdfsaccess.py b/cwltool/stdfsaccess.py index 2db2a5dfa..302235ea6 100644 --- a/cwltool/stdfsaccess.py +++ b/cwltool/stdfsaccess.py @@ -1,11 +1,13 @@ -from typing import Any, BinaryIO, Text -from .pathmapper import abspath import glob import os + from schema_salad.ref_resolver import file_uri +from typing import BinaryIO, Text + +from .pathmapper import abspath -class StdFsAccess(object): +class StdFsAccess(object): def __init__(self, basedir): # type: (Text) -> None self.basedir = basedir diff --git a/cwltool/update.py b/cwltool/update.py index 762ad0317..f549d1a65 100644 --- a/cwltool/update.py +++ b/cwltool/update.py @@ -1,18 +1,17 @@ -import sys -import urlparse +import copy import json import re import traceback -import copy +import urlparse -from schema_salad.ref_resolver import Loader import schema_salad.validate -from typing import Any, Callable, Dict, List, Text, Tuple, Union # pylint: disable=unused-import - from ruamel.yaml.comments import CommentedSeq, CommentedMap +from schema_salad.ref_resolver import Loader +from typing import Any, Callable, Dict, Text, Tuple, Union # pylint: disable=unused-import from .utils import aslist + def findId(doc, frg): # type: (Any, Any) -> Dict if isinstance(doc, dict): if "id" in doc and doc["id"] == frg: @@ -29,6 +28,7 @@ def findId(doc, frg): # type: (Any, Any) -> Dict return f return None + def fixType(doc): # type: (Any) -> Any if isinstance(doc, list): for i, f in enumerate(doc): @@ -42,6 +42,7 @@ def fixType(doc): # type: (Any) -> Any return "#" + doc return doc + def _draft2toDraft3dev1(doc, loader, baseuri, update_steps=True): # type: (Any, Loader, Text, bool) -> Any try: @@ -82,7 +83,6 @@ def _draft2toDraft3dev1(doc, loader, baseuri, update_steps=True): doc["requirements"] = [] doc["requirements"].append({"class": "MultipleInputFeatureRequirement"}) - for a in doc: doc[a] = _draft2toDraft3dev1(doc[a], loader, baseuri) @@ -99,6 +99,7 @@ def _draft2toDraft3dev1(doc, loader, baseuri, update_steps=True): err = doc["name"] raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc())) + def draft2toDraft3dev1(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (_draft2toDraft3dev1(doc, loader, baseuri), "draft-3.dev1") @@ -106,6 +107,7 @@ def draft2toDraft3dev1(doc, loader, baseuri): digits = re.compile("\d+") + def updateScript(sc): # type: (Text) -> Text sc = sc.replace("$job", "inputs") sc = sc.replace("$tmpdir", "runtime.tmpdir") @@ -124,7 +126,7 @@ def _updateDev2Script(ent): # type: (Any) -> Any if not sp[0]: sp.pop(0) front = sp.pop(0) - sp = [Text(i) if digits.match(i) else "'"+i+"'" + sp = [Text(i) if digits.match(i) else "'" + i + "'" for i in sp] if front == "job": return u"$(inputs[%s])" % ']['.join(sp) @@ -167,7 +169,7 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri): if r["class"] == "ExpressionEngineRequirement": if "engineConfig" in r: doc["requirements"].append({ - "class":"InlineJavascriptRequirement", + "class": "InlineJavascriptRequirement", "expressionLib": [updateScript(sc) for sc in aslist(r["engineConfig"])] }) added = True @@ -179,7 +181,7 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri): else: doc["requirements"] = [] if not added: - doc["requirements"].append({"class":"InlineJavascriptRequirement"}) + doc["requirements"].append({"class": "InlineJavascriptRequirement"}) elif isinstance(doc, list): for i, a in enumerate(doc): @@ -192,6 +194,7 @@ def draftDraft3dev1toDev2(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (_draftDraft3dev1toDev2(doc, loader, baseuri), "draft-3.dev2") + def _draftDraft3dev2toDev3(doc, loader, baseuri): # type: (Any, Loader, Text) -> Any try: @@ -236,6 +239,7 @@ def _draftDraft3dev2toDev3(doc, loader, baseuri): import traceback raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc())) + def draftDraft3dev2toDev3(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (_draftDraft3dev2toDev3(doc, loader, baseuri), "draft-3.dev3") @@ -298,6 +302,7 @@ def draftDraft3dev3toDev4(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (_draftDraft3dev3toDev4(doc, loader, baseuri), "draft-3.dev4") + def _draftDraft3dev4toDev5(doc, loader, baseuri): # type: (Any, Loader, Text) -> Any try: @@ -332,10 +337,12 @@ def draftDraft3dev4toDev5(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (_draftDraft3dev4toDev5(doc, loader, baseuri), "draft-3.dev5") + def draftDraft3dev5toFinal(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] return (doc, "draft-3") + def _draft3toDraft4dev1(doc, loader, baseuri): # type: (Any, Loader, Text) -> Any if isinstance(doc, dict): @@ -369,11 +376,13 @@ def fixup(f): # type: (Text) -> Text return doc + def draft3toDraft4dev1(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for draft-3 to draft-4.dev1.""" return (_draft3toDraft4dev1(doc, loader, baseuri), "draft-4.dev1") + def _draft4Dev1toDev2(doc, loader, baseuri): # type: (Any, Loader, Text) -> Any if isinstance(doc, dict): @@ -389,6 +398,7 @@ def _draft4Dev1toDev2(doc, loader, baseuri): return doc + def draft4Dev1toDev2(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for draft-4.dev1 to draft-4.dev2.""" @@ -423,11 +433,13 @@ def _draft4Dev2toDev3(doc, loader, baseuri): return doc + def draft4Dev2toDev3(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for draft-4.dev2 to draft-4.dev3.""" return (_draft4Dev2toDev3(doc, loader, baseuri), "draft-4.dev3") + def _draft4Dev3to1_0dev4(doc, loader, baseuri): # type: (Any, Loader, Text) -> Any if isinstance(doc, dict): @@ -441,16 +453,19 @@ def _draft4Dev3to1_0dev4(doc, loader, baseuri): doc[i] = _draft4Dev3to1_0dev4(a, loader, baseuri) return doc + def draft4Dev3to1_0dev4(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for draft-4.dev3 to v1.0.dev4.""" return (_draft4Dev3to1_0dev4(doc, loader, baseuri), "v1.0.dev4") + def v1_0dev4to1_0(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for v1.0.dev4 to v1.0.""" return (doc, "v1.0") + def v1_0to1_1_0dev1(doc, loader, baseuri): # type: (Any, Loader, Text) -> Tuple[Any, Text] """Public updater for v1.0 to v1.1.0-dev1.""" @@ -482,11 +497,13 @@ def v1_0to1_1_0dev1(doc, loader, baseuri): LATEST = "v1.0" + def identity(doc, loader, baseuri): # pylint: disable=unused-argument # type: (Any, Loader, Text) -> Tuple[Any, Union[Text, Text]] """The default, do-nothing, CWL document upgrade function.""" return (doc, doc["cwlVersion"]) + def checkversion(doc, metadata, enable_dev): # type: (Union[CommentedSeq, CommentedMap], CommentedMap, bool) -> Tuple[Dict[Text, Any], Text] # pylint: disable=line-too-long """Checks the validity of the version of the give CWL document. @@ -526,6 +543,7 @@ def checkversion(doc, metadata, enable_dev): return (cdoc, version) + def update(doc, loader, baseuri, enable_dev, metadata): # type: (Union[CommentedSeq, CommentedMap], Loader, Text, bool, Any) -> dict diff --git a/cwltool/utils.py b/cwltool/utils.py index 2f1abf31a..8f341c0a9 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -2,12 +2,14 @@ from typing import Any, Tuple + def aslist(l): # type: (Any) -> List[Any] if isinstance(l, list): return l else: return [l] + def get_feature(self, feature): # type: (Any, Any) -> Tuple[Any, bool] for t in reversed(self.requirements): if t["class"] == feature: diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 16912a0e1..0e4f59739 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -1,32 +1,27 @@ import copy +import functools +import json import logging import random -import os -from collections import namedtuple -import functools -import urlparse import tempfile -import shutil -import json - -from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union +from collections import namedtuple import schema_salad.validate as validate from schema_salad.sourceline import SourceLine +from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union -from . import job from . import draft2tool -from .utils import aslist -from .process import Process, get_feature, empty_subtree, shortname, uniquename -from .errors import WorkflowException from . import expression +from .errors import WorkflowException from .load_tool import load_tool - +from .process import Process, shortname, uniquename +from .utils import aslist _logger = logging.getLogger("cwltool") WorkflowStateItem = namedtuple('WorkflowStateItem', ['parameter', 'value']) + def defaultMakeTool(toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> Process if not isinstance(toolpath_object, dict): @@ -39,7 +34,10 @@ def defaultMakeTool(toolpath_object, **kwargs): elif toolpath_object["class"] == "Workflow": return Workflow(toolpath_object, **kwargs) - raise WorkflowException(u"Missing or invalid 'class' field in %s, expecting one of: CommandLineTool, ExpressionTool, Workflow" % toolpath_object["id"]) + raise WorkflowException( + u"Missing or invalid 'class' field in %s, expecting one of: CommandLineTool, ExpressionTool, Workflow" % + toolpath_object["id"]) + def findfiles(wo, fn=None): # type: (Any, List) -> List[Dict[Text, Any]] if fn is None: @@ -74,24 +72,25 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom): return False return True elif linkMerge: - if iid not in inputobj: - inputobj[iid] = [] - if linkMerge == "merge_nested": - inputobj[iid].append(src.value) - elif linkMerge == "merge_flattened": - if isinstance(src.value, list): - inputobj[iid].extend(src.value) - else: - inputobj[iid].append(src.value) + if iid not in inputobj: + inputobj[iid] = [] + if linkMerge == "merge_nested": + inputobj[iid].append(src.value) + elif linkMerge == "merge_flattened": + if isinstance(src.value, list): + inputobj[iid].extend(src.value) else: - raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge) - return True + inputobj[iid].append(src.value) + else: + raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge) + return True elif valueFrom is not None or can_assign_src_to_sink(src.parameter["type"], sinktype) or sinktype == "Any": # simply assign the value from state to input inputobj[iid] = copy.deepcopy(src.value) return True return False + def can_assign_src_to_sink(src, sink): # type: (Any, Any) -> bool """Check for identical type specifications, ignoring extra keys like inputBinding. """ @@ -114,6 +113,7 @@ def can_assign_src_to_sink(src, sink): # type: (Any, Any) -> bool return src == sink return False + def _compare_records(src, sink): # type: (Dict[Text, Any], Dict[Text, Any]) -> bool """Compare two records, ensuring they have compatible fields. @@ -121,6 +121,7 @@ def _compare_records(src, sink): This handles normalizing record names, which will be relative to workflow step, so that they can be compared. """ + def _rec_fields(rec): # type: (Dict[Text, Any]) -> Dict[Text, Any] out = {} for field in rec["fields"]: @@ -133,14 +134,15 @@ def _rec_fields(rec): # type: (Dict[Text, Any]) -> Dict[Text, Any] for key in sinkfields.iterkeys(): if (not can_assign_src_to_sink( srcfields.get(key, "null"), sinkfields.get(key, "null")) - and sinkfields.get(key) is not None): + and sinkfields.get(key) is not None): _logger.info("Record comparison failure for %s and %s\n" "Did not match fields for %s: %s and %s" % (src["name"], sink["name"], key, srcfields.get(key), - sinkfields.get(key))) + sinkfields.get(key))) return False return True + def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceField, incomplete=False): # type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text, bool) -> Dict[Text, Any] inputobj = {} # type: Dict[Text, Any] @@ -150,7 +152,7 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel iid = shortname(iid) if sourceField in inp: if (isinstance(inp[sourceField], list) and not - supportsMultipleInput): + supportsMultipleInput): raise WorkflowException( "Workflow contains multiple inbound links to a single " "parameter but MultipleInputFeatureRequirement is not " @@ -161,13 +163,13 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel if not match_types( inp["type"], state[src], iid, inputobj, inp.get("linkMerge", ("merge_nested" - if len(connections) > 1 else None)), + if len(connections) > 1 else None)), valueFrom=inp.get("valueFrom")): raise WorkflowException( u"Type mismatch between source '%s' (%s) and " "sink '%s' (%s)" % (src, - state[src].parameter["type"], inp["id"], - inp["type"])) + state[src].parameter["type"], inp["id"], + inp["type"])) elif src not in state: raise WorkflowException( u"Connect source '%s' on parameter '%s' does not " @@ -184,7 +186,6 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel class WorkflowJobStep(object): - def __init__(self, step): # type: (Any) -> None self.step = step self.tool = step.tool @@ -203,7 +204,6 @@ def job(self, joborder, output_callback, **kwargs): class WorkflowJob(object): - def __init__(self, workflow, **kwargs): # type: (Workflow, **Any) -> None self.workflow = workflow @@ -221,7 +221,8 @@ def __init__(self, workflow, **kwargs): self.name = uniquename(u"workflow %s" % kwargs.get("name", shortname(self.workflow.tool.get("id", "embedded")))) - _logger.debug(u"[%s] initialized from %s", self.name, self.tool.get("id", "workflow embedded in %s" % kwargs.get("part_of"))) + _logger.debug(u"[%s] initialized from %s", self.name, + self.tool.get("id", "workflow embedded in %s" % kwargs.get("part_of"))) def receive_output(self, step, outputparms, jobout, processStatus): # type: (WorkflowJobStep, List[Dict[Text,Text]], Dict[Text,Text], Text) -> None @@ -275,13 +276,14 @@ def try_make_job(self, step, **kwargs): if "valueFrom" in i} if len(valueFrom) > 0 and not bool(self.workflow.get_requirement("StepInputExpressionRequirement")[0]): - raise WorkflowException("Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements") + raise WorkflowException( + "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements") - vfinputs = {shortname(k): v for k,v in inputobj.iteritems()} + vfinputs = {shortname(k): v for k, v in inputobj.iteritems()} def postScatterEval(io): # type: (Dict[Text, Any]) -> Dict[Text, Any] - shortio = {shortname(k): v for k,v in io.iteritems()} + shortio = {shortname(k): v for k, v in io.iteritems()} def valueFromFunc(k, v): # type: (Any, Any) -> Any if k in valueFrom: @@ -290,7 +292,8 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any None, None, {}, context=v) else: return v - return {k: valueFromFunc(k, v) for k,v in io.items()} + + return {k: valueFromFunc(k, v) for k, v in io.items()} if "scatter" in step.tool: scatter = aslist(step.tool["scatter"]) @@ -303,20 +306,20 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any jobs = dotproduct_scatter(step, inputobj, scatter, cast( # known bug with mypy # https://github.com/python/mypy/issues/797 - Callable[[Any], Any],callback), **kwargs) + Callable[[Any], Any], callback), **kwargs) elif method == "nested_crossproduct": jobs = nested_crossproduct_scatter(step, inputobj, - scatter, cast(Callable[[Any], Any], callback), - # known bug in mypy - # https://github.com/python/mypy/issues/797 - **kwargs) + scatter, cast(Callable[[Any], Any], callback), + # known bug in mypy + # https://github.com/python/mypy/issues/797 + **kwargs) elif method == "flat_crossproduct": jobs = cast(Generator, flat_crossproduct_scatter(step, inputobj, scatter, cast(Callable[[Any], Any], - # known bug in mypy - # https://github.com/python/mypy/issues/797 + # known bug in mypy + # https://github.com/python/mypy/issues/797 callback), 0, **kwargs)) else: if _logger.isEnabledFor(logging.DEBUG): @@ -358,7 +361,8 @@ def job(self, joborder, output_callback, **kwargs): elif "default" in i: self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(i["default"])) else: - raise WorkflowException(u"Input '%s' not in input object and does not have a default value." % (i["id"])) + raise WorkflowException( + u"Input '%s' not in input object and does not have a default value." % (i["id"])) for s in self.steps: for out in s.tool["outputs"]: @@ -406,7 +410,8 @@ def job(self, joborder, output_callback, **kwargs): supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0]) try: - wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource", incomplete=True) + wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource", + incomplete=True) except WorkflowException as e: _logger.error(u"[%s] Cannot collect workflow output: %s", self.name, e) wo = {} @@ -426,7 +431,7 @@ def __init__(self, toolpath_object, **kwargs): kwargs["hints"] = self.hints makeTool = kwargs.get("makeTool") - self.steps = [WorkflowStep(step, n, **kwargs) for n,step in enumerate(self.tool.get("steps", []))] + self.steps = [WorkflowStep(step, n, **kwargs) for n, step in enumerate(self.tool.get("steps", []))] random.shuffle(self.steps) # TODO: statically validate data links instead of doing it at runtime. @@ -449,7 +454,6 @@ def visit(self, op): class WorkflowStep(Process): - def __init__(self, toolpath_object, pos, **kwargs): # type: (Dict[Text, Any], int, **Any) -> None if "id" in toolpath_object: @@ -497,8 +501,11 @@ def __init__(self, toolpath_object, pos, **kwargs): if stepfield == "in": param["type"] = "Any" else: - raise WorkflowException("[%s] Workflow step output '%s' not found in the outputs of the tool (expected one of '%s')" % ( - self.id, shortname(step_entry), "', '".join([shortname(tool_entry["id"]) for tool_entry in self.embedded_tool.tool[toolfield]]))) + raise WorkflowException( + "[%s] Workflow step output '%s' not found in the outputs of the tool (expected one of '%s')" % ( + self.id, shortname(step_entry), "', '".join( + [shortname(tool_entry["id"]) for tool_entry in + self.embedded_tool.tool[toolfield]]))) param["id"] = inputid toolpath_object[toolfield].append(param) @@ -507,7 +514,8 @@ def __init__(self, toolpath_object, pos, **kwargs): if self.embedded_tool.tool["class"] == "Workflow": (feature, _) = self.get_requirement("SubworkflowFeatureRequirement") if not feature: - raise WorkflowException("Workflow contains embedded workflow but SubworkflowFeatureRequirement not in requirements") + raise WorkflowException( + "Workflow contains embedded workflow but SubworkflowFeatureRequirement not in requirements") if "scatter" in self.tool: (feature, _) = self.get_requirement("ScatterFeatureRequirement") @@ -525,7 +533,8 @@ def __init__(self, toolpath_object, pos, **kwargs): inp_map = {i["id"]: i for i in inputparms} for s in scatter: if s not in inp_map: - raise WorkflowException(u"Scatter parameter '%s' does not correspond to an input parameter of this step, inputs are %s" % (s, inp_map.keys())) + raise WorkflowException(u"Scatter parameter '%s' does not correspond to an input parameter of this " + u"step, inputs are %s" % (s, inp_map.keys())) inp_map[s]["type"] = {"type": "array", "items": inp_map[s]["type"]} @@ -542,7 +551,7 @@ def __init__(self, toolpath_object, pos, **kwargs): def receive_output(self, output_callback, jobout, processStatus): # type: (Callable[...,Any], Dict[Text, Text], Text) -> None - #_logger.debug("WorkflowStep output from run is %s", jobout) + # _logger.debug("WorkflowStep output from run is %s", jobout) output = {} for i in self.tool["outputs"]: field = shortname(i["id"]) @@ -577,7 +586,6 @@ def visit(self, op): class ReceiveScatterOutput(object): - def __init__(self, output_callback, dest): # type: (Callable[..., Any], Dict[Text,List[Text]]) -> None self.dest = dest @@ -588,7 +596,7 @@ def __init__(self, output_callback, dest): def receive_scatter_output(self, index, jobout, processStatus): # type: (int, Dict[Text, Text], Text) -> None - for k,v in jobout.items(): + for k, v in jobout.items(): self.dest[k][index] = v if processStatus != "success": @@ -605,6 +613,7 @@ def setTotal(self, total): # type: (int) -> None if self.completed == self.total: self.output_callback(self.dest, self.processStatus) + def parallel_steps(steps, rc, kwargs): # type: (List[Generator], ReceiveScatterOutput, Dict[str, Any]) -> Generator while rc.completed < rc.total: made_progress = False @@ -628,6 +637,7 @@ def parallel_steps(steps, rc, kwargs): # type: (List[Generator], ReceiveScatter if not made_progress and rc.completed < rc.total: yield None + def dotproduct_scatter(process, joborder, scatter_keys, output_callback, **kwargs): # type: (WorkflowJobStep, Dict[Text, Any], List[Text], Callable[..., Any], **Any) -> Generator l = None @@ -677,16 +687,17 @@ def nested_crossproduct_scatter(process, joborder, scatter_keys, output_callback jo = kwargs["postScatterEval"](jo) steps.append(process.job(jo, functools.partial(rc.receive_scatter_output, n), **kwargs)) else: + # known bug with mypy, https://github.com/python/mypy/issues/797 + casted = cast(Callable[[Any], Any], functools.partial(rc.receive_scatter_output, n)) steps.append(nested_crossproduct_scatter(process, jo, - scatter_keys[1:], cast( # known bug with mypy - # https://github.com/python/mypy/issues/797g - Callable[[Any], Any], - functools.partial(rc.receive_scatter_output, n)), **kwargs)) + scatter_keys[1:], + casted, **kwargs)) rc.setTotal(l) return parallel_steps(steps, rc, kwargs) + def crossproduct_size(joborder, scatter_keys): # type: (Dict[Text, Any], List[Text]) -> int scatter_key = scatter_keys[0] @@ -700,6 +711,7 @@ def crossproduct_size(joborder, scatter_keys): sum += crossproduct_size(joborder, scatter_keys[1:]) return sum + def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, startindex, **kwargs): # type: (WorkflowJobStep, Dict[Text, Any], List[Text], Union[ReceiveScatterOutput,Callable[..., Any]], int, **Any) -> Union[List[Generator], Generator] scatter_key = scatter_keys[0] diff --git a/ez_setup.py b/ez_setup.py index 50e0dfc69..e4e7b6ee9 100755 --- a/ez_setup.py +++ b/ez_setup.py @@ -6,18 +6,16 @@ Run this script to install or upgrade setuptools. """ +import contextlib +import optparse import os +import platform import shutil +import subprocess import sys import tempfile -import zipfile -import optparse -import subprocess -import platform import textwrap -import contextlib -import warnings - +import zipfile from distutils import log try: @@ -70,7 +68,6 @@ def _build_egg(egg, archive_filename, to_dir): class ContextualZipFile(zipfile.ZipFile): - """Supplement ZipFile class to support context manager for Python 2.6.""" def __enter__(self): @@ -176,13 +173,13 @@ def _conflict_bail(VC_err, version): unsafe to unload it. Bail out. """ conflict_tmpl = textwrap.dedent(""" - The required version of setuptools (>={version}) is not available, - and can't be installed while this script is running. Please - install a more recent version first, using - 'easy_install -U setuptools'. + The required version of setuptools (>={version}) is not available, + and can't be installed while this script is running. Please + install a more recent version first, using + 'easy_install -U setuptools'. - (Currently using {VC_err.args[0]!r}) - """) + (Currently using {VC_err.args[0]!r}) + """) msg = conflict_tmpl.format(**locals()) sys.stderr.write(msg) sys.exit(2) @@ -192,7 +189,7 @@ def _unload_pkg_resources(): del_modules = [ name for name in sys.modules if name.startswith('pkg_resources') - ] + ] for mod_name in del_modules: del sys.modules[mod_name] @@ -244,6 +241,8 @@ def has_powershell(): except Exception: return False return True + + download_file_powershell.viable = has_powershell @@ -260,6 +259,8 @@ def has_curl(): except Exception: return False return True + + download_file_curl.viable = has_curl @@ -276,6 +277,8 @@ def has_wget(): except Exception: return False return True + + download_file_wget.viable = has_wget @@ -291,6 +294,8 @@ def download_file_insecure(url, target): # Write all the data in one block to avoid creating a partial file. with open(target, "wb") as dst: dst.write(data) + + download_file_insecure.viable = lambda: True @@ -362,9 +367,9 @@ def _parse_args(): default=DEFAULT_VERSION, ) parser.add_option( - '--to-dir', - help="Directory to save (and re-use) package", - default=DEFAULT_SAVE_DIR, + '--to-dir', + help="Directory to save (and re-use) package", + default=DEFAULT_SAVE_DIR, ) options, args = parser.parse_args() # positional arguments are ignored @@ -372,13 +377,13 @@ def _parse_args(): def _download_args(options): - """Return args for download_setuptools function from cmdline args.""" - return dict( - version=options.version, - download_base=options.download_base, - downloader_factory=options.downloader_factory, - to_dir=options.to_dir, - ) + """Return args for download_setuptools function from cmdline args.""" + return dict( + version=options.version, + download_base=options.download_base, + downloader_factory=options.downloader_factory, + to_dir=options.to_dir, + ) def main(): @@ -387,5 +392,6 @@ def main(): archive = download_setuptools(**_download_args(options)) return _install(archive, _build_install_args(options)) + if __name__ == '__main__': sys.exit(main()) diff --git a/gittaggers.py b/gittaggers.py index 53dda8fd8..d15f6886f 100644 --- a/gittaggers.py +++ b/gittaggers.py @@ -1,10 +1,10 @@ -from setuptools.command.egg_info import egg_info import subprocess import time +from setuptools.command.egg_info import egg_info + class EggInfoFromGit(egg_info): - """Tag the build with git commit timestamp. If a build tag has already been set (e.g., "egg_info -b", building diff --git a/setup.py b/setup.py index a3d610648..72c57d387 100755 --- a/setup.py +++ b/setup.py @@ -1,18 +1,18 @@ #!/usr/bin/env python import ez_setup + ez_setup.use_setuptools() import os -import sys -import shutil import setuptools.command.egg_info as egg_info_cmd -from setuptools import setup, find_packages +from setuptools import setup SETUP_DIR = os.path.dirname(__file__) README = os.path.join(SETUP_DIR, 'README.rst') try: import gittaggers + tagger = gittaggers.EggInfoFromGit except ImportError: tagger = egg_info_cmd.egg_info @@ -53,8 +53,8 @@ test_suite='tests', tests_require=[], entry_points={ - 'console_scripts': [ "cwltool=cwltool.main:main" ] + 'console_scripts': ["cwltool=cwltool.main:main"] }, zip_safe=True, cmdclass={'egg_info': tagger}, -) + ) diff --git a/tests/test_examples.py b/tests/test_examples.py index a133c7b0f..794a11b30 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1,14 +1,13 @@ import unittest -import json -import cwltool.draft2tool as tool + import cwltool.expression as expr import cwltool.factory -import cwltool.process import cwltool.pathmapper +import cwltool.process import cwltool.workflow -class TestParamMatching(unittest.TestCase): +class TestParamMatching(unittest.TestCase): def test_params(self): self.assertTrue(expr.param_re.match("(foo)")) self.assertTrue(expr.param_re.match("(foo.bar)")) @@ -52,19 +51,19 @@ def test_params(self): } }, "lst": ["A", "B"] - } + } self.assertEqual(expr.interpolate("$(foo)", inputs), inputs["foo"]) for pattern in ("$(foo.bar)", - "$(foo['bar'])", - "$(foo[\"bar\"])"): + "$(foo['bar'])", + "$(foo[\"bar\"])"): self.assertEqual(expr.interpolate(pattern, inputs), inputs["foo"]["bar"]) for pattern in ("$(foo.bar.baz)", - "$(foo['bar'].baz)", - "$(foo['bar'][\"baz\"])", - "$(foo.bar['baz'])"): + "$(foo['bar'].baz)", + "$(foo['bar'][\"baz\"])", + "$(foo.bar['baz'])"): self.assertEqual(expr.interpolate(pattern, inputs), "zab1") self.assertEqual(expr.interpolate("$(foo['b ar'].baz)", inputs), 2) @@ -78,14 +77,14 @@ def test_params(self): self.assertEqual(expr.interpolate("$(lst['length'])", inputs), 2) for pattern in ("-$(foo.bar)", - "-$(foo['bar'])", - "-$(foo[\"bar\"])"): + "-$(foo['bar'])", + "-$(foo[\"bar\"])"): self.assertEqual(expr.interpolate(pattern, inputs), """-{"baz": "zab1"}""") for pattern in ("-$(foo.bar.baz)", - "-$(foo['bar'].baz)", - "-$(foo['bar'][\"baz\"])", - "-$(foo.bar['baz'])"): + "-$(foo['bar'].baz)", + "-$(foo['bar'][\"baz\"])", + "-$(foo.bar['baz'])"): self.assertEqual(expr.interpolate(pattern, inputs), "-zab1") self.assertEqual(expr.interpolate("-$(foo['b ar'].baz)", inputs), "-2") @@ -93,16 +92,15 @@ def test_params(self): self.assertEqual(expr.interpolate("-$(foo[\"b\\'ar\"].baz)", inputs), "-true") self.assertEqual(expr.interpolate("-$(foo['b\\\"ar'].baz)", inputs), "-null") - for pattern in ("$(foo.bar) $(foo.bar)", - "$(foo['bar']) $(foo['bar'])", - "$(foo[\"bar\"]) $(foo[\"bar\"])"): + "$(foo['bar']) $(foo['bar'])", + "$(foo[\"bar\"]) $(foo[\"bar\"])"): self.assertEqual(expr.interpolate(pattern, inputs), """{"baz": "zab1"} {"baz": "zab1"}""") for pattern in ("$(foo.bar.baz) $(foo.bar.baz)", - "$(foo['bar'].baz) $(foo['bar'].baz)", - "$(foo['bar'][\"baz\"]) $(foo['bar'][\"baz\"])", - "$(foo.bar['baz']) $(foo.bar['baz'])"): + "$(foo['bar'].baz) $(foo['bar'].baz)", + "$(foo['bar'][\"baz\"]) $(foo['bar'][\"baz\"])", + "$(foo.bar['baz']) $(foo.bar['baz'])"): self.assertEqual(expr.interpolate(pattern, inputs), "zab1 zab1") self.assertEqual(expr.interpolate("$(foo['b ar'].baz) $(foo['b ar'].baz)", inputs), "2 2") @@ -110,8 +108,8 @@ def test_params(self): self.assertEqual(expr.interpolate("$(foo[\"b\\'ar\"].baz) $(foo[\"b\\'ar\"].baz)", inputs), "true true") self.assertEqual(expr.interpolate("$(foo['b\\\"ar'].baz) $(foo['b\\\"ar'].baz)", inputs), "null null") -class TestFactory(unittest.TestCase): +class TestFactory(unittest.TestCase): def test_factory(self): f = cwltool.factory.Factory() echo = f.make("tests/echo.cwl") @@ -140,6 +138,7 @@ def test_partial_output(self): else: self.fail("Should have raised WorkflowStatus") + class TestScanDeps(unittest.TestCase): def test_scandeps(self): obj = { @@ -198,9 +197,9 @@ def loadref(base, p): raise Exception("test case can't load things") sc = cwltool.process.scandeps(obj["id"], obj, - set(("$import", "run")), - set(("$include", "$schemas", "location")), - loadref) + {"$import", "run"}, + {"$include", "$schemas", "location"}, + loadref) sc.sort(key=lambda k: k["basename"]) @@ -209,34 +208,34 @@ def loadref(base, p): "class": "File", "location": "file:///example/bar.cwl" }, - { - "basename": "data.txt", - "class": "File", - "location": "file:///example/data.txt" - }, - { - "basename": "data2", - "class": "Directory", - "location": "file:///example/data2", - "listing": [{ - "basename": "data3.txt", + { + "basename": "data.txt", "class": "File", - "location": "file:///example/data3.txt", - "secondaryFiles": [{ + "location": "file:///example/data.txt" + }, + { + "basename": "data2", + "class": "Directory", + "location": "file:///example/data2", + "listing": [{ + "basename": "data3.txt", "class": "File", - "basename": "data5.txt", - "location": "file:///example/data5.txt" + "location": "file:///example/data3.txt", + "secondaryFiles": [{ + "class": "File", + "basename": "data5.txt", + "location": "file:///example/data5.txt" + }] }] - }] - }, { + }, { "basename": "data4.txt", "class": "File", "location": "file:///example/data4.txt" - }], sc) + }], sc) sc = cwltool.process.scandeps(obj["id"], obj, - set(("run"),), - set(), loadref) + set(("run"), ), + set(), loadref) sc.sort(key=lambda k: k["basename"]) @@ -246,41 +245,42 @@ def loadref(base, p): "location": "file:///example/bar.cwl" }], sc) + class TestDedup(unittest.TestCase): def test_dedup(self): ex = [{ "class": "File", "location": "file:///example/a" }, - { - "class": "File", - "location": "file:///example/a" - }, - { - "class": "File", - "location": "file:///example/d" - }, - { - "class": "Directory", - "location": "file:///example/c", - "listing": [{ + { + "class": "File", + "location": "file:///example/a" + }, + { "class": "File", "location": "file:///example/d" + }, + { + "class": "Directory", + "location": "file:///example/c", + "listing": [{ + "class": "File", + "location": "file:///example/d" + }] }] - }] self.assertEquals([{ "class": "File", "location": "file:///example/a" }, - { - "class": "Directory", - "location": "file:///example/c", - "listing": [{ - "class": "File", - "location": "file:///example/d" - }] - }], cwltool.pathmapper.dedup(ex)) + { + "class": "Directory", + "location": "file:///example/c", + "listing": [{ + "class": "File", + "location": "file:///example/d" + }] + }], cwltool.pathmapper.dedup(ex)) class TestTypeCompare(unittest.TestCase): @@ -304,31 +304,30 @@ def test_typecompare(self): def test_recordcompare(self): src = { 'fields': [{ - 'type': { 'items': 'string', 'type': 'array' }, + 'type': {'items': 'string', 'type': 'array'}, 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec/description' }, - { - 'type': { 'items': 'File', 'type': 'array' }, - 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec/vrn_file' - }], - 'type': 'record', - 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec' + { + 'type': {'items': 'File', 'type': 'array'}, + 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec/vrn_file' + }], + 'type': 'record', + 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec' } sink = { 'fields': [{ 'type': {'items': 'string', 'type': 'array'}, 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/steps/vc_output_record.cwl#vc_rec/vc_rec/description' }, - { - 'type': {'items': 'File', 'type': 'array'}, - 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/steps/vc_output_record.cwl#vc_rec/vc_rec/vrn_file' - }], + { + 'type': {'items': 'File', 'type': 'array'}, + 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/steps/vc_output_record.cwl#vc_rec/vc_rec/vrn_file' + }], 'type': 'record', 'name': u'file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/steps/vc_output_record.cwl#vc_rec/vc_rec'} self.assertTrue(cwltool.workflow.can_assign_src_to_sink(src, sink)) - def test_lifting(self): # check that lifting the types of the process outputs to the workflow step # fails if the step 'out' doesn't match. diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 2a0f06870..7845fb9b6 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -1,17 +1,13 @@ import unittest -import schema_salad.ref_resolver + import schema_salad.main +import schema_salad.ref_resolver import schema_salad.schema -from schema_salad.jsonld_context import makerdf -from pkg_resources import Requirement, resource_filename, ResolutionError # type: ignore -import rdflib -import ruamel.yaml as yaml -import json -import os +from cwltool.load_tool import load_tool from cwltool.main import main from cwltool.workflow import defaultMakeTool -from cwltool.load_tool import load_tool + class FetcherTest(unittest.TestCase): def test_fetcher(self): @@ -19,7 +15,7 @@ class TestFetcher(schema_salad.ref_resolver.Fetcher): def __init__(self, a, b): pass - def fetch_text(self, url): # type: (unicode) -> unicode + def fetch_text(self, url): # type: (unicode) -> unicode if url == "baz:bar/foo.cwl": return """ cwlVersion: v1.0 @@ -42,4 +38,5 @@ def test_resolver(d, a): load_tool("foo.cwl", defaultMakeTool, resolver=test_resolver, fetcher_constructor=TestFetcher) - self.assertEquals(0, main(["--print-pre", "--debug", "foo.cwl"], resolver=test_resolver, fetcher_constructor=TestFetcher)) + self.assertEquals(0, main(["--print-pre", "--debug", "foo.cwl"], resolver=test_resolver, + fetcher_constructor=TestFetcher)) diff --git a/tests/test_pack.py b/tests/test_pack.py index dd4fad476..7193cd054 100644 --- a/tests/test_pack.py +++ b/tests/test_pack.py @@ -1,11 +1,11 @@ -import unittest import json import os +import unittest from functools import partial -from cwltool.load_tool import fetch_document, validate_document import cwltool.pack import cwltool.workflow +from cwltool.load_tool import fetch_document, validate_document from cwltool.main import makeRelative from cwltool.process import adjustFileObjs, adjustDirObjs diff --git a/tests/test_pathmapper.py b/tests/test_pathmapper.py index b54c89a3f..a5302140a 100644 --- a/tests/test_pathmapper.py +++ b/tests/test_pathmapper.py @@ -1,17 +1,11 @@ import unittest -import json -import cwltool.draft2tool as tool -import cwltool.expression as expr -import cwltool.factory + from cwltool.pathmapper import PathMapper class TestPathMapper(unittest.TestCase): - def test_subclass(self): - class SubPathMapper(PathMapper): - def __init__(self, referenced_files, basedir, stagedir, new): super(SubPathMapper, self).__init__(referenced_files, basedir, stagedir) self.new = new diff --git a/tests/test_relax_path_checks.py b/tests/test_relax_path_checks.py index 403d8e97e..4012d0a1e 100644 --- a/tests/test_relax_path_checks.py +++ b/tests/test_relax_path_checks.py @@ -1,12 +1,11 @@ import unittest from tempfile import NamedTemporaryFile + from cwltool.main import main class ToolArgparse(unittest.TestCase): - - - script=''' + script = ''' #!/usr/bin/env cwl-runner cwlVersion: v1.0 class: CommandLineTool @@ -33,7 +32,8 @@ def test_spaces_in_input_files(self): main(["--debug", f.name, '--input', spaces.name]), 1) self.assertEquals( main(["--debug", "--relax-path-checks", f.name, '--input', - spaces.name]), 0) + spaces.name]), 0) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_toolargparse.py b/tests/test_toolargparse.py index 40983088a..959b630a4 100644 --- a/tests/test_toolargparse.py +++ b/tests/test_toolargparse.py @@ -1,12 +1,11 @@ import unittest from tempfile import NamedTemporaryFile + from cwltool.main import main class ToolArgparse(unittest.TestCase): - - - script=''' + script = ''' #!/usr/bin/env cwl-runner cwlVersion: v1.0 class: CommandLineTool @@ -26,7 +25,7 @@ class ToolArgparse(unittest.TestCase): baseCommand: [cat] ''' - script2=''' + script2 = ''' #!/usr/bin/env cwl-runner cwlVersion: v1.0 class: CommandLineTool @@ -44,7 +43,7 @@ class ToolArgparse(unittest.TestCase): stdout: foo ''' - script3=''' + script3 = ''' #!/usr/bin/env cwl-runner cwlVersion: v1.0 @@ -94,11 +93,10 @@ def test_record(self): f.flush() try: self.assertEquals(main([f.name, '--foo.one', 'README.rst', - '--foo.two', 'test']), 0) + '--foo.two', 'test']), 0) except SystemExit as e: self.assertEquals(e.code, 0) - if __name__ == '__main__': unittest.main()