Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dataset): import dataset at specific tags #2926

Merged
merged 6 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/models/refs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
File References
===============

.. automodule:: renku.domain_model.refs
.. automodule:: renku.core.migration.models.refs
:members:
2 changes: 0 additions & 2 deletions renku/command/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from .external import check_missing_external_files
from .githooks import check_git_hooks_installed
from .migration import check_migration
from .references import check_missing_references
from .storage import check_lfs_info
from .validate_shacl import check_datasets_structure, check_project_structure

Expand All @@ -38,6 +37,5 @@
"check_migration",
"check_missing_external_files",
"check_missing_files",
"check_missing_references",
"check_project_structure",
)
51 changes: 0 additions & 51 deletions renku/command/checks/references.py

This file was deleted.

3 changes: 2 additions & 1 deletion renku/command/format/dataset_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def json(records, **kwargs):

DATASET_FILES_COLUMNS = {
"added": ("date_added", "added"),
"commit": ("entity.checksum", "commit"),
"checksum": ("entity.checksum", "checksum"),
"creators": ("creators_csv", "creators"),
"creators_full": ("creators_full_csv", "creators"),
"dataset": ("title", "dataset"),
Expand All @@ -167,6 +167,7 @@ def json(records, **kwargs):
"dataset_name": ("dataset_name", "dataset name"),
"size": ("size", None),
"lfs": ("is_lfs", "lfs"),
"source": ("source", None),
}

