diff --git a/doc/changes/unreleased.md b/doc/changes/unreleased.md index 2911aa26..21d95d1e 100644 --- a/doc/changes/unreleased.md +++ b/doc/changes/unreleased.md @@ -1,4 +1,4 @@ # Unreleased ## Refactorings - * #186: Integration test for correctness of UDF path generation \ No newline at end of file + * #186: Integration test for correctness of UDF path generation, using as_udf_path and pathlike \ No newline at end of file diff --git a/test/integration/test_service_onprem.py b/test/integration/test_service_onprem.py index 93b3e383..214ef8af 100644 --- a/test/integration/test_service_onprem.py +++ b/test/integration/test_service_onprem.py @@ -28,6 +28,7 @@ BACKEND_SAAS, ) +import exasol.bucketfs as bfs from exasol.bucketfs import ( Bucket, Service, @@ -324,93 +325,3 @@ def test_any_log_message_get_emitted(httpserver, caplog): ] # The log level DEBUG should emit at least one log message assert log_records - - -def test_upload_and_udf_path( - backend_aware_bucketfs_params, backend_aware_database_params, backend -): - # Upload file to BucketFS - file_name = "Uploaded-File-From-Integration-test.bin" - - if backend == BACKEND_ONPREM: - bucket = Bucket( - name=backend_aware_bucketfs_params["bucket_name"], - service_name=backend_aware_bucketfs_params["service_name"], - password=backend_aware_bucketfs_params["password"], - username=backend_aware_bucketfs_params["username"], - verify=backend_aware_bucketfs_params["verify"], - service=backend_aware_bucketfs_params["url"], - ) - elif backend == BACKEND_SAAS: - bucket = SaaSBucket( - url=backend_aware_bucketfs_params["url"], - account_id=backend_aware_bucketfs_params["account_id"], - database_id=backend_aware_bucketfs_params["database_id"], - pat=backend_aware_bucketfs_params["pat"], - ) - content = "".join("1" for _ in range(0, 10)) - try: - bucket.upload(file_name, content) - assert file_name in bucket.files, "File upload failed" - - # Generate UDF path - udf_path = bucket.udf_path - assert udf_path is not None, "UDF path generation failed" - - conn = pyexasol.connect(**backend_aware_database_params) - - conn.execute("CREATE SCHEMA IF NOT EXISTS transact;") - conn.execute("open schema transact;") - - # Create UDF SQL - create_udf_sql = dedent( - f""" - --/ - CREATE OR REPLACE PYTHON3 SCALAR - SCRIPT CHECK_FILE_EXISTS_UDF(file_path VARCHAR(200000)) - RETURNS BOOLEAN AS - import os - def run(ctx): - return os.path.exists(ctx.file_path) - / - """ - ) - conn.execute(create_udf_sql) - # Verify the path exists inside the UDF - result = conn.execute(f"SELECT CHECK_FILE_EXISTS_UDF('{udf_path}')").fetchone()[ - 0 - ] - assert result == True - - # return the content of the file - create_read_udf_sql = dedent( - f""" - --/ - CREATE OR REPLACE PYTHON3 SCALAR - SCRIPT READ_FILE_CONTENT_UDF(file_path VARCHAR(200000)) - RETURNS VARCHAR(200000) AS - def run(ctx): - with open(ctx.file_path, 'rb') as f: - return f.read().decode('utf-8', errors='replace') - / - """ - ) - conn.execute(create_read_udf_sql) - - file_content = conn.execute( - f"SELECT READ_FILE_CONTENT_UDF('{udf_path}/{file_name}')" - ).fetchone()[0] - assert file_content == content - except Exception as e: - print(e) - - finally: - # cleanup - _, _ = delete_file( - bucket._service, - bucket.name, - bucket._username, - bucket._password, - file_name, - ) - pass diff --git a/test/integration/test_udf_path.py b/test/integration/test_udf_path.py new file mode 100644 index 00000000..96dd45f5 --- /dev/null +++ b/test/integration/test_udf_path.py @@ -0,0 +1,197 @@ +import logging +import random +import string +from collections.abc import ( + ByteString, + Iterable, +) +from contextlib import ( + closing, + contextmanager, +) +from inspect import cleandoc +from test.integration.conftest import ( + File, + delete_file, +) +from textwrap import dedent +from typing import ( + Tuple, + Union, +) + +import pyexasol +import pytest +import requests +from exasol.pytest_backend import ( + BACKEND_ONPREM, + BACKEND_SAAS, +) + +import exasol.bucketfs as bfs +from exasol.bucketfs import ( + Bucket, + Service, + as_bytes, + as_string, +) + + +@pytest.fixture(scope="module") +def exa_bucket(backend_aware_bucketfs_params, backend): + # create and return a Bucket or SaaSBucket depending on backend + params = backend_aware_bucketfs_params + if backend == BACKEND_ONPREM: + bucket = Bucket( + name=params["bucket_name"], + service_name=params["service_name"], + password=params["password"], + username=params["username"], + verify=params["verify"], + service=params["url"], + ) + elif backend == BACKEND_SAAS: + bucket = SaaSBucket( + url=params["url"], + account_id=params["account_id"], + database_id=params["database_id"], + pat=params["pat"], + ) + else: + pytest.fail(f"Unknown backend: {backend}") + return bucket + + +@pytest.fixture(scope="module") +def exa_pathlike(backend_aware_bucketfs_params, backend): + # build the pathlike + params = backend_aware_bucketfs_params + file_name = "Uploaded-File-From-Integration-test.bin" + if backend == BACKEND_ONPREM: + return bfs.path.build_path( + backend=bfs.path.StorageBackend.onprem, + url=params["url"], + bucket_name=params["bucket_name"], + service_name=params["service_name"], + path=file_name, + username=params["username"], + password=params["password"], + verify=params["verify"], + ) + elif backend == BACKEND_SAAS: + return bfs.path.build_path( + backend=bfs.path.StorageBackend.saas, + url=params["url"], + account_id=params["account_id"], + database_id=params["database_id"], + pat=params["pat"], + path=file_name, + ) + else: + pytest.fail(f"Unknown backend: {backend}") + + +@pytest.fixture(scope="module") +def uploaded_file(exa_bucket, request): + file_name = "Uploaded-File-From-Integration-test.bin" + content = "1" * 10 + + exa_bucket.upload(file_name, content) + + def cleanup(): + try: + exa_bucket.delete(file_name) + except Exception: + pass + + request.addfinalizer(cleanup) + + return { + "file_name": file_name, + "content": content, + } + + +@pytest.fixture +def setup_schema_and_udfs(backend_aware_database_params): + conn = pyexasol.connect(**backend_aware_database_params) + conn.execute("CREATE SCHEMA IF NOT EXISTS transact;") + conn.execute("OPEN SCHEMA transact;") + # Check file exists UDF + create_check_udf_sql = dedent( + """ + --/ + CREATE OR REPLACE PYTHON3 SCALAR + SCRIPT CHECK_FILE_EXISTS_UDF(file_path VARCHAR(200000)) + RETURNS BOOLEAN AS + import os + def run(ctx): + return os.path.exists(ctx.file_path) + / + """ + ) + conn.execute(create_check_udf_sql) + # Read file content UDF + create_read_udf_sql = dedent( + """ + --/ + CREATE OR REPLACE PYTHON3 SCALAR + SCRIPT READ_FILE_CONTENT_UDF(file_path VARCHAR(200000)) + RETURNS VARCHAR(200000) AS + def run(ctx): + with open(ctx.file_path, 'rb') as f: + return f.read().decode('utf-8', errors='replace') + / + """ + ) + conn.execute(create_read_udf_sql) + return conn + + +def test_upload_and_udf_path(uploaded_file, setup_schema_and_udfs, exa_bucket): + """ + Test that verifies upload and UDF path availability using the uploaded_file_and_paths fixture. + """ + file_name = uploaded_file["file_name"] + content = uploaded_file["content"] + bucket_udf_path = exa_bucket.udf_path + + assert bucket_udf_path is not None, "UDF path generation failed" + conn = setup_schema_and_udfs + + # Verify existence in UDF + result = conn.execute( + f"SELECT CHECK_FILE_EXISTS_UDF('{bucket_udf_path}/{file_name}')" + ).fetchone()[0] + assert result is True + + # Verify content from UDF path + content_from_udf_path = conn.execute( + f"SELECT READ_FILE_CONTENT_UDF('{bucket_udf_path}/{file_name}')" + ).fetchone()[0] + print(content_from_udf_path) + assert content_from_udf_path == content + + +def test_upload_and_udf_pathlike(uploaded_file, setup_schema_and_udfs, exa_pathlike): + """ + Test that verifies upload and pathlike UDF path availability using the uploaded_file_and_paths fixture. + """ + content = uploaded_file["content"] + file_udf_path = exa_pathlike.as_udf_path() + + assert file_udf_path is not None, "Pathlike udf path generation failed" + conn = setup_schema_and_udfs + + # Verify file exists in UDF + exists = conn.execute( + f"SELECT CHECK_FILE_EXISTS_UDF('{file_udf_path}')" + ).fetchone()[0] + assert exists is True + + # Verify content from pathlike udf path + content_of_file_udf_path = conn.execute( + f"SELECT READ_FILE_CONTENT_UDF('{file_udf_path}')" + ).fetchone()[0] + print(content_of_file_udf_path) + assert content_of_file_udf_path == content