Skip to content

Commit

Permalink
feat(dataset): support for azure blob storage (#3257)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jan 13, 2023
1 parent bd063cc commit 47fa194
Show file tree
Hide file tree
Showing 17 changed files with 675 additions and 198 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/test_deploy.yml
Expand Up @@ -754,6 +754,9 @@ jobs:
ZENODO_ACCESS_TOKEN: ${{ secrets.ZENODO_ACCESS_TOKEN }}
OLOS_ACCESS_TOKEN: ${{ secrets.OLOS_ACCESS_TOKEN }}
RENKU_REQUESTS_TIMEOUT_SECONDS: 120
CLOUD_STORAGE_AZURE_KEY: ${{ secrets.CLOUD_STORAGE_AZURE_KEY }}
CLOUD_STORAGE_S3_ACCESS_KEY_ID: ${{ secrets.CLOUD_STORAGE_S3_ACCESS_KEY_ID }}
CLOUD_STORAGE_S3_SECRET_ACCESS_KEY: ${{ secrets.CLOUD_STORAGE_S3_SECRET_ACCESS_KEY }}
run: pytest -m "integration and not service and not serial" -v --timeout=600 -n auto
- name: Start Redis
uses: supercharge/redis-github-action@1.4.0
Expand Down Expand Up @@ -912,6 +915,9 @@ jobs:
ZENODO_ACCESS_TOKEN: ${{ secrets.ZENODO_ACCESS_TOKEN }}
OLOS_ACCESS_TOKEN: ${{ secrets.OLOS_ACCESS_TOKEN }}
RENKU_REQUESTS_TIMEOUT_SECONDS: 120
CLOUD_STORAGE_AZURE_KEY: ${{ secrets.CLOUD_STORAGE_AZURE_KEY }}
CLOUD_STORAGE_S3_ACCESS_KEY_ID: ${{ secrets.CLOUD_STORAGE_S3_ACCESS_KEY_ID }}
CLOUD_STORAGE_S3_SECRET_ACCESS_KEY: ${{ secrets.CLOUD_STORAGE_S3_SECRET_ACCESS_KEY }}
run: pytest -m "integration and not serial" -v
- name: Start Redis
uses: supercharge/redis-github-action@1.4.0
Expand Down
5 changes: 5 additions & 0 deletions renku/command/dataset.py
Expand Up @@ -138,3 +138,8 @@ def mount_external_storage_command(unmount: bool):
"""Command for mounting an external storage."""
command = unmount_external_storage if unmount else mount_external_storage
return Command().command(command).lock_dataset().with_database(write=False).require_migration()


def unmount_external_storage_command():
"""Command for unmounting an external storage."""
return Command().command(unmount_external_storage).lock_dataset().with_database(write=False).require_migration()
9 changes: 3 additions & 6 deletions renku/core/dataset/dataset.py
Expand Up @@ -34,7 +34,6 @@
from renku.core.dataset.pointer_file import create_external_file, is_external_file_updated, update_external_file
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset
from renku.core.dataset.providers.s3 import S3Credentials
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.dataset_gateway import IDatasetGateway
Expand Down Expand Up @@ -882,10 +881,8 @@ def update_dataset_custom_metadata(
if custom_metadata is not None and custom_metadata_source is not None:
if isinstance(custom_metadata, dict):
custom_metadata = [custom_metadata]
for icustom_metadata in custom_metadata:
existing_metadata.append(
Annotation(id=Annotation.generate_id(), body=icustom_metadata, source=custom_metadata_source)
)
for cm in custom_metadata:
existing_metadata.append(Annotation(id=Annotation.generate_id(), body=cm, source=custom_metadata_source))

dataset.annotations = existing_metadata

Expand Down Expand Up @@ -1346,7 +1343,7 @@ def mount_external_storage(name: str, existing: Optional[Path], yes: bool) -> No
datadir.mkdir(parents=True, exist_ok=True)

provider = ProviderFactory.get_mount_provider(uri=dataset.storage)
credentials = S3Credentials(provider)
credentials = provider.get_credentials()
prompt_for_credentials(credentials)
storage = provider.get_storage(credentials=credentials)

Expand Down
48 changes: 29 additions & 19 deletions renku/core/dataset/dataset_add.py
Expand Up @@ -322,23 +322,33 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]):
else:
file.action = DatasetAddAction.DOWNLOAD

if file.action == DatasetAddAction.COPY:
shutil.copy(file.source, file.destination)
elif file.action == DatasetAddAction.MOVE:
shutil.move(file.source, file.destination, copy_function=shutil.copy) # type: ignore
elif file.action == DatasetAddAction.SYMLINK:
create_external_file(target=file.source, path=file.destination)
# NOTE: Don't track symlinks to external files in LFS
track_in_lfs = False
elif file.action == DatasetAddAction.DOWNLOAD:
assert file.provider, f"Storage provider isn't set for {file} with DOWNLOAD action"
storage = file.provider.get_storage()
storage.download(file.url, file.destination)
elif file.metadata_only:
# NOTE: Nothing to do when adding file to a dataset with a parent remote storage
pass
else:
raise errors.OperationError(f"Invalid action {file.action}")
file_to_upload = file.source.resolve()

