Skip to content

Commit

Permalink
feat(core): new dataset provenance (#2181)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jul 9, 2021
1 parent e2e1a22 commit 19492d3
Show file tree
Hide file tree
Showing 28 changed files with 1,120 additions and 872 deletions.
65 changes: 39 additions & 26 deletions renku/core/commands/dataset.py
Expand Up @@ -21,6 +21,7 @@
import urllib
from collections import OrderedDict
from pathlib import Path
from typing import Optional

import click
import git
Expand All @@ -37,7 +38,8 @@
from renku.core.management.command_builder import inject
from renku.core.management.command_builder.command import Command
from renku.core.management.datasets import DATASET_METADATA_PATHS
from renku.core.models.datasets import DatasetDetailsJson, Url, generate_default_name
from renku.core.models.dataset import DatasetsProvenance
from renku.core.models.datasets import DatasetDetailsJson, DatasetTag, Url, generate_default_name
from renku.core.models.provenance.agents import Person
from renku.core.models.refs import LinkReference
from renku.core.models.tabulate import tabulate
Expand Down Expand Up @@ -72,12 +74,13 @@ def list_datasets():
def create_dataset_helper(
name,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
title=None,
description="",
creators=None,
keywords=None,
images=None,
safe_image_paths=[],
safe_image_paths=None,
):
"""Create a dataset in the repository."""
if not creators:
Expand All @@ -95,15 +98,15 @@ def create_dataset_helper(
safe_image_paths=safe_image_paths,
)

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return dataset


def create_dataset():
"""Return a command for creating an empty dataset in the current repo."""
command = Command().command(create_dataset_helper).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand All @@ -113,10 +116,11 @@ def _edit_dataset(
description,
creators,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
keywords=None,
images=[],
images=None,
skip_image_update=False,
safe_image_paths=[],
safe_image_paths=None,
):
"""Edit dataset metadata."""
creator_objs, no_email_warnings = _construct_creators(creators, ignore_email=True)
Expand Down Expand Up @@ -149,16 +153,15 @@ def _edit_dataset(
return [], no_email_warnings

dataset.to_yaml()

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return updated, no_email_warnings


def edit_dataset():
"""Command for editing dataset metadata."""
command = Command().command(_edit_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -212,6 +215,7 @@ def _add_to_dataset(
urls,
name,
client: LocalClient,
datasets_provenance: DatasetsProvenance,
external=False,
force=False,
overwrite=False,
Expand Down Expand Up @@ -272,7 +276,7 @@ def _add_to_dataset(

dataset.update_metadata_from(with_metadata)

client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)
return dataset
except DatasetNotFound:
raise DatasetNotFound(
Expand All @@ -288,7 +292,11 @@ def _add_to_dataset(
def add_to_dataset():
"""Create a command for adding data to datasets."""
command = Command().command(_add_to_dataset).lock_dataset()
return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.with_database(write=True)
.with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
)


def _list_files(datasets=None, creators=None, include=None, exclude=None, format=None, columns=None):
Expand Down Expand Up @@ -316,7 +324,7 @@ def list_files():


@inject.autoparams()
def _file_unlink(name, include, exclude, client: LocalClient, yes=False):
def _file_unlink(name, include, exclude, client: LocalClient, datasets_provenance: DatasetsProvenance, yes=False):
"""Remove matching files from a dataset."""
if not include and not exclude:
raise ParameterError(
Expand Down Expand Up @@ -349,24 +357,24 @@ def _file_unlink(name, include, exclude, client: LocalClient, yes=False):
dataset.unlink_file(item.path)

dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)

return records


def file_unlink():
"""Command for removing matching files from a dataset."""
command = Command().command(_file_unlink).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
def _remove_dataset(name, client: LocalClient):
def _remove_dataset(name, client: LocalClient, datasets_provenance: DatasetsProvenance):
"""Delete a dataset."""
dataset = client.load_dataset(name=name, strict=True)
dataset.mutate()
dataset.to_yaml()
client.update_datasets_provenance(dataset, remove=True)
datasets_provenance.remove(dataset=dataset, client=client)

client.repo.git.add(dataset.path)
client.repo.index.commit("renku dataset rm: final mutation")
Expand All @@ -385,7 +393,7 @@ def _remove_dataset(name, client: LocalClient):
def remove_dataset():
"""Command for deleting a dataset."""
command = Command().command(_remove_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -588,7 +596,7 @@ def _import_dataset(
def import_dataset():
"""Create a command for importing datasets."""
command = Command().command(_import_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -728,7 +736,12 @@ def _update_datasets(names, creators, include, exclude, ref, delete, client: Loc
def update_datasets():
"""Command for updating datasets."""
command = Command().command(_update_datasets).lock_dataset()
return command.require_migration().require_clean().with_commit(commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.require_clean()
.with_database(write=True)
.with_commit(commit_only=DATASET_METADATA_PATHS)
)


def _include_exclude(file_path, include=None, exclude=None):
Expand Down Expand Up @@ -795,7 +808,7 @@ def _filter(client: LocalClient, names=None, creators=None, include=None, exclud


@inject.autoparams()
def _tag_dataset(name, tag, description, client: LocalClient, force=False):
def _tag_dataset(name, tag, description, client: LocalClient, datasets_provenance: DatasetsProvenance, force=False):
"""Creates a new tag for a dataset."""
dataset = client.load_dataset(name, strict=True)

Expand All @@ -805,17 +818,17 @@ def _tag_dataset(name, tag, description, client: LocalClient, force=False):
raise ParameterError(e)
else:
dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)


def tag_dataset():
"""Command for creating a new tag for a dataset."""
command = Command().command(_tag_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
def _remove_dataset_tags(name, tags, client: LocalClient):
def _remove_dataset_tags(name, tags, client: LocalClient, datasets_provenance: DatasetsProvenance):
"""Removes tags from a dataset."""
dataset = client.load_dataset(name, strict=True)

Expand All @@ -825,13 +838,13 @@ def _remove_dataset_tags(name, tags, client: LocalClient):
raise ParameterError(e)
else:
dataset.to_yaml()
client.update_datasets_provenance(dataset)
datasets_provenance.add_or_update(dataset)


def remove_dataset_tags():
"""Command for removing tags from a dataset."""
command = Command().command(_remove_dataset_tags).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -861,7 +874,7 @@ def _prompt_access_token(exporter):
return communication.prompt(text_prompt, type=str)


def _prompt_tag_selection(tags):
def _prompt_tag_selection(tags) -> Optional[DatasetTag]:
"""Prompt user to chose a tag or <HEAD>."""
# Prompt user to select a tag to export
tags = sorted(tags, key=lambda t: t.created)
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/rerun.py
Expand Up @@ -30,9 +30,9 @@ def rerun_workflows():
.command(_rerun_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/run.py
Expand Up @@ -37,7 +37,7 @@

def run_command():
"""Tracking work on a specific problem."""
return Command().command(_run_command).require_migration().require_clean().with_commit().with_database(write=True)
return Command().command(_run_command).require_migration().require_clean().with_database(write=True).with_commit()


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/storage.py
Expand Up @@ -47,7 +47,7 @@ def _fix_lfs(paths, client: LocalClient):

def fix_lfs_command():
"""Fix lfs command."""
return Command().command(_fix_lfs).require_clean().with_commit(commit_if_empty=False).with_database(write=True)
return Command().command(_fix_lfs).require_clean().with_database(write=True).with_commit(commit_if_empty=False)


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/update.py
Expand Up @@ -44,9 +44,9 @@ def update_workflows():
.command(_update_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
8 changes: 8 additions & 0 deletions renku/core/errors.py
Expand Up @@ -474,3 +474,11 @@ def __init__(self):
"Please install it, for details see https://nodejs.org/en/download/package-manager/"
)
super(NodeNotFoundError, self).__init__(msg)


class ObjectNotFoundError(RenkuException):
"""Raise when an object is not found in the storage."""

def __init__(self, filename):
"""Embed exception and build a custom message."""
super().__init__(f"Cannot find object: '{filename}'")

0 comments on commit 19492d3

Please sign in to comment.