Skip to content

Commit

Permalink
feat(dataset): dataset creation with s3 storage backend (#3047)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Aug 8, 2022
1 parent e313f3a commit 316f7a6
Show file tree
Hide file tree
Showing 37 changed files with 740 additions and 90 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_deploy.yml
Expand Up @@ -724,7 +724,7 @@ jobs:
- name: Install system packages
run: |
sudo apt-get update -y
sudo apt-get install -y libyaml-0-2 libyaml-dev
sudo apt-get install -y libyaml-0-2 libyaml-dev rclone
- uses: actions/cache@master
id: dependency-cache
with:
Expand Down Expand Up @@ -815,7 +815,7 @@ jobs:
- name: Install system packages
run: |
sudo apt-get update -y
sudo apt-get install -y libyaml-0-2 libyaml-dev
sudo apt-get install -y libyaml-0-2 libyaml-dev rclone
- uses: actions/cache@master
id: dependency-cache
with:
Expand Down Expand Up @@ -914,6 +914,7 @@ jobs:
python -m pip install .[all]
git config --global --add user.name "Renku Bot"
git config --global --add user.email "renku@datascience.ch"
brew install rclone
- name: Test with pytest
env:
POETRY_VIRTUALENVS_CREATE: false
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -165,6 +165,7 @@ preprocessing
programmatically
py
pyshacl
rclone
rdflib
readme
rebase
Expand Down
3 changes: 3 additions & 0 deletions renku/command/command_builder/database.py
Expand Up @@ -33,11 +33,13 @@
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.interface.plan_gateway import IPlanGateway
from renku.core.interface.project_gateway import IProjectGateway
from renku.core.interface.storage import IStorageFactory
from renku.infrastructure.gateway.activity_gateway import ActivityGateway
from renku.infrastructure.gateway.database_gateway import DatabaseGateway
from renku.infrastructure.gateway.dataset_gateway import DatasetGateway
from renku.infrastructure.gateway.plan_gateway import PlanGateway
from renku.infrastructure.gateway.project_gateway import ProjectGateway
from renku.infrastructure.storage.factory import StorageFactory

if TYPE_CHECKING:
from renku.domain_model.project import Project
Expand Down Expand Up @@ -77,6 +79,7 @@ def _injection_pre_hook(self, builder: Command, context: dict, *args, **kwargs)
context["constructor_bindings"][IDatabaseGateway] = lambda: DatabaseGateway()
context["constructor_bindings"][IDatasetGateway] = lambda: DatasetGateway()
context["constructor_bindings"][IProjectGateway] = lambda: ProjectGateway()
context["constructor_bindings"][IStorageFactory] = lambda: StorageFactory

if int(os.environ.get("RENKU_SKIP_MIN_VERSION_CHECK", "0")) == 1:
# NOTE: Used for unit tests
Expand Down
1 change: 1 addition & 0 deletions renku/command/format/datasets.py
Expand Up @@ -101,5 +101,6 @@ def json(datasets, **kwargs):
"title": ("title", "title"),
"keywords": ("keywords_csv", "keywords"),
"description": ("short_description", "description"),
"storage": ("storage", None),
"datadir": ("datadir_path", "datadir"),
}
2 changes: 1 addition & 1 deletion renku/core/dataset/context.py
Expand Up @@ -62,7 +62,7 @@ def __enter__(self):
# NOTE: Don't update provenance when creating here because it will be updated later
self.dataset = create_dataset(name=self.name, update_provenance=False, datadir=self.datadir)
elif self.create:
raise errors.DatasetExistsError('Dataset exists: "{}".'.format(self.name))
raise errors.DatasetExistsError(self.name)

return self.dataset

