Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add workflow execute subcommand #2273

Merged
merged 16 commits into from Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -117,7 +117,7 @@ This command allows execution of a workflow template using a specified runner/pr

Syntax examples:
```
$ renku workflow execute --provider <provider> --config <config file> <workflow name> --mapping <mapping>
$ renku workflow execute --provider <provider> --config <config file> <workflow name> --values <file>
$ renku workflow execute --provider <provider> --config <config file> <workflow name> --set learning_rate=0.9 --set step3.learning_rate=0.1 --set step3.result_file=/tmp/myresult.txt
```

Expand Down
65 changes: 64 additions & 1 deletion renku/cli/workflow.py
Expand Up @@ -230,20 +230,23 @@
from rich.console import Console
from rich.markdown import Markdown

from renku.cli.utils.callback import ClickCallback
from renku.core.commands.echo import ERROR
from renku.core.commands.format.workflow import WORKFLOW_COLUMNS, WORKFLOW_FORMATS
from renku.core.commands.view_model.composite_plan import CompositePlanViewModel
from renku.core.commands.view_model.plan import PlanViewModel
from renku.core.commands.workflow import (
compose_workflow_command,
edit_workflow_command,
execute_workflow_command,
export_workflow_command,
list_workflows_command,
remove_workflow_command,
show_workflow_command,
workflow_inputs_command,
workflow_outputs_command,
)
from renku.core.plugins.provider import available_workflow_providers
from renku.core.plugins.workflow import supported_formats


Expand Down Expand Up @@ -551,8 +554,13 @@ def edit(workflow_name, name, description, set_params, map_params, rename_params
)
def export(workflow_name, format, output, values):
"""Export workflow."""
communicator = ClickCallback()

result = (
export_workflow_command().build().execute(name_or_id=workflow_name, format=format, output=output, values=values)
export_workflow_command()
.with_communicator(communicator)
.build()
.execute(name_or_id=workflow_name, format=format, output=output, values=values)
)

if not output:
Expand Down Expand Up @@ -599,3 +607,58 @@ def outputs(ctx, paths):
p not in output_paths and all(Path(o) not in Path(p).parents for o in output_paths) for p in paths
):
ctx.exit(1)


@workflow.command()
@click.option(
"provider",
"-p",
"--provider",
default="cwltool",
show_default=True,
type=click.Choice(available_workflow_providers(), case_sensitive=False),
help="The workflow engine to use.",
)
@click.option("config", "-c", "--config", metavar="<config file>", help="YAML file containing config for the provider.")
@click.option(
"set_params",
"-s",
"--set",
multiple=True,
metavar="<parameter>=<value>",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work with CompositePlanValueResolver? In there we look for a parameters or a steps keys. It's also probably good to add a test which uses this flag.

help="Set <value> for a <parameter> to be used in execution.",
)
@click.option(
"--values",
metavar="<file>",
type=click.Path(exists=True, dir_okay=False),
help="YAML file containing parameter mappings to be used.",
)
@click.argument("name_or_id", required=True)
def execute(
provider,
config,
set_params,
values,
name_or_id,
):
"""Execute a given workflow."""
communicator = ClickCallback()

result = (
execute_workflow_command()
.with_communicator(communicator)
.build()
.execute(
name_or_id=name_or_id,
provider=provider,
config=config,
values=values,
set_params=set_params,
)
)

if result.output:
click.echo(
"Unchanged files:\n\n\t{0}".format("\n\t".join(click.style(path, fg="yellow") for path in result.output))
)
2 changes: 1 addition & 1 deletion renku/core/commands/rerun.py
Expand Up @@ -21,7 +21,7 @@
from typing import List, Set

from renku.core import errors
from renku.core.commands.update import execute_workflow
from renku.core.commands.workflow import execute_workflow
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
Expand Down
142 changes: 2 additions & 140 deletions renku/core/commands/update.py
Expand Up @@ -17,29 +17,20 @@
# limitations under the License.
"""Renku ``update`` command."""

import uuid
from collections import defaultdict
from pathlib import Path
from typing import List, Set

from git import Actor

from renku.core import errors
from renku.core.commands.workflow import execute_workflow
from renku.core.errors import ParameterError
from renku.core.management.command_builder import inject
from renku.core.management.command_builder.command import Command
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.interface.plan_gateway import IPlanGateway
from renku.core.management.workflow.plan_factory import delete_indirect_files_list
from renku.core.models.provenance.activity import Activity, ActivityCollection
from renku.core.models.workflow.composite_plan import CompositePlan
from renku.core.models.workflow.plan import Plan
from renku.core.utils.datetime8601 import local_now
from renku.core.utils.git import add_to_git
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
from renku.version import __version__, version_url


