From b210037fef7b1d732d78a833097732c3320587ba Mon Sep 17 00:00:00 2001 From: Mohammad Alisafaee Date: Tue, 22 Jun 2021 19:55:02 +0200 Subject: [PATCH] feat(core): new dataset provenance --- renku/core/commands/dataset.py | 27 ++-- renku/core/commands/rerun.py | 2 +- renku/core/commands/run.py | 2 +- renku/core/commands/storage.py | 2 +- renku/core/commands/update.py | 2 +- renku/core/incubation/graph.py | 34 ++--- .../management/command_builder/database.py | 3 +- renku/core/management/datasets.py | 73 ++++------ renku/core/management/repository.py | 7 + renku/core/management/storage.py | 12 +- renku/core/models/dataset.py | 91 ++++++++++--- tests/cli/test_datasets.py | 127 ++++++++++-------- tests/cli/test_integration_datasets.py | 18 +-- tests/core/commands/test_dataset.py | 16 ++- tests/core/commands/test_storage.py | 6 - tests/core/incubation/test_command.py | 7 +- tests/fixtures/graph.py | 16 +++ 17 files changed, 256 insertions(+), 189 deletions(-) diff --git a/renku/core/commands/dataset.py b/renku/core/commands/dataset.py index 2110806744..67fd78e482 100644 --- a/renku/core/commands/dataset.py +++ b/renku/core/commands/dataset.py @@ -103,7 +103,7 @@ def create_dataset_helper( def create_dataset(): """Return a command for creating an empty dataset in the current repo.""" command = Command().command(create_dataset_helper).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -158,7 +158,7 @@ def _edit_dataset( def edit_dataset(): """Command for editing dataset metadata.""" command = Command().command(_edit_dataset).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -286,7 +286,11 @@ def _add_to_dataset( def add_to_dataset(): """Create a command for adding data to datasets.""" command = Command().command(_add_to_dataset).lock_dataset() - return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS) + return ( + command.require_migration() + .with_database(write=True) + .with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS) + ) def _list_files(datasets=None, creators=None, include=None, exclude=None, format=None, columns=None): @@ -355,7 +359,7 @@ def _file_unlink(name, include, exclude, client: LocalClient, yes=False): def file_unlink(): """Command for removing matching files from a dataset.""" command = Command().command(_file_unlink).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -383,7 +387,7 @@ def _remove_dataset(name, client: LocalClient): def remove_dataset(): """Command for deleting a dataset.""" command = Command().command(_remove_dataset).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -585,7 +589,7 @@ def _import_dataset( def import_dataset(): """Create a command for importing datasets.""" command = Command().command(_import_dataset).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -725,7 +729,12 @@ def _update_datasets(names, creators, include, exclude, ref, delete, client: Loc def update_datasets(): """Command for updating datasets.""" command = Command().command(_update_datasets).lock_dataset() - return command.require_migration().require_clean().with_commit(commit_only=DATASET_METADATA_PATHS) + return ( + command.require_migration() + .require_clean() + .with_database(write=True) + .with_commit(commit_only=DATASET_METADATA_PATHS) + ) def _include_exclude(file_path, include=None, exclude=None): @@ -808,7 +817,7 @@ def _tag_dataset(name, tag, description, client: LocalClient, force=False): def tag_dataset(): """Command for creating a new tag for a dataset.""" command = Command().command(_tag_dataset).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() @@ -828,7 +837,7 @@ def _remove_dataset_tags(name, tags, client: LocalClient): def remove_dataset_tags(): """Command for removing tags from a dataset.""" command = Command().command(_remove_dataset_tags).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() diff --git a/renku/core/commands/rerun.py b/renku/core/commands/rerun.py index db91dd811d..0a32a49589 100644 --- a/renku/core/commands/rerun.py +++ b/renku/core/commands/rerun.py @@ -30,9 +30,9 @@ def rerun_workflows(): .command(_rerun_workflows) .require_migration() .require_clean() - .with_commit() .require_nodejs() .with_database(write=True) + .with_commit() ) diff --git a/renku/core/commands/run.py b/renku/core/commands/run.py index 6339c8fa86..a6e8bc73a3 100644 --- a/renku/core/commands/run.py +++ b/renku/core/commands/run.py @@ -35,7 +35,7 @@ def run_command(): """Tracking work on a specific problem.""" - return Command().command(_run_command).require_migration().require_clean().with_commit().with_database(write=True) + return Command().command(_run_command).require_migration().require_clean().with_database(write=True).with_commit() @inject.autoparams() diff --git a/renku/core/commands/storage.py b/renku/core/commands/storage.py index ab1055207f..5c49d4cad2 100644 --- a/renku/core/commands/storage.py +++ b/renku/core/commands/storage.py @@ -47,7 +47,7 @@ def _fix_lfs(paths, client: LocalClient): def fix_lfs_command(): """Fix lfs command.""" - return Command().command(_fix_lfs).require_clean().with_commit(commit_if_empty=False).with_database(write=True) + return Command().command(_fix_lfs).require_clean().with_database(write=True).with_commit(commit_if_empty=False) @inject.autoparams() diff --git a/renku/core/commands/update.py b/renku/core/commands/update.py index 433e9a2aa7..5546b989fa 100644 --- a/renku/core/commands/update.py +++ b/renku/core/commands/update.py @@ -44,9 +44,9 @@ def update_workflows(): .command(_update_workflows) .require_migration() .require_clean() - .with_commit() .require_nodejs() .with_database(write=True) + .with_commit() ) diff --git a/renku/core/incubation/graph.py b/renku/core/incubation/graph.py index 9b6a3e245a..692ab3de7b 100644 --- a/renku/core/incubation/graph.py +++ b/renku/core/incubation/graph.py @@ -62,7 +62,7 @@ def generate_graph(): """Return a command for generating the graph.""" command = Command().command(_generate_graph).lock_project() - return command.require_migration().with_commit(commit_only=GRAPH_METADATA_PATHS).with_database(write=True) + return command.require_migration().with_database(write=True).with_commit(commit_only=GRAPH_METADATA_PATHS) @inject.autoparams() @@ -99,9 +99,9 @@ def process_datasets(commit): date = commit.authored_datetime for dataset in datasets: - client.datasets_provenance.update_dataset(dataset, revision=revision, date=date) + client.update_datasets_provenance(dataset, revision=revision, date=date, commit_database=False) for dataset in deleted_datasets: - client.datasets_provenance.remove_dataset(dataset, revision=revision, date=date) + client.update_datasets_provenance(dataset, revision=revision, date=date, commit_database=False, remove=True) commits = list(client.repo.iter_commits(paths=[f"{client.workflow_path}/*.yaml", ".renku/datasets/*/*.yml"])) n_commits = len(commits) @@ -109,15 +109,13 @@ def process_datasets(commit): if force: client.remove_graph_files() - client.remove_datasets_provenance_file() - elif client.has_graph_files() or client.has_datasets_provenance(): + elif client.has_graph_files(): raise errors.OperationError("Graph metadata exists. Use ``--force`` to regenerate it.") # database = Database.from_path(path=client.database_path) # update_injected_database(database) client.initialize_graph() - client.initialize_datasets_provenance() for n, commit in enumerate(commits, start=1): communication.echo(f"Processing commits {n}/{n_commits}", end="\r") @@ -132,7 +130,6 @@ def process_datasets(commit): communication.echo("") communication.warn(f"Cannot process commit '{commit.hexsha}' - Exception: {traceback.format_exc()}") - client.datasets_provenance.to_json() database.commit() @@ -181,7 +178,7 @@ def _status(client: LocalClient, database: Database): def update(): """Return a command for generating the graph.""" command = Command().command(_update).lock_project().with_database(write=True) - return command.require_migration().with_commit(commit_if_empty=False).require_clean().require_nodejs() + return command.require_migration().require_clean().require_nodejs().with_commit(commit_if_empty=False) @inject.autoparams() @@ -239,8 +236,9 @@ def _export_graph(format, workflows_only, strict, client: LocalClient): pg = ProvenanceGraph.from_json(client.provenance_graph_path, lazy=True) - if not workflows_only: - pg.rdf_graph.parse(location=str(client.datasets_provenance_path), format="json-ld") + # TODO: Add dataset provenance to graph + # if not workflows_only: + # pg.rdf_graph.parse(location=str(client.datasets_provenance_path), format="json-ld") graph = pg.rdf_graph @@ -395,14 +393,14 @@ def _validate_graph(rdf_graph, format): def create_dataset(): """Return a command for creating an empty dataset in the current repo.""" command = Command().command(_create_dataset).lock_dataset() - return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS) + return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS) @inject.autoparams() def _create_dataset(name, client: LocalClient, title=None, description="", creators=None, keywords=None): """Create a dataset in the repository.""" - if not client.has_datasets_provenance(): - raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate-dataset`.") + if not client.has_graph_files(): + raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate`.") return create_dataset_helper(name=name, title=title, description=description, creators=creators, keywords=keywords) @@ -410,7 +408,11 @@ def _create_dataset(name, client: LocalClient, title=None, description="", creat def add_to_dataset(): """Return a command for adding data to a dataset.""" command = Command().command(_add_to_dataset).lock_dataset() - return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS) + return ( + command.require_migration() + .with_database(write=True) + .with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS) + ) @inject.autoparams() @@ -427,8 +429,8 @@ def _add_to_dataset( ref=None, ): """Add data to a dataset.""" - if not client.has_datasets_provenance(): - raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate-dataset`.") + if not client.has_graph_files(): + raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate`.") if len(urls) == 0: raise errors.UsageError("No URL is specified") diff --git a/renku/core/management/command_builder/database.py b/renku/core/management/command_builder/database.py index aff51018d5..b5b4c7507c 100644 --- a/renku/core/management/command_builder/database.py +++ b/renku/core/management/command_builder/database.py @@ -29,13 +29,12 @@ class DatabaseCommand(Command): POST_ORDER = 5 def __init__(self, builder: Command, write: bool = False, path: str = None) -> None: - """__init__ of ProjectLock.""" self._builder = builder self._write = write self._path = path def _pre_hook(self, builder: Command, context: dict, *args, **kwargs) -> None: - """Lock the project.""" + """Create a Database singleton.""" if "client" not in context: raise ValueError("Commit builder needs a LocalClient to be set.") diff --git a/renku/core/management/datasets.py b/renku/core/management/datasets.py index 597b3974e3..e25dc80892 100644 --- a/renku/core/management/datasets.py +++ b/renku/core/management/datasets.py @@ -44,7 +44,9 @@ from yagup import GitURL from renku.core import errors +from renku.core.incubation.database import Database from renku.core.management.clone import clone +from renku.core.management.command_builder import inject from renku.core.management.command_builder.command import replace_injected_client from renku.core.management.config import RENKU_HOME from renku.core.models.dataset import DatasetProvenance @@ -126,42 +128,23 @@ def renku_pointers_path(self): path.mkdir(exist_ok=True) return path - @property - def datasets_provenance(self): - """Return dataset provenance if available.""" - if not self.has_datasets_provenance(): - return - if not self._datasets_provenance: - self._datasets_provenance = DatasetProvenance.from_json(self.datasets_provenance_path) - - return self._datasets_provenance - - def update_datasets_provenance(self, dataset, remove=False): + @inject.autoparams() + def update_datasets_provenance( + self, dataset, database: Database, *, remove=False, revision: str = None, date=None, commit_database=True + ): """Update datasets provenance for a dataset.""" - if not self.has_datasets_provenance(): + if not self.has_graph_files(): return + datasets_provenance = DatasetProvenance.from_database(database) + if remove: - self.datasets_provenance.remove_dataset(dataset=dataset, client=self) + datasets_provenance.remove_dataset(dataset=dataset, client=self, revision=revision, date=date) else: - self.datasets_provenance.update_dataset(dataset=dataset, client=self) - - self.datasets_provenance.to_json() - - def has_datasets_provenance(self): - """Return true if dataset provenance exists.""" - return self.datasets_provenance_path.exists() - - def remove_datasets_provenance_file(self): - """Remove dataset provenance.""" - try: - self.datasets_provenance_path.unlink() - except FileNotFoundError: - pass + datasets_provenance.update_dataset(dataset=dataset, client=self, revision=revision, date=date) - def initialize_datasets_provenance(self): - """Create empty dataset provenance file.""" - self.datasets_provenance_path.write_text("[]") + if commit_database: + database.commit() def datasets_from_commit(self, commit=None): """Return datasets defined in a commit.""" @@ -257,10 +240,12 @@ def with_dataset(self, name=None, create=False, immutable=False): dataset.to_yaml() + @inject.autoparams() @contextmanager - def with_dataset_provenance(self, name=None, create=False): + def with_dataset_provenance(self, database: Database, *, name=None, create=False): """Yield a dataset's metadata from dataset provenance.""" - dataset = self.load_dataset_from_provenance(name=name) + datasets_provenance = DatasetProvenance.from_database(database) + dataset = datasets_provenance.get_by_name(name=name) clean_up_required = False dataset_ref = None path = None @@ -291,17 +276,6 @@ def with_dataset_provenance(self, name=None, create=False): dataset.to_yaml(os.path.join(self.path, dataset.path, self.METADATA)) - def load_dataset_from_provenance(self, name, strict=False): - """Load latest dataset's metadata from dataset provenance file.""" - dataset = None - if name: - dataset = self.datasets_provenance.get_latest_by_name(name) - - if not dataset and strict: - raise errors.DatasetNotFound(name=name) - - return dataset - def create_dataset( self, name=None, title=None, description=None, creators=None, keywords=None, images=None, safe_image_paths=None ): @@ -651,7 +625,7 @@ def _add_from_local(self, dataset, path, external, destination): else: # Check if file is in the project and return it path_in_repo = None - if self._is_external_file(src): + if self.is_external_file(src): path_in_repo = path else: try: @@ -800,7 +774,7 @@ def _add_from_git(self, url, sources, destination, ref): new_files.append(path_in_dst_repo) - if remote_client._is_external_file(src): + if remote_client.is_external_file(src): operation = (src.resolve(), dst, "symlink") else: operation = (src, dst, "move") @@ -1040,7 +1014,7 @@ def move_files(self, files, to_dataset, commit): modified_datasets[dataset.name] = dataset modified = copy.copy(modified) modified.update_metadata(path=dst, commit=commit) - modified.external = self._is_external_file(self.path / dst) + modified.external = self.is_external_file(self.path / dst) dataset.update_files(modified) communication.update_progress(progress_name, amount=1) @@ -1156,7 +1130,7 @@ def update_dataset_git_files(self, files, ref, delete=False): if src.exists(): # Fetch file if it is tracked by Git LFS self._fetch_lfs_files(repo_path, {based_on.path}) - if remote_client._is_external_file(src): + if remote_client.is_external_file(src): self.remove_file(dst) self._create_external_file(src.resolve(), dst) else: @@ -1296,7 +1270,7 @@ def remove_file(filepath): except FileNotFoundError: pass - def _is_external_file(self, path): + def is_external_file(self, path): """Checks if a path within repo is an external file.""" if not Path(path).is_symlink() or not self._is_path_within_repo(path): return False @@ -1488,7 +1462,7 @@ def _check_url(url): if not is_git: # NOTE: Check if the url is a redirect. url = requests.head(url, allow_redirects=True).url - u = parse.urlparse(url) + _ = parse.urlparse(url) else: try: Repo(u.path, search_parent_directories=True) @@ -1501,6 +1475,7 @@ def _check_url(url): DATASET_METADATA_PATHS = [ + Path(RENKU_HOME) / "metadata", # TODO: Replace with proper constant RepositoryApiMixin.DATABASE_PATH Path(RENKU_HOME) / DatasetsApiMixin.DATASETS, Path(RENKU_HOME) / DatasetsApiMixin.DATASET_IMAGES, Path(RENKU_HOME) / DatasetsApiMixin.POINTERS, diff --git a/renku/core/management/repository.py b/renku/core/management/repository.py index 96a5ff80cb..54f6be425b 100644 --- a/renku/core/management/repository.py +++ b/renku/core/management/repository.py @@ -526,11 +526,14 @@ def initialize_graph(self, database: Database): self.database_path.mkdir(parents=True, exist_ok=True) + from renku.core.models.dataset import Dataset from renku.core.models.provenance.activity import Activity from renku.core.models.workflow.plan import Plan database.add_index(name="activities", value_type=Activity, attribute="id") database.add_index(name="plans", value_type=Plan, attribute="id") + database.add_index(name="datasets", value_type=Dataset, attribute="name") + database.add_index(name="datasets-provenance", value_type=Dataset, attribute="id") database.commit() @@ -548,6 +551,10 @@ def remove_graph_files(self): shutil.rmtree(self.database_path) except FileNotFoundError: pass + try: + self.datasets_provenance_path.unlink() + except FileNotFoundError: + pass def init_repository(self, force=False, user=None, initial_branch=None): """Initialize an empty Renku repository.""" diff --git a/renku/core/management/storage.py b/renku/core/management/storage.py index 928bcbd173..d19741a06e 100644 --- a/renku/core/management/storage.py +++ b/renku/core/management/storage.py @@ -34,7 +34,6 @@ from renku.core import errors from renku.core.incubation.database import Database from renku.core.management.command_builder.command import inject -from renku.core.models.dataset import DatasetProvenance from renku.core.models.provenance.activity import Collection from renku.core.models.provenance.provenance_graph import ProvenanceGraph from renku.core.utils import communication @@ -591,10 +590,7 @@ def _map_checksum_old(entity, checksum_mapping): activity._p_changed = True # NOTE: Update datasets provenance - datasets_provenance = DatasetProvenance.from_json(self.datasets_provenance_path) - - for dataset in datasets_provenance.datasets: - for file_ in dataset.files: - _map_checksum_old(file_.entity, sha_mapping) - - datasets_provenance.to_json() + # TODO: Fix dataset provenance + # for dataset in datasets_provenance.datasets: + # for file_ in dataset.files: + # _map_checksum_old(file_.entity, sha_mapping) diff --git a/renku/core/models/dataset.py b/renku/core/models/dataset.py index 9e0e14f15a..4e2836e73d 100644 --- a/renku/core/models/dataset.py +++ b/renku/core/models/dataset.py @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Models representing datasets.""" + from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Union @@ -29,7 +30,8 @@ from renku.core.management.command_builder.command import inject from renku.core.models import datasets as old_datasets from renku.core.models.calamus import DateTimeList, JsonLDSchema, Nested, Uri, fields, prov, renku, schema -from renku.core.models.datasets import DatasetFileSchema, generate_dataset_file_url, is_dataset_name_valid +from renku.core.models.datasets import generate_dataset_file_url, is_dataset_name_valid +from renku.core.models.entities import generate_label from renku.core.models.entity import Entity, NewEntitySchema from renku.core.models.provenance.agents import Person, PersonSchema from renku.core.utils import communication @@ -85,16 +87,6 @@ def get_default_url(self): else: raise NotImplementedError("Either url_id or url_str has to be set") - @property - def value(self): - """Returns the url value as string.""" - if self.url_str: - return self.url_str - elif self.url_id: - return self.url_id - else: - raise NotImplementedError("Either url_id or url_str has to be set") - class DatasetTag: """Represents a Tag of an instance of a dataset.""" @@ -194,13 +186,52 @@ def is_absolute(self): return bool(urlparse(self.content_url).netloc) +class RemoteEntity: + """Reference to an Entity in a remote repo.""" + + def __init__(self, *, commit_sha: str, id: str = None, path: Union[Path, str], url: str): + self.commit_sha: str = commit_sha + self.id = id or RemoteEntity.generate_id(commit_sha, path) + self.path: str = str(path) + self.url = url + + @staticmethod + def generate_id(commit_sha: str, path: Union[Path, str]) -> str: + """Generate an id.""" + path = quote(str(path)) + return f"/remote-entity/{commit_sha}/{path}" + + @classmethod + def from_dataset_file(cls, dataset_file: Optional[old_datasets.DatasetFile]) -> Optional["RemoteEntity"]: + """Create an instance by converting from renku.core.models.datasets.DatasetFile.""" + if not dataset_file: + return + commit_sha = dataset_file._label.rsplit("@", maxsplit=1)[-1] + return cls(commit_sha=commit_sha, path=dataset_file.path, url=dataset_file.url) + + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, RemoteEntity): + return False + return self.commit_sha == other.commit_sha and self.path == other.path and self.url == other.url + + def __hash__(self): + return hash((self.commit_sha, self.path, self.url)) + + def to_dataset_file(self) -> old_datasets.DatasetFile: + """Return an instance of renku.core.models.datasets.DatasetFile.""" + label = generate_label(self.path, self.commit_sha) + return old_datasets.DatasetFile(label=label, path=self.path, source=self.url, url=self.url) + + class DatasetFile: """A file in a dataset.""" def __init__( self, *, - based_on=None, + based_on: RemoteEntity = None, date_added: datetime = None, date_deleted: datetime = None, entity: Entity, @@ -211,7 +242,7 @@ def __init__( ): assert isinstance(entity, Entity), f"Invalid entity type: '{entity}'" - self.based_on = based_on + self.based_on: RemoteEntity = based_on self.date_added: datetime = fix_timezone(date_added) or local_now() self.date_deleted: datetime = fix_timezone(date_deleted) self.entity: Entity = entity @@ -226,7 +257,7 @@ def from_path(cls, client, path: Union[str, Path]) -> "DatasetFile": entity = Entity.from_revision(client=client, path=path) return cls( entity=entity, - # TODO: Set is_external + is_external=client.is_external_file(path), url=generate_dataset_file_url(client=client, filepath=entity.path), ) @@ -237,12 +268,12 @@ def from_dataset_file(cls, dataset_file: old_datasets.DatasetFile, client, revis entity = Entity.from_revision(client=client, path=dataset_file.path, revision=revision) return cls( - based_on=dataset_file.based_on, # TODO: Convert based_on + based_on=RemoteEntity.from_dataset_file(dataset_file.based_on), date_added=dataset_file.added, entity=entity, is_external=dataset_file.external, source=dataset_file.source, - url=generate_dataset_file_url(client=client, filepath=entity.path), # TODO: Fix url + url=generate_dataset_file_url(client=client, filepath=entity.path), ) @staticmethod @@ -259,9 +290,9 @@ def is_equal_to(self, other: "DatasetFile"): NOTE: id is generated randomly and should not be included in this comparison. """ - # TODO: Include based_on return ( - self.date_added == other.date_added + self.based_on == other.based_on + and self.date_added == other.date_added and self.date_deleted == other.date_deleted and self.entity == other.entity and self.is_external == other.is_external @@ -284,7 +315,7 @@ def to_dataset_file(self, client, revision="HEAD") -> Optional[old_datasets.Data client=client, revision=revision, added=self.date_added, - based_on=self.based_on, + based_on=self.based_on.to_dataset_file() if self.based_on else None, external=self.is_external, id=None, path=self.entity.path, @@ -326,8 +357,8 @@ def __init__( ): if not is_dataset_name_valid(name): raise errors.ParameterError(f"Invalid dataset name: {name}") - # TODO Verify identifier to be valid + # TODO Verify identifier to be valid self.identifier = identifier or str(uuid4()) self.id = id or Dataset.generate_id(identifier=self.identifier) self.name = name @@ -505,7 +536,7 @@ def to_dataset(self, client) -> old_datasets.Dataset: files=self._convert_to_dataset_files(client), id=None, identifier=self.identifier, - images=[image.to_image_object() for image in self.images], + images=[image.to_image_object() for image in self.images] if self.images else None, in_language=self.in_language.to_language() if self.in_language else None, keywords=self.keywords, license=self.license, @@ -672,6 +703,22 @@ class Meta: position = fields.Integer(schema.position) +class RemoteEntitySchema(JsonLDSchema): + """RemoteEntity schema.""" + + class Meta: + """Meta class.""" + + rdf_type = [prov.Entity, schema.DigitalDocument] + model = RemoteEntity + unknown = EXCLUDE + + commit_sha = fields.String(renku.commit_sha) + id = fields.Id() + path = fields.String(prov.atLocation) + url = fields.String(schema.url) + + class NewDatasetFileSchema(JsonLDSchema): """DatasetFile schema.""" @@ -682,7 +729,7 @@ class Meta: model = DatasetFile unknown = EXCLUDE - based_on = Nested(schema.isBasedOn, DatasetFileSchema, missing=None, propagate_client=False) + based_on = Nested(schema.isBasedOn, RemoteEntitySchema, missing=None) date_added = DateTimeList(schema.dateCreated, format="iso", extra_formats=("%Y-%m-%d",)) date_deleted = fields.DateTime(prov.invalidatedAtTime, missing=None, allow_none=True, format="iso") entity = Nested(prov.entity, NewEntitySchema) diff --git a/tests/cli/test_datasets.py b/tests/cli/test_datasets.py index 7d347c1388..53c92bbc0d 100644 --- a/tests/cli/test_datasets.py +++ b/tests/cli/test_datasets.py @@ -34,6 +34,7 @@ from renku.core.management.config import RENKU_HOME from renku.core.management.datasets import DatasetsApiMixin from renku.core.management.repository import DEFAULT_DATA_DIR as DATA_DIR +from renku.core.models.dataset import Dataset from renku.core.models.refs import LinkReference from renku.core.utils.urls import get_slug from tests.utils import assert_dataset_is_mutated @@ -1809,7 +1810,7 @@ def test_immutability_after_remove(directory_tree, runner, client): @pytest.mark.parametrize("use_graph", [False, True]) -def test_datasets_provenance_after_create(runner, client_with_new_graph, use_graph): +def test_datasets_provenance_after_create(runner, client_with_new_graph, use_graph, datasets_provenance): """Test datasets provenance is updated after creating a dataset.""" args = [ "dataset", @@ -1830,9 +1831,9 @@ def test_datasets_provenance_after_create(runner, client_with_new_graph, use_gra ] if use_graph: args = ["graph"] + args - assert 0 == runner.invoke(cli, args).exit_code + assert 0 == runner.invoke(cli, args, catch_exceptions=False).exit_code - dataset = next(client_with_new_graph.datasets_provenance.get_by_name("my-data")) + dataset = datasets_provenance(client_with_new_graph).get_by_name("my-data") assert "Long Title" == dataset.title assert "my-data" == dataset.name @@ -1842,19 +1843,20 @@ def test_datasets_provenance_after_create(runner, client_with_new_graph, use_gra assert "John Smiths" in [c.name for c in dataset.creators] assert "john.smiths@mail.ch" in [c.email for c in dataset.creators] assert {"keyword-1", "keyword-2"} == set(dataset.keywords) - assert client_with_new_graph.project._id == dataset.project._id + # assert client_with_new_graph.project._id == dataset.project._id # TODO: Re-enable this assert not client_with_new_graph.repo.is_dirty() -def test_datasets_provenance_after_edit(runner, client_with_new_graph): +def test_datasets_provenance_after_edit(runner, client_with_new_graph, datasets_provenance): """Test datasets provenance is updated after editing a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "create", "my-data"]).exit_code assert 0 == runner.invoke(cli, ["dataset", "edit", "my-data", "-k", "new-data"]).exit_code dataset = client_with_new_graph.load_dataset("my-data") - current_version = client_with_new_graph.datasets_provenance.get(dataset.identifier) - old_version = client_with_new_graph.datasets_provenance.get(dataset.original_identifier) + datasets_provenance = datasets_provenance(client_with_new_graph) + current_version = datasets_provenance.get_by_id(Dataset.generate_id(dataset.identifier)) + old_version = datasets_provenance.get_by_id(Dataset.generate_id(dataset.original_identifier)) assert current_version.identifier != old_version.identifier assert current_version.name == old_version.name @@ -1862,23 +1864,25 @@ def test_datasets_provenance_after_edit(runner, client_with_new_graph): assert {"new-data"} == set(current_version.keywords) -def test_datasets_provenance_after_add(runner, client_with_new_graph, directory_tree): +def test_datasets_provenance_after_add(runner, client_with_new_graph, directory_tree, datasets_provenance): """Test datasets provenance is updated after adding data to a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "add", "my-data", "-c", str(directory_tree / "file1")]).exit_code - dataset = next(client_with_new_graph.datasets_provenance.get_by_name("my-data")) + dataset = datasets_provenance(client_with_new_graph).get_by_name("my-data") path = os.path.join(DATA_DIR, "my-data", "file1") - file_ = dataset.find_file(path) + file = dataset.find_file(path) object_hash = client_with_new_graph.repo.git.rev_parse(f"HEAD:{path}") - assert object_hash in file_.entity._id - assert path in file_.entity._id - assert object_hash == file_.entity.checksum - assert path == file_.entity.path + assert object_hash in file.entity.id + assert path in file.entity.id + assert object_hash == file.entity.checksum + assert path == file.entity.path @pytest.mark.parametrize("use_graph", [False, True]) -def test_datasets_provenance_not_updated_after_same_add(runner, client_with_new_graph, directory_tree, use_graph): +def test_datasets_provenance_not_updated_after_same_add( + runner, client_with_new_graph, directory_tree, datasets_provenance, use_graph +): """Test datasets provenance is not updated if adding same files multiple times.""" command = ["graph", "dataset", "add"] if use_graph else ["dataset", "add"] assert 0 == runner.invoke(cli, command + ["my-data", "--create", str(directory_tree)]).exit_code @@ -1887,78 +1891,99 @@ def test_datasets_provenance_not_updated_after_same_add(runner, client_with_new_ assert 1 == runner.invoke(cli, command + ["my-data", str(directory_tree)]).exit_code commit_sha_after = client_with_new_graph.repo.head.object.hexsha - datasets = list(client_with_new_graph.datasets_provenance.get_by_name("my-data")) + datasets_provenance = datasets_provenance(client_with_new_graph) + provenance = datasets_provenance.get_provenance() - assert 1 == len(datasets) + assert 1 == len(provenance) assert commit_sha_before == commit_sha_after -def test_datasets_provenance_after_file_unlink(runner, client_with_new_graph, directory_tree): +def test_datasets_provenance_after_file_unlink(runner, client_with_new_graph, directory_tree, datasets_provenance): """Test datasets provenance is updated after removing data.""" assert 0 == runner.invoke(cli, ["dataset", "add", "my-data", "-c", str(directory_tree)]).exit_code assert 0 == runner.invoke(cli, ["dataset", "unlink", "my-data", "--include", "*/dir1/*"], input="y").exit_code dataset = client_with_new_graph.load_dataset("my-data") - current_version = client_with_new_graph.datasets_provenance.get(dataset.identifier) - old_version = client_with_new_graph.datasets_provenance.get(dataset.original_identifier) + datasets_provenance = datasets_provenance(client_with_new_graph) + current_version = datasets_provenance.get_by_id(Dataset.generate_id(dataset.identifier)) + old_version = datasets_provenance.get_by_id(Dataset.generate_id(dataset.original_identifier)) path = os.path.join(DATA_DIR, "my-data", directory_tree.name, "file1") - assert 1 == len(current_version.files) - assert {path} == {f.entity.path for f in current_version.files} + # NOTE: Files are not removed but they are marked as deleted + assert 3 == len(current_version.files) + existing_files = [f for f in current_version.files if not f.is_deleted()] + assert 1 == len(existing_files) + assert {path} == {f.entity.path for f in existing_files} assert 3 == len(old_version.files) + assert current_version.identifier != current_version.original_identifier -def test_datasets_provenance_after_remove(runner, client_with_new_graph, directory_tree): +def test_datasets_provenance_after_remove(runner, client_with_new_graph, directory_tree, datasets_provenance): """Test datasets provenance is updated after removing a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "add", "my-data", "-c", str(directory_tree)]).exit_code assert 0 == runner.invoke(cli, ["dataset", "rm", "my-data"]).exit_code - datasets = client_with_new_graph.datasets_provenance.get_by_name("my-data") - current_version = next(d for d in datasets if d.identifier != d.original_identifier) + datasets_provenance = datasets_provenance(client_with_new_graph) + dataset = datasets_provenance.get_by_name("my-data") + + assert dataset is None + + provenance = datasets_provenance.get_provenance() + last_version = next(d for d in provenance if d.identifier != d.original_identifier) - assert current_version.date_deleted is not None + assert 2 == len(provenance) + assert last_version.is_deleted() @pytest.mark.serial -def test_datasets_provenance_after_update(runner, client_with_new_graph, directory_tree): +def test_datasets_provenance_after_update(runner, client_with_new_graph, directory_tree, datasets_provenance): """Test datasets provenance is updated after updating a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "add", "-c", "--external", "my-data", str(directory_tree)]).exit_code directory_tree.joinpath("file1").write_text("some updates") assert 0 == runner.invoke(cli, ["dataset", "update", "--external"]).exit_code - dataset = client_with_new_graph.load_dataset("my-data") - current_version = client_with_new_graph.datasets_provenance.get(dataset.identifier) + datasets_provenance = datasets_provenance(client_with_new_graph) + current_version = datasets_provenance.get_by_name("my-data") assert current_version.identifier != current_version.original_identifier -def test_datasets_provenance_after_adding_tag(runner, client_with_new_graph): +def test_datasets_provenance_after_adding_tag(runner, client_with_new_graph, datasets_provenance): """Test datasets provenance is updated after tagging a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "create", "my-data"]).exit_code + commit_sha_before = client_with_new_graph.repo.head.object.hexsha + assert 0 == runner.invoke(cli, ["dataset", "tag", "my-data", "42.0"]).exit_code - datasets = list(client_with_new_graph.datasets_provenance.get_by_name("my-data")) + datasets_provenance = datasets_provenance(client_with_new_graph) + provenance = datasets_provenance.get_provenance() + current_version = datasets_provenance.get_by_name("my-data") + commit_sha_after = client_with_new_graph.repo.head.object.hexsha - assert 1 == len(datasets) - assert "42.0" in [t.name for t in datasets[0].tags] + assert 1 == len(provenance) + assert current_version.identifier == current_version.original_identifier + assert commit_sha_before != commit_sha_after + assert not client_with_new_graph.repo.is_dirty() -def test_datasets_provenance_after_removing_tag(runner, client_with_new_graph): +def test_datasets_provenance_after_removing_tag(runner, client_with_new_graph, datasets_provenance): """Test datasets provenance is updated after removing a dataset's tag.""" assert 0 == runner.invoke(cli, ["dataset", "create", "my-data"]).exit_code assert 0 == runner.invoke(cli, ["dataset", "tag", "my-data", "42.0"]).exit_code assert 0 == runner.invoke(cli, ["dataset", "rm-tags", "my-data", "42.0"]).exit_code - datasets = list(client_with_new_graph.datasets_provenance.get_by_name("my-data")) + datasets_provenance = datasets_provenance(client_with_new_graph) + provenance = datasets_provenance.get_provenance() + current_version = datasets_provenance.get_by_name("my-data") - assert 1 == len(datasets) - assert "42.0" not in [t.name for t in datasets[0].tags] + assert 1 == len(provenance) + assert current_version.identifier == current_version.original_identifier -def test_datasets_provenance_multiple(runner, client_with_new_graph, directory_tree): +def test_datasets_provenance_multiple(runner, client_with_new_graph, directory_tree, datasets_provenance): """Test datasets provenance is updated after multiple dataset operations.""" assert 0 == runner.invoke(cli, ["dataset", "create", "my-data"]).exit_code version_1 = client_with_new_graph.load_dataset("my-data") @@ -1971,30 +1996,22 @@ def test_datasets_provenance_multiple(runner, client_with_new_graph, directory_t assert 0 == runner.invoke(cli, ["dataset", "unlink", "my-data", "--include", "*/dir1/*"], input="y").exit_code version_5 = client_with_new_graph.load_dataset("my-data") - datasets_provenance = client_with_new_graph.datasets_provenance - - assert datasets_provenance.get(version_1.identifier) - assert datasets_provenance.get(version_2.identifier) - assert datasets_provenance.get(version_3.identifier) - assert datasets_provenance.get(version_4.identifier) - assert datasets_provenance.get(version_5.identifier) - + datasets_provenance = datasets_provenance(client_with_new_graph) -def test_datasets_provenance_get_latest(runner, client_with_new_graph, directory_tree): - """Test getting last dataset mutation.""" - assert 0 == runner.invoke(cli, ["dataset", "create", "my-data"]).exit_code - assert 0 == runner.invoke(cli, ["dataset", "edit", "my-data", "-k", "new-data"]).exit_code - assert 0 == runner.invoke(cli, ["dataset", "add", "my-data", str(directory_tree)]).exit_code - assert 0 == runner.invoke(cli, ["dataset", "tag", "my-data", "42.0"]).exit_code + assert datasets_provenance.get_by_id(Dataset.generate_id(version_1.identifier)) + assert datasets_provenance.get_by_id(Dataset.generate_id(version_2.identifier)) + assert datasets_provenance.get_by_id(Dataset.generate_id(version_3.identifier)) + assert datasets_provenance.get_by_id(Dataset.generate_id(version_4.identifier)) + assert datasets_provenance.get_by_id(Dataset.generate_id(version_5.identifier)) dataset = client_with_new_graph.load_dataset("my-data") - datasets_provenance = client_with_new_graph.datasets_provenance + dataset_in_provenance = datasets_provenance.get_by_name("my-data") - assert dataset.identifier == datasets_provenance.get_latest_by_name("my-data").identifier + assert dataset.identifier == dataset_in_provenance.identifier def test_datasets_provenance_add_file(runner, client_with_new_graph, directory_tree): - """Test getting last dataset mutation.""" + """Test add to dataset using graph command.""" file1 = str(directory_tree.joinpath("file1")) assert 0 == runner.invoke(cli, ["graph", "dataset", "add", "--create", "my-data", file1]).exit_code dir1 = str(directory_tree.joinpath("dir1")) diff --git a/tests/cli/test_integration_datasets.py b/tests/cli/test_integration_datasets.py index 1502891255..d3fb3a693c 100644 --- a/tests/cli/test_integration_datasets.py +++ b/tests/cli/test_integration_datasets.py @@ -1572,18 +1572,17 @@ def test_import_returns_last_dataset_version(runner, client, url): @pytest.mark.integration @flaky(max_runs=10, min_passes=1) -def test_datasets_provenance_after_import(runner, client_with_new_graph): +def test_datasets_provenance_after_import(runner, client_with_new_graph, datasets_provenance): """Test dataset provenance is updated after importing a dataset.""" assert 0 == runner.invoke(cli, ["dataset", "import", "-y", "--name", "my-data", "10.7910/DVN/F4NUMR"]).exit_code - dataset = next(client_with_new_graph.datasets_provenance.get_by_name("my-data"), None) - - assert dataset is not None + datasets_provenance = datasets_provenance(client_with_new_graph) + assert datasets_provenance.get_by_name("my-data") is not None @pytest.mark.integration @flaky(max_runs=10, min_passes=1) -def test_datasets_provenance_after_git_update(client_with_new_graph, runner): +def test_datasets_provenance_after_git_update(client_with_new_graph, runner, datasets_provenance): """Test dataset provenance is updated after an update.""" url = "https://github.com/SwissDataScienceCenter/renku-jupyter.git" @@ -1592,22 +1591,19 @@ def test_datasets_provenance_after_git_update(client_with_new_graph, runner): assert 0 == runner.invoke(cli, ["dataset", "update"], catch_exceptions=False).exit_code - dataset = client_with_new_graph.load_dataset("my-data") - current_version = client_with_new_graph.datasets_provenance.get(dataset.identifier) - + current_version = datasets_provenance(client_with_new_graph).get_by_name("my-data") assert current_version.identifier != current_version.original_identifier @pytest.mark.integration @flaky(max_runs=10, min_passes=1) -def test_datasets_provenance_after_external_provider_update(client_with_new_graph, runner): +def test_datasets_provenance_after_external_provider_update(client_with_new_graph, runner, datasets_provenance): """Test dataset provenance is not updated after an update from an external provider.""" doi = "10.5281/zenodo.2658634" assert 0 == runner.invoke(cli, ["dataset", "import", "-y", "--name", "my-data", doi]).exit_code assert 0 == runner.invoke(cli, ["dataset", "update", "my-data"]).exit_code - dataset = client_with_new_graph.load_dataset("my-data") - current_version = client_with_new_graph.datasets_provenance.get(dataset.identifier) + current_version = datasets_provenance(client_with_new_graph).get_by_name("my-data") assert current_version.identifier != current_version.original_identifier diff --git a/tests/core/commands/test_dataset.py b/tests/core/commands/test_dataset.py index 42ad4a7945..9328d40d6e 100644 --- a/tests/core/commands/test_dataset.py +++ b/tests/core/commands/test_dataset.py @@ -163,7 +163,9 @@ def read_value(key): def test_create_dataset_custom_message(project): """Test create dataset custom message.""" - create_dataset().with_commit_message("my dataset").build().execute("ds1", title="", description="", creators=[]) + create_dataset().with_commit_message("my dataset").with_database(write=True).build().execute( + "ds1", title="", description="", creators=[] + ) last_commit = Repo(".").head.commit assert "my dataset" == last_commit.message @@ -171,7 +173,9 @@ def test_create_dataset_custom_message(project): def test_list_datasets_default(project): """Test a default dataset listing.""" - create_dataset().with_commit_message("my dataset").build().execute("ds1", title="", description="", creators=[]) + create_dataset().with_commit_message("my dataset").with_database(write=False).build().execute( + "ds1", title="", description="", creators=[] + ) datasets = list_datasets().build().execute().output @@ -181,7 +185,9 @@ def test_list_datasets_default(project): def test_list_files_default(project, tmpdir): """Test a default file listing.""" - create_dataset().with_commit_message("my dataset").build().execute("ds1", title="", description="", creators=[]) + create_dataset().with_commit_message("my dataset").with_database(write=False).build().execute( + "ds1", title="", description="", creators=[] + ) data_file = tmpdir / Path("some-file") data_file.write_text("1,2,3", encoding="utf-8") @@ -195,8 +201,8 @@ def test_list_files_default(project, tmpdir): def test_unlink_default(directory_tree, client): """Test unlink default behaviour.""" with chdir(client.path): - create_dataset().build().execute("dataset") - add_to_dataset().build().execute([str(directory_tree / "dir1")], "dataset") + create_dataset().with_database(write=True).build().execute("dataset") + add_to_dataset().with_database(write=True).build().execute([str(directory_tree / "dir1")], "dataset") with pytest.raises(ParameterError): file_unlink().build().execute("dataset", (), ()) diff --git a/tests/core/commands/test_storage.py b/tests/core/commands/test_storage.py index fa8271c39c..b965c14934 100644 --- a/tests/core/commands/test_storage.py +++ b/tests/core/commands/test_storage.py @@ -105,7 +105,6 @@ def test_lfs_migrate(runner, project, client): client.repo.git.add("*") client.repo.index.commit("add files") - dataset_checksum = client.repo.head.commit.tree["dataset_file"].hexsha result = runner.invoke(cli, ["graph", "generate"]) assert 0 == result.exit_code @@ -132,8 +131,6 @@ def test_lfs_migrate(runner, project, client): changed_files = client.repo.head.commit.stats.files.keys() assert ".renku/metadata/activities" not in changed_files - assert dataset_checksum not in (client.path / ".renku" / "dataset.json").read_text() - def test_lfs_migrate_no_changes(runner, project, client): """Test ``renku storage migrate`` command without broken files.""" @@ -170,7 +167,6 @@ def test_lfs_migrate_explicit_path(runner, project, client): client.repo.git.add("*") client.repo.index.commit("add files") - dataset_checksum = client.repo.head.commit.tree["dataset_file"].hexsha result = runner.invoke(cli, ["graph", "generate"]) assert 0 == result.exit_code @@ -188,6 +184,4 @@ def test_lfs_migrate_explicit_path(runner, project, client): assert previous_head != client.repo.head.commit.hexsha - assert dataset_checksum in (client.path / ".renku" / "dataset.json").read_text() - assert "oid sha256:" in (client.path / "regular_file").read_text() diff --git a/tests/core/incubation/test_command.py b/tests/core/incubation/test_command.py index 9b052bade7..57452d3355 100644 --- a/tests/core/incubation/test_command.py +++ b/tests/core/incubation/test_command.py @@ -29,12 +29,15 @@ def test_dataset_add_command(project, tmp_path): .require_clean() .require_migration() .with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS) - .lock_project() + .with_database(write=True) + .lock_dataset() .command(_add_to_dataset) .build() ) - create_dataset().with_commit_message("my dataset").build().execute("ds1", title="", description="", creators=[]) + create_dataset().with_commit_message("my dataset").with_database(write=True).build().execute( + "ds1", title="", description="", creators=[] + ) data_file = tmp_path / "some-file" data_file.write_text("1,2,3", encoding="utf-8") diff --git a/tests/fixtures/graph.py b/tests/fixtures/graph.py index 8df08981d3..30d9a0fe84 100644 --- a/tests/fixtures/graph.py +++ b/tests/fixtures/graph.py @@ -27,3 +27,19 @@ def client_with_new_graph(client): generate_graph().build().execute(force=True) yield client + + +@pytest.fixture +def datasets_provenance(): + """A function to return DatasetProvenance for a client.""" + from renku.core.incubation.database import Database + from renku.core.models.dataset import DatasetProvenance + + def get_datasets_provenance(client): + """Return dataset provenance if available.""" + assert client.has_graph_files() + + database = Database.from_path(client.database_path) + return DatasetProvenance.from_database(database) + + return get_datasets_provenance