Expand Down
28 changes: 17 additions & 11 deletions renku/core/dataset/dataset.py
Expand Up @@ -41,7 +41,7 @@
from renku.core.util.datetime8601 import local_now
from renku.core.util.dispatcher import get_client, get_database
from renku.core.util.git import clone_repository, get_cache_directory_for_repository, get_git_user
from renku.core.util.metadata import is_external_file
from renku.core.util.metadata import is_external_file, read_credentials, store_credentials
from renku.core.util.os import delete_file, get_safe_relative_path
from renku.core.util.tabulate import tabulate
from renku.core.util.urls import get_slug
Expand Down Expand Up @@ -94,6 +94,7 @@ def create_dataset(
images: Optional[List[ImageRequestModel]] = None,
update_provenance: bool = True,
custom_metadata: Optional[Dict[str, Any]] = None,
storage: Optional[str] = None,
datadir: Optional[Path] = None,
):
"""Create a dataset.
Expand All @@ -108,6 +109,7 @@ def create_dataset(
update_provenance(bool, optional): Whether to add this dataset to dataset provenance
(Default value = True).
custom_metadata(Optional[Dict[str, Any]], optional): Custom JSON-LD metadata (Default value = None).
storage(Optional[str], optional): Backend storage's URI (Default value = None).
datadir(Optional[Path]): Dataset's data directory (Default value = None).
Returns:
Expand All @@ -124,22 +126,21 @@ def create_dataset(

if not is_dataset_name_valid(name):
valid_name = get_slug(name, lowercase=False)
raise errors.ParameterError(f'Dataset name "{name}" is not valid (Hint: "{valid_name}" is valid).')
raise errors.ParameterError(f"Dataset name '{name}' is not valid (Hint: '{valid_name}' is valid).")

datasets_provenance = DatasetsProvenance()

if datasets_provenance.get_by_name(name=name):
raise errors.DatasetExistsError(f"Dataset exists: '{name}'")
raise errors.DatasetExistsError(name)

if not title:
title = name

keywords = keywords or []

annotations = None

if custom_metadata:
annotations = [Annotation(id=Annotation.generate_id(), source="renku", body=custom_metadata)]
annotations = (
[Annotation(id=Annotation.generate_id(), source="renku", body=custom_metadata)] if custom_metadata else None
)

if datadir:
try:
Expand All @@ -156,12 +157,17 @@ def create_dataset(
keywords=keywords,
project_id=client.project.id,
annotations=annotations,
storage=storage,
datadir=datadir,
)

if images:
set_dataset_images(client, dataset, images)

if storage:
provider = ProviderFactory.get_create_provider(uri=storage)
provider.on_create(dataset=dataset)

if update_provenance:
datasets_provenance.add_or_update(dataset)

Expand Down Expand Up @@ -414,15 +420,15 @@ def export_dataset(name, provider_name, tag, client_dispatcher: IClientDispatche
exporter = provider.get_exporter(dataset=dataset, tag=selected_tag, **kwargs)

if exporter.requires_access_token():
access_token = client.get_value(provider_name, config_key_secret)
access_token = read_credentials(section=provider_name, key=config_key_secret)

if access_token is None:
access_token = prompt_access_token(exporter)

if access_token is None or len(access_token) == 0:
raise errors.InvalidAccessToken()

client.set_value(provider_name, config_key_secret, access_token, global_only=True)
store_credentials(section=provider_name, key=config_key_secret, value=access_token)

exporter.set_access_token(access_token)

Expand Down Expand Up @@ -497,7 +503,7 @@ def remove_files(dataset):
provider = ProviderFactory.get_import_provider(uri)

try:
importer = provider.get_importer(uri, gitlab_token=gitlab_token, **kwargs)
importer = provider.get_importer(gitlab_token=gitlab_token, **kwargs)
provider_dataset: ProviderDataset = importer.fetch_provider_dataset()
except KeyError as e:
raise errors.ParameterError(f"Could not process '{uri}'.\nUnable to fetch metadata: {e}")
Expand Down Expand Up @@ -614,7 +620,7 @@ def update_datasets(
except errors.DatasetProviderNotFound:
continue

record = provider.get_importer(uri)
record = provider.get_importer()

if isinstance(provider, RenkuProvider) and dataset.version is not None:
tags = dataset_gateway.get_all_tags(dataset=dataset)
Expand Down
2 changes: 1 addition & 1 deletion renku/core/dataset/providers/__init__.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 - Swiss Data Science Center (SDSC)
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
Expand Down
104 changes: 94 additions & 10 deletions renku/core/dataset/providers/api.py
@@ -1,4 +1,4 @@
# Copyright 2020 - Swiss Data Science Center (SDSC)
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
Expand All @@ -16,11 +16,14 @@
"""API for providers."""

import abc
from collections import UserDict
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from renku.core import errors
from renku.core.util.metadata import get_canonical_key, read_credentials, store_credentials
from renku.core.util.util import NO_VALUE, NoValueType

if TYPE_CHECKING:
from renku.core.dataset.providers.models import (
Expand Down Expand Up @@ -51,6 +54,9 @@ class ProviderApi(abc.ABC):
priority: Optional[ProviderPriority] = None
name: Optional[str] = None

def __init__(self, uri: Optional[str], **kwargs):
self._uri: str = uri or ""

def __init_subclass__(cls, **kwargs):
for required_property in ("priority", "name"):
if getattr(cls, required_property, None) is None:
Expand All @@ -70,6 +76,11 @@ def supports_add() -> bool:
"""Whether this provider supports adding data to datasets."""
return False

@staticmethod
def supports_create() -> bool:
"""Whether this provider supports creating a dataset."""
return False

@staticmethod
def supports_export() -> bool:
"""Whether this provider supports dataset export."""
Expand All @@ -85,14 +96,6 @@ def add(client: "LocalClient", uri: str, destination: Path, **kwargs) -> List["D
"""Add files from a URI to a dataset."""
raise NotImplementedError

def get_exporter(self, dataset: "Dataset", *, tag: Optional["DatasetTag"], **kwargs) -> "ExporterApi":
"""Get export manager."""
raise NotImplementedError

def get_importer(self, uri, **kwargs) -> "ImporterApi":
"""Get import manager."""
raise NotImplementedError

@staticmethod
def get_add_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for add."""
Expand All @@ -108,6 +111,23 @@ def get_import_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for import."""
return []

@property
def uri(self) -> str:
"""Return provider's URI."""
return self._uri

