Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): make rerun work with new storage #2319

Merged
merged 2 commits into from Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 8 additions & 70 deletions renku/cli/rerun.py
Expand Up @@ -36,92 +36,30 @@

$ renku rerun C

If you would like to recreate a file which was one of several produced by
a tool, then these files must be recreated as well. See the explanation in
:ref:`updating siblings <cli-update-with-siblings>`.
Note that all other outputs of the executed workflow will be recreated as well.
"""

import os
import sys
from pathlib import Path

import click

from renku.cli.utils.callback import ClickCallback
from renku.core import errors
from renku.core.commands.options import option_siblings
from renku.core.commands.rerun import rerun_workflows
from renku.core.management.command_builder import inject
from renku.core.management.interface.client_dispatcher import IClientDispatcher


def show_inputs(workflow):
"""Show workflow inputs and exit."""
for input_ in workflow.inputs:
click.echo("{id}: {default}".format(id=input_._id, default=input_.consumes.path))
sys.exit(0)


@inject.autoparams()
def edit_inputs(workflow, client_dispatcher: IClientDispatcher):
"""Edit workflow inputs."""
client = client_dispatcher.current_client
for input_ in workflow.inputs:
new_path = click.prompt("{0._id}".format(input_), default=input_.consumes.path)
input_.consumes.path = str(Path(os.path.abspath(new_path)).relative_to(client.path))

try:
input_.consumes.commit = client.find_previous_commit(input_.consumes.path)
except KeyError:
raise errors.DirtyRepository(f"Please commit {input_.consumes.path} before using it as an input.")

input_.consumes._id = input_.consumes.default_id()
input_.consumes._label = input_.consumes.default_label()

for step in workflow.subprocesses:
for parameter in step.process.parameters:
parameter.default_value = click.prompt("{0._id}".format(parameter), default=parameter.default_value)

return workflow
from renku.core.commands.rerun import rerun_command


@click.command()
@click.option("--revision", default="HEAD")
@click.option(
"--from",
"roots",
"sources",
type=click.Path(exists=True, dir_okay=False),
multiple=True,
help="Start an execution from this file.",
)
@option_siblings
@click.option(
"--default-inputs",
"inputs",
default=True,
flag_value=lambda workflow: workflow,
help="Use default inputs.",
type=click.types.UnprocessedParamType(),
)
@click.option(
"--show-inputs",
"inputs",
flag_value=show_inputs,
help=show_inputs.__doc__,
type=click.types.UnprocessedParamType(),
)
@click.option(
"--edit-inputs",
"inputs",
flag_value=edit_inputs,
help=edit_inputs.__doc__,
type=click.types.UnprocessedParamType(),
)
@click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1, required=True)
def rerun(revision, roots, siblings, inputs, paths):
def rerun(sources, paths):
"""Recreate files generated by a sequence of ``run`` commands."""
communicator = ClickCallback()

rerun_workflows().with_communicator(communicator).build().execute(
revision=revision, roots=roots, siblings=siblings, inputs=inputs, paths=paths
)
try:
rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths)
except errors.NothingToExecuteError:
exit(1)
52 changes: 0 additions & 52 deletions renku/core/commands/options.py
Expand Up @@ -19,8 +19,6 @@

import click

from renku.core.errors import RenkuException

from .git import set_git_isolation


Expand All @@ -45,56 +43,6 @@ def install_completion(ctx, attr, value): # pragma: no cover
)


def check_siblings(graph, outputs):
"""Check that all outputs have their siblings listed."""
siblings = set()
for node in outputs:
siblings |= graph.siblings(node)

siblings = {node.path for node in siblings}
missing = siblings - {node.path for node in outputs}
missing = {m for m in missing if all(not m.startswith(node.path) for node in outputs)}

if missing:
msg = "Include the files above in the command " "or use the --with-siblings option."
raise RenkuException(
"There are missing output siblings:\n\n"
"\t{0}\n\n{1}".format("\n\t".join(click.style(path, fg="red") for path in missing), msg)
)
return outputs


def with_siblings(graph, outputs):
"""Include all missing siblings."""
siblings = set()
for node in outputs:
siblings |= graph.siblings(node)
return siblings


option_check_siblings = click.option(
"--check-siblings",
"siblings",
flag_value=check_siblings,
default=True,
help=check_siblings.__doc__,
type=click.types.UnprocessedParamType(),
)
option_with_siblings = click.option(
"--with-siblings",
"siblings",
flag_value=with_siblings,
default=True,
help=with_siblings.__doc__,
type=click.types.UnprocessedParamType(),
)


def option_siblings(func):
"""Combine siblings options."""
return option_check_siblings(option_with_siblings(func))


option_external_storage_requested = click.option(
"external_storage_requested",
"--external-storage/--no-external-storage",
Expand Down
91 changes: 68 additions & 23 deletions renku/core/commands/rerun.py
Expand Up @@ -15,17 +15,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku rerun command."""
"""Renku ``rerun`` command."""

from collections import defaultdict
from typing import List, Set

from renku.core import errors
from renku.core.commands.update import execute_workflow
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.provenance.activity import Activity
from renku.core.utils import communication
from renku.core.utils.metadata import add_activity_if_recent
from renku.core.utils.os import get_relative_paths


