Skip to content

Commit

Permalink
feat(dataset): import dataset at specific tags
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jun 7, 2022
1 parent e9f0a00 commit 7feff9d
Show file tree
Hide file tree
Showing 31 changed files with 839 additions and 496 deletions.
2 changes: 1 addition & 1 deletion docs/reference/models/refs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
File References
===============

.. automodule:: renku.domain_model.refs
.. automodule:: renku.core.migration.models.refs
:members:
2 changes: 0 additions & 2 deletions renku/command/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from .external import check_missing_external_files
from .githooks import check_git_hooks_installed
from .migration import check_migration
from .references import check_missing_references
from .storage import check_lfs_info
from .validate_shacl import check_datasets_structure, check_project_structure

Expand All @@ -38,6 +37,5 @@
"check_migration",
"check_missing_external_files",
"check_missing_files",
"check_missing_references",
"check_project_structure",
)
51 changes: 0 additions & 51 deletions renku/command/checks/references.py

This file was deleted.

3 changes: 2 additions & 1 deletion renku/command/format/dataset_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def json(records, **kwargs):

DATASET_FILES_COLUMNS = {
"added": ("date_added", "added"),
"commit": ("entity.checksum", "commit"),
"checksum": ("entity.checksum", "checksum"),
"creators": ("creators_csv", "creators"),
"creators_full": ("creators_full_csv", "creators"),
"dataset": ("title", "dataset"),
Expand All @@ -167,6 +167,7 @@ def json(records, **kwargs):
"dataset_name": ("dataset_name", "dataset name"),
"size": ("size", None),
"lfs": ("is_lfs", "lfs"),
"source": ("source", None),
}

