Skip to content

Commit

Permalink
feat(service): add workflow export endpoint (#3212)
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed Nov 21, 2022
1 parent dacddde commit bb50f86
Show file tree
Hide file tree
Showing 21 changed files with 548 additions and 121 deletions.
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -172,6 +172,7 @@ Postgresql
powerline
pre
prepend
prepended
prepending
preprocessed
preprocessing
Expand Down
129 changes: 76 additions & 53 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Expand Up @@ -83,7 +83,7 @@ importlib-resources = { version = ">=5.4.0,<5.10.0", python = "<3.9.0" }
inject = "<4.4.0,>=4.3.0"
isort = { version = "<5.10.2,>=5.3.2", optional = true }
jinja2 = { version = ">=2.11.3,<3.1.3" }
marshmallow = { version = ">=3.14.0,<3.18.0", optional = true }
marshmallow = { version = ">=3.18.0", optional = true }
marshmallow-oneofschema = { version=">=3.0.1,<4.0.0", optional = true }
mypy = {version = ">=0.942,<1.0", optional = true}
networkx = "<2.7,>=2.6.0"
Expand Down Expand Up @@ -123,7 +123,7 @@ rdflib = "<7.0,>=6.0.0"
redis = { version = ">=3.5.3,<4.2.0", optional = true }
renku-sphinx-theme = { version = ">=0.2.0", optional = true }
requests = ">=2.23.0,<2.28.2"
responses = { version = ">=0.7.0,<0.22.0", optional = true }
responses = { version = ">=0.7.0,<=0.22.0", optional = true }
rich = ">=9.3.0,<12.6.0"
rq = { version = "==1.11.0", optional = true }
rq-scheduler = { version = "==0.11.0", optional = true }
Expand Down Expand Up @@ -343,6 +343,7 @@ files = [
"renku/**/*.py",
"tests/**/*.py"
]
implicit_optional = true

[[tool.mypy.overrides]]
module = [
Expand Down
2 changes: 2 additions & 0 deletions renku/command/command_builder/command.py
Expand Up @@ -365,6 +365,8 @@ def command(self, operation: Callable):
def working_directory(self, directory: str) -> "Command":
"""Set the working directory for the command.
WARNING: Should not be used in the core service.
Args:
directory(str): The working directory to work in.
Expand Down
2 changes: 1 addition & 1 deletion renku/core/plugin/dataset_provider.py
Expand Up @@ -27,7 +27,7 @@


@hookspec
def dataset_provider() -> "Type[ProviderApi]":
def dataset_provider() -> "Type[ProviderApi]": # type: ignore[empty-body]
"""Plugin Hook for different dataset providers.
Returns:
Expand Down
2 changes: 1 addition & 1 deletion renku/core/plugin/session.py
Expand Up @@ -26,7 +26,7 @@


@hookspec
def session_provider() -> Tuple[ISessionProvider, str]:
def session_provider() -> Tuple[ISessionProvider, str]: # type: ignore[empty-body]
"""Plugin Hook for ``session`` sub-command.
Returns:
Expand Down
21 changes: 16 additions & 5 deletions renku/core/plugin/workflow.py
Expand Up @@ -30,7 +30,7 @@


@hookspec
def workflow_format() -> Tuple[IWorkflowConverter, List[str]]: # type: ignore
def workflow_format() -> Tuple[IWorkflowConverter, List[str]]: # type: ignore[empty-body]
"""Plugin Hook for ``workflow export`` call.
Can be used to export renku workflows in different formats.
Expand All @@ -44,9 +44,14 @@ def workflow_format() -> Tuple[IWorkflowConverter, List[str]]: # type: ignore


@hookspec(firstresult=True)
def workflow_convert(
workflow: Plan, basedir: Path, output: Optional[Path], output_format: Optional[str]
) -> str: # type: ignore
def workflow_convert( # type: ignore[empty-body]
workflow: Plan,
basedir: Path,
output: Optional[Path],
output_format: Optional[str],
resolve_paths: bool,
nest_workflows: bool,
) -> str:
"""Plugin Hook for ``workflow export`` call.
Can be used to export renku workflows in different formats.
Expand Down Expand Up @@ -82,7 +87,13 @@ class WorkflowConverterProtocol(Protocol):
"""Typing protocol to specify type of the workflow converter hook."""

def __call__(
self, workflow: Plan, basedir: Path, output: Optional[Path] = None, output_format: Optional[str] = None
self,
workflow: Plan,
basedir: Path,
output: Optional[Path] = None,
output_format: Optional[str] = None,
resolve_paths: Optional[bool] = None,
nest_workflows: Optional[bool] = None,
) -> str:
"""Dummy method to let mypy know the type of the hook implementation."""
raise NotImplementedError()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/util/git.py
Expand Up @@ -250,7 +250,7 @@ def get_full_repository_path(url: Optional[str]) -> str:
Returns:
The hostname plus path extracted from the URL.
"""
if not str:
if str is None:
return ""

parsed_url = parse_git_url(url)
Expand Down
5 changes: 5 additions & 0 deletions renku/core/util/yaml.py
Expand Up @@ -72,6 +72,11 @@ def write_yaml(path, data):
yaml.dump(data, fp, default_flow_style=False, Dumper=Dumper)


def dumps_yaml(data) -> str:
"""Convert YAML data to a YAML string."""
return yaml.dump(data)


def load_yaml(data):
"""Load YAML data and return its content as a dict."""
return yaml.load(data, Loader=NoDatesSafeLoader) or {}
126 changes: 86 additions & 40 deletions renku/core/workflow/converters/cwl.py
Expand Up @@ -21,15 +21,15 @@
import re
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from uuid import uuid4

import cwl_utils.parser.cwl_v1_2 as cwl

from renku.core import errors
from renku.core.plugin import hookimpl
from renku.core.plugin.provider import RENKU_ENV_PREFIX
from renku.core.util.yaml import write_yaml
from renku.core.util.yaml import dumps_yaml, write_yaml
from renku.core.workflow.concrete_execution_graph import ExecutionGraph
from renku.domain_model.workflow.composite_plan import CompositePlan
from renku.domain_model.workflow.converters import IWorkflowConverter
Expand Down Expand Up @@ -94,38 +94,80 @@ def workflow_format(self):

@hookimpl
def workflow_convert(
self, workflow: Union[CompositePlan, Plan], basedir: Path, output: Optional[Path], output_format: Optional[str]
):
"""Converts the specified workflow to CWL format."""
self,
workflow: Union[CompositePlan, Plan],
basedir: Path,
output: Optional[Path],
output_format: Optional[str],
resolve_paths: Optional[bool],
nest_workflows: Optional[bool],
) -> str:
"""Converts the specified workflow to CWL format.
Args:
worflow(Union[CompositePlan, Plan]): The plan or composite plan to be converted to cwl.
basedir(Path): The path of the base location used as a prefix for all workflow input and outputs.
output(Optional[Path]): The file where the CWL specification should be saved,
if None then no file is created.
output_format(Optional[str]): Not used. Only YAML is generated, regardless of what is provided.
resolve_paths(Optional[bool]): Whether to make all paths absolute and resolve all symlinks,
True by default.
nest_workflows(Optional[bool]): Whether nested CWL workflows should be used or each sub-workflow should be
a separate file, False by default.
Returns:
The contents of the CWL workflow as string. If nested workflows are used then only the parent
specification is returned.
"""
filename = None

if resolve_paths is None:
resolve_paths = True

if output:
if output.is_dir():
tmpdir = output
filename = None
else:
tmpdir = output.parent
filename = output
else:
tmpdir = Path(tempfile.mkdtemp())

cwl_workflow: Union[cwl.Workflow, CommandLineTool]
if isinstance(workflow, CompositePlan):
path = CWLExporter._convert_composite(
workflow, tmpdir, basedir, filename=filename, output_format=output_format
)
cwl_workflow = CWLExporter._convert_composite(workflow, basedir, resolve_paths=resolve_paths)
if nest_workflows:
# INFO: There is only one parent workflow with all children embedded in it
if cwl_workflow.requirements is None:
cwl_workflow.requirements = []
cwl_workflow.requirements.append(cwl.SubworkflowFeatureRequirement())
else:
# INFO: The parent composite worfklow references other workflow files,
# write the child workflows in separate files and reference them in parent
for step in cast(List[WorkflowStep], cwl_workflow.steps):
step_filename = Path(f"{uuid4()}.cwl")
step_path = (tmpdir / step_filename).resolve()
write_yaml(step_path, step.run.save())
step.run = str(step_path)
if filename is None:
filename = Path(f"parent_{uuid4()}.cwl")
else:
_, path = CWLExporter._convert_step(
workflow, tmpdir, basedir, filename=filename, output_format=output_format
)
cwl_workflow = CWLExporter._convert_step(workflow, basedir, resolve_paths=resolve_paths)
if filename is None:
filename = Path(f"{uuid4()}.cwl")

return path.read_text()
cwl_workflow_dict: Dict[str, Any] = cwl_workflow.save()
path = (tmpdir / filename).resolve()
write_yaml(path, cwl_workflow_dict)
return dumps_yaml(cwl_workflow_dict)

@staticmethod
def _sanitize_id(id):
return re.sub(r"/|-", "_", id)

@staticmethod
def _convert_composite(
workflow: CompositePlan, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str]
):
def _convert_composite(workflow: CompositePlan, basedir: Path, resolve_paths: bool) -> cwl.Workflow:
"""Converts a composite plan to a CWL file."""
inputs: Dict[str, str] = {}
arguments = {}
Expand All @@ -145,10 +187,8 @@ def _convert_composite(
import networkx as nx

for i, wf in enumerate(nx.topological_sort(graph.workflow_graph)):
cwl_workflow, path = CWLExporter._convert_step(
workflow=wf, tmpdir=tmpdir, basedir=basedir, filename=None, output_format=output_format
)
step = WorkflowStep(in_=[], out=[], run=str(path), id="step_{}".format(i))
step_clitool = CWLExporter._convert_step(workflow=wf, basedir=basedir, resolve_paths=resolve_paths)
step = WorkflowStep(in_=[], out=[], run=step_clitool, id="step_{}".format(i))

for input in wf.inputs:
input_path = input.actual_value
Expand Down Expand Up @@ -192,11 +232,17 @@ def _convert_composite(
# check types of paths and add as top level inputs/outputs
for path, id_ in inputs.items():
type_ = "Directory" if os.path.isdir(path) else "File"
location = Path(path)
if resolve_paths:
location = location.resolve()
location_str = str(location.as_uri())
else:
location_str = str(location)
workflow_object.inputs.append(
cwl.WorkflowInputParameter(
id=id_,
type=type_,
default={"location": Path(path).resolve().as_uri(), "class": type_},
default={"location": location_str, "class": type_},
)
)

Expand All @@ -211,19 +257,12 @@ def _convert_composite(
id="output_{}".format(index), outputSource="{}/{}".format(step_id, id_), type=type_
)
)
if filename is None:
filename = Path("parent_{}.cwl".format(uuid4()))

output = workflow_object.save()
path = (tmpdir / filename).resolve()
write_yaml(path, output)
return path
return workflow_object

@staticmethod
def _convert_step(
workflow: Plan, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str]
):
"""Converts a single workflow step to a CWL file."""
def _convert_step(workflow: Plan, basedir: Path, resolve_paths: bool) -> CommandLineTool:
"""Converts a single workflow step to a CWL CommandLineTool."""
stdin, stdout, stderr = None, None, None

