Skip to content

Commit

Permalink
feat(dataset): pull data from s3 storage (#3066)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Aug 26, 2022
1 parent aa8772d commit 289b1af
Show file tree
Hide file tree
Showing 34 changed files with 524 additions and 988 deletions.
922 changes: 125 additions & 797 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Expand Up @@ -123,7 +123,7 @@ redis = { version = ">=3.5.3,<4.2.0", optional = true }
renku-sphinx-theme = { version = ">=0.2.0", optional = true }
requests = ">=2.23.0,<2.28.2"
responses = { version = ">=0.7.0,<0.22.0", optional = true }
rich = ">=9.3.0,<12.3.0"
rich = ">=9.3.0,<12.6.0"
rq = { version = "==1.10.1", optional = true }
rq-scheduler = { version = "==0.11.0", optional = true }
sentry-sdk = { version = ">=1.5.11,<1.5.12", extras = ["flask"], optional = true }
Expand All @@ -140,6 +140,7 @@ types-PyYAML = { version="<6.1.0,>=5.4", optional = true }
types-redis = { version=">=3.5.3,<4.1.0", optional = true }
types-requests = { version = "<2.27.2,>=2.23.0", optional = true }
types-tabulate = { version = "<0.8.10,>=0.7.7", optional = true }
typing-extensions = { version = ">=4.3.0,<5.0.0", python = "<3.8.0" }
walrus = { version = ">=0.8.2,<0.10.0", optional = true }
werkzeug = ">=1.0.0,<2.1.2"
yagup = ">=0.1.1"
Expand Down
6 changes: 3 additions & 3 deletions renku/command/config.py
Expand Up @@ -19,8 +19,8 @@
from renku.command.command_builder import inject
from renku.command.command_builder.command import Command
from renku.core import errors
from renku.core.constant import CONFIG_LOCAL_PATH
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.management.config import CONFIG_LOCAL_PATH
from renku.domain_model.enums import ConfigFilter


Expand Down Expand Up @@ -61,7 +61,7 @@ def update_multiple_config():
.command(_update_multiple_config)
.with_database()
.require_migration()
.with_commit(commit_if_empty=False, commit_only=CONFIG_LOCAL_PATH)
.with_commit(commit_if_empty=False, commit_only=[CONFIG_LOCAL_PATH])
)


Expand Down Expand Up @@ -100,7 +100,7 @@ def update_config():
Command()
.command(_update_config)
.require_migration()
.with_commit(commit_if_empty=False, commit_only=CONFIG_LOCAL_PATH, skip_staging=True)
.with_commit(commit_if_empty=False, commit_only=[CONFIG_LOCAL_PATH], skip_staging=True)
.with_database()
)

Expand Down
8 changes: 8 additions & 0 deletions renku/command/dataset.py
Expand Up @@ -18,6 +18,7 @@
"""Repository datasets management."""

