diff --git a/renku/cli/rerun.py b/renku/cli/rerun.py index 83d05b3f53..700dfa103b 100644 --- a/renku/cli/rerun.py +++ b/renku/cli/rerun.py @@ -42,6 +42,7 @@ import click from renku.cli.utils.callback import ClickCallback +from renku.core import errors from renku.core.commands.rerun import rerun_command @@ -58,4 +59,7 @@ def rerun(sources, paths): """Recreate files generated by a sequence of ``run`` commands.""" communicator = ClickCallback() - rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths) + try: + rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths) + except errors.NothingToExecuteError: + exit(1) diff --git a/renku/core/commands/rerun.py b/renku/core/commands/rerun.py index ca8b698c47..0147bd31ff 100644 --- a/renku/core/commands/rerun.py +++ b/renku/core/commands/rerun.py @@ -16,14 +16,18 @@ # See the License for the specific language governing permissions and # limitations under the License. """Renku ``rerun`` command.""" + from collections import defaultdict -from typing import List +from typing import List, Set +from renku.core import errors from renku.core.commands.update import execute_workflow from renku.core.management.command_builder.command import Command, inject from renku.core.management.interface.activity_gateway import IActivityGateway from renku.core.management.interface.client_dispatcher import IClientDispatcher +from renku.core.models.provenance.activity import Activity from renku.core.utils import communication +from renku.core.utils.metadata import add_activity_if_recent from renku.core.utils.os import get_relative_paths @@ -53,32 +57,20 @@ def _rerun( activities = _get_activities(paths, sources, activity_gateway) - if not activities: - exit(1) + if len(activities) == 0: + raise errors.NothingToExecuteError() plans = [a.plan_with_values for a in activities] execute_workflow(plans=plans, command_name="rerun") -def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway): +def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]: all_activities = defaultdict(set) def include_newest_activity(activity): existing_activities = all_activities[activity.association.plan.id] - - if activity in existing_activities: - return - - for existing_activity in existing_activities: - if activity.has_identical_inputs_and_outputs_as(existing_activity): - if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer - existing_activities.remove(existing_activity) - existing_activities.add(activity) - return - - # No similar activity was found - existing_activities.add(activity) + add_activity_if_recent(activity=activity, activities=existing_activities) for path in paths: activities = activity_gateway.get_activities_by_generation(path) diff --git a/renku/core/commands/update.py b/renku/core/commands/update.py index 94d57bc3fe..ab6d732153 100644 --- a/renku/core/commands/update.py +++ b/renku/core/commands/update.py @@ -37,7 +37,7 @@ from renku.core.models.workflow.plan import Plan from renku.core.utils.datetime8601 import local_now from renku.core.utils.git import add_to_git -from renku.core.utils.metadata import get_modified_activities +from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities from renku.core.utils.os import get_relative_paths from renku.version import __version__, version_url @@ -94,19 +94,7 @@ def _get_downstream_activities( def include_newest_activity(activity): existing_activities = all_activities[activity.association.plan.id] - - if activity in existing_activities: - return - - for existing_activity in existing_activities: - if activity.has_identical_inputs_and_outputs_as(existing_activity): - if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer - existing_activities.remove(existing_activity) - existing_activities.add(activity) - return - - # No similar activity was found - existing_activities.add(activity) + add_activity_if_recent(activity=activity, activities=existing_activities) def does_activity_generate_any_paths(activity): is_same = any(g.entity.path in paths for g in activity.generations) diff --git a/renku/core/utils/metadata.py b/renku/core/utils/metadata.py index f5b7003f43..2b4d5f3c83 100644 --- a/renku/core/utils/metadata.py +++ b/renku/core/utils/metadata.py @@ -94,3 +94,19 @@ def get_modified_activities( modified.add((activity, entity)) return modified, deleted + + +def add_activity_if_recent(activity: Activity, activities: Set[Activity]): + """Add ``activity`` to ``activities`` if it's not in the set or is the latest executed instance.""" + if activity in activities: + return + + for existing_activity in activities: + if activity.has_identical_inputs_and_outputs_as(existing_activity): + if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer + activities.remove(existing_activity) + activities.add(activity) + return + + # NOTE: No similar activity was found + activities.add(activity) diff --git a/tests/cli/test_rerun.py b/tests/cli/test_rerun.py index b0d48cb7e2..04e592bc91 100644 --- a/tests/cli/test_rerun.py +++ b/tests/cli/test_rerun.py @@ -171,7 +171,7 @@ def test_rerun_with_edited_inputs(project, run, no_lfs_warning): def test_rerun_with_no_execution(project, runner): - """Test update and rerun of an input.""" + """Test rerun when no workflow is executed.""" repo = git.Repo(project) input = os.path.join(project, "data", "input.txt") write_and_commit_file(repo, input, "content")