try:
if file.action == DatasetAddAction.COPY:
shutil.copy(file.source, file.destination)
elif file.action == DatasetAddAction.MOVE:
shutil.move(file.source, file.destination, copy_function=shutil.copy) # type: ignore
elif file.action == DatasetAddAction.SYMLINK:
create_external_file(target=file.source, path=file.destination)
# NOTE: Don't track symlinks to external files in LFS
track_in_lfs = False
elif file.action == DatasetAddAction.DOWNLOAD:
assert file.provider, f"Storage provider isn't set for {file} with DOWNLOAD action"
download_storage = file.provider.get_storage()
download_storage.download(file.url, file.destination)
file_to_upload = file.destination
elif file.metadata_only:
# NOTE: Nothing to do when adding file to a dataset with a parent remote storage
pass
else:
raise errors.OperationError(f"Invalid action {file.action}")
except OSError as e:
# NOTE: It's ok if copying data to a read-only mounted cloud storage fails
if "Read-only file system" in str(e) and storage:
pass
else:
raise

if track_in_lfs and not dataset.storage:
track_paths_in_storage(file.destination)
Expand All @@ -352,8 +362,8 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]):
md5_hash = file.based_on.checksum
else:
file_uri = get_upload_uri(dataset=dataset, entity_path=file.entity_path)
storage.upload(source=file.destination, uri=file_uri)
md5_hash = hash_file(file.destination, hash_type="md5") or ""
storage.upload(source=file_to_upload, uri=file_uri)
md5_hash = hash_file(file_to_upload, hash_type="md5") or ""

file.based_on = RemoteEntity(url=file_uri, path=file.entity_path, checksum=md5_hash)

Expand Down
15 changes: 14 additions & 1 deletion renku/core/dataset/providers/api.py
Expand Up @@ -132,6 +132,11 @@ def get_importer(self, **kwargs) -> "ImporterApi":
class StorageProviderInterface(abc.ABC):
"""Interface defining backend storage providers."""

@abc.abstractmethod
def get_credentials(self) -> "ProviderCredentials":
"""Return an instance of provider's credential class."""
raise NotImplementedError

@abc.abstractmethod
def get_storage(self, credentials: Optional["ProviderCredentials"] = None) -> "IStorage":
"""Return the storage manager for the provider."""
Expand Down Expand Up @@ -285,10 +290,18 @@ def get_canonical_credentials_names(self) -> Tuple[str, ...]:

return tuple(get_canonical_key(key) for key in self.get_credentials_names())

def get_canonical_credentials_names_with_no_value(self) -> Tuple[str, ...]:
"""Return canonical credentials names that can be used as config keys for keys with no valid value."""
from renku.core.util.metadata import get_canonical_key

return tuple(get_canonical_key(key) for key in self.get_credentials_names_with_no_value())

def get_credentials_section_name(self) -> str:
"""Get section name for storing credentials.
NOTE: This methods should be overridden by subclasses to allow multiple credentials per providers if needed.
NOTE: Values used in this method shouldn't depend on ProviderCredentials attributes since we don't have those
attributes when reading credentials. It's OK to use ProviderApi attributes.
"""
return self.provider.name.lower() # type: ignore

Expand All @@ -302,7 +315,7 @@ def read_and_convert_credentials(key) -> Union[str, NoValueType]:
value = read_credentials(section=section, key=key)
return NO_VALUE if value is None else value

data = {key: read_and_convert_credentials(key) for key in self.get_canonical_credentials_names()}
data = {key: read_and_convert_credentials(key) for key in self.get_canonical_credentials_names_with_no_value()}
self.data.update(data)

return self.data
Expand Down
181 changes: 181 additions & 0 deletions renku/core/dataset/providers/azure.py
@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Azure dataset provider."""

import urllib
from typing import TYPE_CHECKING, List, Optional, Tuple, cast

from renku.command.command_builder import inject
from renku.core import errors
from renku.core.dataset.providers.api import ProviderApi, ProviderCredentials, ProviderPriority
from renku.core.dataset.providers.cloud import CloudStorageAddProvider
from renku.core.dataset.providers.models import ProviderParameter
from renku.core.interface.storage import IStorage, IStorageFactory
from renku.core.util.metadata import get_canonical_key, prompt_for_credentials
from renku.core.util.urls import get_scheme
from renku.domain_model.project_context import project_context

if TYPE_CHECKING:
from renku.domain_model.dataset import Dataset


class AzureProvider(ProviderApi, CloudStorageAddProvider):
"""Azure provider."""

priority = ProviderPriority.HIGHEST
name = "Azure"

def __init__(self, uri: Optional[str]):
super().__init__(uri=uri)

account, endpoint, container, _ = parse_azure_uri(uri=self.uri)

self._account: str = account
self._endpoint: str = endpoint
self._container = container

@staticmethod
def supports(uri: str) -> bool:
"""Whether or not this provider supports a given URI."""
return get_scheme(uri) == "azure"

@staticmethod
def get_add_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for add."""
from renku.core.dataset.providers.models import ProviderParameter

