Skip to content

Commit

Permalink
feat(dataset): parallel data download/upload (#3358)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Mar 20, 2023
1 parent 2a461d4 commit 3f1e707
Show file tree
Hide file tree
Showing 89 changed files with 720 additions and 715 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -15,7 +15,7 @@ repos:
- id: mixed-line-ending
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.10.0
rev: 23.1.0
hooks:
- id: black
additional_dependencies: ["click==8.0.4"]
Expand Down
9 changes: 9 additions & 0 deletions conftest.py
Expand Up @@ -79,3 +79,12 @@ def pytest_configure(config):

os.environ["RENKU_SKIP_MIN_VERSION_CHECK"] = "1"
os.environ["RENKU_DISABLE_VERSION_CHECK"] = "1"
# NOTE: Set an env var during during tests to mark that Renku is running in a test session.
os.environ["RENKU_RUNNING_UNDER_TEST"] = "1"


def pytest_unconfigure(config):
"""Hook that is called by pytest after all tests are executed."""
os.environ.pop("RENKU_SKIP_MIN_VERSION_CHECK", None)
os.environ.pop("RENKU_DISABLE_VERSION_CHECK", None)
os.environ.pop("RENKU_RUNNING_UNDER_TEST", None)
14 changes: 9 additions & 5 deletions docs/reference/core.rst
Expand Up @@ -256,11 +256,11 @@ Utilities
:members:
:show-inheritance:

.. automodule:: renku.core.util.file_size
.. automodule:: renku.core.util.git
:members:
:show-inheritance:

.. automodule:: renku.core.util.git
.. automodule:: renku.core.util.jwt
:members:
:show-inheritance:

Expand All @@ -280,15 +280,19 @@ Utilities
:members:
:show-inheritance:

.. automodule:: renku.core.util.urls
.. automodule:: renku.core.util.ssh
:members:
:show-inheritance:

.. automodule:: renku.core.util.util
.. automodule:: renku.core.util.tabulate
:members:
:show-inheritance:

.. automodule:: renku.core.util.uuid
.. automodule:: renku.core.util.urls
:members:
:show-inheritance:

.. automodule:: renku.core.util.util
:members:
:show-inheritance:

Expand Down
318 changes: 156 additions & 162 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Expand Up @@ -74,7 +74,7 @@ inject = "<4.4.0,>=4.3.0"
jinja2 = { version = ">=2.11.3,<3.1.3" }
networkx = "<2.7,>=2.6.0"
numpy = ">=1.20.0,<1.22.0"
packaging = "<22.0,>=21.3"
packaging = "<24.0,>=23.0"
pathspec = "<1.0.0,>=0.8.0"
patool = "==1.12"
pluggy = "==1.0.0"
Expand Down Expand Up @@ -121,7 +121,7 @@ sentry-sdk = { version = ">=1.5.11,<1.5.12", extras = ["flask"], optional = tru
walrus = { version = ">=0.8.2,<0.10.0", optional = true }

[tool.poetry.group.dev.dependencies]
black = "==22.10.0"
black = "==23.1.0"
flake8 = ">=6.0.0,<7.0.0"
Flake8-pyproject = "==1.2.2"
isort = "<5.10.2,>=5.3.2"
Expand Down
4 changes: 2 additions & 2 deletions renku/command/dataset.py
Expand Up @@ -17,7 +17,7 @@
"""Repository datasets management."""

from renku.command.command_builder.command import Command
from renku.core.constant import CONFIG_LOCAL_PATH, DATASET_METADATA_PATHS
from renku.core.constant import DATASET_METADATA_PATHS
from renku.core.dataset.dataset import (
create_dataset,
edit_dataset,
Expand Down Expand Up @@ -130,7 +130,7 @@ def list_tags_command():
def pull_cloud_storage_command():
"""Command for pulling/copying data from a cloud storage."""
command = Command().command(pull_cloud_storage).lock_dataset().with_database(write=True)
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS + [CONFIG_LOCAL_PATH])
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS)


def mount_cloud_storage_command(unmount: bool):
Expand Down
5 changes: 2 additions & 3 deletions renku/core/__init__.py
@@ -1,6 +1,5 @@
#
# Copyright 2017-2023- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
5 changes: 2 additions & 3 deletions renku/core/config.py
@@ -1,6 +1,5 @@
#
# Copyright 2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
5 changes: 2 additions & 3 deletions renku/core/constant.py
@@ -1,6 +1,5 @@
#
# Copyright 2017-2023 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
125 changes: 72 additions & 53 deletions renku/core/dataset/dataset.py
Expand Up @@ -50,6 +50,7 @@
delete_dataset_file,
delete_path,
get_absolute_path,
get_file_size,
get_files,
get_safe_relative_path,
hash_file,
Expand All @@ -59,15 +60,18 @@
)
from renku.core.util.tabulate import tabulate
from renku.core.util.urls import get_slug
from renku.core.util.util import NO_VALUE, NoValueType
from renku.core.util.util import parallel_execute
from renku.domain_model.constant import NO_VALUE, NON_EXISTING_ENTITY_CHECKSUM, NoValueType
from renku.domain_model.dataset import Dataset, DatasetDetailsJson, DatasetFile, RemoteEntity, is_dataset_name_valid
from renku.domain_model.entity import Entity
from renku.domain_model.enums import ConfigFilter
from renku.domain_model.project_context import project_context
from renku.domain_model.provenance.agent import Person
from renku.domain_model.provenance.annotation import Annotation
from renku.infrastructure.immutable import DynamicProxy

if TYPE_CHECKING:
from renku.core.interface.storage import IStorage
from renku.infrastructure.repository import Repository


