Skip to content

Commit

Permalink
feat(core): make update work with new storage
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Sep 2, 2021
1 parent ad622bc commit ec3bc7b
Show file tree
Hide file tree
Showing 17 changed files with 550 additions and 431 deletions.
41 changes: 13 additions & 28 deletions renku/cli/update.py
Expand Up @@ -20,12 +20,14 @@
Recreating outdated files
~~~~~~~~~~~~~~~~~~~~~~~~~
The information about dependencies for each file in the repository is generated
from information stored in the underlying Git repository.
The information about dependencies for each file in a Renku project is stored
in various metadata.
A minimal dependency graph is generated for each outdated file stored in the
repository. It means that only the necessary steps will be executed and the
workflow used to orchestrate these steps is stored in the repository.
When an update command is executed, Renku looks into the most recent execution
of each workflow and checks which one is outdated (i.e. at least one of its
inputs is modified). It generates a minimal dependency graph for each outdated
file stored in the repository. It means that only the necessary steps will be
executed.
Assume that the following history for the file ``H`` exists.
Expand Down Expand Up @@ -53,7 +55,7 @@
.. code-block:: console
$ renku update E
$ renku update E H
* Update all files by simply running
Expand Down Expand Up @@ -96,44 +98,27 @@
\
(D)
An attempt to update a single file would fail with the following error.
.. code-block:: console
$ renku update C
Error: There are missing output siblings:
B
D
Include the files above in the command or use --with-siblings option.
An attempt to update a single file would updates its siblings as well.
The following commands will produce the same result.
.. code-block:: console
$ renku update --with-siblings C
$ renku update C
$ renku update B C D
"""

import click

from renku.cli.utils.callback import ClickCallback
from renku.core.commands.options import option_siblings
from renku.core.commands.update import update_workflows
from renku.core.commands.update import update_command


@click.command()
@click.option("--revision", default="HEAD")
@click.option("--no-output", is_flag=True, default=False, help="Display commands without output files.")
@click.option("--all", "-a", "update_all", is_flag=True, default=False, help="Update all outdated files.")
@option_siblings
@click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1)
def update(revision, no_output, update_all, siblings, paths):
def update(update_all, paths):
"""Update existing files by rerunning their outdated workflow."""
communicator = ClickCallback()

update_workflows().with_communicator(communicator).build().execute(
revision=revision, no_output=no_output, update_all=update_all, siblings=siblings, paths=paths
)
update_command().with_communicator(communicator).build().execute(update_all=update_all, paths=paths)
2 changes: 1 addition & 1 deletion renku/core/commands/status.py
Expand Up @@ -15,7 +15,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku show command."""
"""Renku ``status`` command."""