def get_exporter(self, dataset: "Dataset", *, tag: Optional["DatasetTag"], **kwargs) -> "ExporterApi":
"""Get export manager."""
raise NotImplementedError

def get_importer(self, **kwargs) -> "ImporterApi":
"""Get import manager."""
raise NotImplementedError

def on_create(self, dataset: "Dataset") -> None:
"""Hook to perform provider-specific actions on a newly-created dataset."""
raise NotImplementedError


class ImporterApi(abc.ABC):
"""Interface defining importer methods."""
Expand Down Expand Up @@ -215,3 +235,67 @@ def get_access_token_url(self) -> str:
def export(self, **kwargs) -> str:
"""Execute export process."""
raise NotImplementedError


class ProviderCredentials(abc.ABC, UserDict):
"""Credentials of a provider.
NOTE: An empty string, "", is a valid value. ``NO_VALUE`` means that the value for a key is not set.
"""

def __init__(self, provider: ProviderApi):
super().__init__()
self._provider: ProviderApi = provider
self.data: Dict[str, Union[str, NoValueType]] = {
key: NO_VALUE for key in self.get_canonical_credentials_names()
}

@staticmethod
@abc.abstractmethod
def get_credentials_names() -> Tuple[str, ...]:
"""Return a tuple of the required credentials for a provider."""
raise NotImplementedError

@property
def provider(self):
"""Return the associated provider instance."""
return self._provider

def get_credentials_names_with_no_value(self) -> Tuple[str, ...]:
"""Return a tuple of credential keys that don't have a valid value."""
return tuple(key for key, value in self.items() if value is NO_VALUE)

def get_canonical_credentials_names(self) -> Tuple[str, ...]:
"""Return canonical credentials names that can be used as config keys."""
return tuple(get_canonical_key(key) for key in self.get_credentials_names())

def get_credentials_section_name(self) -> str:
"""Get section name for storing credentials.
NOTE: This methods should be overridden by subclasses to allow multiple credentials per providers if needed.
"""
return self.provider.name.lower() # type: ignore

def read(self) -> Dict[str, Union[str, NoValueType]]:
"""Read credentials from the config and return them. Set non-existing values to None."""
section = self.get_credentials_section_name()

def read_and_convert_credentials(key) -> Union[str, NoValueType]:
value = read_credentials(section=section, key=key)
if value is None:
return NO_VALUE

return value

data = {key: read_and_convert_credentials(key) for key in self.get_canonical_credentials_names()}
self.data.update(data)

return self.data

def store(self) -> None:
"""Store credentials globally."""
section = self.get_credentials_section_name()

for key, value in self.items():
if value is not None:
store_credentials(section=section, key=key, value=value)

0 comments on commit 316f7a6

Please sign in to comment.