Skip to content

Commit

Permalink
feat(workflow): remove unnecessary workflows from rerun/update (#2341)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Sep 23, 2021
1 parent 34297be commit 2505c9d
Show file tree
Hide file tree
Showing 16 changed files with 574 additions and 224 deletions.
15 changes: 13 additions & 2 deletions renku/cli/rerun.py
Expand Up @@ -43,10 +43,12 @@

from renku.cli.utils.callback import ClickCallback
from renku.core import errors
from renku.core.commands.format.activity import tabulate_activities
from renku.core.commands.rerun import rerun_command


@click.command()
@click.option("--dry-run", "-n", is_flag=True, default=False, help="Show a preview of plans that will be executed.")
@click.option(
"--from",
"sources",
Expand All @@ -55,11 +57,20 @@
help="Start an execution from this file.",
)
@click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1, required=True)
def rerun(sources, paths):
def rerun(dry_run, sources, paths):
"""Recreate files generated by a sequence of ``run`` commands."""
communicator = ClickCallback()

try:
rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths)
result = (
rerun_command()
.with_communicator(communicator)
.build()
.execute(dry_run=dry_run, sources=sources, paths=paths)
)
except errors.NothingToExecuteError:
exit(1)
else:
if dry_run:
activities, modified_inputs = result.output
click.echo(tabulate_activities(activities, modified_inputs))
15 changes: 13 additions & 2 deletions renku/cli/update.py
Expand Up @@ -119,17 +119,28 @@

from renku.cli.utils.callback import ClickCallback
from renku.core import errors
from renku.core.commands.format.activity import tabulate_activities
from renku.core.commands.update import update_command


@click.command()
@click.option("--all", "-a", "update_all", is_flag=True, default=False, help="Update all outdated files.")
@click.option("--dry-run", "-n", is_flag=True, default=False, help="Show a preview of plans that will be executed.")
@click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1)
def update(update_all, paths):
def update(update_all, dry_run, paths):
"""Update existing files by rerunning their outdated workflow."""
communicator = ClickCallback()

try:
update_command().with_communicator(communicator).build().execute(update_all=update_all, paths=paths)
result = (
update_command()
.with_communicator(communicator)
.build()
.execute(update_all=update_all, dry_run=dry_run, paths=paths)
)
except errors.NothingToExecuteError:
exit(1)
else:
if dry_run:
activities, modified_inputs = result.output
click.echo(tabulate_activities(activities, modified_inputs))
54 changes: 54 additions & 0 deletions renku/core/commands/format/activity.py
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
#
# Copyright 2021 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Serializers for activities."""

from collections import namedtuple
from typing import List, Set

from renku.core.commands.format.tabulate import tabulate
from renku.core.models.provenance.activity import Activity
from renku.core.utils.os import are_paths_related


def tabulate_activities(activities: List[Activity], modified_inputs: Set[str]):
"""Return some info about the activities in a tabular form."""
collection = []
fields = "plan, execution_date, modified_inputs, outputs"
ActivityDisplay = namedtuple("ActivityDisplay", fields)

for activity in activities:
modified_usages = {
u.entity.path for u in activity.usages if any(are_paths_related(u.entity.path, m) for m in modified_inputs)
}
generations = {g.entity.path for g in activity.generations}
modified_inputs |= generations
plan = activity.association.plan.name

modified_usages = ", ".join(sorted(modified_usages))
generations = ", ".join(sorted(generations))
collection.append(ActivityDisplay(plan, activity.ended_at_time, modified_usages, generations))

return tabulate(collection=collection, columns=fields, columns_mapping=ACTIVITY_DISPLAY_COLUMNS, sort=False)


ACTIVITY_DISPLAY_COLUMNS = {
"plan": ("plan", None),
"execution_date": ("execution_date", "date executed"),
"modified_inputs": ("modified_inputs", "modified inputs"),
"outputs": ("outputs", None),
}
1 change: 1 addition & 0 deletions renku/core/commands/format/graph.py
Expand Up @@ -381,6 +381,7 @@ def rdf(graph, strict=False):
"""Valid formatting options."""

