Skip to content

Commit

Permalink
feat(core): add renku rerun command (#2319)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Sep 15, 2021
1 parent c047ed9 commit c61a5ab
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 379 deletions.
78 changes: 8 additions & 70 deletions renku/cli/rerun.py
Expand Up @@ -36,92 +36,30 @@
$ renku rerun C
If you would like to recreate a file which was one of several produced by
a tool, then these files must be recreated as well. See the explanation in
:ref:`updating siblings <cli-update-with-siblings>`.
Note that all other outputs of the executed workflow will be recreated as well.
"""

import os
import sys
from pathlib import Path

import click

from renku.cli.utils.callback import ClickCallback
from renku.core import errors
from renku.core.commands.options import option_siblings
from renku.core.commands.rerun import rerun_workflows
from renku.core.management.command_builder import inject
from renku.core.management.interface.client_dispatcher import IClientDispatcher


def show_inputs(workflow):
"""Show workflow inputs and exit."""
for input_ in workflow.inputs:
click.echo("{id}: {default}".format(id=input_._id, default=input_.consumes.path))
sys.exit(0)


@inject.autoparams()
def edit_inputs(workflow, client_dispatcher: IClientDispatcher):
"""Edit workflow inputs."""
client = client_dispatcher.current_client
for input_ in workflow.inputs:
new_path = click.prompt("{0._id}".format(input_), default=input_.consumes.path)
input_.consumes.path = str(Path(os.path.abspath(new_path)).relative_to(client.path))

try:
input_.consumes.commit = client.find_previous_commit(input_.consumes.path)
except KeyError:
raise errors.DirtyRepository(f"Please commit {input_.consumes.path} before using it as an input.")

input_.consumes._id = input_.consumes.default_id()
input_.consumes._label = input_.consumes.default_label()

for step in workflow.subprocesses:
for parameter in step.process.parameters:
parameter.default_value = click.prompt("{0._id}".format(parameter), default=parameter.default_value)

return workflow
from renku.core.commands.rerun import rerun_command


@click.command()
@click.option("--revision", default="HEAD")
@click.option(
"--from",
"roots",
"sources",
type=click.Path(exists=True, dir_okay=False),
multiple=True,
help="Start an execution from this file.",
)
@option_siblings
@click.option(
"--default-inputs",
"inputs",
default=True,
flag_value=lambda workflow: workflow,
help="Use default inputs.",
type=click.types.UnprocessedParamType(),
)
@click.option(
"--show-inputs",
"inputs",
flag_value=show_inputs,
help=show_inputs.__doc__,
type=click.types.UnprocessedParamType(),
)
@click.option(
"--edit-inputs",
"inputs",
flag_value=edit_inputs,
help=edit_inputs.__doc__,
type=click.types.UnprocessedParamType(),
)
@click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1, required=True)
def rerun(revision, roots, siblings, inputs, paths):
def rerun(sources, paths):
"""Recreate files generated by a sequence of ``run`` commands."""
communicator = ClickCallback()

rerun_workflows().with_communicator(communicator).build().execute(
revision=revision, roots=roots, siblings=siblings, inputs=inputs, paths=paths
)
try:
rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths)
except errors.NothingToExecuteError:
exit(1)
52 changes: 0 additions & 52 deletions renku/core/commands/options.py
Expand Up @@ -19,8 +19,6 @@

import click

from renku.core.errors import RenkuException

from .git import set_git_isolation


Expand All @@ -45,56 +43,6 @@ def install_completion(ctx, attr, value): # pragma: no cover
)


def check_siblings(graph, outputs):
"""Check that all outputs have their siblings listed."""
siblings = set()
for node in outputs:
siblings |= graph.siblings(node)

siblings = {node.path for node in siblings}
missing = siblings - {node.path for node in outputs}
missing = {m for m in missing if all(not m.startswith(node.path) for node in outputs)}

if missing:
msg = "Include the files above in the command " "or use the --with-siblings option."
raise RenkuException(
"There are missing output siblings:\n\n"
"\t{0}\n\n{1}".format("\n\t".join(click.style(path, fg="red") for path in missing), msg)
)
return outputs


def with_siblings(graph, outputs):
"""Include all missing siblings."""
siblings = set()
for node in outputs:
siblings |= graph.siblings(node)
return siblings


option_check_siblings = click.option(
"--check-siblings",
"siblings",
flag_value=check_siblings,
default=True,
help=check_siblings.__doc__,
type=click.types.UnprocessedParamType(),
)
option_with_siblings = click.option(
"--with-siblings",
"siblings",
flag_value=with_siblings,
default=True,
help=with_siblings.__doc__,
type=click.types.UnprocessedParamType(),
)


def option_siblings(func):
"""Combine siblings options."""
return option_check_siblings(option_with_siblings(func))


option_external_storage_requested = click.option(
"external_storage_requested",
"--external-storage/--no-external-storage",
Expand Down
91 changes: 68 additions & 23 deletions renku/core/commands/rerun.py
Expand Up @@ -15,17 +15,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku rerun command."""
"""Renku ``rerun`` command."""

from collections import defaultdict
from typing import List, Set

from renku.core import errors
from renku.core.commands.update import execute_workflow
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.provenance.activity import Activity
from renku.core.utils import communication
from renku.core.utils.metadata import add_activity_if_recent
from renku.core.utils.os import get_relative_paths


def rerun_workflows():
def rerun_command():
"""Recreate files generated by a sequence of ``run`` commands."""
return (
Command()
.command(_rerun_workflows)
.command(_rerun)
.require_migration()
.require_clean()
.require_nodejs()
Expand All @@ -35,23 +45,58 @@ def rerun_workflows():


@inject.autoparams()
def _rerun_workflows(revision, roots, siblings, inputs, paths, client_dispatcher: IClientDispatcher):
pass
# TODO: implement with new database
# graph = Graph(client)
# outputs = graph.build(paths=paths, revision=revision)

# # Check or extend siblings of outputs.
# outputs = siblings(graph, outputs)
# output_paths = {node.path for node in outputs}

# # Normalize and check all starting paths.
# roots = {graph.normalize_path(root) for root in roots}
# output_paths -= roots
# outputs = [o for o in outputs if o.path not in roots]

# # Generate workflow and check inputs.
# # NOTE The workflow creation is done before opening a new file.
# workflow = inputs(graph.as_workflow(input_paths=roots, output_paths=output_paths, outputs=outputs))

# execute_workflow(workflow=workflow, output_paths=output_paths, command_name="rerun", update_commits=False)
def _rerun(
sources: List[str], paths: List[str], client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway
):
client = client_dispatcher.current_client

sources = sources or []
sources = get_relative_paths(base=client.path, paths=sources)
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

activities = _get_activities(paths, sources, activity_gateway)

if len(activities) == 0:
raise errors.NothingToExecuteError()

plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="rerun")


def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]:
all_activities = defaultdict(set)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]
add_activity_if_recent(activity=activity, activities=existing_activities)

