Skip to content

Commit

Permalink
feat(core): new dataset provenance
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jun 23, 2021
1 parent 9002847 commit b210037
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 189 deletions.
27 changes: 18 additions & 9 deletions renku/core/commands/dataset.py
Expand Up @@ -103,7 +103,7 @@ def create_dataset_helper(
def create_dataset():
"""Return a command for creating an empty dataset in the current repo."""
command = Command().command(create_dataset_helper).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -158,7 +158,7 @@ def _edit_dataset(
def edit_dataset():
"""Command for editing dataset metadata."""
command = Command().command(_edit_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -286,7 +286,11 @@ def _add_to_dataset(
def add_to_dataset():
"""Create a command for adding data to datasets."""
command = Command().command(_add_to_dataset).lock_dataset()
return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.with_database(write=True)
.with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
)


def _list_files(datasets=None, creators=None, include=None, exclude=None, format=None, columns=None):
Expand Down Expand Up @@ -355,7 +359,7 @@ def _file_unlink(name, include, exclude, client: LocalClient, yes=False):
def file_unlink():
"""Command for removing matching files from a dataset."""
command = Command().command(_file_unlink).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -383,7 +387,7 @@ def _remove_dataset(name, client: LocalClient):
def remove_dataset():
"""Command for deleting a dataset."""
command = Command().command(_remove_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -585,7 +589,7 @@ def _import_dataset(
def import_dataset():
"""Create a command for importing datasets."""
command = Command().command(_import_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -725,7 +729,12 @@ def _update_datasets(names, creators, include, exclude, ref, delete, client: Loc
def update_datasets():
"""Command for updating datasets."""
command = Command().command(_update_datasets).lock_dataset()
return command.require_migration().require_clean().with_commit(commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.require_clean()
.with_database(write=True)
.with_commit(commit_only=DATASET_METADATA_PATHS)
)


def _include_exclude(file_path, include=None, exclude=None):
Expand Down Expand Up @@ -808,7 +817,7 @@ def _tag_dataset(name, tag, description, client: LocalClient, force=False):
def tag_dataset():
"""Command for creating a new tag for a dataset."""
command = Command().command(_tag_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand All @@ -828,7 +837,7 @@ def _remove_dataset_tags(name, tags, client: LocalClient):
def remove_dataset_tags():
"""Command for removing tags from a dataset."""
command = Command().command(_remove_dataset_tags).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/rerun.py
Expand Up @@ -30,9 +30,9 @@ def rerun_workflows():
.command(_rerun_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/run.py
Expand Up @@ -35,7 +35,7 @@

def run_command():
"""Tracking work on a specific problem."""
return Command().command(_run_command).require_migration().require_clean().with_commit().with_database(write=True)
return Command().command(_run_command).require_migration().require_clean().with_database(write=True).with_commit()


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/storage.py
Expand Up @@ -47,7 +47,7 @@ def _fix_lfs(paths, client: LocalClient):

def fix_lfs_command():
"""Fix lfs command."""
return Command().command(_fix_lfs).require_clean().with_commit(commit_if_empty=False).with_database(write=True)
return Command().command(_fix_lfs).require_clean().with_database(write=True).with_commit(commit_if_empty=False)


@inject.autoparams()
Expand Down
2 changes: 1 addition & 1 deletion renku/core/commands/update.py
Expand Up @@ -44,9 +44,9 @@ def update_workflows():
.command(_update_workflows)
.require_migration()
.require_clean()
.with_commit()
.require_nodejs()
.with_database(write=True)
.with_commit()
)


Expand Down
34 changes: 18 additions & 16 deletions renku/core/incubation/graph.py
Expand Up @@ -62,7 +62,7 @@
def generate_graph():
"""Return a command for generating the graph."""
command = Command().command(_generate_graph).lock_project()
return command.require_migration().with_commit(commit_only=GRAPH_METADATA_PATHS).with_database(write=True)
return command.require_migration().with_database(write=True).with_commit(commit_only=GRAPH_METADATA_PATHS)


@inject.autoparams()
Expand Down Expand Up @@ -99,25 +99,23 @@ def process_datasets(commit):
date = commit.authored_datetime

for dataset in datasets:
client.datasets_provenance.update_dataset(dataset, revision=revision, date=date)
client.update_datasets_provenance(dataset, revision=revision, date=date, commit_database=False)
for dataset in deleted_datasets:
client.datasets_provenance.remove_dataset(dataset, revision=revision, date=date)
client.update_datasets_provenance(dataset, revision=revision, date=date, commit_database=False, remove=True)

commits = list(client.repo.iter_commits(paths=[f"{client.workflow_path}/*.yaml", ".renku/datasets/*/*.yml"]))
n_commits = len(commits)
commits = reversed(commits)

if force:
client.remove_graph_files()
client.remove_datasets_provenance_file()
elif client.has_graph_files() or client.has_datasets_provenance():
elif client.has_graph_files():
raise errors.OperationError("Graph metadata exists. Use ``--force`` to regenerate it.")

# database = Database.from_path(path=client.database_path)
# update_injected_database(database)

client.initialize_graph()
client.initialize_datasets_provenance()

for n, commit in enumerate(commits, start=1):
communication.echo(f"Processing commits {n}/{n_commits}", end="\r")
Expand All @@ -132,7 +130,6 @@ def process_datasets(commit):
communication.echo("")
communication.warn(f"Cannot process commit '{commit.hexsha}' - Exception: {traceback.format_exc()}")

client.datasets_provenance.to_json()
database.commit()


Expand Down Expand Up @@ -181,7 +178,7 @@ def _status(client: LocalClient, database: Database):
def update():
"""Return a command for generating the graph."""
command = Command().command(_update).lock_project().with_database(write=True)
return command.require_migration().with_commit(commit_if_empty=False).require_clean().require_nodejs()
return command.require_migration().require_clean().require_nodejs().with_commit(commit_if_empty=False)


@inject.autoparams()
Expand Down Expand Up @@ -239,8 +236,9 @@ def _export_graph(format, workflows_only, strict, client: LocalClient):

pg = ProvenanceGraph.from_json(client.provenance_graph_path, lazy=True)

if not workflows_only:
pg.rdf_graph.parse(location=str(client.datasets_provenance_path), format="json-ld")
# TODO: Add dataset provenance to graph
# if not workflows_only:
# pg.rdf_graph.parse(location=str(client.datasets_provenance_path), format="json-ld")

graph = pg.rdf_graph

Expand Down Expand Up @@ -395,22 +393,26 @@ def _validate_graph(rdf_graph, format):
def create_dataset():
"""Return a command for creating an empty dataset in the current repo."""
command = Command().command(_create_dataset).lock_dataset()
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)
return command.require_migration().with_database(write=True).with_commit(commit_only=DATASET_METADATA_PATHS)


@inject.autoparams()
def _create_dataset(name, client: LocalClient, title=None, description="", creators=None, keywords=None):
"""Create a dataset in the repository."""
if not client.has_datasets_provenance():
raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate-dataset`.")
if not client.has_graph_files():
raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate`.")

return create_dataset_helper(name=name, title=title, description=description, creators=creators, keywords=keywords)


def add_to_dataset():
"""Return a command for adding data to a dataset."""
command = Command().command(_add_to_dataset).lock_dataset()
return command.require_migration().with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
return (
command.require_migration()
.with_database(write=True)
.with_commit(raise_if_empty=True, commit_only=DATASET_METADATA_PATHS)
)


@inject.autoparams()
Expand All @@ -427,8 +429,8 @@ def _add_to_dataset(
ref=None,
):
"""Add data to a dataset."""
if not client.has_datasets_provenance():
raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate-dataset`.")
if not client.has_graph_files():
raise errors.OperationError("Dataset provenance is not generated. Run `renku graph generate`.")

if len(urls) == 0:
raise errors.UsageError("No URL is specified")
Expand Down
3 changes: 1 addition & 2 deletions renku/core/management/command_builder/database.py
Expand Up @@ -29,13 +29,12 @@ class DatabaseCommand(Command):
POST_ORDER = 5

def __init__(self, builder: Command, write: bool = False, path: str = None) -> None:
"""__init__ of ProjectLock."""
self._builder = builder
self._write = write
self._path = path

def _pre_hook(self, builder: Command, context: dict, *args, **kwargs) -> None:
"""Lock the project."""
"""Create a Database singleton."""
if "client" not in context:
raise ValueError("Commit builder needs a LocalClient to be set.")

Expand Down

0 comments on commit b210037

Please sign in to comment.