GRAPH_FORMATS = {
"jsonld": jsonld,
"json-ld": jsonld,
"json-ld-graph": jsonld_graph,
"nt": nt,
Expand Down
15 changes: 8 additions & 7 deletions renku/core/commands/format/tabulate.py
Expand Up @@ -24,7 +24,7 @@
from renku.core.models.tabulate import tabulate as tabulate_


def tabulate(collection, columns, columns_mapping, columns_alignments=None):
def tabulate(collection, columns, columns_mapping, columns_alignments=None, sort=True):
"""Format collection with a tabular output."""
if not columns:
raise errors.ParameterError("Columns cannot be empty.")
Expand All @@ -34,12 +34,13 @@ def tabulate(collection, columns, columns_mapping, columns_alignments=None):
headers, alignments = _make_headers(columns, columns_mapping, columns_alignments)

# Sort based on the first requested field
attr = list(headers.keys())[0]
try:
getter = attrgetter(attr)
collection = sorted(collection, key=lambda d: getter(d))
except TypeError:
pass
if sort:
try:
attr = list(headers.keys())[0]
getter = attrgetter(attr)
collection = sorted(collection, key=lambda d: getter(d))
except TypeError:
pass

alignments = alignments if collection else None # To avoid a tabulate bug

Expand Down
13 changes: 11 additions & 2 deletions renku/core/commands/rerun.py
Expand Up @@ -25,6 +25,7 @@
from renku.core.management.command_builder.command import Command, inject
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.workflow import sort_activities
from renku.core.models.provenance.activity import Activity
from renku.core.utils import communication
from renku.core.utils.metadata import add_activity_if_recent
Expand All @@ -46,7 +47,11 @@ def rerun_command():

@inject.autoparams()
def _rerun(
sources: List[str], paths: List[str], client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway
dry_run: bool,
sources: List[str],
paths: List[str],
client_dispatcher: IClientDispatcher,
activity_gateway: IActivityGateway,
):
client = client_dispatcher.current_client

Expand All @@ -60,6 +65,10 @@ def _rerun(
if len(activities) == 0:
raise errors.NothingToExecuteError()

activities = sort_activities(activities)
if dry_run:
return activities, set(sources)

plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="rerun")
Expand Down Expand Up @@ -87,7 +96,7 @@ def include_newest_activity(activity):
# NOTE: Add the activity to check if it also matches the condition
upstream_chains.append((latest_activity,))
# NOTE: Only include paths that is using at least one of the sources
upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[0].usages)]
upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[-1].usages)]

# NOTE: Include activity only if any of its upstream match the condition
if upstream_chains:
Expand Down
20 changes: 13 additions & 7 deletions renku/core/commands/update.py
Expand Up @@ -19,7 +19,7 @@

from collections import defaultdict
from pathlib import Path
from typing import List, Set
from typing import List, Set, Tuple

from renku.core import errors
from renku.core.commands.workflow import execute_workflow
Expand All @@ -28,6 +28,7 @@
from renku.core.management.command_builder.command import Command
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.workflow import sort_activities
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_paths
Expand All @@ -47,9 +48,9 @@ def update_command():


