Skip to content
This repository was archived by the owner on Sep 26, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7208973
Add upload and download functions
tkilias Feb 16, 2021
5367987
Refactor out upload and download on fileobj and use NamedTemporaryFil…
tkilias Feb 16, 2021
6980350
Cleanup
tkilias Feb 16, 2021
4930106
Improve parameter names and add tests
tkilias Feb 17, 2021
c02a252
Add tests for bucketfs_utils.py and some fixes
tkilias Feb 17, 2021
5e625b4
Refactoring
tkilias Mar 2, 2021
0fb0e9d
Add docstrings to some classes and functions
tkilias Mar 2, 2021
342d1c5
Refactoring and more docstrings
tkilias Mar 3, 2021
4b92632
Harden url generation with url encoding and more tests
tkilias Mar 3, 2021
a8ea397
Harden path generation using pathlib and add more checks for Config i…
tkilias Mar 3, 2021
a7517f2
Hardening Config objects by runtime type checking and read only prope…
tkilias Mar 3, 2021
342af5d
Introduce runtime type checking to bucketfs_utils.py
tkilias Mar 3, 2021
76ef23b
Fix port type in test_upload_download.py
tkilias Mar 3, 2021
b9a2220
Add --cached to git diff of setup.py in check_setup_py.yaml
tkilias Mar 3, 2021
b1b4ce0
Update setup.py
tkilias Mar 3, 2021
9ee3f22
Extract BucketConfig and BucketFSConnectionConfig from bucket_config.py
tkilias Mar 3, 2021
2025491
Apply suggestions from code review
tkilias Mar 9, 2021
8eb56ba
Fix bucketfs archive file extensions
tkilias Mar 9, 2021
4f64bbe
Refactor tests
tkilias Mar 9, 2021
1e53862
Fix review suggestions in bucketfs_utils.py
tkilias Mar 9, 2021
3721e93
Correct typos in bucketfs_utils.py
tkilias Mar 9, 2021
7e4f754
Add docstrings and type annotations to return types to upload.py. upl…
tkilias Mar 19, 2021
5557261
Add docstrings and type annotations to return types to download.py. F…
tkilias Mar 19, 2021
afac65d
Apply suggestions from code review
tkilias Mar 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check_setup_py.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Show changes on working copy
run: git status --porcelain=v1 -uno
- name: Show diff on working copy
run: git diff
run: git diff --cached; cat setup.py
- name: Check if setup.py changed
run: |
[ -z "$(git status --porcelain=v1 -uno 2>/dev/null)" ]
Expand Down
25 changes: 25 additions & 0 deletions exasol_bucketfs_utils_python/bucket_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typeguard import typechecked

from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig


class BucketConfig:
"""
The BucketConfig contains all required information about a BucketFS
to access it either via HTTP[S] or in the file system inside of UDFs.
"""

@typechecked(always=True)
def __init__(self, bucket_name: str, bucketfs_config: BucketFSConfig):
if bucket_name == "":
raise ValueError("Bucket name can't be an empty string")
self._bucket_name = bucket_name
self._bucketfs_config = bucketfs_config

@property
def bucket_name(self) -> str:
return self._bucket_name

@property
def bucketfs_config(self) -> BucketFSConfig:
return self._bucketfs_config
31 changes: 31 additions & 0 deletions exasol_bucketfs_utils_python/bucketfs_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Union

from typeguard import typechecked

from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig


class BucketFSConfig:
"""
The BucketFSConfig contains all required information
to access it either via HTTP[S] or in the file system inside of UDFs.
The BucketFSConnectionConfig is here by optional,
because in UDF we sometimes don't want to use HTTP[S].
"""

@typechecked(always=True)
def __init__(self, bucketfs_name: str, connection_config: Union[BucketFSConnectionConfig, None] = None):
self._connection_config = connection_config
if bucketfs_name == "":
raise ValueError("BucketFS name can't be an empty string")
self._bucketfs_name = bucketfs_name

@property
def bucketfs_name(self) -> str:
return self._bucketfs_name

@property
def connection_config(self) -> Union[BucketFSConnectionConfig, None]:
return self._connection_config


42 changes: 42 additions & 0 deletions exasol_bucketfs_utils_python/bucketfs_connection_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typeguard import typechecked


class BucketFSConnectionConfig:
"""
The BucketFSConnectionConfig contains all necessary information
to connect to the BucketFS Server via HTTP[s]
"""