Expand Down Expand Up @@ -1249,75 +1253,90 @@ def should_include(filepath: Path) -> bool:
return sorted(records, key=lambda r: r.date_added)


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def pull_cloud_storage(name: str, location: Optional[Path] = None) -> None:
"""Pull/copy data for a cloud storage to a dataset's data directory or a specified location.
def download_file(file: DatasetFile, storage: "IStorage") -> List[DatasetFile]:
"""Download a dataset file and retrieve its missing metadata (if any).
Args:
name(str): Name of the dataset
location(Optional[Path]): A directory to copy data to (Default value = None).
"""
datasets_provenance = DatasetsProvenance()
file(DatasetFile): Dataset file to download.
storage: Dataset's cloud storage (an instance of ``IStorage``).
dataset = datasets_provenance.get_by_name(name=name, strict=True)

if not dataset.storage:
communication.warn(f"Dataset '{name}' doesn't have a storage backend")
return
Returns:
List[DatasetFile]: A list with the updated file if its metadata was missing; an empty list otherwise.
# NOTE: Try to unmount the path in case it was mounted before
unmount_path(project_context.path / dataset.get_datadir())
"""
if not file.based_on:
raise errors.DatasetImportError(f"Dataset file doesn't have a URI: {file.entity.path}")

create_symlinks = True
destination: Union[Path, str]
path = project_context.path / file.entity.path
path.parent.mkdir(parents=True, exist_ok=True)

if location:
destination = get_absolute_path(location)
else:
stored_location = read_dataset_data_location(dataset=dataset)
if stored_location:
destination = stored_location
else:
destination = project_context.path
create_symlinks = False
# NOTE: Don't check if destination file exists. ``IStorage.copy`` won't copy a file if it exists and is not
# modified.

provider = ProviderFactory.get_pull_provider(uri=dataset.storage)
storage = provider.get_storage()
communication.start_progress(name=file.entity.path, total=1)
try:
storage.download(file.based_on.url, path)
communication.update_progress(name=file.entity.path, amount=1)
finally:
communication.finalize_progress(name=file.entity.path)

updated_files = []
# NOTE: File has no missing information
if file.has_valid_checksum() and file.has_valid_size():
return []

for file in dataset.files:
path = Path(destination) / file.entity.path
path.parent.mkdir(parents=True, exist_ok=True)
# NOTE: Don't check if destination exists. ``IStorage.copy`` won't copy a file if it exists and is not modified.
if not file.has_valid_checksum():
md5_hash = hash_file(path, hash_type="md5") or NON_EXISTING_ENTITY_CHECKSUM
entity = Entity(path=file.entity.path, checksum=md5_hash)
remote_entity = RemoteEntity(checksum=md5_hash, url=file.based_on.url, path=file.based_on.path)
else:
entity = file.entity
remote_entity = file.based_on

size = file.size if file.has_valid_size() else get_file_size(path)

return [
DatasetFile(
entity=entity,
based_on=remote_entity,
size=size,
date_added=file.date_added,
date_removed=file.date_removed,
source=file.source,
)
]

if not file.based_on:
raise errors.DatasetImportError(f"Dataset file doesn't have a URI: {file.entity.path}")

with communication.busy(f"Copying {file.entity.path} ..."):
storage.download(file.based_on.url, path)
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def pull_cloud_storage(name: str, location: Optional[Path] = None) -> None:
"""Pull/copy data for a cloud storage to a dataset's data directory or a specified location.
# NOTE: Make files read-only since we don't support pushing data to the remote storage
os.chmod(path, 0o400)
Args:
name(str): Name of the dataset
location(Optional[Path]): A directory to copy data to (Default value = None).
"""
dataset, datadir = _get_dataset_with_cloud_storage(name=name)

if not file.based_on.checksum:
md5_hash = hash_file(path, hash_type="md5") or ""
file.based_on = RemoteEntity(checksum=md5_hash, url=file.based_on.url, path=file.based_on.path)
# NOTE: Try to unmount the path in case it was mounted before
unmount_path(datadir)

new_file = DynamicProxy(file)
new_file.dataset = dataset
updated_files.append(new_file)
if location:
if not is_path_empty(datadir):
communication.confirm(
f"Dataset's data directory will be removed: {dataset.get_datadir()}. Do you want to continue?",
abort=True,
warning=True,
)
create_symlink(target=location, symlink_path=datadir, overwrite=True)

if create_symlinks:
symlink_path = project_context.path / file.entity.path
symlink_path.parent.mkdir(parents=True, exist_ok=True)
create_symlink(path=path, symlink_path=symlink_path, overwrite=True)
provider = ProviderFactory.get_pull_provider(uri=dataset.storage)
storage = provider.get_storage()

# NOTE: Store location in metadata in case where we want to mount the external storage in the same location
store_dataset_data_location(dataset=dataset, location=location)
updated_files = parallel_execute(download_file, dataset.files, rate=5, storage=storage)

if updated_files:
_update_datasets_files_metadata(updated_files=updated_files, deleted_files=[], delete=False)
dataset.add_or_update_files(updated_files)
DatasetsProvenance().add_or_update(dataset, creator=get_git_user(repository=project_context.repository))
project_context.database.commit()


def store_dataset_data_location(dataset: Dataset, location: Optional[Path]) -> None:
Expand Down Expand Up @@ -1358,7 +1377,7 @@ def mount_cloud_storage(name: str, existing: Optional[Path], yes: bool) -> None:
)

if existing:
create_symlink(path=existing, symlink_path=datadir, overwrite=True)
create_symlink(target=existing, symlink_path=datadir, overwrite=True)
return

delete_path(datadir)
Expand Down

0 comments on commit 3f1e707

Please sign in to comment.