Skip to content

Commit

Permalink
feat(cli): allow CompositePlans to be created based on activities (#2385
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Panaetius committed Oct 5, 2021
1 parent 0d2167b commit 011f618
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 186 deletions.
74 changes: 56 additions & 18 deletions renku/cli/workflow.py
Expand Up @@ -85,21 +85,34 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For more complex workflows consisting of several steps, you can use the
``renku workflow group`` command. This creates a new workflow that has
``renku workflow compose`` command. This creates a new workflow that has
substeps.
The basic usage is:
.. code-block:: console
$ renku run --name step1 -- command
$ renku run --name step2 -- command
$ renku workflow group my-grouped-workflow step1 step2
$ renku run --name step1 -- cp input intermediate
$ renku run --name step2 -- cp intermediate output
$ renku workflow compose my-composed-workflow step1 step2
This would create a new workflow ``my-grouped-workflow`` that consists
This would create a new workflow ``my-composed-workflow`` that consists
of ``step1`` and ``step2`` as steps. This new workflow is just
like any other workflow in renku in that it can be executed, exported
or grouped with other workflows.
or composed with other workflows.
Workflows can also be composed based on past activities and their
inputs/outputs, using the ``--from`` and ``--to`` parameters. This finds
chains of activities from inputs to outputs and then adds them to the
composed plan, applying mappings (see below) where appropriate to make
sure the correct values for execution are used in the composite. This
also means that all the parameters in the used plans are exposed on the
composed plan directly.
In the example above, this would be:
.. code-block:: console
$ renku workflow compose --from input --to output my-composed-workflow
You can expose parameters of child steps on the parent workflow using
``--map``/``-m`` arguments followed by a mapping expression. Mapping expressions
Expand All @@ -113,12 +126,12 @@
An absolute expression in the example above could be ``step1.my_dataset``
to refer to the input, output or argument named ``my_dataset` on the step
``step1``. A relative expression could be ``@step2.@output1`` to refer
to the first output of the second step of the grouped workflow.
to the first output of the second step of the composed workflow.
Valid relative expressions are ``@input<n>``, ``@output<n>`` and ``@param<n>``
for the n'th input, output or argument of a step, respectively. For referring
to steps inside a grouped workflow, you can use ``@step<n>``. For referencing
a mapping on a grouped workflow, you can use ``@mapping<n>``. Of course, the
to steps inside a composed workflow, you can use ``@step<n>``. For referencing
a mapping on a composed workflow, you can use ``@mapping<n>``. Of course, the
names of the objects for all these cases also work.
The expressions can also be combined using ``,`` if a mapping should point
Expand All @@ -130,9 +143,9 @@
.. code-block:: console
$ renku workflow group --map input_file=step1.@input2 \
$ renku workflow compose --map input_file=step1.@input2 \
--map output_file=@step1.my-output,@step2.step2s_output \
my-grouped-workflow step1 step2
my-composed-workflow step1 step2
This would create a mapping called ``input_file`` on the parent workflow that
points to the second input of ``step1`` and a mapping called ``output_file``
Expand All @@ -145,13 +158,13 @@
.. code-block:: console
$ renku workflow group --map input_file=step1.@input2 \
$ renku workflow compose --map input_file=step1.@input2 \
--set input_file=data.csv
my-grouped-workflow step1 step2
my-composed-workflow step1 step2
This would lead to ``data.csv`` being used for the second input of
``step1`` when ``my-grouped-workflow`` is executed (if it isn't overridden
``step1`` when ``my-composed-workflow`` is executed (if it isn't overridden
at execution time).
You can add a description to the mappings to make them more human-readable
Expand All @@ -161,9 +174,9 @@
.. code-block:: console
$ renku workflow group --map input_file=step1.@input2 \
$ renku workflow compose --map input_file=step1.@input2 \
-p input_file="The dataset to process"
my-grouped-workflow step1 step2
my-composed-workflow step1 step2
You can also expose all inputs, outputs or parameters of child steps by
using ``--map-inputs``, ``--map-outputs`` or ``--map-params``, respectively.
Expand All @@ -186,7 +199,7 @@
Renku can also add links for you automatically based on the default values
of inputs and outputs, where inputs/outputs that have the same path get
linked in the grouped run. To do this, pass the ``--link-all`` flag.
linked in the composed run. To do this, pass the ``--link-all`` flag.
.. warning:: Due to workflows having to be directed acyclic graphs, cycles
in the dependencies are not allowed. E.g. step1 depending on step2
Expand Down Expand Up @@ -428,8 +441,22 @@ def remove(name, force):
@click.option("--map-all", is_flag=True, help="Combination of --map-inputs, --map-outputs, --map-params.")
@click.option("--link-all", is_flag=True, help="Automatically link steps based on default values.")
@click.option("--keyword", multiple=True, help="List of keywords for the workflow.")
@click.option(
"--from",
"sources",
type=click.Path(exists=True, dir_okay=False),
multiple=True,
help="Start a composite plan from this file as input.",
)
@click.option(
"--to",
"sinks",
type=click.Path(exists=True, dir_okay=True),
multiple=True,
help="End a composite plan at this file as output.",
)
@click.argument("name", required=True)
@click.argument("steps", nargs=-1, required=True, type=click.UNPROCESSED)
@click.argument("steps", nargs=-1, type=click.UNPROCESSED)
def compose(
description,
mappings,
Expand All @@ -442,11 +469,20 @@ def compose(
map_all,
link_all,
keyword,
sources,
sinks,
name,
steps,
):
"""Create a composite workflow consisting of multiple steps."""

if (sources or sinks) and steps:
click.secho(ERROR + "--from/--to cannot be used at the same time as passing run/step names.")
exit(1)
elif not (sources or sinks or steps):
click.secho(ERROR + "Either --from/--to passing run/step names is required.")
exit(1)

if map_all:
map_inputs = map_outputs = map_params = True

Expand All @@ -466,6 +502,8 @@ def compose(
link_all=link_all,
keywords=keyword,
steps=steps,
sources=sources,
sinks=sinks,
)
)

Expand Down
13 changes: 11 additions & 2 deletions renku/core/commands/format/activity.py
Expand Up @@ -28,7 +28,7 @@
def tabulate_activities(activities: List[Activity], modified_inputs: Set[str]):
"""Return some info about the activities in a tabular form."""
collection = []
fields = "plan, execution_date, modified_inputs, outputs"
fields = "plan, execution_date, modified_inputs, outputs, command"
ActivityDisplay = namedtuple("ActivityDisplay", fields)

for activity in activities:
Expand All @@ -41,7 +41,15 @@ def tabulate_activities(activities: List[Activity], modified_inputs: Set[str]):

modified_usages = ", ".join(sorted(modified_usages))
generations = ", ".join(sorted(generations))
collection.append(ActivityDisplay(plan, activity.ended_at_time, modified_usages, generations))
collection.append(
ActivityDisplay(
plan,
activity.ended_at_time,
modified_usages,
generations,
" ".join(activity.plan_with_values.to_argv(with_streams=True)),
)
)

return tabulate(collection=collection, columns=fields, columns_mapping=ACTIVITY_DISPLAY_COLUMNS, sort=False)

Expand All @@ -51,4 +59,5 @@ def tabulate_activities(activities: List[Activity], modified_inputs: Set[str]):
"execution_date": ("execution_date", "date executed"),
"modified_inputs": ("modified_inputs", "modified inputs"),
"outputs": ("outputs", None),
"command": ("command", None),
}
47 changes: 3 additions & 44 deletions renku/core/commands/rerun.py
Expand Up @@ -17,18 +17,14 @@
# limitations under the License.
"""Renku ``rerun`` command."""

from collections import defaultdict
from typing import List, Set
from typing import List

from renku.core import errors
from renku.core.commands.workflow import execute_workflow
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.workflow import sort_activities
from renku.core.models.provenance.activity import Activity
from renku.core.utils import communication
from renku.core.utils.metadata import add_activity_if_recent
from renku.core.management.workflow.activity import get_activities_until_paths, sort_activities
from renku.core.utils.os import get_relative_paths


Expand Down Expand Up @@ -60,7 +56,7 @@ def _rerun(
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

activities = _get_activities(paths, sources, activity_gateway)
activities = list(get_activities_until_paths(paths, sources, activity_gateway))

if len(activities) == 0:
raise errors.NothingToExecuteError()
Expand All @@ -72,40 +68,3 @@ def _rerun(
plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="rerun")


def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]:
all_activities = defaultdict(set)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]
add_activity_if_recent(activity=activity, activities=existing_activities)

for path in paths:
activities = activity_gateway.get_activities_by_generation(path)

if len(activities) == 0:
communication.warn(f"Path '{path}' is not generated by any workflows.")
continue

latest_activity = max(activities, key=lambda a: a.ended_at_time)

upstream_chains = activity_gateway.get_upstream_activity_chains(latest_activity)

if sources:
# NOTE: Add the activity to check if it also matches the condition
upstream_chains.append((latest_activity,))
# NOTE: Only include paths that is using at least one of the sources
upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[-1].usages)]

# NOTE: Include activity only if any of its upstream match the condition
if upstream_chains:
include_newest_activity(latest_activity)
else:
include_newest_activity(latest_activity)

for chain in upstream_chains:
for activity in chain:
include_newest_activity(activity)

return {a for activities in all_activities.values() for a in activities}
2 changes: 1 addition & 1 deletion renku/core/commands/update.py
Expand Up @@ -28,7 +28,7 @@
from renku.core.management.command_builder.command import Command
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.workflow import sort_activities
from renku.core.management.workflow.activity import sort_activities
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
Expand Down

0 comments on commit 011f618

Please sign in to comment.