@typechecked(always=True)
def __init__(self, host: str, port: int, user: str, pwd: str, is_https=False):
self._is_https = is_https
if host == "":
raise ValueError("Host can't be an empty string")
self._host = host
self._port = port
if user not in ["w", "r"]: # The BucketFs currently supports only these two users
raise ValueError(f"User can only be, 'w' (read-write access) or 'r' (read-only access), but got {user}")
self._user = user
if pwd == "":
raise ValueError("Password can't be an empty string")
self._pwd = pwd

@property
def is_https(self) -> bool:
return self._is_https

@property
def host(self) -> str:
return self._host

@property
def port(self) -> int:
return self._port

@property
def user(self) -> str:
return self._user

@property
def pwd(self) -> str:
return self._pwd
130 changes: 130 additions & 0 deletions exasol_bucketfs_utils_python/bucketfs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import urllib.parse
from pathlib import PurePosixPath
from typing import Union

from requests.auth import HTTPBasicAuth
from typeguard import typechecked

from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig

ARCHIVE_EXTENSIONS = [".tar.gz", ".tgz", ".zip", ".tar"]


def _encode_url_part(part: str) -> str:
urlencoded = urllib.parse.quote(part)
return urlencoded


def _correct_path_in_bucket_for_archives(path_in_bucket: PurePosixPath) -> PurePosixPath:
for extension in ARCHIVE_EXTENSIONS:
if path_in_bucket.name.endswith(extension):
path_in_bucket = PurePosixPath(path_in_bucket.parent,
path_in_bucket.name[:-len(extension)])
break
return path_in_bucket


def _make_path_relative(path_in_bucket: Union[None, str, PurePosixPath]) -> PurePosixPath:
path_in_bucket = PurePosixPath(path_in_bucket)
if path_in_bucket.is_absolute():
path_in_bucket = path_in_bucket.relative_to(PurePosixPath("/"))
return path_in_bucket


@typechecked(always=True)
def generate_bucketfs_udf_path(bucketfs_config: BucketFSConfig) -> PurePosixPath:
"""
This function generates the path where UDFs can access the content of a BucketFS in their file system
:param bucketfs_config: Config of the BucketFS, the BucketFSConnectionConfig in the BucketFSConfig can be None
:return: Path of the given BucketFS in the file system of the UDFs
"""
path = PurePosixPath("/buckets/", bucketfs_config.bucketfs_name)
return path


@typechecked(always=True)
def generate_bucket_udf_path(bucket_config: BucketConfig,
path_in_bucket: Union[None, str, PurePosixPath]) -> PurePosixPath:
"""
This function generates the path where UDFs can access the content of a bucket or
the given path in a bucket in their file system
:param bucket_config: Config of the Bucket, the BucketFSConnectionConfig in the BucketFSConfig can be None
:param path_in_bucket: If not None, path_in_bucket gets concatenated to the path of the bucket
:return: Path of the bucket or the file in the Bucket in the file system of UDFs
"""
bucketfs_path = generate_bucketfs_udf_path(bucket_config.bucketfs_config)
path = PurePosixPath(bucketfs_path, bucket_config.bucket_name)

if path_in_bucket is not None:
path_in_bucket = _make_path_relative(path_in_bucket)
path_in_bucket = _correct_path_in_bucket_for_archives(path_in_bucket)
else:
path_in_bucket = ""
path = PurePosixPath(path, path_in_bucket)
return path


@typechecked(always=True)
def generate_bucketfs_http_url(bucketfs_config: BucketFSConfig,
with_credentials: bool = False) -> urllib.parse.ParseResult:
"""
This function generates an HTTP[s] url for the given BucketFSConfig
with or without basic authentication (a template: http[s]://user:password@host:port)
:param bucketfs_config: A BucketFSConfig with a non None BucketFSConnectionConfig
:param with_credentials: If True, this function generates a url with basic authentication, default False
:return: HTTP[S] URL of the BucketFS
"""
if bucketfs_config.connection_config is None:
raise ValueError("bucket_config.bucketfs_config.connection_config can't be None for this operation")
if with_credentials:
encoded_password = _encode_url_part(bucketfs_config.connection_config.pwd)
encoded_user = _encode_url_part(bucketfs_config.connection_config.user)
credentials = f"{encoded_user}:{encoded_password}@"
else:
credentials = ""
if bucketfs_config.connection_config.is_https:
protocol = "https"
else:
protocol = "http"
encoded_host = _encode_url_part(bucketfs_config.connection_config.host)
url = f"{protocol}://{credentials}" \
f"{encoded_host}:{bucketfs_config.connection_config.port}"
urlparse = urllib.parse.urlparse(url)
return urlparse