return [
ProviderParameter(
"storage",
flags=["storage"],
default=None,
help="Uri for the Azure container when creating the dataset at the same time when running 'add'",
multiple=False,
type=str,
),
]

def get_credentials(self) -> "AzureCredentials":
"""Return an instance of provider's credential class."""
return AzureCredentials(provider=self)

@inject.autoparams("storage_factory")
def get_storage(
self, storage_factory: "IStorageFactory", credentials: Optional["ProviderCredentials"] = None
) -> "IStorage":
"""Return the storage manager for the provider."""
azure_configuration = {
"type": "azureblob",
"endpoint": self.endpoint,
}

def create_renku_storage_azure_uri(uri: str) -> str:
"""Create an Azure URI to work with the Renku storage handler."""
_, _, container, path = parse_azure_uri(uri=uri)

return f"azure://{container}/{path}"

if not credentials:
credentials = self.get_credentials()
prompt_for_credentials(credentials)

return storage_factory.get_storage(
storage_scheme="azure",
provider=self,
credentials=credentials,
configuration=azure_configuration,
uri_convertor=create_renku_storage_azure_uri,
)

@property
def account(self) -> str:
"""Azure account name."""
return self._account

@property
def endpoint(self) -> str:
"""Return Azure container endpoint."""
return self._endpoint

@property
def container(self) -> str:
"""Return Azure container name."""
return self._container

def on_create(self, dataset: "Dataset") -> None:
"""Hook to perform provider-specific actions on a newly-created dataset."""
credentials = self.get_credentials()
prompt_for_credentials(credentials)
storage = self.get_storage(credentials=credentials)

# NOTE: The underlying rclone tool cannot tell if a directory within an Azure container exists or not
if not storage.exists(self.uri):
raise errors.ParameterError(f"Azure container '{self.container}' doesn't exists.")

project_context.repository.add_ignored_pattern(pattern=str(dataset.get_datadir()))


class AzureCredentials(ProviderCredentials):
"""Azure-specific credentials."""

def __init__(self, provider: AzureProvider):
super().__init__(provider=provider)

# NOTE: Set account name so that users don't need to re-enter it
self.data[get_canonical_key("Account")] = self.provider.account

@staticmethod
def get_credentials_names() -> Tuple[str, ...]:
"""Return a tuple of the required credentials for a provider."""
return "Account", "Key"

@property
def provider(self) -> AzureProvider:
"""Return the associated provider instance."""
return cast(AzureProvider, self._provider)

def get_credentials_section_name(self) -> str:
"""Get section name for storing credentials.
NOTE: This methods should be overridden by subclasses to allow multiple credentials per providers if needed.
"""
return f"{self.provider.account}.{self.provider.endpoint}"


def parse_azure_uri(uri: str) -> Tuple[str, str, str, str]:
"""Extract account, endpoint, container, and path within the container from a given URI.
NOTE: We support azure://<account-name>.<endpoint>/<container-name>/<path> or
azure://<account-name>/<container-name>/<path>.
"""
parsed_uri = urllib.parse.urlparse(uri)

account, _, endpoint = parsed_uri.netloc.partition(".")

if parsed_uri.scheme.lower() != "azure" or not account:
raise errors.ParameterError(
f"Invalid Azure URI: {uri}. Valid format is 'azure://<account-name>.<endpoint>/<container-name>/<path>' or "
"azure://<account-name>/<container-name>/<path>"
)

endpoint = endpoint.lower() or "blob.core.windows.net"

path = parsed_uri.path.strip("/")
container, _, path = path.partition("/")

return account, endpoint, container, path.strip("/")
54 changes: 54 additions & 0 deletions renku/core/dataset/providers/cloud.py
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common functionality for cloud storage providers."""

import re
from pathlib import Path
from typing import List

from renku.core import errors
from renku.core.dataset.providers.api import AddProviderInterface, StorageProviderInterface
from renku.core.dataset.providers.models import DatasetAddAction, DatasetAddMetadata
from renku.domain_model.dataset import RemoteEntity
from renku.domain_model.project_context import project_context


class CloudStorageAddProvider(AddProviderInterface, StorageProviderInterface):
"""Common AddProviderInterface for cloud providers."""

def add(self, uri: str, destination: Path, **kwargs) -> List["DatasetAddMetadata"]:
"""Add files from a URI to a dataset."""
if re.search(r"[*?]", uri):
raise errors.ParameterError("Wildcards like '*' or '?' are not supported for cloud storage URIs.")

storage = self.get_storage()

destination_path_in_repo = Path(destination).relative_to(project_context.repository.path)
hashes = storage.get_hashes(uri=uri)
return [
DatasetAddMetadata(
entity_path=destination_path_in_repo / hash.path,
url=hash.base_uri,
action=DatasetAddAction.REMOTE_STORAGE,
based_on=RemoteEntity(checksum=hash.hash if hash.hash else "", url=hash.base_uri, path=hash.path),
source=Path(hash.base_uri),
destination=destination_path_in_repo,
provider=self,
)
for hash in hashes
]

0 comments on commit 47fa194

Please sign in to comment.