Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Sep 13, 2021
1 parent 8d8160b commit 2aa8abd
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
6 changes: 5 additions & 1 deletion renku/cli/rerun.py
Expand Up @@ -42,6 +42,7 @@
import click

from renku.cli.utils.callback import ClickCallback
from renku.core import errors
from renku.core.commands.rerun import rerun_command


Expand All @@ -58,4 +59,7 @@ def rerun(sources, paths):
"""Recreate files generated by a sequence of ``run`` commands."""
communicator = ClickCallback()

rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths)
try:
rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths)
except errors.NothingToExecuteError:
exit(1)
26 changes: 9 additions & 17 deletions renku/core/commands/rerun.py
Expand Up @@ -16,14 +16,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku ``rerun`` command."""

from collections import defaultdict
from typing import List
from typing import List, Set

from renku.core import errors
from renku.core.commands.update import execute_workflow
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.provenance.activity import Activity
from renku.core.utils import communication
from renku.core.utils.metadata import add_activity_if_recent
from renku.core.utils.os import get_relative_paths


Expand Down Expand Up @@ -53,32 +57,20 @@ def _rerun(

activities = _get_activities(paths, sources, activity_gateway)

if not activities:
exit(1)
if len(activities) == 0:
raise errors.NothingToExecuteError()

plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="rerun")


def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway):
def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]:
all_activities = defaultdict(set)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]

if activity in existing_activities:
return

for existing_activity in existing_activities:
if activity.has_identical_inputs_and_outputs_as(existing_activity):
if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer
existing_activities.remove(existing_activity)
existing_activities.add(activity)
return

# No similar activity was found
existing_activities.add(activity)
add_activity_if_recent(activity=activity, activities=existing_activities)

for path in paths:
activities = activity_gateway.get_activities_by_generation(path)
Expand Down
16 changes: 2 additions & 14 deletions renku/core/commands/update.py
Expand Up @@ -37,7 +37,7 @@
from renku.core.models.workflow.plan import Plan
from renku.core.utils.datetime8601 import local_now
from renku.core.utils.git import add_to_git
from renku.core.utils.metadata import get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
from renku.version import __version__, version_url

Expand Down Expand Up @@ -94,19 +94,7 @@ def _get_downstream_activities(

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]

if activity in existing_activities:
return

for existing_activity in existing_activities:
if activity.has_identical_inputs_and_outputs_as(existing_activity):
if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer
existing_activities.remove(existing_activity)
existing_activities.add(activity)
return

# No similar activity was found
existing_activities.add(activity)
add_activity_if_recent(activity=activity, activities=existing_activities)

def does_activity_generate_any_paths(activity):
is_same = any(g.entity.path in paths for g in activity.generations)
Expand Down
16 changes: 16 additions & 0 deletions renku/core/utils/metadata.py
Expand Up @@ -94,3 +94,19 @@ def get_modified_activities(
modified.add((activity, entity))

return modified, deleted


def add_activity_if_recent(activity: Activity, activities: Set[Activity]):
"""Add ``activity`` to ``activities`` if it's not in the set or is the latest executed instance."""
if activity in activities:
return

for existing_activity in activities:
if activity.has_identical_inputs_and_outputs_as(existing_activity):
if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer
activities.remove(existing_activity)
activities.add(activity)
return

# NOTE: No similar activity was found
activities.add(activity)
2 changes: 1 addition & 1 deletion tests/cli/test_rerun.py
Expand Up @@ -171,7 +171,7 @@ def test_rerun_with_edited_inputs(project, run, no_lfs_warning):


def test_rerun_with_no_execution(project, runner):
"""Test update and rerun of an input."""
"""Test rerun when no workflow is executed."""
repo = git.Repo(project)
input = os.path.join(project, "data", "input.txt")
write_and_commit_file(repo, input, "content")
Expand Down

0 comments on commit 2aa8abd

Please sign in to comment.