Skip to content
This repository was archived by the owner on Sep 26, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/changes/changes_0.3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@


## Features / Enhancements
- /

- #72: Added generate bucket udf path method to BucketFSLocation

## Bug Fixes

Expand Down
8 changes: 8 additions & 0 deletions exasol_bucketfs_utils_python/abstract_bucketfs_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Tuple, IO, Iterable
from pathlib import PurePosixPath, Path
from urllib.parse import ParseResult
from typing import Union


class AbstractBucketFSLocation(ABC):
Expand All @@ -10,6 +11,13 @@ class AbstractBucketFSLocation(ABC):
fileobjects and joblib objects. Also able to read files from the BucketFS
directly, if called from inside a UDF.
"""

@abstractmethod
def generate_bucket_udf_path(
self, path_in_bucket: Union[None, str, PurePosixPath]) \
-> PurePosixPath:
pass

@abstractmethod
def download_from_bucketfs_to_string(
self,
Expand Down
10 changes: 8 additions & 2 deletions exasol_bucketfs_utils_python/bucketfs_location.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Tuple, IO, Iterable
from typing import Any, Tuple, IO, Iterable, Union
from pathlib import PurePosixPath, Path
from urllib.parse import ParseResult
from exasol_bucketfs_utils_python import download, upload, list_files, \
delete
delete, bucketfs_utils
from exasol_bucketfs_utils_python import load_file_from_local_fs as from_BFS
from exasol_bucketfs_utils_python.bucket_config import BucketConfig

Expand All @@ -25,6 +25,12 @@ def __init__(self, bucket_config: BucketConfig, base_path: PurePosixPath):
self.base_path = base_path
self.bucket_config = bucket_config

def generate_bucket_udf_path(
self, path_in_bucket: Union[None, str, PurePosixPath]) \
-> PurePosixPath:
return bucketfs_utils.generate_bucket_udf_path(
self.bucket_config, path_in_bucket)

def get_complete_file_path_in_bucket(
self,
bucket_file_path: str) -> str:
Expand Down
10 changes: 5 additions & 5 deletions exasol_bucketfs_utils_python/bucketfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _encode_url_part(part: str) -> str:
return urlencoded


def _correct_path_in_bucket_for_archives(path_in_bucket: PurePosixPath) \
def correct_path_in_bucket_for_archives(path_in_bucket: PurePosixPath) \
-> PurePosixPath:
for extension in ARCHIVE_EXTENSIONS:
if path_in_bucket.name.endswith(extension):
Expand All @@ -24,7 +24,7 @@ def _correct_path_in_bucket_for_archives(path_in_bucket: PurePosixPath) \
return path_in_bucket


def _make_path_relative(path_in_bucket: Union[None, str, PurePosixPath]) \
def make_path_relative(path_in_bucket: Union[None, str, PurePosixPath]) \
-> PurePosixPath:
path_in_bucket = PurePosixPath(path_in_bucket)
if path_in_bucket.is_absolute():
Expand Down Expand Up @@ -62,8 +62,8 @@ def generate_bucket_udf_path(
path = PurePosixPath(bucketfs_path, bucket_config.bucket_name)

if path_in_bucket is not None:
path_in_bucket = _make_path_relative(path_in_bucket)
path_in_bucket = _correct_path_in_bucket_for_archives(path_in_bucket)
path_in_bucket = make_path_relative(path_in_bucket)
path_in_bucket = correct_path_in_bucket_for_archives(path_in_bucket)
else:
path_in_bucket = ""
path = PurePosixPath(path, path_in_bucket)
Expand Down Expand Up @@ -120,7 +120,7 @@ def generate_bucket_http_url(
url = generate_bucketfs_http_url(bucket_config.bucketfs_config,
with_credentials)
if path_in_bucket is not None:
path_in_bucket = _make_path_relative(path_in_bucket)
path_in_bucket = make_path_relative(path_in_bucket)
else:
path_in_bucket = ""
encoded_bucket_and_path_in_bucket = \
Expand Down
16 changes: 15 additions & 1 deletion exasol_bucketfs_utils_python/localfs_mock_bucketfs_location.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Any, IO, List
from typing import Any, IO, List, Union
from pathlib import PurePosixPath, Path
from typing import Any
import joblib

from exasol_bucketfs_utils_python import bucketfs_utils
from exasol_bucketfs_utils_python.abstract_bucketfs_location import \
AbstractBucketFSLocation

Expand All @@ -20,6 +22,18 @@ def __init__(self, base_path: PurePosixPath):
def get_complete_file_path_in_bucket(self, bucket_file_path) -> str:
return str(PurePosixPath(self.base_path, bucket_file_path))

def generate_bucket_udf_path(
self, path_in_bucket: Union[None, str, PurePosixPath]) \
-> PurePosixPath:

if path_in_bucket is not None:
path_in_bucket = bucketfs_utils.\
make_path_relative(path_in_bucket)
else:
path_in_bucket = ""
path = PurePosixPath(self.base_path, path_in_bucket)
return path

def download_from_bucketfs_to_string(self, bucket_file_path: str) -> str:
with open(self.get_complete_file_path_in_bucket(
bucket_file_path), "rt") as f:
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/prepare_bucket_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import \
BucketFSConnectionConfig
from tests.test_load_fs_file_from_udf import delete_testfile_from_bucketfs
from tests.integration_tests.with_db.test_load_fs_file_from_udf import delete_testfile_from_bucketfs


@pytest.fixture(scope="module")
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -1,56 +1,76 @@
from pathlib import PurePosixPath

from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig

from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
import pytest
import textwrap
from tests.test_load_fs_file_from_udf import delete_testfile_from_bucketfs, upload_testfile_to_bucketfs
from tests.integration_tests.with_db.test_load_fs_file_from_udf import delete_testfile_from_bucketfs, upload_testfile_to_bucketfs
# TODO replace upload_testfile_to_BucketFS once missing funcs in BucketFSLocation are implemented


def test_upload_download_string_from_different_instance():
connection_config = BucketFSConnectionConfig(host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig("bfsdefault", connection_config=connection_config)
bucket_config = BucketConfig(bucket_name="default", bucketfs_config=bucketfs_config)
bucket_base_path = PurePosixPath("test_up_down_str")
bucketfs_location_upload = BucketFSLocation(bucket_config, bucket_base_path)
bucketfs_location_download = BucketFSLocation(bucket_config, bucket_base_path)
bucket_file_path = "test_file.txt"
test_string = "test_string"
bucketfs_location_upload.upload_string_to_bucketfs(bucket_file_path, test_string)
result = bucketfs_location_download.download_from_bucketfs_to_string(bucket_file_path)
assert result == test_string
delete_testfile_from_bucketfs(file_path=str(bucket_base_path) + "/" + bucket_file_path,
bucket_config=bucketfs_location_upload.bucket_config)


class TestValue:
__test__ = False

def __init__(self, value: str):
self.value = value

def __eq__(self, other):
return self.value == self.value
def test_generate_bucket_udf_path_with_db(
upload_language_container, pyexasol_connection):

connection_config = BucketFSConnectionConfig(
host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig(
connection_config=connection_config, bucketfs_name="bfsdefault")
bucket_config = BucketConfig(
bucket_name="default", bucketfs_config=bucketfs_config)
bucketfs_location = BucketFSLocation(bucket_config, "")

def test_upload_download_obj_from_different_instance():
connection_config = BucketFSConnectionConfig(host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig("bfsdefault", connection_config=connection_config)
bucket_config = BucketConfig(bucket_name="default", bucketfs_config=bucketfs_config)
bucket_base_path = PurePosixPath("test_up_down_obj")
bucketfs_location_upload = BucketFSLocation(bucket_config, bucket_base_path)
bucketfs_location_download = BucketFSLocation(bucket_config, bucket_base_path)
bucket_file_path = "test_file.txt"
test_value = TestValue("test_string")
bucketfs_location_upload.upload_object_to_bucketfs_via_joblib(test_value, bucket_file_path)
result = bucketfs_location_download.download_object_from_bucketfs_via_joblib(bucket_file_path)
assert result == test_value
delete_testfile_from_bucketfs(file_path=str(bucket_base_path) + "/" + bucket_file_path,
bucket_config=bucketfs_location_upload.bucket_config)
test_string = "test_string"
bucketfs_location.upload_string_to_bucketfs(bucket_file_path, test_string)

target_schema = "TARGET_SCHEMA"
try:
# access file from udf
udf_name = "AccessFileInBucketFSFromUDF"
pyexasol_connection.execute(
f"CREATE SCHEMA IF NOT EXISTS {target_schema};")
pyexasol_connection.execute(
f"OPEN SCHEMA {target_schema};")
udf_sql = textwrap.dedent(f"""
CREATE OR REPLACE PYTHON3_BFSUP SET SCRIPT {target_schema}."{udf_name}"(
"path_in_bucket" VARCHAR(20000))
RETURNS BOOLEAN
AS
from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
from pathlib import PurePosixPath, Path