def update_command():
Expand Down Expand Up @@ -121,132 +112,3 @@ def does_activity_generate_any_paths(activity):
include_newest_activity(activity)

return {a for activities in all_activities.values() for a in activities}


@inject.autoparams()
def execute_workflow(
plans: List[Plan],
command_name,
client_dispatcher: IClientDispatcher,
activity_gateway: IActivityGateway,
plan_gateway: IPlanGateway,
):
"""Execute a Run with/without subprocesses."""
client = client_dispatcher.current_client

# NOTE: Pull inputs from Git LFS or other storage backends
if client.check_external_storage():
inputs = [i.actual_value for p in plans for i in p.inputs]
client.pull_paths_from_storage(*inputs)

delete_indirect_files_list(client.path)

started_at_time = local_now()

modified_outputs = _execute_workflow_helper(plans=plans, client=client)

ended_at_time = local_now()

add_to_git(client.repo.git, *modified_outputs)

if client.repo.is_dirty():
postfix = "s" if len(modified_outputs) > 1 else ""
commit_msg = f"renku {command_name}: committing {len(modified_outputs)} modified file{postfix}"
committer = Actor(f"renku {__version__}", version_url)
client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True)

activities = []

for plan in plans:
# NOTE: Update plans are copies of Plan objects. We need to use the original Plan objects to avoid duplicates.
original_plan = plan_gateway.get_by_id(plan.id)
activity = Activity.from_plan(plan=original_plan, started_at_time=started_at_time, ended_at_time=ended_at_time)
activity_gateway.add(activity)
activities.append(activity)

if len(activities) > 1:
activity_collection = ActivityCollection(activities=activities)
activity_gateway.add_activity_collection(activity_collection)


# TODO: This function is created as a patch from renku/core/commands/workflow.py::_execute_workflow and
# renku/core/management/workflow/providers/cwltool_provider.py::CWLToolProvider::workflow_execute in the ``workflow
# execute`` PR (renku-python/pull/2273). Once the PR is merged remove this function and refactor
# renku/core/commands/workflow.py::_execute_workflow to accept a Plan and use it here.
def _execute_workflow_helper(plans: List[Plan], client):
"""Executes a given workflow using cwltool."""
import os
import shutil
import sys
import tempfile
from pathlib import Path
from urllib.parse import unquote

import cwltool.factory
from cwltool.context import LoadingContext, RuntimeContext

from renku.core import errors
from renku.core.commands.echo import progressbar

basedir = client.path

# NOTE: Create a ``CompositePlan`` because ``workflow_covert`` expects it
workflow = CompositePlan(id=CompositePlan.generate_id(), plans=plans, name=f"plan-collection-{uuid.uuid4().hex}")

with tempfile.NamedTemporaryFile() as f:
# export Plan to cwl
from renku.core.management.workflow.converters.cwl import CWLExporter

converter = CWLExporter()
converter.workflow_convert(workflow=workflow, basedir=basedir, output=Path(f.name), output_format=None)

# run cwl with cwltool
argv = sys.argv
sys.argv = ["cwltool"]

runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True}
loading_args = {"relax_path_checks": True}

# Keep all environment variables.
runtime_context = RuntimeContext(kwargs=runtime_args)
loading_context = LoadingContext(kwargs=loading_args)

factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context)
process = factory.make(os.path.relpath(str(f.name)))

try:
outputs = process()
except cwltool.factory.WorkflowStatus:
raise errors.RenkuException("WorkflowExecuteError()")

sys.argv = argv

# Move outputs to correct location in the repository.
output_dirs = process.factory.executor.output_dirs

def remove_prefix(location, prefix="file://"):
if location.startswith(prefix):
return unquote(location[len(prefix) :])
return unquote(location)

locations = {remove_prefix(output["location"]) for output in outputs.values()}
# make sure to not move an output if it's containing directory gets moved
locations = {
location for location in locations if not any(location.startswith(d) for d in locations if location != d)
}

output_paths = []
with progressbar(locations, label="Moving outputs") as bar:
for location in bar:
for output_dir in output_dirs:
if location.startswith(output_dir):
output_path = location[len(output_dir) :].lstrip(os.path.sep)
destination = basedir / output_path
output_paths.append(destination)
if destination.is_dir():
shutil.rmtree(str(destination))
destination = destination.parent
shutil.move(location, str(destination))
continue

return client.remove_unmodified(output_paths)