from collections import defaultdict
from typing import List, Set, Tuple
Expand Down
237 changes: 169 additions & 68 deletions renku/core/commands/update.py
Expand Up @@ -15,29 +15,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku update command."""
"""Renku ``update`` command."""

import uuid
from typing import Generator, List, Union

import networkx
from git import Actor

from renku.core.commands.cwl_runner import execute
from renku.core.errors import ParameterError
from renku.core.management.command_builder import inject
from renku.core.management.command_builder.command import Command
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.management.workflow.converters.cwl import CWLConverter
from renku.core.management.workflow.plan_factory import delete_indirect_files_list
from renku.core.utils.git import add_to_git
from renku.core.models.provenance.activity import Activity
from renku.core.models.workflow.composite_plan import CompositePlan, PlanCollection
from renku.core.models.workflow.plan import Plan
from renku.core.utils.datetime8601 import local_now
from renku.core.utils.git import add_to_git, get_modified_entities
from renku.core.utils.os import get_relative_paths
from renku.version import __version__, version_url


def update_workflows():
def update_command():
"""Update existing files by rerunning their outdated workflow."""
return (
Command()
.command(_update_workflows)
.command(_update)
.require_migration()
.require_clean()
.require_nodejs()
Expand All @@ -46,96 +51,192 @@ def update_workflows():
)


def _update_workflows(revision, no_output, update_all, siblings, paths):
@inject.autoparams()
def _update(update_all, client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
if not paths and not update_all:
raise ParameterError("Either PATHS or --all/-a should be specified.")
if paths and update_all:
raise ParameterError("Cannot use PATHS and --all/-a at the same time.")

# TODO: Implement this properly with new database
# graph = Graph()
# outputs = graph.build(revision=revision, can_be_cwl=no_output, paths=paths)
# outputs = {node for node in outputs if graph.need_update(node)}
# if not outputs:
# communication.echo("All files were generated from the latest inputs.")
# sys.exit(0)
client = client_dispatcher.current_client

# # Check or extend siblings of outputs.
# outputs = siblings(graph, outputs)
# output_paths = {node.path for node in outputs if _safe_path(node.path)}
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

# # Get all clean nodes.
# input_paths = {node.path for node in graph.nodes} - output_paths
modified_activities = _get_modified_activities(client, activity_gateway)
ordered_activities = _get_ordered_downstream_activities(modified_activities, activity_gateway, paths)

# # Store the generated workflow used for updating paths.
# workflow = graph.as_workflow(input_paths=input_paths, output_paths=output_paths, outputs=outputs)
update_plan = _create_plan_from_activity_list(ordered_activities)

# execute_workflow(workflow, output_paths, command_name="update", update_commits=True)
execute_workflow(workflow=update_plan, command_name="update")


@inject.autoparams()
def execute_workflow(
workflow,
output_paths,
command_name,
update_commits,
client_dispatcher: IClientDispatcher,
activity_gateway: IActivityGateway,
def _get_modified_activities(client, activity_gateway) -> Generator[Activity, None, None]:
"""Return latest activities that one of their inputs is modified."""
latest_activities = activity_gateway.get_latest_activity_per_plan().values()

used_entities = (u.entity for a in latest_activities for u in a.usages)
modified, _ = get_modified_entities(entities=used_entities, repo=client.repo)
return (a for a in latest_activities if any(u.entity in modified for u in a.usages))


def _get_ordered_downstream_activities(
starting_activities: Generator[Activity, None, None], activity_gateway: IActivityGateway, paths: List[str]
):
"""Execute a Run with/without subprocesses."""
client = client_dispatcher.current_client
"""Return an ordered list of activities so that an activities comes before all its downstream activities."""
graph = networkx.DiGraph()

wf, path = CWLConverter.convert(workflow, client.path)
# Don't compute paths if storage is disabled.
if client.check_external_storage():
# Make sure all inputs are pulled from a storage.
paths_ = (i.consumes.path for i in workflow.inputs)
client.pull_paths_from_storage(*paths_)
activities = set(starting_activities)
while activities:
activity = activities.pop()
child_activities = activity_gateway.get_downstream_activities(activity, max_depth=1)

delete_indirect_files_list(client.path)
if len(child_activities) > 0:
activities |= child_activities
for child in child_activities:
graph.add_edge(activity, child)
elif activity not in graph:
graph.add_node(activity)

# Execute the workflow and relocate all output files.
# FIXME get new output paths for edited tools
# output_paths = {path for _, path in workflow.iter_output_files()}
execute(path, output_paths=output_paths)
if paths:
tail_activities = {activity for activity in graph if any(g.entity.path in paths for g in activity.generations)}

paths = [o.produces.path for o in workflow.outputs]
# NOTE: Add tail nodes and their ancestors are required for an update
required_activities = tail_activities.copy()
for activity in tail_activities:
parents = networkx.ancestors(graph, activity)
required_activities.update(parents)

add_to_git(client.repo.git, *paths)
original_graph = graph.copy()
# Exclude non-required activities
for activity in original_graph:
if activity not in required_activities:
graph.remove_node(activity)

if client.repo.is_dirty():
commit_msg = f"renku {command_name}: committing {len(paths)} newly added files"
return list(networkx.algorithms.dag.topological_sort(graph))

committer = Actor(f"renku {__version__}", version_url)

client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True)
def _create_plan_from_activity_list(activities: List[Activity]) -> Union[Plan, PlanCollection]:
"""Create a CompositePlan by using Plans from an activity list."""
plans = [a.to_plan() for a in activities]

workflow_name = f"{uuid.uuid4().hex}_{command_name}.yaml"
if len(plans) == 1:
return plans[0]

path = client.workflow_path / workflow_name
return PlanCollection(id=PlanCollection.generate_id(), plans=plans, name=f"plan-collection-{uuid.uuid4().hex}")

workflow.update_id_and_label_from_commit_path(client, client.repo.head.commit, path)

# TODO: implement properly with new database
# if not workflow.subprocesses: # Update parameters if there is only one step
# _update_run_parameters(run=workflow, working_dir=client.path)
@inject.autoparams()
def execute_workflow(
workflow: Union[CompositePlan, Plan, PlanCollection],
command_name,
client_dispatcher: IClientDispatcher,
activity_gateway: IActivityGateway,
):
"""Execute a Run with/without subprocesses."""
client = client_dispatcher.current_client

# NOTE: Pull inputs from Git LFS or other storage backends
if client.check_external_storage():
inputs = [i.actual_value for i in workflow.inputs]
client.pull_paths_from_storage(*inputs)

# cls = WorkflowRun if workflow.subprocesses else ProcessRun
# activity = cls.from_run(run=workflow, path=path, update_commits=update_commits)
# activity.to_yaml(path=path)
# client.add_to_activity_index(activity)
delete_indirect_files_list(client.path)

# activity_gateway.add(activity)
started_at_time = local_now()

modified_outputs = _execute_workflow_helper(workflow=workflow, client=client)

# def _update_run_parameters(run, working_dir):
ended_at_time = local_now()

# default_parameters = {p.name: p for p in run.run_parameters}
add_to_git(client.repo.git, *modified_outputs)

# indirect_parameters = read_indirect_parameters(working_dir)
# for name, value in indirect_parameters.items():
# id_ = RunParameter.generate_id(run_id=run._id, name=name)
# parameter = RunParameter(id=id_, name=name, value=value)
# default_parameters[name] = parameter
if client.repo.is_dirty():
postfix = "s" if len(modified_outputs) > 1 else ""
commit_msg = f"renku {command_name}: committing {len(modified_outputs)} modified file{postfix}"
committer = Actor(f"renku {__version__}", version_url)
client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True)

# run.run_parameters = list(default_parameters.values())
activity = Activity.from_plan(plan=workflow, started_at_time=started_at_time, ended_at_time=ended_at_time)

activity_gateway.add(activity)


# TODO: This function is created as a patch from renku/core/commands/workflow.py::_execute_workflow and
# renku/core/management/workflow/providers/cwltool_provider.py::CWLToolProvider::workflow_execute in the ``workflow
# execute`` PR (renku-python/pull/2273). Once the PR is merged remove this function and refactor
# renku/core/commands/workflow.py::_execute_workflow to accept a Plan and use it here.
def _execute_workflow_helper(workflow: Union[Plan, PlanCollection], client):
"""Executes a given workflow using cwltool."""
import os
import shutil
import sys
import tempfile
from pathlib import Path
from urllib.parse import unquote

import cwltool.factory
from cwltool.context import LoadingContext, RuntimeContext

from renku.core import errors
from renku.core.commands.echo import progressbar

basedir = client.path

with tempfile.NamedTemporaryFile() as f:
# export Plan to cwl
from renku.core.management.workflow.converters.cwl import CWLExporter

converter = CWLExporter()
converter.workflow_convert(workflow=workflow, basedir=basedir, output=Path(f.name), output_format=None)

# run cwl with cwltool
argv = sys.argv
sys.argv = ["cwltool"]

runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True}
loading_args = {"relax_path_checks": True}

# Keep all environment variables.
runtime_context = RuntimeContext(kwargs=runtime_args)
loading_context = LoadingContext(kwargs=loading_args)

factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context)
process = factory.make(os.path.relpath(str(f.name)))

try:
outputs = process()
except cwltool.factory.WorkflowStatus:
raise errors.RenkuException("WorkflowExecuteError()")

sys.argv = argv

# Move outputs to correct location in the repository.
output_dirs = process.factory.executor.output_dirs

def remove_prefix(location, prefix="file://"):
if location.startswith(prefix):
return unquote(location[len(prefix) :])
return unquote(location)

locations = {remove_prefix(output["location"]) for output in outputs.values()}
# make sure to not move an output if it's containing directory gets moved
locations = {
location for location in locations if not any(location.startswith(d) for d in locations if location != d)
}

output_paths = []
with progressbar(locations, label="Moving outputs") as bar:
for location in bar:
for output_dir in output_dirs:
if location.startswith(output_dir):
output_path = location[len(output_dir) :].lstrip(os.path.sep)
destination = basedir / output_path
output_paths.append(destination)
if destination.is_dir():
shutil.rmtree(str(destination))
destination = destination.parent
shutil.move(location, str(destination))
continue

return client.remove_unmodified(output_paths)
2 changes: 1 addition & 1 deletion renku/core/management/interface/activity_gateway.py
Expand Up @@ -35,7 +35,7 @@ def get_plans_and_usages_for_latest_activities(self) -> Dict[AbstractPlan, List[
"""Get all usages associated with a plan by its latest activity."""
raise NotImplementedError

def get_downstream_activities(self, activity: Activity) -> Set[Activity]:
def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]:
"""Get downstream activities that depend on this activity."""
raise NotImplementedError

Expand Down

0 comments on commit ec3bc7b

Please sign in to comment.