bucket_name = "default"
bucketfs_name = "bfsdefault"
def get_bucket_config():
connection_config = BucketFSConnectionConfig(host="localhost",
port=6666,
user="r", pwd="read",
is_https=False)
bucketfs_config = BucketFSConfig(bucketfs_name, connection_config=connection_config)
return BucketConfig(bucket_name, bucketfs_config)

def run(ctx):
path_in_bucket = ctx.path_in_bucket
bucket_config = get_bucket_config()
bucketfs_location = BucketFSLocation(bucket_config, "")
file_path = bucketfs_location.generate_bucket_udf_path(path_in_bucket)

return Path(file_path).exists()
""")
pyexasol_connection.execute(udf_sql)
result = pyexasol_connection.execute(
f"""select {target_schema}."{udf_name}"('{bucket_file_path}')""").fetchall()
print(result)
assert result[0][0]
finally:
delete_testfile_from_bucketfs(file_path=bucket_file_path,
bucket_config=bucketfs_location.bucket_config)
pyexasol_connection.execute(f"DROP SCHEMA IF EXISTS {target_schema} CASCADE;")


@pytest.mark.usefixtures("upload_language_container",
Expand Down Expand Up @@ -228,6 +248,7 @@ def run(ctx):
bucket_config=bucketfs_location_read.bucket_config)
pyexasol_connection.execute(f"DROP SCHEMA IF EXISTS {target_schema} CASCADE;")


def test_read_files_to_fileobj_from_bucketfs_inside_udf(upload_language_container, pyexasol_connection):
connection_config = BucketFSConnectionConfig(host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig("bfsdefault", connection_config=connection_config)
Expand Down
Empty file.
75 changes: 75 additions & 0 deletions tests/integration_tests/without_db/test_bucketfs_location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from pathlib import PurePosixPath
from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig
from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
import pytest
from tests.integration_tests.with_db.test_load_fs_file_from_udf import delete_testfile_from_bucketfs


# TODO replace upload_testfile_to_BucketFS once missing funcs in BucketFSLocation are implemented


@pytest.mark.parametrize("path_in_bucket", [
"/path/in/bucket/file.txt",
"path/in/bucket/file.txt",
"path/in/bucket/file.txt.tar.gz",
"path/in/bucket/file.txt.zip",
"path/in/bucket/file.txt.tgz",
"path/in/bucket/file.txt.tar"])
def test_generate_bucket_udf_path(path_in_bucket):
connection_config = BucketFSConnectionConfig(
host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig(
connection_config=connection_config, bucketfs_name="bfsdefault")
bucket_config = BucketConfig(
bucket_name="default", bucketfs_config=bucketfs_config)
bucketfs_location = BucketFSLocation(bucket_config, "")

udf_path = bucketfs_location.generate_bucket_udf_path(
path_in_bucket=path_in_bucket)

assert str(udf_path) == "/buckets/bfsdefault/default/" \
"path/in/bucket/file.txt"


def test_upload_download_string_from_different_instance():
connection_config = BucketFSConnectionConfig(host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig("bfsdefault", connection_config=connection_config)
bucket_config = BucketConfig(bucket_name="default", bucketfs_config=bucketfs_config)
bucket_base_path = PurePosixPath("test_up_down_str")
bucketfs_location_upload = BucketFSLocation(bucket_config, bucket_base_path)
bucketfs_location_download = BucketFSLocation(bucket_config, bucket_base_path)
bucket_file_path = "test_file.txt"
test_string = "test_string"
bucketfs_location_upload.upload_string_to_bucketfs(bucket_file_path, test_string)
result = bucketfs_location_download.download_from_bucketfs_to_string(bucket_file_path)
assert result == test_string
delete_testfile_from_bucketfs(file_path=str(bucket_base_path) + "/" + bucket_file_path,
bucket_config=bucketfs_location_upload.bucket_config)


class TestValue:
__test__ = False

def __init__(self, value: str):
self.value = value

def __eq__(self, other):
return self.value == self.value


def test_upload_download_obj_from_different_instance():
connection_config = BucketFSConnectionConfig(host="localhost", port=6666, user="w", pwd="write", is_https=False)
bucketfs_config = BucketFSConfig("bfsdefault", connection_config=connection_config)
bucket_config = BucketConfig(bucket_name="default", bucketfs_config=bucketfs_config)
bucket_base_path = PurePosixPath("test_up_down_obj")
bucketfs_location_upload = BucketFSLocation(bucket_config, bucket_base_path)
bucketfs_location_download = BucketFSLocation(bucket_config, bucket_base_path)
bucket_file_path = "test_file.txt"
test_value = TestValue("test_string")
bucketfs_location_upload.upload_object_to_bucketfs_via_joblib(test_value, bucket_file_path)
result = bucketfs_location_download.download_object_from_bucketfs_via_joblib(bucket_file_path)
assert result == test_value
delete_testfile_from_bucketfs(file_path=str(bucket_base_path) + "/" + bucket_file_path,
bucket_config=bucketfs_location_upload.bucket_config)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.bucketfs_config import BucketFSConfig
from exasol_bucketfs_utils_python.bucketfs_connection_config import BucketFSConnectionConfig
from tests.test_load_fs_file_from_udf import delete_testfile_from_bucketfs
from tests.integration_tests.with_db.test_load_fs_file_from_udf import delete_testfile_from_bucketfs


def test_delete_files():
Expand Down
Empty file added tests/unit_tests/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import tempfile
from tempfile import TemporaryDirectory, NamedTemporaryFile
from pathlib import Path, PurePosixPath

import pytest
from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import \
LocalFSMockBucketFSLocation


@pytest.mark.parametrize("path_in_bucket", [
"/path/in/bucket/file.txt",
"path/in/bucket/file.txt"])
def test_generate_bucket_udf_path(path_in_bucket):
with tempfile.TemporaryDirectory() as tmpdir_name:
bucketfs_location = LocalFSMockBucketFSLocation(tmpdir_name)
udf_path = bucketfs_location.generate_bucket_udf_path(path_in_bucket)

assert udf_path == PurePosixPath(tmpdir_name, "path/in/bucket/file.txt")

from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import LocalFSMockBucketFSLocation

def test_upload_download_string_from_different_instance():
with TemporaryDirectory() as path:
Expand Down