for path in paths:
activities = activity_gateway.get_activities_by_generation(path)

if len(activities) == 0:
communication.warn(f"Path '{path}' is not generated by any workflows.")
continue

latest_activity = max(activities, key=lambda a: a.ended_at_time)

upstream_chains = activity_gateway.get_upstream_activity_chains(latest_activity)

if sources:
# NOTE: Add the activity to check if it also matches the condition
upstream_chains.append((latest_activity,))
# NOTE: Only include paths that is using at least one of the sources
upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[0].usages)]

# NOTE: Include activity only if any of its upstream match the condition
if upstream_chains:
include_newest_activity(latest_activity)
else:
include_newest_activity(latest_activity)

for chain in upstream_chains:
for activity in chain:
include_newest_activity(activity)

return {a for activities in all_activities.values() for a in activities}
23 changes: 2 additions & 21 deletions renku/core/commands/update.py
Expand Up @@ -37,7 +37,7 @@
from renku.core.models.workflow.plan import Plan
from renku.core.utils.datetime8601 import local_now
from renku.core.utils.git import add_to_git
from renku.core.utils.metadata import get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
from renku.version import __version__, version_url

Expand Down Expand Up @@ -92,28 +92,9 @@ def _get_downstream_activities(
"""Return an ordered list of activities so that an activities comes before all its downstream activities."""
all_activities = defaultdict(set)

def have_identical_inputs_and_outputs(activity1, activity2):
return sorted(u.entity.path for u in activity1.usages) == sorted(
u.entity.path for u in activity2.usages
) and sorted(g.entity.path for g in activity1.generations) == sorted(
g.entity.path for g in activity2.generations
)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]

if activity in existing_activities:
return

for existing_activity in existing_activities:
if have_identical_inputs_and_outputs(activity, existing_activity):
if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer
existing_activities.remove(existing_activity)
existing_activities.add(activity)
return

# No similar activity was found
existing_activities.add(activity)
add_activity_if_recent(activity=activity, activities=existing_activities)

def does_activity_generate_any_paths(activity):
is_same = any(g.entity.path in paths for g in activity.generations)
Expand Down
12 changes: 11 additions & 1 deletion renku/core/management/interface/activity_gateway.py
Expand Up @@ -18,7 +18,8 @@
"""Renku activity gateway interface."""

from abc import ABC
from typing import Dict, List, Set, Tuple
from pathlib import Path
from typing import Dict, List, Set, Tuple, Union

from renku.core.models.provenance.activity import Activity, ActivityCollection, Usage
from renku.core.models.workflow.plan import AbstractPlan
Expand All @@ -43,12 +44,21 @@ def get_all_generation_paths(self) -> List[str]:
"""Return all generation paths."""
raise NotImplementedError

def get_activities_by_generation(self, path: Union[Path, str]) -> List[Activity]:
"""Return the list of all activities that generate a path."""
raise NotImplementedError

def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]:
"""Get downstream activities that depend on this activity."""
raise NotImplementedError

def get_downstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all downstream paths of this activity."""
raise NotImplementedError

def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all upstream paths of this activity."""
raise NotImplementedError

def get_all_activities(self) -> List[Activity]:
"""Get all activities in the project."""
Expand Down

0 comments on commit c61a5ab

Please sign in to comment.