Skip to content

Commit

Permalink
feat(core): show status for specific paths (#2287)
Browse files Browse the repository at this point in the history
* feat(core): show status for specific paths
* fix(core) fix serialization of zope interfaces (#2294)

Co-authored-by: Ralf Grubenmann <ralf.grubenmann@sdsc.ethz.ch>
  • Loading branch information
m-alisafaee and Panaetius committed Sep 1, 2021
1 parent 9f5dcce commit d69a91c
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 82 deletions.
59 changes: 37 additions & 22 deletions renku/cli/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,45 @@
Inspecting a repository
~~~~~~~~~~~~~~~~~~~~~~~
Displays paths of outputs which were generated from newer inputs files
and paths of files that have been used in diverent versions.
``renku status`` command can be used to check if there are output files in
a repository that are outdated and need to be re-generated. Output files get
outdated due to changes in input data or source code (i.e. dependencies).
The first paths are what need to be recreated by running ``renku update``.
See more in section about :ref:`renku update <cli-update>`.
This command shows a list of output files that need to be updated along with
a list of modified inputs for each file. It also display deleted inputs files
if any.
To check for a specific input or output files, you can pass them to this command:
.. code-block:: console
$ renku status path/to/file1 path/to/file2
In this case, renku only checks if the specified path or paths are modified or
outdated and need an update, instead of checking all inputs and outputs.
The paths mentioned in the output are made relative to the current directory
if you are working in a subdirectory (this is on purpose, to help
cutting and pasting to other commands). They also contain first 8 characters
of the corresponding commit identifier after the ``#`` (hash). If the file was
imported from another repository, the short name of is shown together with the
filename before ``@``.
cutting and pasting to other commands).
"""

import click

from renku.cli.utils.callback import ClickCallback
from renku.core.commands.status import get_status
from renku.core.commands.status import get_status_command


@click.command()
@click.pass_context
def status(ctx):
@click.argument("paths", type=click.Path(exists=True, dir_okay=False), nargs=-1)
def status(ctx, paths):
"""Show a status of the repository."""
communicator = ClickCallback()
result = get_status().with_communicator(communicator).build().execute()
result = get_status_command().with_communicator(communicator).build().execute(paths=paths)

stales, modified, deleted = result.output
stales, stale_activities, modified, deleted = result.output

if not modified and not deleted:
if not stales and not deleted and not stale_activities:
click.secho("Everything is up-to-date.", fg="green")
return

Expand All @@ -58,15 +67,17 @@ def status(ctx):
f"Outdated outputs({len(stales)}):\n"
# TODO: Enable once renku workflow visualize is implemented
# " (use `renku workflow visualize [<file>...]` to see the full lineage)\n"
" (use `renku update [<file>...]` to generate the file from its latest inputs)\n"
" (use `renku update --all` to generate the file from its latest inputs)\n"
)
for k, v in stales.items():
paths = click.style(", ".join(sorted(v)), fg="red", bold=True)
click.echo(f"\t{k}:{paths}")
click.echo()
paths = click.style(", ".join(sorted(v)), fg="blue", bold=True)
output = click.style(k, fg="red", bold=True)
click.echo(f"\t{output}: {paths}")
else:
click.secho("All files were generated from the latest inputs.", fg="green")

click.echo()

if modified:
click.echo(
f"Modified inputs({len(modified)}):\n"
Expand All @@ -78,13 +89,17 @@ def status(ctx):
click.echo()

if deleted:
click.echo(
"Deleted files used to generate outputs:\n"
" (use `git show <sha1>:<file>` to see the file content for the given revision)\n"
)
click.echo("Deleted files used to generate outputs:\n")
for v in deleted:
click.echo(click.style(f"\t{v}", fg="blue", bold=True))
click.echo()

if stale_activities:
click.echo(f"Outdated activities that have no outputs({len(stale_activities)}):\n")
for k, v in stale_activities.items():
paths = click.style(", ".join(sorted(v)), fg="blue", bold=True)
activity = click.style(k, fg="red", bold=True)
click.echo(f"\t{activity}: {paths}")
click.echo()

ctx.exit(1 if stales else 0)
ctx.exit(1 if stales or stale_activities else 0)
98 changes: 49 additions & 49 deletions renku/core/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,88 @@
# limitations under the License.
"""Renku show command."""

import os
from collections import defaultdict
from typing import List, Tuple
from typing import List, Set, Tuple

from git import GitCommandError
from git import Repo

from renku.core.management.command_builder import inject
from renku.core.management.command_builder.command import Command
from renku.core.management.interface.activity_gateway import IActivityGateway
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.provenance.activity import Activity, Usage
from renku.core.utils import communication
from renku.core.utils.git import get_object_hash
from renku.core.utils.os import get_relative_path_to_cwd, get_relative_paths


def _get_relative_path(client, path):
"""Get a relative path to current working directory."""
return str((client.path / path).resolve().relative_to(os.getcwd()))


def get_status():
def get_status_command():
"""Show a status of the repository."""
return Command().command(_get_status).require_migration().require_clean().with_database(write=False)


@inject.autoparams()
def _get_status(client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway):
client = client_dispatcher.current_client
def _get_status(client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
def get_dependant_activities_from(start_activity):
"""Return a set of activity and all its downstream activities."""
all_activities = activity_gateway.get_downstream_activities(start_activity)
all_activities.add(start_activity)
return all_activities

def mark_generations_as_stale(activity):
for generation in activity.generations:
generation_path = get_relative_path_to_cwd(client.path / generation.entity.path)
stale_outputs[generation_path].add(usage_path)

latest_activities = activity_gateway.get_latest_activity_per_plan().values()
client = client_dispatcher.current_client

if client.has_external_files():
communication.warn(
"Changes in external files are not detected automatically. To update external files run "
"`renku dataset update -e`."
)
paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)

try:
communication.echo("On branch {0}".format(client.repo.active_branch))
except TypeError:
communication.warn("Git HEAD is detached!\n Please move back to your working branch to use renku\n")
latest_activities = activity_gateway.get_latest_activity_per_plan().values()

modified, deleted = _get_modified_paths(activities=latest_activities)
modified, deleted = _get_modified_paths(activities=latest_activities, repo=client.repo)

if not modified and not deleted:
return None, None, None
return None, None, None, None

stales = defaultdict(set)
modified_inputs = set()
stale_outputs = defaultdict(set)
stale_activities = defaultdict(set)

for activity, usage in modified:
usage_path = _get_relative_path(client, usage.entity.path)
for generation in activity.generations:
generation_path = _get_relative_path(client, generation.entity.path)
stales[generation_path].add(usage_path)
downstream_activities = activity_gateway.get_downstream_activities(activity)
paths = [_get_relative_path(client, g.entity.path) for a in downstream_activities for g in a.generations]
for p in paths:
stales[p].add(usage_path)
for start_activity, usage in modified:
usage_path = get_relative_path_to_cwd(client.path / usage.entity.path)

modified = {_get_relative_path(client, v[1].entity.path) for v in modified}
activities = get_dependant_activities_from(start_activity)

deleted = {_get_relative_path(client, d) for d in deleted}
if not paths or usage.entity.path in paths: # add all downstream activities
modified_inputs.add(usage_path)
for activity in activities:
if len(activity.generations) == 0:
stale_activities[activity.id].add(usage_path)
else:
mark_generations_as_stale(activity)
else:
for activity in activities:
if any(g.entity.path in paths for g in activity.generations):
modified_inputs.add(usage_path)
mark_generations_as_stale(activity)

return stales, modified, deleted
deleted = {get_relative_path_to_cwd(client.path / d) for d in deleted if not paths or d in paths}

return stale_outputs, stale_activities, modified_inputs, deleted

@inject.autoparams()
def _get_modified_paths(
activities: List[Activity], client_dispatcher: IClientDispatcher
) -> Tuple[List[Tuple[Activity, Usage]], List[Tuple[Activity, Usage]]]:
"""Get modified and deleted usages/inputs of a list of activities."""
client = client_dispatcher.current_client

def _get_modified_paths(activities: List[Activity], repo: Repo) -> Tuple[Set[Tuple[Activity, Usage]], Set[str]]:
"""Get modified and deleted usages/inputs of a list of activities."""
modified = set()
deleted = set()

for activity in activities:
for usage in activity.usages:
try:
current_checksum = client.repo.git.rev_parse(f"HEAD:{str(usage.entity.path)}")
except GitCommandError:
current_checksum = get_object_hash(repo=repo, path=usage.entity.path)
if not current_checksum:
deleted.add(usage.entity.path)
else:
if current_checksum != usage.entity.checksum:
modified.add((activity, usage))
elif current_checksum != usage.entity.checksum:
modified.add((activity, usage))

return modified, deleted
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def _process_workflows(client: LocalClient, activity_gateway: IActivityGateway,
pass


@inject.autoparams()
@inject.autoparams("client_dispatcher")
def _process_run_to_new_activity(process_run: old_schema.ProcessRun, client_dispatcher: IClientDispatcher) -> Activity:
"""Convert a ProcessRun to a new Activity."""
assert not isinstance(process_run, old_schema.WorkflowRun)
Expand Down
5 changes: 5 additions & 0 deletions renku/core/metadata/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from persistent.interfaces import IPickleCache
from ZODB.utils import z64
from zope.interface import implementer
from zope.interface.interface import InterfaceClass

from renku.core import errors
from renku.core.metadata.immutable import Immutable
Expand Down Expand Up @@ -633,6 +634,10 @@ def _serialize_helper(self, object):
value = object.isoformat()
elif isinstance(object, tuple):
value = tuple(self._serialize_helper(value) for value in object)
elif isinstance(object, (InterfaceClass)):
# NOTE: Zope interfaces are weird, they're a class with type InterfaceClass, but need to be deserialized
# as the class (without instantiation)
return {"@type": TYPE_TYPE, "@value": f"{object.__module__}.{object.__name__}"}
elif isinstance(object, type):
# NOTE: We're storing a type, not an instance
return {"@type": TYPE_TYPE, "@value": get_type_name(object)}
Expand Down
7 changes: 3 additions & 4 deletions renku/core/metadata/gateway/database_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ def load_downstream_relations(token, catalog, cache, database_dispatcher: IDatab
return btree[token]


# NOTE: Transitive query factory is needed for transitive (follow more than 1 edge) queries
downstream_transitive_factory = TransposingTransitive("downstream", "upstream")


class DatabaseGateway(IDatabaseGateway):
"""Gateway for base database operations."""

Expand All @@ -101,6 +97,9 @@ def initialize(self) -> None:
"""Initialize the database."""
database = self.database_dispatcher.current_database

# NOTE: Transitive query factory is needed for transitive (follow more than 1 edge) queries
downstream_transitive_factory = TransposingTransitive("downstream", "upstream")

database.clear()

database.add_index(name="activities", object_type=Activity, attribute="id")
Expand Down
47 changes: 47 additions & 0 deletions renku/core/utils/os.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
#
# Copyright 2018-2021 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OS utility functions."""

import os
from pathlib import Path
from typing import List, Union

from renku.core import errors


def get_relative_path_to_cwd(path: Union[Path, str]) -> str:
"""Get a relative path to current working directory."""
absolute_path = os.path.abspath(path)
return os.path.relpath(absolute_path, os.getcwd())


def get_relative_paths(base: Union[Path, str], paths: List[Union[Path, str]]) -> List[str]:
"""Return a list of paths relative to a base path."""
relative_paths = []

for path in paths:
try:
# NOTE: Do not use os.path.realpath or Path.resolve() because they resolve symlinks
absolute_path = os.path.abspath(os.path.join(base, path))
relative_path = Path(absolute_path).relative_to(base)
except ValueError:
raise errors.ParameterError(f"Path '{path}' is not within base path '{base}'")

relative_paths.append(str(relative_path))

return relative_paths

0 comments on commit d69a91c

Please sign in to comment.