Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): new dataset provenance #2181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 39 additions & 26 deletions renku/core/commands/dataset.py
Expand Up @@ -21,6 +21,7 @@
import urllib
from collections import OrderedDict
from pathlib import Path
from typing import Optional

import click
import git
Expand All @@ -37,7 +38,8 @@
from renku.core.management.command_builder import inject
from renku.core.management.command_builder.command import Command
from renku.core.management.datasets import DATASET_METADATA_PATHS
from renku.core.models.datasets import DatasetDetailsJson, Url, generate_default_name
from renku.core.models.dataset import DatasetsProvenance
from renku.core.models.datasets import DatasetDetailsJson, DatasetTag, Url, generate_default_name
from renku.core.models.provenance.agents import Person
from renku.core.models.refs import LinkReference
from renku.core.models.tabulate import tabulate
Expand Down Expand Up @@ -72,12 +74,13 @@ def list_datasets():
def create_dataset_helper(
name,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
title=None,
description="",
creators=None,
keywords=None,
images=None,
safe_image_paths=[],
safe_image_paths=None,
):
"""Create a dataset in the repository."""
if not creators:
Expand All @@ -95,15 +98,15 @@ def create_dataset_helper(
safe_image_paths=safe_image_paths,
)

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return dataset


def create_dataset():
"""Return a command for creating an empty dataset in the current repo."""
command = Command().command(create_dataset_helper).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand All @@ -113,10 +116,11 @@ def _edit_dataset(
description,
creators,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
keywords=None,
images=[],
images=None,
skip_image_update=False,
safe_image_paths=[],
safe_image_paths=None,
):
"""Edit dataset metadata."""
creator_objs, no_email_warnings = _construct_creators(creators, ignore_email=True)
Expand Down Expand Up @@ -149,16 +153,15 @@ def _edit_dataset(
return [], no_email_warnings

dataset.to_yaml()

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return updated, no_email_warnings


def edit_dataset():
"""Command for editing dataset metadata."""
command = Command().command(_edit_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -212,6 +215,7 @@ def _add_to_dataset(
urls,
name,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
external=False,
force=False,
overwrite=False,
Expand Down Expand Up @@ -270,7 +274,7 @@ def _add_to_dataset(

dataset.update_metadata_from(with_metadata)

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)
return dataset
except DatasetNotFound:
raise DatasetNotFound(
Expand All @@ -286,7 +290,11 @@ def _add_to_dataset(
def add_to_dataset():
"""Create a command for adding data to datasets."""
command = Command().command(_add_to_dataset).lock_dataset()
return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.with_database(write=True)
.with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
)


def _list_files(datasets=None, creators=None, include=None, exclude=None, format=None, columns=None):
Expand Down Expand Up @@ -314,7 +322,7 @@ def list_files():


@inject.autoparams()
def _file_unlink(name, include, exclude, client: LocalClient, yes=False):
def _file_unlink(name, include, exclude, client: LocalClient, datasets_provenance: DatasetsProvenance, yes=False):
"""Remove matching files from a dataset."""
if not include and not exclude:
raise ParameterError(
Expand Down Expand Up @@ -347,24 +355,24 @@ def _file_unlink(name, include, exclude, client: LocalClient, yes=False):
dataset.unlink_file(item.path)

dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return records


def file_unlink():
"""Command for removing matching files from a dataset."""
command = Command().command(_file_unlink).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
def _remove_dataset(name, client: LocalClient):
def _remove_dataset(name, client: LocalClient, datasets_provenance: DatasetsProvenance):
"""Delete a dataset."""
dataset = client.load_dataset(name=name, strict=True)
dataset.mutate()
dataset.to_yaml()
client.update_datasets_provenance(dataset, remove=True)
datasets_provenance.remove(dataset=dataset, client=client)

client.repo.git.add(dataset.path)
client.repo.index.commit("renku dataset rm: final mutation")
Expand All @@ -383,7 +391,7 @@ def _remove_dataset(name, client: LocalClient):
def remove_dataset():
"""Command for deleting a dataset."""
command = Command().command(_remove_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -585,7 +593,7 @@ def _import_dataset(
def import_dataset():
"""Create a command for importing datasets."""
command = Command().command(_import_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -725,7 +733,12 @@ def _update_datasets(names, creators, include, exclude, ref, delete, client: Loc
def update_datasets():
"""Command for updating datasets."""
command = Command().command(_update_datasets).lock_dataset()
return command.require_migration().require_clean().with_commit(commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.require_clean()
.with_database(write=True)
.with_commit(commit_only=DATASET_METADATA_PATHS)
)


def _include_exclude(file_path, include=None, exclude=None):
Expand Down Expand Up @@ -792,7 +805,7 @@ def _filter(client: LocalClient, names=None, creators=None, include=None, exclud


@inject.autoparams()
def _tag_dataset(name, tag, description, client: LocalClient, force=False):
def _tag_dataset(name, tag, description, client: LocalClient, datasets_provenance: DatasetsProvenance, force=False):
"""Creates a new tag for a dataset."""
dataset = client.load_dataset(name, strict=True)

Expand All @@ -802,17 +815,17 @@ def _tag_dataset(name, tag, description, client: LocalClient, force=False):
raise ParameterError(e)
else:
dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)


def tag_dataset():
"""Command for creating a new tag for a dataset."""
command = Command().command(_tag_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
def _remove_dataset_tags(name, tags, client: LocalClient):
def _remove_dataset_tags(name, tags, client: LocalClient, datasets_provenance: DatasetsProvenance):
"""Removes tags from a dataset."""
dataset = client.load_dataset(name, strict=True)

Expand All @@ -822,13 +835,13 @@ def _remove_dataset_tags(name, tags, client: LocalClient):
raise ParameterError(e)
else:
dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)


def remove_dataset_tags():
"""Command for removing tags from a dataset."""
command = Command().command(_remove_dataset_tags).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -858,7 +871,7 @@ def _prompt_access_token(exporter):
return communication.prompt(text_prompt, type=str)


def _prompt_tag_selection(tags):
def _prompt_tag_selection(tags) -> Optional[DatasetTag]:
"""Prompt user to chose a tag or <HEAD>."""
# Prompt user to select a tag to export
tags = sorted(tags, key=lambda t: t.created)
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/rerun.py
Expand Up @@ -30,9 +30,9 @@ def rerun_workflows():
.command(_rerun_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/run.py
Expand Up @@ -35,7 +35,7 @@

def run_command():
"""Tracking work on a specific problem."""
return Command().command(_run_command).require_migration().require_clean().with_commit().with_database(write=True)
return Command().command(_run_command).require_migration().require_clean().with_database(write=True).with_commit()


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/storage.py
Expand Up @@ -47,7 +47,7 @@ def _fix_lfs(paths, client: LocalClient):

def fix_lfs_command():
"""Fix lfs command."""
return Command().command(_fix_lfs).require_clean().with_commit(commit_if_empty=False).with_database(write=True)
return Command().command(_fix_lfs).require_clean().with_database(write=True).with_commit(commit_if_empty=False)


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/update.py
Expand Up @@ -44,9 +44,9 @@ def update_workflows():
.command(_update_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
8 changes: 8 additions & 0 deletions renku/core/errors.py
Expand Up @@ -474,3 +474,11 @@ def __init__(self):
"Please install it, for details see https://nodejs.org/en/download/package-manager/"
)
super(NodeNotFoundError, self).__init__(msg)


class ObjectNotFoundError(RenkuException):
"""Raise when an object is not found in the storage."""

def __init__(self, filename):
"""Embed exception and build a custom message."""
super().__init__(f"Cannot find object: '{filename}'")