Skip to content
This repository was archived by the owner on Sep 26, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,6 @@ poetry.lock
# Sphinx
doc/_build
doc/api

# Language container
.build_output
4 changes: 4 additions & 0 deletions doc/changes/changes_0.2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ Code name: t.b.d

## Bug Fixes

- #54: Removed PosixPath conversion from alter session string

## Documentation

## Refactoring

- #58: Added Python type hints

## Security

- #51: Added fixed numpy version build from source because of Buffer Overflow vulnerability in NumPy
Expand Down
47 changes: 31 additions & 16 deletions exasol_bucketfs_utils_python/abstract_bucketfs_location.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,67 @@
import typing
from abc import ABC, abstractmethod
from typing import Any
from pathlib import Path
from typing import Any, Tuple, IO
from pathlib import PurePosixPath, Path
from urllib.parse import ParseResult


class AbstractBucketFSLocation(ABC):
"""
Abstract class for a BucketFSLocation for uploading and downloading strings, fileobjects and joblib objects.
Also able to read files from the BucketFS directly, if called from inside a UDF.
Abstract class for a BucketFSLocation for uploading and downloading strings,
fileobjects and joblib objects. Also able to read files from the BucketFS
directly, if called from inside a UDF.
"""
@abstractmethod
def download_from_bucketfs_to_string(self, bucket_file_path: str) -> str:
def download_from_bucketfs_to_string(self,
bucket_file_path: str) -> str:
pass

@abstractmethod
def download_object_from_bucketfs_via_joblib(self, bucket_file_path: str) -> Any:
def download_object_from_bucketfs_via_joblib(self,
bucket_file_path: str) -> Any:
pass

@abstractmethod
def upload_string_to_bucketfs(self, bucket_file_path: str, string: str):
def upload_string_to_bucketfs(self,
bucket_file_path: str,
string: str) -> \
Tuple[ParseResult, PurePosixPath]:
pass

@abstractmethod
def upload_object_to_bucketfs_via_joblib(self, object: Any,
def upload_object_to_bucketfs_via_joblib(self,
object: Any,
bucket_file_path: str,
**kwargs):
**kwargs) -> \
Tuple[ParseResult, PurePosixPath]:
pass

@abstractmethod
def upload_fileobj_to_bucketfs(self,
fileobj: typing.IO,
bucket_file_path: str):
fileobj: IO,
bucket_file_path: str) -> \
Tuple[ParseResult, PurePosixPath]:
pass

# TODO add missing upload/download functions

@abstractmethod
def read_file_from_bucketfs_to_string(self, bucket_file_path: str) -> str:
def read_file_from_bucketfs_to_string(self,
bucket_file_path: str) -> str:
pass

@abstractmethod
def read_file_from_bucketfs_to_file(self, bucket_file_path: str, local_file_path: Path):
def read_file_from_bucketfs_to_file(self,
bucket_file_path: str,
local_file_path: Path) -> None:
pass

@abstractmethod
def read_file_from_bucketfs_to_fileobj(self, bucket_file_path: str, fileobj: typing.IO):
def read_file_from_bucketfs_to_fileobj(self,
bucket_file_path: str,
fileobj: IO) -> None:
pass

@abstractmethod
def read_file_from_bucketfs_via_joblib(self, bucket_file_path: str) -> typing.Any:
def read_file_from_bucketfs_via_joblib(self,
bucket_file_path: str) -> Any:
pass
10 changes: 6 additions & 4 deletions exasol_bucketfs_utils_python/bucketfs_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Union

from typeguard import typechecked

from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import \
BucketFSConnectionConfig


class BucketFSConfig:
Expand All @@ -14,7 +13,10 @@ class BucketFSConfig:
"""

@typechecked(always=True)
def __init__(self, bucketfs_name: str, connection_config: Union[BucketFSConnectionConfig, None] = None):
def __init__(
self,
bucketfs_name: str,
connection_config: Union[BucketFSConnectionConfig, None] = None):
self._connection_config = connection_config
if bucketfs_name == "":
raise ValueError("BucketFS name can't be an empty string")
Expand Down
6 changes: 4 additions & 2 deletions exasol_bucketfs_utils_python/bucketfs_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ class BucketFSConnectionConfig:
"""

