Skip to content

Commit

Permalink
feat(cli): option to skip metadata update (#3025)
Browse files Browse the repository at this point in the history
Co-authored-by: Ralf Grubenmann <ralf.grubenmann@sdsc.ethz.ch>
  • Loading branch information
olevski and Panaetius committed Jul 19, 2022
1 parent b1a803e commit c89aba7
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 44 deletions.
9 changes: 7 additions & 2 deletions renku/command/rerun.py
Expand Up @@ -29,9 +29,14 @@
from renku.core.workflow.execute import execute_workflow_graph


def rerun_command():
def rerun_command(skip_metadata_update: bool):
"""Recreate files generated by a sequence of ``run`` commands."""
return Command().command(_rerun).require_migration().require_clean().with_database(write=True).with_commit()
command = Command().command(_rerun).require_migration().require_clean()
if skip_metadata_update:
command = command.with_database(write=False)
else:
command = command.with_database(write=True).with_commit()
return command


@inject.autoparams()
Expand Down
9 changes: 7 additions & 2 deletions renku/command/update.py
Expand Up @@ -36,9 +36,14 @@
from renku.core.workflow.execute import execute_workflow_graph


def update_command():
def update_command(skip_metadata_update: bool):
"""Update existing files by rerunning their outdated workflow."""
return Command().command(_update).require_migration().require_clean().with_database(write=True).with_commit()
command = Command().command(_update).require_migration().require_clean()
if skip_metadata_update:
command = command.with_database(write=False)
else:
command = command.with_database(write=True).with_commit()
return command


@inject.autoparams()
Expand Down
22 changes: 14 additions & 8 deletions renku/command/workflow.py
Expand Up @@ -82,23 +82,29 @@ def workflow_outputs_command():
return Command().command(workflow_outputs).require_migration().with_database(write=False)


def execute_workflow_command():
def execute_workflow_command(skip_metadata_update: bool):
"""Command that executes a workflow."""
return (
Command().command(execute_workflow).require_migration().require_clean().with_database(write=True).with_commit()
)
command = Command().command(execute_workflow).require_migration().require_clean()
if skip_metadata_update:
command = command.with_database(write=False)
else:
command = command.with_database(write=True).with_commit()
return command


def visualize_graph_command():
"""Execute the graph visualization command."""
return Command().command(visualize_graph).require_migration().with_database(write=False)


def iterate_workflow_command():
def iterate_workflow_command(skip_metadata_update: bool):
"""Command that executes several workflows given a set of variables."""
return (
Command().command(iterate_workflow).require_migration().require_clean().with_database(write=True).with_commit()
)
command = Command().command(iterate_workflow).require_migration().require_clean()
if skip_metadata_update:
command = command.with_database(write=False)
else:
command = command.with_database(write=True).with_commit()
return command


def revert_activity_command():
Expand Down
9 changes: 7 additions & 2 deletions renku/ui/cli/rerun.py
Expand Up @@ -51,6 +51,10 @@
If the output didn't change, it will be removed from git and re-added to ensure
that the re-execution is properly tracked.
In some cases it may be desirable to avoid updating the renku metadata
and to avoid committing this and any other change in the repository when the rerun
command is used. If this is the case then you can pass the ``--skip-metadata-update``
flag to ``renku rerun``.
.. cheatsheet::
:group: Running
Expand All @@ -70,6 +74,7 @@

@click.command()
@click.option("--dry-run", "-n", is_flag=True, default=False, help="Show a preview of plans that will be executed.")
@click.option("--skip-metadata-update", is_flag=True, help="Do not update the metadata store for the execution.")
@click.option(
"--from",
"sources",
Expand All @@ -90,7 +95,7 @@
@click.option(
"config", "-c", "--config", metavar="<config file>", help="YAML file containing configuration for the provider."
)
def rerun(dry_run, sources, paths, provider, config):
def rerun(dry_run, skip_metadata_update, sources, paths, provider, config):
"""Recreate files generated by a sequence of ``run`` commands."""
from renku.command.format.activity import tabulate_activities
from renku.command.rerun import rerun_command
Expand All @@ -99,7 +104,7 @@ def rerun(dry_run, sources, paths, provider, config):

try:
result = (
rerun_command()
rerun_command(skip_metadata_update=skip_metadata_update)
.with_communicator(communicator)
.build()
.execute(dry_run=dry_run, sources=sources, paths=paths, provider=provider, config=config)
Expand Down
10 changes: 8 additions & 2 deletions renku/ui/cli/update.py
Expand Up @@ -76,6 +76,11 @@
.. note:: If there were uncommitted changes then the command fails.
Check :program:`git status` to see details.
In some cases it may be desirable to avoid updating the renku metadata
and to avoid committing this and any other change in the repository when the update
command is run. If this is the case then you can pass the ``--skip-metadata-update``
flag to ``renku update``.
.. cheatsheet::
:group: Running
:command: $ renku update [--all] [<path>...]
Expand Down Expand Up @@ -167,7 +172,8 @@
"config", "-c", "--config", metavar="<config file>", help="YAML file containing configuration for the provider."
)
@click.option("-i", "--ignore-deleted", is_flag=True, help="Ignore deleted paths.")
def update(update_all, dry_run, paths, provider, config, ignore_deleted):
@click.option("--skip-metadata-update", is_flag=True, help="Do not update the metadata store for the execution.")
def update(update_all, dry_run, paths, provider, config, ignore_deleted, skip_metadata_update):
"""Update existing files by rerunning their outdated workflow."""
from renku.command.format.activity import tabulate_activities
from renku.command.update import update_command
Expand All @@ -176,7 +182,7 @@ def update(update_all, dry_run, paths, provider, config, ignore_deleted):

try:
result = (
update_command()
update_command(skip_metadata_update=skip_metadata_update)
.with_communicator(communicator)
.build()
.execute(
Expand Down
19 changes: 16 additions & 3 deletions renku/ui/cli/workflow.py
Expand Up @@ -149,6 +149,11 @@
parameter <param-name>'s value.
:extended:
In some cases it may be desirable to avoid updating the renku metadata
and to avoid committing this and any other change in the repository when a workflow
is executed. If this is the case then you can pass the ``--skip-metadata-update``
flag to ``renku workflow execute``.
Iterate Plans
*************
Expand Down Expand Up @@ -212,6 +217,11 @@
``10``, `20`` and ``30`` and the producing output files ``output_0.txt``,
``output_1.txt`` and ``output_2.txt`` files in this order.
In some cases it may be desirable to avoid updating the renku metadata
and to avoid committing this and any other change in the repository when a workflow
is iterated through. If this is the case then you can pass the ``--skip-metadata-update``
flag to ``renku workflow iterate``.
Exporting Plans
***************
Expand Down Expand Up @@ -1083,12 +1093,14 @@ def outputs(ctx, paths):
type=click.Path(exists=True, dir_okay=False),
help="YAML file containing parameter mappings to be used.",
)
@click.option("--skip-metadata-update", is_flag=True, help="Do not update the metadata store for the execution.")
@click.argument("name_or_id", required=True, shell_complete=_complete_workflows)
def execute(
provider,
config,
set_params,
values,
skip_metadata_update,
name_or_id,
):
"""Execute a given workflow."""
Expand All @@ -1097,7 +1109,7 @@ def execute(
communicator = ClickCallback()

result = (
execute_workflow_command()
execute_workflow_command(skip_metadata_update=skip_metadata_update)
.with_communicator(communicator)
.build()
.execute(
Expand Down Expand Up @@ -1196,6 +1208,7 @@ def visualize(sources, columns, exclude_files, ascii, interactive, no_color, pag


@workflow.command()
@click.option("--skip-metadata-update", is_flag=True, help="Do not update the metadata store for the execution.")
@click.option(
"mapping_path",
"--mapping",
Expand Down Expand Up @@ -1223,7 +1236,7 @@ def visualize(sources, columns, exclude_files, ascii, interactive, no_color, pag
@click.option("mappings", "-m", "--map", multiple=True, help="Mapping for a workflow parameter.")
@click.option("config", "-c", "--config", metavar="<config file>", help="YAML file containing config for the provider.")
@click.argument("name_or_id", required=True, shell_complete=_complete_workflows)
def iterate(name_or_id, mappings, mapping_path, dry_run, provider, config):
def iterate(name_or_id, mappings, mapping_path, dry_run, provider, config, skip_metadata_update):
"""Execute a workflow by iterating through a range of provided parameters."""
from renku.command.view_model.plan import PlanViewModel
from renku.command.workflow import iterate_workflow_command, show_workflow_command
Expand All @@ -1241,7 +1254,7 @@ def iterate(name_or_id, mappings, mapping_path, dry_run, provider, config):
_print_composite_plan(plan)

communicator = ClickCallback()
iterate_workflow_command().with_communicator(communicator).build().execute(
iterate_workflow_command(skip_metadata_update=skip_metadata_update).with_communicator(communicator).build().execute(
name_or_id=name_or_id,
mapping_path=mapping_path,
mappings=mappings,
Expand Down
19 changes: 17 additions & 2 deletions tests/cli/test_rerun.py
Expand Up @@ -25,13 +25,16 @@
import pytest

from renku.core.plugin.provider import available_workflow_providers
from renku.infrastructure.gateway.activity_gateway import ActivityGateway
from renku.infrastructure.gateway.plan_gateway import PlanGateway
from renku.infrastructure.repository import Repository
from renku.ui.cli import cli
from tests.utils import format_result_exception, write_and_commit_file


@pytest.mark.parametrize("provider", available_workflow_providers())
def test_rerun(project, renku_cli, provider):
@pytest.mark.parametrize("skip_metadata_update", [True, False])
def test_rerun(project, client, client_database_injection_manager, renku_cli, provider, skip_metadata_update):
"""Test rerun."""
output = Path(project) / "output.txt"

Expand All @@ -42,7 +45,19 @@ def test_rerun(project, renku_cli, provider):
content = output.read_text().strip()

def rerun():
assert 0 == renku_cli("rerun", "-p", provider, output).exit_code
cmd = ["rerun", "-p", provider]
if skip_metadata_update:
cmd.append("--skip-metadata-update")
cmd.append(output)
assert 0 == renku_cli(*cmd).exit_code
with client_database_injection_manager(client):
plans = PlanGateway().get_all_plans()
activities = ActivityGateway().get_all_activities()
assert len(plans) == 1
if skip_metadata_update:
assert len(activities) == 1
else:
assert len(activities) > 1
return output.read_text().strip()

for _ in range(10):
Expand Down
64 changes: 44 additions & 20 deletions tests/cli/test_update.py
Expand Up @@ -33,7 +33,8 @@


@pytest.mark.parametrize("provider", available_workflow_providers())
def test_update(runner, client, renku_cli, client_database_injection_manager, provider):
@pytest.mark.parametrize("skip_metadata_update", [True, False])
def test_update(runner, client, renku_cli, client_database_injection_manager, provider, skip_metadata_update):
"""Test output is updated when source changes."""
source = os.path.join(client.path, "source.txt")
output = os.path.join(client.path, "output.txt")
Expand All @@ -45,17 +46,22 @@ def test_update(runner, client, renku_cli, client_database_injection_manager, pr

write_and_commit_file(client.repository, source, "changed content")

exit_code, activity = renku_cli("update", "-p", provider, "--all")
cmd = ["update", "-p", provider, "--all"]
if skip_metadata_update:
cmd.append("--skip-metadata-update")
exit_code, activity = renku_cli(*cmd)

assert 0 == exit_code
plan = activity.association.plan
assert previous_activity.association.plan.id == plan.id
assert isinstance(plan, Plan)

assert "changed content" == Path(output).read_text()

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)
if skip_metadata_update:
assert activity is None
else:
plan = activity.association.plan
assert previous_activity.association.plan.id == plan.id
assert isinstance(plan, Plan)
result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)

with client_database_injection_manager(client):
activity_gateway = ActivityGateway()
Expand All @@ -64,12 +70,20 @@ def test_update(runner, client, renku_cli, client_database_injection_manager, pr
# NOTE: No ActivityCollection is created if update include only one activity
assert [] == activity_collections

if skip_metadata_update:
assert len(activity_gateway.get_all_activities()) == 1
else:
assert len(activity_gateway.get_all_activities()) == 2

result = runner.invoke(cli, ["graph", "export", "--format", "json-ld", "--strict"])
assert 0 == result.exit_code, format_result_exception(result)


@pytest.mark.parametrize("provider", available_workflow_providers())
def test_update_multiple_steps(runner, client, renku_cli, client_database_injection_manager, provider):
@pytest.mark.parametrize("skip_metadata_update", [True, False])
def test_update_multiple_steps(
runner, client, renku_cli, client_database_injection_manager, provider, skip_metadata_update
):
"""Test update in a multi-step workflow."""
source = os.path.join(client.path, "source.txt")
intermediate = os.path.join(client.path, "intermediate.txt")
Expand All @@ -84,27 +98,37 @@ def test_update_multiple_steps(runner, client, renku_cli, client_database_inject

write_and_commit_file(client.repository, source, "changed content")

exit_code, activities = renku_cli("update", "-p", provider, "--all")
cmd = ["update", "-p", provider, "--all"]
if skip_metadata_update:
cmd.append("--skip-metadata-update")
exit_code, activities = renku_cli(*cmd)

assert 0 == exit_code
plans = [a.association.plan for a in activities]
assert 2 == len(plans)
assert isinstance(plans[0], Plan)
assert isinstance(plans[1], Plan)
assert {p.id for p in plans} == {activity1.association.plan.id, activity2.association.plan.id}

assert "changed content" == Path(intermediate).read_text()
assert "changed content" == Path(output).read_text()

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)
if skip_metadata_update:
assert activities is None
else:
plans = [a.association.plan for a in activities]
assert 2 == len(plans)
assert isinstance(plans[0], Plan)
assert isinstance(plans[1], Plan)
assert {p.id for p in plans} == {activity1.association.plan.id, activity2.association.plan.id}
result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)

with client_database_injection_manager(client):
activity_gateway = ActivityGateway()
activity_collections = activity_gateway.get_all_activity_collections()

assert 1 == len(activity_collections)
assert {a.id for a in activities} == {a.id for a in activity_collections[0].activities}
all_activities = activity_gateway.get_all_activities()
if skip_metadata_update:
assert len(all_activities) == 2
else:
assert 1 == len(activity_collections)
assert {a.id for a in activities} == {a.id for a in activity_collections[0].activities}
assert len(all_activities) == 4


@pytest.mark.parametrize("provider", available_workflow_providers())
Expand Down

0 comments on commit c89aba7

Please sign in to comment.