DATASET_FILES_COLUMNS_ALIGNMENTS = {"size": "right"}
4 changes: 2 additions & 2 deletions renku/command/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def _warn_about_ignored_destinations(destinations, client_dispatcher: IClientDis

ignored = client.find_ignored_paths(*destinations)
if ignored:
ignored = "\n\t".join((str(Path(p).relative_to(client.path)) for p in ignored))
communication.warn(f"The following moved path match .gitignore:\n\t{ignored}")
ignored_str = "\n\t".join((str(Path(p).relative_to(client.path)) for p in ignored))
communication.warn(f"The following moved path match .gitignore:\n\t{ignored_str}")


@inject.autoparams()
Expand Down
6 changes: 4 additions & 2 deletions renku/core/dataset/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@

from renku.core.constant import RENKU_HOME
from renku.core.management.repository import RepositoryApiMixin
from renku.domain_model.refs import LinkReference

POINTERS = "pointers"
"""Directory for storing external pointer files."""

DATASET_IMAGES = "dataset_images"
"""Directory for dataset images."""

REFS = "refs"
"""Define a name of the folder with references in the Renku folder."""


def renku_dataset_images_path(client):
"""Return a ``Path`` instance of Renku dataset metadata folder."""
Expand All @@ -46,6 +48,6 @@ def renku_pointers_path(client):
Path(RENKU_HOME) / RepositoryApiMixin.DATABASE_PATH,
Path(RENKU_HOME) / DATASET_IMAGES,
Path(RENKU_HOME) / POINTERS,
Path(RENKU_HOME) / LinkReference.REFS,
Path(RENKU_HOME) / REFS,
".gitattributes",
]
98 changes: 61 additions & 37 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def export_dataset(name, provider_name, publish, tag, client_dispatcher: IClient
except KeyError:
raise errors.ParameterError("Unknown provider.")

provider.set_parameters(**kwargs)
provider.set_export_parameters(**kwargs)

selected_tag = None
tags = datasets_provenance.get_all_tags(dataset) # type: ignore
Expand Down Expand Up @@ -424,6 +424,7 @@ def import_dataset(
previous_dataset=None,
delete=False,
gitlab_token=None,
**kwargs,
):
"""Import data from a 3rd party provider or another renku project.
Expand All @@ -449,11 +450,13 @@ def import_dataset(

assert provider is not None

provider.set_import_parameters(**kwargs)

try:
record = provider.find_record(uri, gitlab_token=gitlab_token)
provider_dataset: ProviderDataset = record.as_dataset(client)
files: List[ProviderDatasetFile] = record.files_info
total_size = 0
total_size = 0.0

if not yes:
communication.echo(
Expand All @@ -477,9 +480,9 @@ def import_dataset(

communication.confirm(text_prompt, abort=True, warning=True)

for file_ in files:
if file_.size_in_mb is not None:
total_size += file_.size_in_mb
for file in files:
if file.size_in_mb is not None:
total_size += file.size_in_mb

total_size *= 2**20

Expand Down Expand Up @@ -509,7 +512,7 @@ def import_dataset(
with_metadata=provider_dataset,
force=True,
extract=extract,
all_at_once=True,
is_import=True,
destination_names=names,
total_size=total_size,
overwrite=True,
Expand All @@ -535,39 +538,45 @@ def import_dataset(
if not provider_dataset.data_dir:
raise errors.OperationError(f"Data directory for dataset must be set: {provider_dataset.name}")

sources = []

if record.datadir_exists:
sources = [f"{provider_dataset.data_dir}/*"]

for file in files:
try:
Path(file.path).relative_to(provider_dataset.data_dir)
except ValueError: # Files that are not in dataset's data directory
sources.append(file.path)
if provider_dataset.version: # NOTE: A tag was specified for import
sources, checksums = zip(*[(f.path, f.checksum) for f in files]) # type: ignore
else:
sources = [f.path for f in files] # type: ignore
checksums = None

new_dataset = add_data_to_dataset(
urls=[record.project_url],
dataset_name=name,
sources=sources,
checksums=checksums,
with_metadata=provider_dataset,
is_renku_import=True,
create=not previous_dataset,
overwrite=True,
repository=record.repository,
clear_files_before=True,
dataset_datadir=provider_dataset.data_dir,
force=True, # NOTE: Force-add to include any ignored files
)

if previous_dataset:
_update_datasets_metadata(new_dataset, previous_dataset, delete, provider_dataset.same_as)

if provider_dataset.version:
add_dataset_tag(
dataset_name=new_dataset.name,
tag=provider_dataset.version,
description=f"Tag {provider_dataset.version} created by renku import",
)

record.import_images(new_dataset)

database_dispatcher.current_database.commit()


@inject.autoparams()
def update_datasets(
names,
names: List[str],
creators,
include,
exclude,
Expand All @@ -594,41 +603,56 @@ def update_datasets(
client_dispatcher(IClientDispatcher): Injected client dispatcher.
dataset_gateway(IDatasetGateway): Injected dataset gateway.
"""
from renku.core.dataset.providers.renku import RenkuProvider

if not update_all and not names and not include and not exclude and not dry_run:
raise errors.ParameterError("No update criteria is specified")

client = client_dispatcher.current_client

imported_datasets: List[Dataset] = []
imported_dataset_updates: List[Dataset] = []

all_datasets = dataset_gateway.get_all_active_datasets()
imported_datasets = [d for d in all_datasets if d.same_as]

if names and update_all:
raise errors.ParameterError("Cannot pass dataset names when updating all datasets")
elif (include or exclude) and update_all:
raise errors.ParameterError("Cannot specify include and exclude filters when updating all datasets")
elif (include or exclude) and names and any(d.same_as for d in all_datasets if d.name in names):
elif (include or exclude) and names and any(d for d in imported_datasets if d.name in names):
raise errors.IncompatibleParametersError(a="--include/--exclude", b="imported datasets")

names_provided = bool(names)
names = names or [d.name for d in all_datasets]

# NOTE: update imported datasets
if not include and not exclude:
for dataset in all_datasets:
if names and dataset.name not in names or not dataset.same_as:
must_match_records = False

for dataset in imported_datasets:
if dataset.name not in names:
continue

uri = dataset.same_as.url
if isinstance(uri, dict):
uri = cast(str, uri.get("@id"))
uri = dataset.same_as.value # type: ignore
provider, _ = ProviderFactory.from_uri(uri)

if not provider:
continue

record = provider.find_record(uri)

if record.is_last_version(uri) and record.version == dataset.version:
if isinstance(provider, RenkuProvider) and dataset.version is not None:
tags = dataset_gateway.get_all_tags(dataset=dataset)
tag = next((t for t in tags if t.name == dataset.version), None)
# NOTE: Do not update Renku dataset that are imported from a specific version
if tag is not None and tag.dataset_id.value == dataset.id:
communication.echo(
f"Skipped updating imported Renku dataset '{dataset.name}' with tag '{tag.name}'"
)
names.remove(dataset.name)
continue

if record.is_last_version(uri) and record.is_version_equal_to(dataset):
names.remove(dataset.name)
continue

if not dry_run:
Expand All @@ -651,25 +675,25 @@ def update_datasets(

communication.echo(f"Updated dataset '{dataset.name}' from remote provider")

if names:
names.remove(dataset.name)
imported_datasets.append(dataset)
names.remove(dataset.name)
imported_dataset_updates.append(dataset)
else:
imported_datasets = [d for d in all_datasets if d.same_as]
must_match_records = True

imported_datasets_view_models = [DatasetViewModel.from_dataset(d) for d in imported_datasets]
imported_dataset_updates_view_models = [DatasetViewModel.from_dataset(d) for d in imported_dataset_updates]

if names_provided and not names:
return imported_datasets_view_models, []
if not names:
return imported_dataset_updates_view_models, []

# NOTE: Exclude all imported dataset from individual file filter
records = filter_dataset_files(
names=names, creators=creators, include=include, exclude=exclude, ignore=[d.name for d in imported_datasets]
)

if not records:
if imported_datasets:
return imported_datasets_view_models, []
raise errors.ParameterError("No files matched the criteria.")
if must_match_records:
raise errors.ParameterError("No files matched the criteria.")
return imported_dataset_updates_view_models, []

git_files = []
unique_remotes = set()
Expand Down Expand Up @@ -730,7 +754,7 @@ def update_datasets(
dataset_files_view_models = [
DatasetFileViewModel.from_dataset_file(cast(DatasetFile, f), f.dataset) for f in updated_files + deleted_files
]
return imported_datasets_view_models, dataset_files_view_models
return imported_dataset_updates_view_models, dataset_files_view_models


def show_dataset(name: str, tag: Optional[str] = None):
Expand Down

0 comments on commit 7feff9d

Please sign in to comment.