inputs = list(workflow.inputs)
Expand Down Expand Up @@ -276,7 +315,7 @@ def _convert_step(
tool_object.inputs.append(arg)

for input_ in inputs:
tool_input = CWLExporter._convert_input(input_, basedir)
tool_input = CWLExporter._convert_input(input_, basedir, resolve_paths=resolve_paths)

workdir_req.listing.append(
cwl.Dirent(entry="$(inputs.{})".format(tool_input.id), entryname=input_.actual_value, writable=False)
Expand All @@ -299,12 +338,18 @@ def _convert_step(
workdir_req.listing.append(
cwl.Dirent(entry="$(inputs.input_renku_metadata)", entryname=".renku", writable=False)
)
location = basedir / ".renku"
if resolve_paths:
location = location.resolve()
location_str = location.as_uri()
else:
location_str = str(location)
tool_object.inputs.append(
cwl.CommandInputParameter(
id="input_renku_metadata",
type="Directory",
inputBinding=None,
default={"location": (basedir / ".renku").resolve().as_uri(), "class": "Directory"},
default={"location": location_str, "class": "Directory"},
)
)

Expand All @@ -315,12 +360,7 @@ def _convert_step(
if environment_variables:
tool_object.requirements.append(cwl.EnvVarRequirement(environment_variables)) # type: ignore

output = tool_object.save()
if filename is None:
filename = Path("{}.cwl".format(uuid4()))
path = (tmpdir / filename).resolve()
write_yaml(path, output)
return output, path
return tool_object

@staticmethod
def _convert_parameter(parameter: CommandParameter):
Expand All @@ -347,7 +387,7 @@ def _convert_parameter(parameter: CommandParameter):
)

@staticmethod
def _convert_input(input: CommandInput, basedir: Path):
def _convert_input(input: CommandInput, basedir: Path, resolve_paths: bool):
"""Converts an input to a CWL input."""
type_ = (
"Directory"
Expand All @@ -371,13 +411,19 @@ def _convert_input(input: CommandInput, basedir: Path):
prefix = prefix[:-1]
separate = True

location = basedir / input.actual_value
if resolve_paths:
location = location.resolve()
location_str = location.as_uri()
else:
location_str = str(location)
return cwl.CommandInputParameter(
id=sanitized_id,
type=type_,
inputBinding=cwl.CommandLineBinding(position=position, prefix=prefix, separate=separate)
if position or prefix
else None,
default={"location": (basedir / input.actual_value).resolve().as_uri(), "class": type_},
default={"location": location_str, "class": type_},
)

@staticmethod
Expand Down

0 comments on commit bb50f86

Please sign in to comment.