@typechecked(always=True)
def __init__(self, host: str, port: int, user: str, pwd: str, is_https=False):
def __init__(self, host: str, port: int,
user: str, pwd: str, is_https: bool = False):
self._is_https = is_https
if host == "":
raise ValueError("Host can't be an empty string")
self._host = host
self._port = port
if user not in ["w", "r"]: # The BucketFs currently supports only these two users
raise ValueError(f"User can only be, 'w' (read-write access) or 'r' (read-only access), but got {user}")
raise ValueError(f"User can only be, 'w' (read-write access) or "
f"'r' (read-only access), but got {user}")
self._user = user
if pwd == "":
raise ValueError("Password can't be an empty string")
Expand Down
54 changes: 34 additions & 20 deletions exasol_bucketfs_utils_python/bucketfs_factory.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import urllib.parse
from pathlib import PurePosixPath
from typing import Optional

from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig

from exasol_bucketfs_utils_python.bucketfs_connection_config import \
BucketFSConnectionConfig
from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import LocalFSMockBucketFSLocation
from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import \
LocalFSMockBucketFSLocation


class BucketFSFactory:
"""
Creates a BucketFSLocation given an url.
"""
def create_bucketfs_location(self, url: str, user: str, pwd: str, base_path: Optional[PurePosixPath] = None):
def create_bucketfs_location(self, url: str, user: str, pwd: str,
base_path: Optional[PurePosixPath] = None) -> \
BucketFSLocation:
"""
Create BucketFSLocation from the the url given. If the url has the schema http:// or https://,
this function creates a real BucketFSLocation for a url scheme file:/// we create a LocalFSMockBucketFSLocation.
For url with http:// or https:// schema you also need to provide the bucketfs-name via a url parameter.
A url would look like the following: http[s]://<host>:<port>/<bucket_name>/<path_in_bucket>;<bucketfs_name>
Create BucketFSLocation from the url given.
If the url has the schema http:// or https://, this function creates a
real BucketFSLocation for a url scheme file:/// we create a
LocalFSMockBucketFSLocation. For url with http:// or https:// schema
you also need to provide the bucketfs-name via an url parameter. An url
would look like the following:
http[s]://<host>:<port>/<bucket_name>/<path_in_bucket>;<bucketfs_name>
:param url:
:param user:
:param pwd:
Expand All @@ -29,26 +34,35 @@ def create_bucketfs_location(self, url: str, user: str, pwd: str, base_path: Opt
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme == "http" or parsed_url.scheme == "https":
is_https = parsed_url.scheme == "https"
connection_config = BucketFSConnectionConfig(host=parsed_url.hostname,
port=parsed_url.port,
user=user, pwd=pwd,
is_https=is_https)
connection_config = BucketFSConnectionConfig(
host=parsed_url.hostname,
port=parsed_url.port,
user=user,
pwd=pwd,
is_https=is_https)
url_path = PurePosixPath(parsed_url.path)
bucket_name = url_path.parts[1]
base_path_in_bucket = PurePosixPath(url_path.parts[2]).joinpath(*url_path.parts[3:])
base_path_in_bucket = PurePosixPath(
url_path.parts[2]).joinpath(*url_path.parts[3:])
if base_path is not None:
base_path_in_bucket = PurePosixPath(base_path_in_bucket, base_path)
base_path_in_bucket = PurePosixPath(
base_path_in_bucket, base_path)
bucketfs_name = parsed_url.params
bucketfs_config = BucketFSConfig(bucketfs_name, connection_config=connection_config)
bucket_config = BucketConfig(bucket_name=bucket_name, bucketfs_config=bucketfs_config)
bucketfs_location = BucketFSLocation(bucket_config, base_path_in_bucket)
bucketfs_config = BucketFSConfig(
bucketfs_name, connection_config=connection_config)
bucket_config = BucketConfig(
bucket_name=bucket_name, bucketfs_config=bucketfs_config)
bucketfs_location = BucketFSLocation(
bucket_config, base_path_in_bucket)
return bucketfs_location
elif parsed_url.scheme == "file":
if parsed_url.netloc != '':
raise ValueError(f"URL '{url}' with file:// schema and netloc not support.")
raise ValueError(f"URL '{url}' with file:// schema "
f"and netloc not support.")
base_path_in_bucket = PurePosixPath(parsed_url.path)
if base_path is not None:
base_path_in_bucket = PurePosixPath(base_path_in_bucket, base_path)
base_path_in_bucket = PurePosixPath(
base_path_in_bucket, base_path)
bucketfs_location = LocalFSMockBucketFSLocation(base_path_in_bucket)
return bucketfs_location
else:
Expand Down
81 changes: 51 additions & 30 deletions exasol_bucketfs_utils_python/bucketfs_location.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,113 @@
import typing
from typing import Any, Tuple, IO
from pathlib import PurePosixPath, Path
from typing import Any

from urllib.parse import ParseResult
from exasol_bucketfs_utils_python import download, upload
from exasol_bucketfs_utils_python import load_file_from_local_fs as from_BFS
from exasol_bucketfs_utils_python.bucket_config import BucketConfig

from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation
from exasol_bucketfs_utils_python.abstract_bucketfs_location import \
AbstractBucketFSLocation


class BucketFSLocation(AbstractBucketFSLocation):
"""
BucketFSLocation implements AbstractBucketFSLocation.
BucketFSLocation is used to upload fileobjects, strings or joblib objects to the BucketFS given a path and the object,
or to download objects into strings, fileobjects or joblib objects from the BucketFS given a file path.
Also able to read files from the BucketFS directly, if called from inside of an UDF.
If reading an object via joblib inside of an UDF, make sure the object type is known inside the UDF.
BucketFSLocation is used to upload fileobjects, strings or joblib objects to
the BucketFS given a path and the object, or to download objects into
strings, fileobjects or joblib objects from the BucketFS given a file path.
Also able to read files from the BucketFS directly, if called from inside of
an UDF. If reading an object via joblib inside of an UDF, make sure the
object type is known inside the UDF.
"""

def __init__(self, bucket_config: BucketConfig, base_path: PurePosixPath):
self.base_path = base_path
self.bucket_config = bucket_config

def get_complete_file_path_in_bucket(self, bucket_file_path: str) -> str:
def get_complete_file_path_in_bucket(self,
bucket_file_path: str) -> str:
return str(PurePosixPath(self.base_path, bucket_file_path))

def download_from_bucketfs_to_string(self, bucket_file_path: str) -> str:
def download_from_bucketfs_to_string(self,
bucket_file_path: str) -> str:
result = download.download_from_bucketfs_to_string(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path))
self.get_complete_file_path_in_bucket(bucket_file_path)
)
return result

def download_object_from_bucketfs_via_joblib(self, bucket_file_path: str) -> Any:
def download_object_from_bucketfs_via_joblib(self,
bucket_file_path: str) -> Any:
result = download.download_object_from_bucketfs_via_joblib(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path))
self.get_complete_file_path_in_bucket(bucket_file_path)
)
return result

def upload_string_to_bucketfs(self, bucket_file_path: str, string: str):
def upload_string_to_bucketfs(self,
bucket_file_path: str,
string: str) -> \
Tuple[ParseResult, PurePosixPath]:
result = upload.upload_string_to_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
string)
string
)
return result

def upload_object_to_bucketfs_via_joblib(self, object: Any,
bucket_file_path: str,
**kwargs):
**kwargs) -> \
Tuple[ParseResult, PurePosixPath]:
result = upload.upload_object_to_bucketfs_via_joblib(
object,
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
**kwargs)
**kwargs
)
return result

def upload_fileobj_to_bucketfs(self,
fileobj: typing.IO,
bucket_file_path: str):
fileobj: IO,
bucket_file_path: str) -> \
Tuple[ParseResult, PurePosixPath]:
result = upload.upload_fileobj_to_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
fileobj)
return result

def read_file_from_bucketfs_to_string(self, bucket_file_path: str) -> str:
result = from_BFS.read_file_from_bucketfs_to_string(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config
fileobj
)
return result

def read_file_from_bucketfs_via_joblib(self, bucket_file_path: str) -> typing.Any:
result = from_BFS.read_file_from_bucketfs_via_joblib(
def read_file_from_bucketfs_to_string(self,
bucket_file_path: str) -> str:
result = from_BFS.read_file_from_bucketfs_to_string(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config
)
return result

def read_file_from_bucketfs_to_file(self, bucket_file_path: str, local_file_path: Path) -> None:
def read_file_from_bucketfs_to_file(self,
bucket_file_path: str,
local_file_path: Path) -> None:
from_BFS.read_file_from_bucketfs_to_file(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config,
local_file_path
)

def read_file_from_bucketfs_to_fileobj(self, bucket_file_path: str, fileobj: typing.IO) -> None:
def read_file_from_bucketfs_to_fileobj(self,
bucket_file_path: str,
fileobj: IO) -> None:
from_BFS.read_file_from_bucketfs_to_fileobj(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config,
fileobj
)

def read_file_from_bucketfs_via_joblib(self,
bucket_file_path: str) -> Any:
result = from_BFS.read_file_from_bucketfs_via_joblib(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config
)
return result
Loading