def rerun_workflows():
def rerun_command():
"""Recreate files generated by a sequence of ``run`` commands."""
return (
Command()
.command(_rerun_workflows)
.command(_rerun)
.require_migration()
.require_clean()
.require_nodejs()
Expand All @@ -35,23 +45,58 @@ def rerun_workflows():


@inject.autoparams()
def _rerun_workflows(revision, roots, siblings, inputs, paths, client_dispatcher: IClientDispatcher):
pass
# TODO: implement with new database
# graph = Graph(client)
# outputs = graph.build(paths=paths, revision=revision)

# # Check or extend siblings of outputs.
# outputs = siblings(graph, outputs)
# output_paths = {node.path for node in outputs}

# # Normalize and check all starting paths.
# roots = {graph.normalize_path(root) for root in roots}
# output_paths -= roots
# outputs = [o for o in outputs if o.path not in roots]

# # Generate workflow and check inputs.
# # NOTE The workflow creation is done before opening a new file.
# workflow = inputs(graph.as_workflow(input_paths=roots, output_paths=output_paths, outputs=outputs))

# execute_workflow(workflow=workflow, output_paths=output_paths, command_name="rerun", update_commits=False)
def _rerun(
sources: List[str], paths: List[str], client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway
):
client = client_dispatcher.current_client

sources = sources or []
sources = get_relative_paths(base=client.path, paths=sources)
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

activities = _get_activities(paths, sources, activity_gateway)

if len(activities) == 0:
raise errors.NothingToExecuteError()

plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="rerun")


def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]:
all_activities = defaultdict(set)

def include_newest_activity(activity):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplicate of the one used in update, would make sense to unify them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a more general utility function and used it here. There is still a duplicated line but I believe it's more readable and re-usable this way.

existing_activities = all_activities[activity.association.plan.id]
add_activity_if_recent(activity=activity, activities=existing_activities)

for path in paths:
activities = activity_gateway.get_activities_by_generation(path)

if len(activities) == 0:
communication.warn(f"Path '{path}' is not generated by any workflows.")
continue

latest_activity = max(activities, key=lambda a: a.ended_at_time)

upstream_chains = activity_gateway.get_upstream_activity_chains(latest_activity)

if sources:
# NOTE: Add the activity to check if it also matches the condition
upstream_chains.append((latest_activity,))
# NOTE: Only include paths that is using at least one of the sources
upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[0].usages)]

# NOTE: Include activity only if any of its upstream match the condition
if upstream_chains:
include_newest_activity(latest_activity)
else:
include_newest_activity(latest_activity)

for chain in upstream_chains:
for activity in chain:
include_newest_activity(activity)

return {a for activities in all_activities.values() for a in activities}
23 changes: 2 additions & 21 deletions renku/core/commands/update.py
Expand Up @@ -37,7 +37,7 @@
from renku.core.models.workflow.plan import Plan
from renku.core.utils.datetime8601 import local_now
from renku.core.utils.git import add_to_git
from renku.core.utils.metadata import get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
from renku.version import __version__, version_url

Expand Down Expand Up @@ -92,28 +92,9 @@ def _get_downstream_activities(
"""Return an ordered list of activities so that an activities comes before all its downstream activities."""
all_activities = defaultdict(set)

def have_identical_inputs_and_outputs(activity1, activity2):
return sorted(u.entity.path for u in activity1.usages) == sorted(
u.entity.path for u in activity2.usages
) and sorted(g.entity.path for g in activity1.generations) == sorted(
g.entity.path for g in activity2.generations
)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]

if activity in existing_activities:
return

for existing_activity in existing_activities:
if have_identical_inputs_and_outputs(activity, existing_activity):
if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer
existing_activities.remove(existing_activity)
existing_activities.add(activity)
return

# No similar activity was found
existing_activities.add(activity)
add_activity_if_recent(activity=activity, activities=existing_activities)

def does_activity_generate_any_paths(activity):
is_same = any(g.entity.path in paths for g in activity.generations)
Expand Down
12 changes: 11 additions & 1 deletion renku/core/management/interface/activity_gateway.py
Expand Up @@ -18,7 +18,8 @@
"""Renku activity gateway interface."""

from abc import ABC
from typing import Dict, List, Set, Tuple
from pathlib import Path
from typing import Dict, List, Set, Tuple, Union

from renku.core.models.provenance.activity import Activity, ActivityCollection, Usage
from renku.core.models.workflow.plan import AbstractPlan
Expand All @@ -43,12 +44,21 @@ def get_all_generation_paths(self) -> List[str]:
"""Return all generation paths."""
raise NotImplementedError

def get_activities_by_generation(self, path: Union[Path, str]) -> List[Activity]:
"""Return the list of all activities that generate a path."""
raise NotImplementedError

m-alisafaee marked this conversation as resolved.
Show resolved Hide resolved
def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]:
"""Get downstream activities that depend on this activity."""
raise NotImplementedError

def get_downstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all downstream paths of this activity."""
raise NotImplementedError

def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]:
"""Get a list of tuples of all upstream paths of this activity."""
raise NotImplementedError

m-alisafaee marked this conversation as resolved.
Show resolved Hide resolved
def get_all_activities(self) -> List[Activity]:
"""Get all activities in the project."""
Expand Down