from renku.command.command_builder.command import Command
from renku.core.constant import CONFIG_LOCAL_PATH
from renku.core.dataset.constant import DATASET_METADATA_PATHS
from renku.core.dataset.dataset import (
create_dataset,
Expand All @@ -27,6 +28,7 @@
import_dataset,
list_dataset_files,
list_datasets,
pull_external_data,
remove_dataset,
search_datasets,
show_dataset,
Expand Down Expand Up @@ -123,3 +125,9 @@ def remove_dataset_tags_command():
def list_tags_command():
"""Command for listing a dataset's tags."""
return Command().command(list_dataset_tags).with_database().require_migration()


def pull_external_data_command():
"""Command for pulling/copying data from an external storage."""
command = Command().command(pull_external_data).lock_dataset().with_database(write=True)
return command.require_migration().with_commit(commit_only=DATASET_METADATA_PATHS + [CONFIG_LOCAL_PATH])
4 changes: 2 additions & 2 deletions renku/command/remove.py
Expand Up @@ -29,7 +29,7 @@
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.util import communication
from renku.core.util.git import get_git_user
from renku.core.util.os import delete_file
from renku.core.util.os import delete_dataset_file


@inject.autoparams()
Expand Down Expand Up @@ -77,7 +77,7 @@ def get_relative_path(path):
dataset = dataset.copy()
for key in remove:
dataset.unlink_file(key)
delete_file(client.path / key, follow_symlinks=True)
delete_dataset_file(client.path / key, follow_symlinks=True)

datasets_provenance = DatasetsProvenance()
datasets_provenance.add_or_update(dataset, creator=get_git_user(client.repository))
Expand Down
5 changes: 5 additions & 0 deletions renku/core/constant.py
Expand Up @@ -17,10 +17,15 @@
# limitations under the License.
"""Renku core constants."""

import os

CACHE = "cache"
"""Directory to cache transient data."""

RENKU_HOME = ".renku"
"""Project directory name."""

RENKU_TMP = "tmp"

CONFIG_NAME = "renku.ini"
CONFIG_LOCAL_PATH = os.path.join(RENKU_HOME, CONFIG_NAME)
109 changes: 104 additions & 5 deletions renku/core/dataset/dataset.py
Expand Up @@ -33,20 +33,30 @@
from renku.core.dataset.pointer_file import create_external_file, is_external_file_updated, update_external_file
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset
from renku.core.dataset.providers.s3 import S3Credentials
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.interface.storage import IStorageFactory
from renku.core.util import communication
from renku.core.util.datetime8601 import local_now
from renku.core.util.dispatcher import get_client, get_database
from renku.core.util.git import clone_repository, get_cache_directory_for_repository, get_git_user
from renku.core.util.metadata import is_external_file, read_credentials, store_credentials
from renku.core.util.os import delete_file, get_safe_relative_path, is_subpath
from renku.core.util.metadata import is_external_file, prompt_for_credentials, read_credentials, store_credentials
from renku.core.util.os import (
create_symlink,
delete_dataset_file,
get_absolute_path,
get_safe_relative_path,
hash_file,
is_subpath,
)
from renku.core.util.tabulate import tabulate
from renku.core.util.urls import get_slug
from renku.core.util.util import NO_VALUE, NoValueType
from renku.domain_model.dataset import Dataset, DatasetDetailsJson, DatasetFile, RemoteEntity, is_dataset_name_valid
from renku.domain_model.enums import ConfigFilter
from renku.domain_model.provenance.agent import Person
from renku.domain_model.provenance.annotation import Annotation
from renku.infrastructure.immutable import DynamicProxy
Expand Down Expand Up @@ -498,7 +508,7 @@ def remove_files(dataset):
deleted_paths = previous_paths - current_paths

for path in deleted_paths:
delete_file(client.path / path, follow_symlinks=True)
delete_dataset_file(client.path / path, follow_symlinks=True)

provider = ProviderFactory.get_import_provider(uri)

Expand Down Expand Up @@ -1053,15 +1063,15 @@ def update_dataset_git_files(

if not found:
if not dry_run and delete:
delete_file(dst, follow_symlinks=True)
delete_dataset_file(dst, follow_symlinks=True)
client.repository.add(dst, force=True)
deleted_files.append(file)
elif changed:
if not dry_run:
# Fetch file if it is tracked by Git LFS
remote_client.pull_paths_from_storage(remote_client.path / based_on.path)
if is_external_file(path=src, client_path=remote_client.path):
delete_file(dst, follow_symlinks=True)
delete_dataset_file(dst, follow_symlinks=True)
create_external_file(client=client, target=src.resolve(), path=dst)
else:
shutil.copy(src, dst)
Expand Down Expand Up @@ -1216,3 +1226,92 @@ def should_include(filepath: Path) -> bool:
raise errors.ParameterError(f"These datasets don't exist: {unused_names_str}")

return sorted(records, key=lambda r: r.date_added)


@inject.autoparams("client_dispatcher", "storage_factory")
def pull_external_data(
name: str, client_dispatcher: IClientDispatcher, storage_factory: IStorageFactory, location: Optional[Path] = None
) -> None:
"""Pull/copy data for an external storage to a dataset's data directory or a specified location.
Args:
name(str): Name of the dataset
location(Optional[Path]): A directory to copy data to (Default value = None).
"""
client = client_dispatcher.current_client
datasets_provenance = DatasetsProvenance()

dataset = datasets_provenance.get_by_name(name=name, strict=True)

if not dataset.storage:
communication.warn(f"Dataset '{name}' doesn't have a storage backend")
return

create_symlinks = True
destination: Union[Path, str]

if location:
destination = get_absolute_path(location)
else:
stored_location = read_dataset_data_location(dataset=dataset)
if stored_location:
destination = stored_location
else:
destination = client.path
create_symlinks = False

provider = ProviderFactory.get_pull_provider(uri=dataset.storage)

credentials = S3Credentials(provider)
prompt_for_credentials(credentials)

storage = storage_factory.get_storage(provider=provider, credentials=credentials)
updated_files = []

for file in dataset.files:
path = Path(destination) / file.entity.path
path.parent.mkdir(parents=True, exist_ok=True)
# NOTE: Don't check if destination exists. ``IStorage.copy`` won't copy a file if it exists and is not modified.

if not file.source:
raise errors.DatasetImportError(f"Dataset file doesn't have a URI: {file.entity.path}")

with communication.busy(f"Copying {file.entity.path} ..."):
storage.copy(file.source, path)

if file.based_on and not file.based_on.checksum:
md5_hash = hash_file(path, hash_type="md5") or ""
file.based_on = RemoteEntity(checksum=md5_hash, url=file.based_on.url, path=file.based_on.path)

new_file = DynamicProxy(file)
new_file.dataset = dataset
updated_files.append(new_file)

if create_symlinks:
symlink_path = client.path / file.entity.path
symlink_path.parent.mkdir(parents=True, exist_ok=True)
create_symlink(path=path, symlink_path=symlink_path, overwrite=True)

# NOTE: Store location in metadata in case where we want to mount the external storage in the same location
store_dataset_data_location(dataset=dataset, location=location)

if updated_files:
_update_datasets_files_metadata(client, updated_files=updated_files, deleted_files=[], delete=False)


def store_dataset_data_location(dataset: Dataset, location: Optional[Path]) -> None:
"""Store data location for a dataset in the config file."""
client = get_client()

section = "dataset-locations"
key = dataset.name

if not location:
client.remove_value(section=section, key=key)
else:
client.set_value(section=section, key=key, value=get_absolute_path(location))


def read_dataset_data_location(dataset: Dataset) -> Optional[str]:
"""Read data location for a dataset in the config file."""
return get_client().get_value(section="dataset-locations", key=dataset.name, config_filter=ConfigFilter.LOCAL_ONLY)
4 changes: 2 additions & 2 deletions renku/core/dataset/dataset_add.py
Expand Up @@ -35,7 +35,7 @@
from renku.core.util.dataset import check_url
from renku.core.util.dispatcher import get_client, get_database
from renku.core.util.git import get_git_user
from renku.core.util.os import delete_file, get_relative_path
from renku.core.util.os import delete_dataset_file, get_relative_path
from renku.domain_model.dataset import Dataset, DatasetFile

if TYPE_CHECKING:
Expand Down Expand Up @@ -291,7 +291,7 @@ def move_files_to_dataset(client: "LocalClient", files: List["DatasetAddMetadata
continue

# Remove existing file if any; required as a safety-net to avoid corrupting external files
delete_file(file.destination, follow_symlinks=True)
delete_dataset_file(file.destination, follow_symlinks=True)
file.destination.parent.mkdir(parents=True, exist_ok=True)

if file.action == DatasetAddAction.COPY:
Expand Down
19 changes: 17 additions & 2 deletions renku/core/dataset/datasets_provenance.py
Expand Up @@ -18,14 +18,19 @@
"""Datasets Provenance."""

from datetime import datetime
from typing import TYPE_CHECKING, List, Literal, Optional, Union, overload
from typing import TYPE_CHECKING, List, Optional, Union, overload
from uuid import UUID

from renku.command.command_builder.command import inject
from renku.core import errors
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.util import communication

try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore

if TYPE_CHECKING:
from renku.domain_model.dataset import Dataset, DatasetTag
from renku.domain_model.provenance.agent import Person
Expand Down Expand Up @@ -70,7 +75,17 @@ def get_by_name(self, name: str, *, immutable: bool = False, strict: Literal[Tru
def get_by_name(
self, name: str, immutable: bool = False, strict: bool = False
) -> Union[Optional["Dataset"], "Dataset"]:
"""Return a dataset by its name."""
"""Return a dataset by its name.
Args:
name(str): Name of the dataset
immutable(bool): Whether the dataset will be used as an immutable instance or will be modified (Default
value = False).
strict(bool): Whether to raise an exception if the dataset doesn't exist or not (Default value = False)
Returns:
Optional[Dataset]: Dataset with the specified name if exists.
"""
dataset = self.dataset_gateway.get_by_name(name)
if not dataset:
if strict:
Expand Down
26 changes: 17 additions & 9 deletions renku/core/dataset/providers/api.py
Expand Up @@ -19,11 +19,13 @@
from collections import UserDict
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union

from renku.core import errors
from renku.core.plugin import hookimpl
from renku.core.util.metadata import get_canonical_key, read_credentials, store_credentials
from renku.core.util.util import NO_VALUE, NoValueType
from renku.domain_model.dataset_provider import IDatasetProviderPlugin

if TYPE_CHECKING:
from renku.core.dataset.providers.models import (
Expand All @@ -37,7 +39,10 @@


class ProviderPriority(IntEnum):
"""Defines the order in which a provider is checked to see if it supports a URI."""
"""Defines the order in which a provider is checked to see if it supports a URI.
Providers that support more specific URIs should have a higher priority so that they are checked first.
"""

HIGHEST = 1
HIGHER = 2
Expand All @@ -48,7 +53,7 @@ class ProviderPriority(IntEnum):
LOWEST = 7


class ProviderApi(abc.ABC):
class ProviderApi(IDatasetProviderPlugin):
"""Interface defining provider methods."""

priority: Optional[ProviderPriority] = None
Expand All @@ -65,6 +70,12 @@ def __init_subclass__(cls, **kwargs):
def __repr__(self):
return f"<DatasetProvider {self.name}>"

@classmethod
@hookimpl
def dataset_provider(cls) -> "Type[ProviderApi]":
"""The definition of the provider."""
return cls

@staticmethod
@abc.abstractmethod
def supports(uri: str) -> bool:
Expand Down Expand Up @@ -142,15 +153,15 @@ def __init__(self, uri: str, original_uri: str):
def provider_dataset(self) -> "ProviderDataset":
"""Return the remote dataset. This is only valid after a call to ``fetch_provider_dataset``."""
if self._provider_dataset is None:
raise errors.ImportError("Dataset is not fetched")
raise errors.DatasetImportError("Dataset is not fetched")

return self._provider_dataset

@property
def provider_dataset_files(self) -> List["ProviderDatasetFile"]:
"""Return list of dataset files. This is only valid after a call to ``fetch_provider_dataset``."""
if self._provider_dataset_files is None:
raise errors.ImportError("Dataset is not fetched")
raise errors.DatasetImportError("Dataset is not fetched")

return self._provider_dataset_files

Expand Down Expand Up @@ -282,10 +293,7 @@ def read(self) -> Dict[str, Union[str, NoValueType]]:

def read_and_convert_credentials(key) -> Union[str, NoValueType]:
value = read_credentials(section=section, key=key)
if value is None:
return NO_VALUE

return value
return NO_VALUE if value is None else value

data = {key: read_and_convert_credentials(key) for key in self.get_canonical_credentials_names()}
self.data.update(data)
Expand Down

0 comments on commit 289b1af

Please sign in to comment.