@inject.autoparams()
def _update(update_all, client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
if not paths and not update_all:
raise ParameterError("Either PATHS or --all/-a should be specified.")
def _update(update_all, dry_run, client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
if not paths and not update_all and not dry_run:
raise ParameterError("Either PATHS, --all/-a, or --dry-run/-n should be specified.")
if paths and update_all:
raise ParameterError("Cannot use PATHS and --all/-a at the same time.")

Expand All @@ -58,23 +59,28 @@ def _update(update_all, client_dispatcher: IClientDispatcher, activity_gateway:
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

modified_activities = _get_modified_activities(client.repo, activity_gateway)
modified_activities, modified_paths = _get_modified_activities_and_paths(client.repo, activity_gateway)
activities = _get_downstream_activities(modified_activities, activity_gateway, paths)

if len(activities) == 0:
raise errors.NothingToExecuteError()

# NOTE: When updating we only want to eliminate activities that are overridden, not their parents
activities = sort_activities(activities, remove_overridden_parents=False)
if dry_run:
return activities, modified_paths

plans = [a.plan_with_values for a in activities]

execute_workflow(plans=plans, command_name="update")


def _get_modified_activities(repo, activity_gateway) -> Set[Activity]:
def _get_modified_activities_and_paths(repo, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
"""Return latest activities that one of their inputs is modified."""
latest_activities = activity_gateway.get_latest_activity_per_plan().values()
modified, _ = get_modified_activities(activities=latest_activities, repo=repo)

return {a for a, _ in modified}
return {a for a, _ in modified}, {e.path for _, e in modified}


def _get_downstream_activities(
Expand Down
106 changes: 106 additions & 0 deletions renku/core/management/workflow/__init__.py
Expand Up @@ -16,3 +16,109 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku workflow management."""

import itertools
from collections import defaultdict
from typing import List, Set

import networkx

from renku.core.models.provenance.activity import Activity


def sort_activities(activities: List[Activity], remove_overridden_parents=True) -> List[Activity]:
"""Returns a sorted list of activities based on their dependencies and execution order."""
by_usage = defaultdict(set)
by_generation = defaultdict(set)

overridden_activities = defaultdict(set)

graph = networkx.DiGraph()

def connect_nodes_based_on_dependencies():
for activity in activities:
# NOTE: Make sure that activity is in the graph in case it has no connection to others
graph.add_node(activity)

for usage in activity.usages:
path = usage.entity.path
by_usage[path].add(activity)
parent_activities = by_generation[path]
for parent in parent_activities:
create_edge(parent, activity, path)

for generation in activity.generations:
path = generation.entity.path
by_generation[path].add(activity)
child_activities = by_usage[path]
for child in child_activities:
create_edge(activity, child, path)

def create_edge(parent, child, path: str):
if graph.has_edge(parent, child):
return
graph.add_edge(parent, child, path=path)

def connect_nodes_by_execution_order():
for path, values in by_generation.items():
if len(values) <= 1:
continue

# NOTE: Order multiple activities that generate a common path
create_order_among_activities(values, path)

def create_order_among_activities(activities: Set[Activity], path):
for a, b in itertools.combinations(activities, 2):
if networkx.has_path(graph, a, b) or networkx.has_path(graph, b, a):
continue

# NOTE: More recent activity should be executed after the other one
# NOTE: This won't introduce a cycle in the graph because there is no other path between the two nodes
comparison = a.compare_to(b)
if comparison < 0:
graph.add_edge(a, b)
overridden_activities[a].add(path)
elif comparison > 0:
graph.add_edge(b, a)
overridden_activities[b].add(path)
else:
raise ValueError(f"Cannot create an order between activities {a.id} and {b.id}")

def remove_overridden_activities():
to_be_removed = set()
to_be_processed = set(overridden_activities.keys())

while len(to_be_processed) > 0:
activity = to_be_processed.pop()
overridden_paths = overridden_activities[activity]
generated_path = {g.entity.path for g in activity.generations}
if generated_path != overridden_paths:
continue

# NOTE: All generated paths are overridden; there is no point in executing the activity
to_be_removed.add(activity)

if not remove_overridden_parents:
continue

# NOTE: Check if its parents can be removed as well
for parent in graph.predecessors(activity):
if parent in to_be_removed:
continue
data = graph.get_edge_data(parent, activity)
if data and "path" in data:
overridden_activities[parent].add(data["path"])
to_be_processed.add(parent)

for activity in to_be_removed:
graph.remove_node(activity)

connect_nodes_based_on_dependencies()

if not networkx.algorithms.dag.is_directed_acyclic_graph(graph):
raise ValueError("Cannot find execution order: Project has cyclic dependencies.")

connect_nodes_by_execution_order()
remove_overridden_activities()

return list(networkx.topological_sort(graph))

0 comments on commit 2505c9d

Please sign in to comment.