DATASET_FILES_COLUMNS_ALIGNMENTS = {"size": "right"}
4 changes: 2 additions & 2 deletions renku/command/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def _warn_about_ignored_destinations(destinations, client_dispatcher: IClientDis

ignored = client.find_ignored_paths(*destinations)
if ignored:
ignored = "\n\t".join((str(Path(p).relative_to(client.path)) for p in ignored))
communication.warn(f"The following moved path match .gitignore:\n\t{ignored}")
ignored_str = "\n\t".join((str(Path(p).relative_to(client.path)) for p in ignored))
communication.warn(f"The following moved path match .gitignore:\n\t{ignored_str}")


@inject.autoparams()
Expand Down
6 changes: 4 additions & 2 deletions renku/core/dataset/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@

from renku.core.constant import RENKU_HOME
from renku.core.management.repository import RepositoryApiMixin
from renku.domain_model.refs import LinkReference

POINTERS = "pointers"
"""Directory for storing external pointer files."""

DATASET_IMAGES = "dataset_images"
"""Directory for dataset images."""

REFS = "refs"
"""Define a name of the folder with references in the Renku folder."""


def renku_dataset_images_path(client):
"""Return a ``Path`` instance of Renku dataset metadata folder."""
Expand All @@ -46,6 +48,6 @@ def renku_pointers_path(client):
Path(RENKU_HOME) / RepositoryApiMixin.DATABASE_PATH,
Path(RENKU_HOME) / DATASET_IMAGES,
Path(RENKU_HOME) / POINTERS,
Path(RENKU_HOME) / LinkReference.REFS,
Path(RENKU_HOME) / REFS,
".gitattributes",
]
104 changes: 67 additions & 37 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def export_dataset(name, provider_name, publish, tag, client_dispatcher: IClient
except KeyError:
raise errors.ParameterError("Unknown provider.")

provider.set_parameters(**kwargs)
provider.set_export_parameters(**kwargs)

selected_tag = None
tags = datasets_provenance.get_all_tags(dataset) # type: ignore
Expand Down Expand Up @@ -424,6 +424,7 @@ def import_dataset(
previous_dataset=None,
delete=False,
gitlab_token=None,
**kwargs,
):
"""Import data from a 3rd party provider or another renku project.

Expand All @@ -449,11 +450,13 @@ def import_dataset(

assert provider is not None

provider.set_import_parameters(**kwargs)

try:
record = provider.find_record(uri, gitlab_token=gitlab_token)
provider_dataset: ProviderDataset = record.as_dataset(client)
files: List[ProviderDatasetFile] = record.files_info
total_size = 0
total_size = 0.0

if not yes:
communication.echo(
Expand All @@ -477,9 +480,9 @@ def import_dataset(

communication.confirm(text_prompt, abort=True, warning=True)

for file_ in files:
if file_.size_in_mb is not None:
total_size += file_.size_in_mb
for file in files:
if file.size_in_mb is not None:
total_size += file.size_in_mb

total_size *= 2**20

Expand Down Expand Up @@ -509,7 +512,7 @@ def import_dataset(
with_metadata=provider_dataset,
force=True,
extract=extract,
all_at_once=True,
is_import=True,
destination_names=names,
total_size=total_size,
overwrite=True,
Expand All @@ -535,39 +538,51 @@ def import_dataset(
if not provider_dataset.data_dir:
raise errors.OperationError(f"Data directory for dataset must be set: {provider_dataset.name}")

sources = []

if record.datadir_exists:
sources = [f"{provider_dataset.data_dir}/*"]

for file in files:
try:
Path(file.path).relative_to(provider_dataset.data_dir)
except ValueError: # Files that are not in dataset's data directory
sources.append(file.path)
if provider_dataset.version: # NOTE: A tag was specified for import
sources, checksums = zip(*[(f.path, f.checksum) for f in files]) # type: ignore
else:
sources = [f.path for f in files] # type: ignore
checksums = None

new_dataset = add_data_to_dataset(
urls=[record.project_url],
dataset_name=name,
sources=sources,
checksums=checksums,
with_metadata=provider_dataset,
is_renku_import=True,
create=not previous_dataset,
overwrite=True,
repository=record.repository,
clear_files_before=True,
dataset_datadir=provider_dataset.data_dir,
force=True, # NOTE: Force-add to include any ignored files
)

if previous_dataset:
_update_datasets_metadata(new_dataset, previous_dataset, delete, provider_dataset.same_as)

if provider_dataset.tag:
add_dataset_tag(
dataset_name=new_dataset.name,
tag=provider_dataset.tag.name,
description=provider_dataset.tag.description,
)
elif provider_dataset.version:
add_dataset_tag(
dataset_name=new_dataset.name,
tag=provider_dataset.version,
description=f"Tag {provider_dataset.version} created by renku import",
)

record.import_images(new_dataset)

database_dispatcher.current_database.commit()


@inject.autoparams()
def update_datasets(
names,
names: List[str],
creators,
include,
exclude,
Expand All @@ -594,41 +609,56 @@ def update_datasets(
client_dispatcher(IClientDispatcher): Injected client dispatcher.
dataset_gateway(IDatasetGateway): Injected dataset gateway.
"""
from renku.core.dataset.providers.renku import RenkuProvider

if not update_all and not names and not include and not exclude and not dry_run:
raise errors.ParameterError("No update criteria is specified")

client = client_dispatcher.current_client

imported_datasets: List[Dataset] = []
imported_dataset_updates: List[Dataset] = []

all_datasets = dataset_gateway.get_all_active_datasets()
imported_datasets = [d for d in all_datasets if d.same_as]

if names and update_all:
raise errors.ParameterError("Cannot pass dataset names when updating all datasets")
elif (include or exclude) and update_all:
raise errors.ParameterError("Cannot specify include and exclude filters when updating all datasets")
elif (include or exclude) and names and any(d.same_as for d in all_datasets if d.name in names):
elif (include or exclude) and names and any(d for d in imported_datasets if d.name in names):
raise errors.IncompatibleParametersError(a="--include/--exclude", b="imported datasets")

names_provided = bool(names)
names = names or [d.name for d in all_datasets]

# NOTE: update imported datasets
if not include and not exclude:
for dataset in all_datasets:
if names and dataset.name not in names or not dataset.same_as:
must_match_records = False

for dataset in imported_datasets:
if dataset.name not in names:
continue

uri = dataset.same_as.url
if isinstance(uri, dict):
uri = cast(str, uri.get("@id"))
uri = dataset.same_as.value # type: ignore
provider, _ = ProviderFactory.from_uri(uri)

if not provider:
continue

record = provider.find_record(uri)

if record.is_last_version(uri) and record.version == dataset.version:
if isinstance(provider, RenkuProvider) and dataset.version is not None:
tags = dataset_gateway.get_all_tags(dataset=dataset)
tag = next((t for t in tags if t.name == dataset.version), None)
# NOTE: Do not update Renku dataset that are imported from a specific version
if tag is not None and tag.dataset_id.value == dataset.id:
communication.echo(
f"Skipped updating imported Renku dataset '{dataset.name}' with tag '{tag.name}'"
)
names.remove(dataset.name)
continue

if record.is_last_version(uri) and record.is_version_equal_to(dataset):
names.remove(dataset.name)
continue

if not dry_run:
Expand All @@ -651,25 +681,25 @@ def update_datasets(

communication.echo(f"Updated dataset '{dataset.name}' from remote provider")

if names:
names.remove(dataset.name)
imported_datasets.append(dataset)
names.remove(dataset.name)
imported_dataset_updates.append(dataset)
else:
imported_datasets = [d for d in all_datasets if d.same_as]
must_match_records = True

imported_datasets_view_models = [DatasetViewModel.from_dataset(d) for d in imported_datasets]
imported_dataset_updates_view_models = [DatasetViewModel.from_dataset(d) for d in imported_dataset_updates]

if names_provided and not names:
return imported_datasets_view_models, []
if not names:
return imported_dataset_updates_view_models, []

# NOTE: Exclude all imported dataset from individual file filter
records = filter_dataset_files(
names=names, creators=creators, include=include, exclude=exclude, ignore=[d.name for d in imported_datasets]
)

if not records:
if imported_datasets:
return imported_datasets_view_models, []
raise errors.ParameterError("No files matched the criteria.")
if must_match_records:
raise errors.ParameterError("No files matched the criteria.")
return imported_dataset_updates_view_models, []

git_files = []
unique_remotes = set()
Expand Down Expand Up @@ -730,7 +760,7 @@ def update_datasets(
dataset_files_view_models = [
DatasetFileViewModel.from_dataset_file(cast(DatasetFile, f), f.dataset) for f in updated_files + deleted_files
]
return imported_datasets_view_models, dataset_files_view_models
return imported_dataset_updates_view_models, dataset_files_view_models


def show_dataset(name: str, tag: Optional[str] = None):
Expand Down