Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cwltool/cwlprov/ro.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def guess_mediatype(
return aggregates

def add_uri(self, uri: str, timestamp: Optional[datetime.datetime] = None) -> Aggregate:
"""Add the given URI to this RO."""
self.self_check()
aggr: Aggregate = {"uri": uri}
aggr["createdOn"], aggr["createdBy"] = self._self_made(timestamp=timestamp)
Expand Down
174 changes: 3 additions & 171 deletions cwltool/cwlrdf.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import urllib
from codecs import StreamWriter
from collections.abc import Iterator
from typing import IO, Any, Optional, TextIO, Union, cast
"""RDF output."""

from typing import IO, Any

from rdflib import Graph
from rdflib.query import ResultRow
from ruamel.yaml.comments import CommentedMap
from schema_salad.jsonld_context import makerdf
from schema_salad.utils import ContextType
Expand Down Expand Up @@ -38,172 +36,6 @@ def lastpart(uri: Any) -> str:
return uri2


def dot_with_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?step ?run ?runtype
WHERE {
?step cwl:run ?run .
?run rdf:type ?runtype .
}"""
),
) # ResultRow because the query is of type SELECT

for step, run, _ in qres:
stdout.write(
'"{}" [label="{}"]\n'.format(lastpart(step), f"{lastpart(step)} ({lastpart(run)})")
)

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?step ?inp ?source
WHERE {
?wf Workflow:steps ?step .
?step cwl:inputs ?inp .
?inp cwl:source ?source .
}"""
),
) # ResultRow because the query is of type SELECT

for step, inp, source in qres:
stdout.write('"%s" [shape=box]\n' % (lastpart(inp)))
stdout.write('"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(inp), ""))
stdout.write('"{}" -> "{}" [label="{}"]\n'.format(lastpart(inp), lastpart(step), ""))

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?step ?out
WHERE {
?wf Workflow:steps ?step .
?step cwl:outputs ?out .
}"""
),
) # ResultRow because the query is of type SELECT

for step, out in qres:
stdout.write('"%s" [shape=box]\n' % (lastpart(out)))
stdout.write('"{}" -> "{}" [label="{}"]\n'.format(lastpart(step), lastpart(out), ""))

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?out ?source
WHERE {
?wf cwl:outputs ?out .
?out cwl:source ?source .
}"""
),
) # ResultRow because the query is of type SELECT

for out, source in qres:
stdout.write('"%s" [shape=octagon]\n' % (lastpart(out)))
stdout.write('"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(out), ""))

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?inp
WHERE {
?wf rdf:type cwl:Workflow .
?wf cwl:inputs ?inp .
}"""
),
) # ResultRow because the query is of type SELECT

for (inp,) in qres:
stdout.write('"%s" [shape=octagon]\n' % (lastpart(inp)))


def dot_without_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
dotname: dict[str, str] = {}
clusternode = {}

stdout.write("compound=true\n")

subworkflows = set()
qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?run
WHERE {
?wf rdf:type cwl:Workflow .
?wf Workflow:steps ?step .
?step cwl:run ?run .
?run rdf:type cwl:Workflow .
} ORDER BY ?wf"""
),
) # ResultRow because the query is of type SELECT
for (run,) in qres:
subworkflows.add(run)

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT ?wf ?step ?run ?runtype
WHERE {
?wf rdf:type cwl:Workflow .
?wf Workflow:steps ?step .
?step cwl:run ?run .
?run rdf:type ?runtype .
} ORDER BY ?wf"""
),
) # ResultRow because the query is of type SELECT

currentwf: Optional[str] = None
for wf, step, _run, runtype in qres:
if step not in dotname:
dotname[step] = lastpart(step)

if wf != currentwf:
if currentwf is not None:
stdout.write("}\n")
if wf in subworkflows:
if wf not in dotname:
dotname[wf] = "cluster_" + lastpart(wf)
stdout.write(f'subgraph "{dotname[wf]}" {{ label="{lastpart(wf)}"\n') # noqa: B907
currentwf = wf
clusternode[wf] = step
else:
currentwf = None

if str(runtype) != "https://w3id.org/cwl/cwl#Workflow":
stdout.write(
f'"{dotname[step]}" [label="{urllib.parse.urldefrag(str(step))[1]}"]\n' # noqa: B907
)

if currentwf is not None:
stdout.write("}\n")

qres = cast(
Iterator[ResultRow],
g.query(
"""SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
WHERE {
?wf1 Workflow:steps ?src .
?wf2 Workflow:steps ?sink .
?src cwl:out ?out .
?inp cwl:source ?out .
?sink cwl:in ?inp .
?src cwl:run ?srcrun .
?sink cwl:run ?sinkrun .
}"""
),
) # ResultRow because the query is of type SELECT

for src, sink, srcrun, sinkrun in qres:
attr = ""
if srcrun in clusternode:
attr += 'ltail="%s"' % dotname[srcrun]
src = clusternode[srcrun]
if sinkrun in clusternode:
attr += ' lhead="%s"' % dotname[sinkrun]
sink = clusternode[sinkrun]
stdout.write(f'"{dotname[src]}" -> "{dotname[sink]}" [{attr}]\n') # noqa: B907


def printdot(
wf: Process,
ctx: ContextType,
Expand Down
1 change: 1 addition & 0 deletions cwltool/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, t: Process, factory: "Factory") -> None:

def __call__(self, **kwargs):
# type: (**Any) -> Union[str, Optional[CWLObjectType]]
"""Invoke the tool."""
runtime_context = self.factory.runtime_context.copy()
runtime_context.basedir = os.getcwd()
out, status = self.factory.executor(self.t, kwargs, runtime_context)
Expand Down
1 change: 1 addition & 0 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def relink_initialworkdir(


def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]:
"""No-op."""
return None


Expand Down
3 changes: 3 additions & 0 deletions cwltool/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


def resolve_local(document_loader: Optional[Loader], uri: str) -> Optional[str]:
"""Use the local resolver to find the target of the URI."""
pathpart, frag = urllib.parse.urldefrag(uri)

try:
Expand Down Expand Up @@ -41,6 +42,7 @@ def resolve_local(document_loader: Optional[Loader], uri: str) -> Optional[str]:


def tool_resolver(document_loader: Loader, uri: str) -> Optional[str]:
"""Try both the local resolver and the GA4GH TRS resolver, in that order."""
for r in [resolve_local, resolve_ga4gh_tool]:
ret = r(document_loader, uri)
if ret is not None:
Expand All @@ -63,6 +65,7 @@ def tool_resolver(document_loader: Loader, uri: str) -> Optional[str]:


def resolve_ga4gh_tool(document_loader: Loader, uri: str) -> Optional[str]:
"""Use the GA4GH TRS API to resolve a tool reference."""
path, version = uri.partition(":")[::2]
if not version:
version = "latest"
Expand Down
1 change: 1 addition & 0 deletions cwltool/stdfsaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def listdir(self, fn: str) -> list[str]:
return [abspath(urllib.parse.quote(entry), fn) for entry in os.listdir(self._abs(fn))]

def join(self, path, *paths): # type: (str, *str) -> str
"""Join one or more path segments intelligently."""
return os.path.join(path, *paths)

def realpath(self, path: str) -> str:
Expand Down
Loading