@typechecked(always=True)
def generate_bucket_http_url(bucket_config: BucketConfig, path_in_bucket: Union[None, str, PurePosixPath],
with_credentials: bool = False) -> urllib.parse.ParseResult:
"""
This function generates an HTTP[s] url for the given bucket or the path in the bucket
with or without basic authentication (a template: http[s]://user:password@host:port)
:param bucket_config: Config of the Bucket, the BucketFSConnectionConfig in the BucketFSConfig must be not None
:param path_in_bucket: If not None, path_in_bucket gets concatenated to the path of the bucket
:param with_credentials: If True, this function generates a url with basic authentication, default False
:return: HTTP[S] URL of the bucket or the path in the bucket
"""
url = generate_bucketfs_http_url(bucket_config.bucketfs_config, with_credentials)
if path_in_bucket is not None:
path_in_bucket = _make_path_relative(path_in_bucket)
else:
path_in_bucket = ""
encoded_bucket_and_path_in_bucket = \
"/".join(
_encode_url_part(part)
for part in
PurePosixPath(bucket_config.bucket_name, path_in_bucket).parts)
url = urllib.parse.urljoin(url.geturl(), encoded_bucket_and_path_in_bucket)
urlparse = urllib.parse.urlparse(url)
return urlparse


@typechecked(always=True)
def create_auth_object(bucket_config: BucketConfig) -> HTTPBasicAuth:
if bucket_config.bucketfs_config.connection_config is None:
raise TypeError("bucket_config.bucketfs_config.connection_config can't be None for this operation")
auth = HTTPBasicAuth(
bucket_config.bucketfs_config.connection_config.user,
bucket_config.bucketfs_config.connection_config.pwd)
return auth
73 changes: 73 additions & 0 deletions exasol_bucketfs_utils_python/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import typing
from pathlib import Path
from tempfile import NamedTemporaryFile

import joblib
import requests

from exasol_bucketfs_utils_python import bucketfs_utils
from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_utils import generate_bucket_http_url


def download_from_bucketfs_to_file(bucket_config: BucketConfig, bucket_file_path: str, local_file_path: Path):
"""
Download a file from the specified path in the bucket in the BucketFs and save as a local file
:param bucket_config: BucketConfig for the bucket to download from
:param bucket_file_path: Path in the bucket to download the file from
:param local_file_path: File path to the local file to store the downloaded data
:return: None
"""
with local_file_path.open("wb") as f:
download_from_bucketfs_to_fileobj(bucket_config, bucket_file_path, f)


def download_from_bucketfs_to_fileobj(bucket_config: BucketConfig, bucket_file_path: str, fileobj: typing.IO):
"""
Download a file from the specified path in the bucket in the BucketFs into a given
`file object <https://docs.python.org/3/glossary.html#term-file-object>`_
:param bucket_config: BucketConfig for the bucket to download from
:param bucket_file_path: Path in the bucket to download the file from
:param fileobj: File object where the data of the file in the BucketFS is downloaded to
:return: None
"""
if bucket_file_path is None:
raise ValueError("bucket_file_path can't be None")
url = generate_bucket_http_url(bucket_config, bucket_file_path)
auth = bucketfs_utils.create_auth_object(bucket_config)
with requests.get(url.geturl(), stream=True, auth=auth) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
fileobj.write(chunk)


def download_from_bucketfs_to_string(bucket_config: BucketConfig, bucket_file_path: str) -> str:
"""
Download a file from the specified path in the bucket in the BucketFs into a string
:param bucket_config: BucketConfig for the bucket to download from
:param bucket_file_path: Path in the bucket to download the file from
:return: The content of the file in the BucketFS as string
"""
if bucket_file_path is None:
raise ValueError("bucket_file_path can't be None")
url = generate_bucket_http_url(bucket_config, bucket_file_path)
auth = bucketfs_utils.create_auth_object(bucket_config)
response = requests.get(url.geturl(), auth=auth)
response.raise_for_status()
return response.text


def download_object_from_bucketfs_via_joblib(bucket_config: BucketConfig, bucket_file_path: str)-> typing.Any:
"""
Download a file from the specified path in the bucket in the BucketFs and deserialize it via
`joblib.load <https://joblib.readthedocs.io/en/latest/generated/joblib.load.html#>`_
:param bucket_config: BucketConfig for the bucket to download from
:param bucket_file_path: Path in the bucket to download the file from
:return: The deserialized object which was downloaded from the BucketFS
"""
with NamedTemporaryFile() as temp_file:
download_from_bucketfs_to_fileobj(bucket_config, bucket_file_path, temp_file)
temp_file.flush()
temp_file.seek(0)
obj = joblib.load(temp_file)
return obj
Loading