Skip to content

Commit

Permalink
feat(dataset): add data from s3 (#3063)
Browse files Browse the repository at this point in the history
Co-authored-by: Mohammad Alisafaee <mohammad.alisafaee@epfl.ch>
  • Loading branch information
olevski and m-alisafaee committed Aug 24, 2022
1 parent efc735b commit b3735e6
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 348 deletions.
1 change: 1 addition & 0 deletions conftest.py
Expand Up @@ -21,6 +21,7 @@

CLI_FIXTURE_LOCATIONS = [
"tests.cli.fixtures.cli_gateway",
"tests.cli.fixtures.cli_integration_datasets",
"tests.cli.fixtures.cli_kg",
"tests.cli.fixtures.cli_old_projects",
"tests.cli.fixtures.cli_projects",
Expand Down
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -88,6 +88,7 @@ gitattributes
githooks
github
gitignore
gitignored
gitkeep
gitlab
gitlabClientSecret
Expand Down Expand Up @@ -268,6 +269,7 @@ untracked
untracked
updatable
url
uri
urls
username
validator
Expand Down
383 changes: 49 additions & 334 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions renku/core/dataset/constant.py
Expand Up @@ -50,4 +50,5 @@ def renku_pointers_path(client):
Path(RENKU_HOME) / POINTERS,
Path(RENKU_HOME) / REFS,
".gitattributes",
".gitignore",
]
6 changes: 5 additions & 1 deletion renku/core/dataset/context.py
Expand Up @@ -43,6 +43,7 @@ def __init__(
commit_database: Optional[bool] = False,
creator: Optional[Person] = None,
datadir: Optional[Path] = None,
storage: Optional[str] = None,
) -> None:
self.name = name
self.create = create
Expand All @@ -51,6 +52,7 @@ def __init__(
self.dataset_provenance = DatasetsProvenance()
self.dataset: Optional[Dataset] = None
self.datadir: Optional[Path] = datadir
self.storage = storage

def __enter__(self):
"""Enter context."""
Expand All @@ -60,7 +62,9 @@ def __enter__(self):
raise errors.DatasetNotFound(name=self.name)

# NOTE: Don't update provenance when creating here because it will be updated later
self.dataset = create_dataset(name=self.name, update_provenance=False, datadir=self.datadir)
self.dataset = create_dataset(
name=self.name, update_provenance=False, datadir=self.datadir, storage=self.storage
)
elif self.create:
raise errors.DatasetExistsError(self.name)

Expand Down
26 changes: 22 additions & 4 deletions renku/core/dataset/dataset_add.py
Expand Up @@ -21,6 +21,7 @@
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Set, Union, cast
from urllib.parse import urlparse

from renku.core import errors
from renku.core.dataset.constant import renku_pointers_path
Expand Down Expand Up @@ -57,6 +58,7 @@ def add_to_dataset(
clear_files_before: bool = False,
total_size: Optional[int] = None,
datadir: Optional[Path] = None,
storage: Optional[str] = None,
**kwargs,
) -> Dataset:
"""Import the data into the data directory."""
Expand All @@ -65,8 +67,18 @@ def add_to_dataset(

_check_available_space(client, urls, total_size=total_size)

if not create and storage:
raise errors.ParameterError(
"Using the '--storage' parameter is only required if the '--create' parameter is also used to "
"create the dataset at the same time as when data is added to it"
)
if create and not storage and any([url.lower().startswith("s3://") for url in urls]):
raise errors.ParameterError(
"Creating a S3 dataset at the same time as adding data requires the '--storage' parameter to be set"
)

try:
with DatasetContext(name=dataset_name, create=create, datadir=datadir) as dataset:
with DatasetContext(name=dataset_name, create=create, datadir=datadir, storage=storage) as dataset:
destination_path = _create_destination_directory(client, dataset, destination)

client.check_external_storage() # TODO: This is not required for external storages
Expand All @@ -92,7 +104,7 @@ def add_to_dataset(
"Ignored adding paths under a .git directory:\n\t" + "\n\t".join(str(p) for p in paths_to_avoid)
)

files_to_commit = {f.get_absolute_commit_path(client.path) for f in files}
files_to_commit = {f.get_absolute_commit_path(client.path) for f in files if not f.gitignored}

if not force:
files, files_to_commit = _check_ignored_files(client, files_to_commit, files)
Expand All @@ -109,7 +121,8 @@ def add_to_dataset(
client.track_paths_in_storage(*files_to_commit)

# Force-add to include possible ignored files
client.repository.add(*files_to_commit, renku_pointers_path(client), force=True)
if len(files_to_commit) > 0:
client.repository.add(*files_to_commit, renku_pointers_path(client), force=True)

n_staged_changes = len(client.repository.staged_changes)
if n_staged_changes == 0:
Expand Down Expand Up @@ -155,6 +168,11 @@ def _download_files(
**kwargs,
) -> List["DatasetAddMetadata"]:
"""Process file URLs for adding to a dataset."""
if dataset.storage and any([urlparse(dataset.storage).scheme != urlparse(url).scheme for url in urls]):
raise errors.ParameterError(
f"The scheme of some urls {urls} does not match the defined storage url {dataset.storage}."
)

if importer:
return importer.download_files(client=client, destination=destination, extract=extract)

Expand Down Expand Up @@ -230,7 +248,7 @@ def _check_ignored_files(client: "LocalClient", files_to_commit: Set[str], files
if ignored_files:
ignored_sources = []
for file in files:
if file.get_absolute_commit_path(client.path) in ignored_files:
if not file.gitignored and file.get_absolute_commit_path(client.path) in ignored_files:
ignored_sources.append(file.source)

communication.warn(
Expand Down
2 changes: 1 addition & 1 deletion renku/core/dataset/providers/factory.py
Expand Up @@ -36,7 +36,7 @@ class ProviderFactory:
def get_providers():
"""Return a list of providers sorted based on their priorities (higher priority providers come first)."""
providers = get_supported_dataset_providers()
return sorted(providers, key=lambda p: p.priority)
return sorted(providers, key=lambda p: p.priority.value)

@staticmethod
def get_add_provider(uri) -> "ProviderApi":
Expand Down
1 change: 1 addition & 0 deletions renku/core/dataset/providers/models.py
Expand Up @@ -44,6 +44,7 @@ class DatasetAddMetadata(NamedTuple):
source: Path
destination: Path
based_on: Optional["RemoteEntity"] = None
gitignored: bool = False

@property
def has_action(self) -> bool:
Expand Down
69 changes: 66 additions & 3 deletions renku/core/dataset/providers/s3.py
Expand Up @@ -17,25 +17,30 @@
# limitations under the License.
"""S3 dataset provider."""

import re
import urllib
from typing import TYPE_CHECKING, Optional, Tuple, Type
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple, Type

from renku.core import errors
from renku.core.dataset.providers.api import ProviderApi, ProviderCredentials, ProviderPriority
from renku.core.dataset.providers.models import DatasetAddAction, DatasetAddMetadata, ProviderParameter
from renku.core.plugin import hookimpl
from renku.core.util.dispatcher import get_repository, get_storage
from renku.core.util.metadata import prompt_for_credentials
from renku.core.util.urls import get_scheme
from renku.core.util.urls import get_scheme, is_uri_subfolder
from renku.domain_model.dataset import RemoteEntity
from renku.domain_model.dataset_provider import IDatasetProviderPlugin

if TYPE_CHECKING:
from renku.core.management.client import LocalClient
from renku.domain_model.dataset import Dataset


class S3Provider(ProviderApi, IDatasetProviderPlugin):
"""S3 provider."""

priority = ProviderPriority.NORMAL
priority = ProviderPriority.HIGHEST
name = "S3"

def __init__(self, uri: Optional[str]):
Expand All @@ -59,6 +64,64 @@ def supports_create() -> bool:
"""Whether this provider supports creating a dataset."""
return True

@staticmethod
def supports_add() -> bool:
"""Whether this provider supports adding data to datasets."""
return True

@staticmethod
def get_add_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for add."""
from renku.core.dataset.providers.models import ProviderParameter

return [
ProviderParameter(
"storage",
flags=["storage"],
default=None,
help="Uri for the S3 bucket when creating the dataset at the same time when running 'add'",
multiple=False,
type=str,
),
]

@staticmethod
def add(client: "LocalClient", uri: str, destination: Path, **kwargs) -> List["DatasetAddMetadata"]:
"""Add files from a URI to a dataset."""
dataset = kwargs.get("dataset")
if dataset and dataset.storage and not dataset.storage.lower().startswith("s3://"):
raise errors.ParameterError(
"Files from S3 buckets can only be added to datasets with S3 storage, "
f"the dataset {dataset.name} has non-S3 storage {dataset.storage}."
)
if re.search(r"[\*\?]", uri):
raise errors.ParameterError("Wildcards like '*' or '?' are not supported in the uri for S3 datasets.")
provider = S3Provider(uri=uri)
credentials = S3Credentials(provider=provider)
prompt_for_credentials(credentials)

storage = get_storage(provider=provider, credentials=credentials)
if dataset and dataset.storage and not is_uri_subfolder(dataset.storage, uri):
raise errors.ParameterError(
f"S3 uri {uri} should be located within or at the storage uri {dataset.storage}."
)
if not storage.exists(uri):
raise errors.ParameterError(f"S3 bucket '{uri}' doesn't exists.")

hashes = storage.get_hashes(uri=uri)
return [
DatasetAddMetadata(
entity_path=Path(destination).relative_to(client.repository.path) / hash.path,
url=hash.base_uri,
action=DatasetAddAction.NONE,
based_on=RemoteEntity(checksum=hash.hash if hash.hash else "", url=hash.base_uri, path=hash.path),
source=Path(hash.full_uri),
destination=Path(destination).relative_to(client.repository.path),
gitignored=True,
)
for hash in hashes
]

@property
def bucket(self) -> str:
"""Return S3 bucket name."""
Expand Down
7 changes: 7 additions & 0 deletions renku/core/dataset/providers/web.py
Expand Up @@ -22,6 +22,7 @@
import urllib
from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple, Type
from urllib.parse import urlparse

from renku.core import errors
from renku.core.constant import CACHE
Expand Down Expand Up @@ -67,6 +68,12 @@ def add(
**kwargs,
) -> List["DatasetAddMetadata"]:
"""Add files from a URI to a dataset."""
dataset = kwargs.get("dataset")
if dataset and dataset.storage and urlparse(dataset.storage).scheme != urlparse(uri).scheme:
raise errors.ParameterError(
f"The scheme of the url {uri} does not match the defined storage url {dataset.storage}."
)

return download_file(
client=client, uri=uri, destination=destination, extract=extract, filename=filename, multiple=multiple
)
Expand Down
25 changes: 24 additions & 1 deletion renku/core/interface/storage.py
Expand Up @@ -18,12 +18,30 @@
"""External storage interface."""

import abc
from typing import TYPE_CHECKING
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional

if TYPE_CHECKING:
from renku.core.dataset.providers.api import ProviderApi, ProviderCredentials


@dataclass
class FileHash:
"""The has for a file at a specific location."""

base_uri: str
path: str
hash: Optional[str] = None
hash_type: Optional[str] = None
modified_datetime: Optional[str] = None

@property
def full_uri(self) -> str:
"""Return the full uri to the file."""
return str(Path(self.base_uri) / Path(self.path))


class IStorageFactory(abc.ABC):
"""Interface to get an external storage."""

Expand Down Expand Up @@ -60,3 +78,8 @@ def set_configurations(self):
def exists(self, uri: str) -> bool:
"""Checks if a remote storage URI exists."""
raise NotImplementedError

@abc.abstractmethod
def get_hashes(self, uri: str) -> List[FileHash]:
"""Get the hashes of all files at the uri."""
raise NotImplementedError
26 changes: 25 additions & 1 deletion renku/core/util/urls.py
Expand Up @@ -22,12 +22,13 @@
import unicodedata
import urllib
from typing import List, Optional
from urllib.parse import ParseResult
from urllib.parse import ParseResult, urlparse

from renku.command.command_builder.command import inject
from renku.core import errors
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.util.git import get_remote, parse_git_url
from renku.core.util.os import is_subpath


def url_to_string(url):
Expand Down Expand Up @@ -122,3 +123,26 @@ def get_slug(name: str, invalid_chars: Optional[List[str]] = None, lowercase: bo
valid_end = re.sub(r"[._-]$", "", valid_start)
no_dot_lock_at_end = re.sub(r"\.lock$", "_lock", valid_end)
return no_dot_lock_at_end


def is_uri_subfolder(uri: str, subfolder_uri: str) -> bool:
"""Check if one uri is a 'subfolder' of another."""
parsed_uri = urlparse(uri)
parsed_subfolder_uri = urlparse(subfolder_uri)
parsed_uri_path = parsed_uri.path
parsed_subfolder_uri_path = parsed_subfolder_uri.path
if parsed_uri_path in ["", "."]:
# NOTE: s3://test has a path that equals "" and Path("") gets interpreted as Path(".")
# this becomes a problem then when s3://test/1 has an "absolute-like" path of Path("/1")
# and Path(".") is not considered a subpath of Path("/1") but from the uris we see that this
# is indeed a subpath
parsed_uri_path = "/"
if parsed_subfolder_uri_path in ["", "."]:
parsed_subfolder_uri_path = "/"
if parsed_uri.scheme != parsed_subfolder_uri.scheme:
# INFO: catch s3://test vs http://test
return False
if parsed_uri.netloc != parsed_subfolder_uri.netloc:
# INFO: catch s3://test1 vs s3://test2
return False
return is_subpath(parsed_subfolder_uri_path, parsed_uri_path)

0 comments on commit b3735e6

Please sign in to comment.