Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): use toil-cwl-runner for running workflows #2263

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 35 additions & 59 deletions renku/core/commands/cwl_runner.py
Expand Up @@ -17,10 +17,8 @@
# limitations under the License.
"""Wrap CWL runner."""

import os
import shutil
import sys
from urllib.parse import unquote
import subprocess
from pathlib import Path

import click

Expand All @@ -31,71 +29,49 @@
from .echo import progressbar


@inject.autoparams()
def execute(output_file, client: LocalClient, output_paths=None):
"""Run the generated workflow using cwltool library."""
output_paths = output_paths or set()
def execute_cwl(job, cwl_file, cwl_filename, output_paths):
"""toil.Job function for execute a provided cwl."""
tempDir = Path(job.fileStore.getLocalTempDir())
cwl = job.fileStore.readGlobalFile(cwl_file, userPath=str(tempDir / cwl_filename))

import cwltool.factory
from cwltool import workflow
from cwltool.context import LoadingContext, RuntimeContext
from cwltool.utils import visit_class
subprocess.check_call(["toil-cwl-runner", cwl])

def construct_tool_object(toolpath_object, *args, **kwargs):
"""Fix missing locations."""
protocol = "file://"
outputs = []
for output in output_paths:
output_file = job.fileStore.writeGlobalFile(output)
job.fileStore.readGlobalFile(output_file, userPath=str(tempDir / output))
outputs.append((output_file, output))
return outputs

def addLocation(d):
if "location" not in d and "path" in d:
d["location"] = protocol + d["path"]

visit_class(toolpath_object, ("File", "Directory"), addLocation)
return workflow.default_make_tool(toolpath_object, *args, **kwargs)
@inject.autoparams()
def execute(output_file, client: LocalClient, output_paths=None):
"""Run the generated workflow using toil-cwl-runner library."""
output_paths = output_paths or set()

argv = sys.argv
sys.argv = ["cwltool"]
from toil.common import Toil
from toil.job import Job
from toil.leader import FailedJobsException

# Keep all environment variables.
runtime_context = RuntimeContext(
kwargs={"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True}
)
loading_context = LoadingContext(kwargs={"construct_tool_object": construct_tool_object, "relax_path_checks": True})
options = Job.Runner.getDefaultOptions("./workflow_run")
options.logLevel = "OFF"
options.clean = "always"

factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context)
process = factory.make(os.path.relpath(str(output_file)))
try:
outputs = process()
except cwltool.factory.WorkflowStatus:
with Toil(options) as toil:
output_file = Path(output_file)
cwl_filename = output_file.name
cwl_file = toil.importFile(output_file.as_uri())
job = Job.wrapJobFn(execute_cwl, cwl_file, cwl_filename, output_paths)
locations = toil.start(job)

with progressbar(length=len(locations), label="Moving outputs") as bar:
for location in locations:
toil.exportFile(location[0], (client.path / location[1]).absolute().as_uri())
bar.update(1)
except FailedJobsException:
raise WorkflowRerunError(output_file)

sys.argv = argv

# Move outputs to correct location in the repository.
output_dirs = process.factory.executor.output_dirs

def remove_prefix(location, prefix="file://"):
if location.startswith(prefix):
return unquote(location[len(prefix) :])
return unquote(location)

locations = {remove_prefix(output["location"]) for output in outputs.values()}
# make sure to not move an output if it's containing directory gets moved
locations = {
location for location in locations if not any(location.startswith(d) for d in locations if location != d)
}

with progressbar(locations, label="Moving outputs") as bar:
for location in bar:
for output_dir in output_dirs:
if location.startswith(output_dir):
output_path = location[len(output_dir) :].lstrip(os.path.sep)
destination = client.path / output_path
if destination.is_dir():
shutil.rmtree(str(destination))
destination = destination.parent
shutil.move(location, str(destination))
continue

unchanged_paths = client.remove_unmodified(output_paths)
if unchanged_paths:
click.echo(
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -190,6 +190,7 @@ def run(self):
"rich>=9.3.0,<10.4.0",
"setuptools_scm>=3.1.0,<6.0.2",
"tabulate>=0.7.7,<0.8.10",
"toil",
"tqdm>=4.48.1,<4.60.1",
"wcmatch>=6.0.0,<8.3",
"werkzeug>=0.15.5,<2.0.2",
Expand Down