Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
abbbaee
first commit
mengw15 Jan 26, 2026
330dbd0
for python, need test
mengw15 Jan 26, 2026
603b373
for python, 2
mengw15 Jan 28, 2026
8efbb73
update storage.conf
mengw15 Jan 28, 2026
720f5e1
script
mengw15 Feb 3, 2026
2050396
script
mengw15 Feb 3, 2026
227395c
1
mengw15 Feb 3, 2026
ec9d4be
update script, will refine later
mengw15 Feb 3, 2026
713b65a
single node
mengw15 Feb 15, 2026
4234285
maybe merge to s3 (region, and s3-bucket)
mengw15 Feb 15, 2026
955e8b4
single node fix
mengw15 Feb 15, 2026
c67ba85
IcebergUtil.scala namespace init, needed
mengw15 Feb 15, 2026
2c31f4c
k8s set up, will test again, able to run in pods, but didnt test with…
mengw15 Feb 17, 2026
14e884b
expose lakekeeper to the texera-computing-unit-pool
mengw15 Feb 17, 2026
55a99a2
expose lakekeeper to the texera-computing-unit-pool, update
mengw15 Feb 17, 2026
8d2711e
expose lakekeeper to the texera-computing-unit-pool, update
mengw15 Feb 17, 2026
34d6363
fix python dependency issue
mengw15 Feb 25, 2026
ea2a04c
update
mengw15 Feb 25, 2026
1f14d74
update
mengw15 Feb 25, 2026
7ed8b9a
update
mengw15 Feb 25, 2026
ba038bd
update
mengw15 Feb 25, 2026
dd37045
clean
mengw15 Feb 26, 2026
cdc2f01
clean
mengw15 Feb 26, 2026
b96436c
clean
mengw15 Feb 26, 2026
da271de
clean
mengw15 Feb 26, 2026
906b3af
clean
mengw15 Feb 26, 2026
3ac34c1
clean
mengw15 Feb 26, 2026
069d313
clean
mengw15 Feb 26, 2026
5d125fc
clean
mengw15 Feb 26, 2026
b58e3cb
clean
mengw15 Feb 26, 2026
c95541a
clean
mengw15 Feb 26, 2026
2658c34
clean code
mengw15 Feb 27, 2026
38561e6
clean code
mengw15 Feb 27, 2026
1f91b50
clean code
mengw15 Feb 27, 2026
cfd2001
Merge branch 'apache:main' into Restful-Catalog4
mengw15 Feb 27, 2026
d6eb2cc
Merge branch 'main' into Restful-Catalog4
mengw15 Feb 27, 2026
932f8d3
Merge branch 'main' into Restful-Catalog4
mengw15 Feb 27, 2026
4f7b4ad
fmt fix
mengw15 Feb 27, 2026
6bed117
dependency fix
mengw15 Feb 27, 2026
401973e
fmt fix
mengw15 Feb 27, 2026
bec831c
update
mengw15 Feb 28, 2026
d65756b
fmt fix
mengw15 Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ bidict==0.22.0
cached_property==1.5.2
psutil==5.9.0
tzlocal==2.1
pyiceberg==0.8.1
pyiceberg==0.9.0
s3fs==2025.9.0
aiobotocore==2.25.1
botocore==1.40.53
readerwriterlock==1.0.9
tenacity==8.5.0
SQLAlchemy==2.0.37
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
from pyiceberg.catalog import Catalog
from typing import Optional

from core.storage.iceberg.iceberg_utils import create_postgres_catalog
from core.storage.iceberg.iceberg_utils import (
create_postgres_catalog,
create_rest_catalog,
)
from core.storage.storage_config import StorageConfig


class IcebergCatalogInstance:
"""
IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
Currently only postgres SQL catalog is supported.
Supports postgres SQL catalog and REST catalog.
- Provides a single shared catalog for all Iceberg table-related operations.
- Lazily initializes the catalog on first access.
- Supports replacing the catalog instance for testing or reconfiguration.
Expand All @@ -39,16 +42,31 @@ def get_instance(cls):
Retrieves the singleton Iceberg catalog instance.
- If the catalog is not initialized, it is lazily created using the configured
properties.
- Supports "postgres" and "rest" catalog types.
:return: the Iceberg catalog instance.
"""
if cls._instance is None:
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE
if catalog_type == "postgres":
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
elif catalog_type == "rest":
cls._instance = create_rest_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME,
StorageConfig.ICEBERG_REST_CATALOG_URI,
StorageConfig.S3_ENDPOINT,
StorageConfig.S3_REGION,
StorageConfig.S3_AUTH_USERNAME,
StorageConfig.S3_AUTH_PASSWORD,
)
else:
raise ValueError(f"Unsupported catalog type: {catalog_type}")
return cls._instance

@classmethod
Expand Down
40 changes: 39 additions & 1 deletion amber/src/main/python/core/storage/iceberg/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pyarrow as pa
import pyiceberg.table
from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan
Expand Down Expand Up @@ -153,6 +153,44 @@ def create_postgres_catalog(
)


def create_rest_catalog(
catalog_name: str,
warehouse_name: str,
rest_uri: str,
s3_endpoint: str,
s3_region: str,
s3_username: str,
s3_password: str,
) -> Catalog:
"""
Creates a REST catalog instance by connecting to a REST endpoint.
- Configures the catalog to interact with a REST endpoint.
- The warehouse_name parameter specifies the warehouse identifier (name for Lakekeeper).
- Configures S3FileIO for MinIO/S3 storage backend.
:param catalog_name: the name of the catalog.
:param warehouse_name: the warehouse identifier (name for Lakekeeper).
:param rest_uri: the URI of the REST catalog endpoint.
:param s3_endpoint: the S3 endpoint URL.
:param s3_region: the S3 region.
:param s3_username: the S3 access key ID.
:param s3_password: the S3 secret access key.
:return: a Catalog instance (REST catalog).
"""
return load_catalog(
catalog_name,
**{
"type": "rest",
"uri": rest_uri,
"warehouse": warehouse_name,
"s3.endpoint": s3_endpoint,
"s3.access-key-id": s3_username,
"s3.secret-access-key": s3_password,
"s3.region": s3_region,
"s3.path-style-access": "true",
},
)


def create_table(
catalog: Catalog,
table_namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@

# Hardcoded storage config only for test purposes.
StorageConfig.initialize(
catalog_type="rest",
postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog",
postgres_username="texera",
postgres_password="password",
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="operator-port-result",
directory_path="../../../../../../amber/user-resources/workflow-results",
commit_batch_size=4096,
s3_endpoint="http://localhost:9000",
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
s3_region="us-west-2",
s3_auth_username="texera_minio",
s3_auth_password="password",
)


Expand Down
12 changes: 11 additions & 1 deletion amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ class StorageConfig:

_initialized = False

ICEBERG_CATALOG_TYPE = None
ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
ICEBERG_POSTGRES_CATALOG_USERNAME = None
ICEBERG_POSTGRES_CATALOG_PASSWORD = None
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None

# S3 configs (for large_binary_manager module)
# S3 configs
S3_ENDPOINT = None
S3_REGION = None
S3_AUTH_USERNAME = None
Expand All @@ -41,9 +44,12 @@ class StorageConfig:
@classmethod
def initialize(
cls,
catalog_type,
postgres_uri_without_scheme,
postgres_username,
postgres_password,
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
directory_path,
commit_batch_size,
Expand All @@ -57,9 +63,13 @@ def initialize(
"Storage config has already been initialized and cannot be modified."
)

cls.ICEBERG_CATALOG_TYPE = catalog_type
cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme
cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name

cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ def setup_storage_config(self):
"""Initialize StorageConfig for tests."""
if not StorageConfig._initialized:
StorageConfig.initialize(
catalog_type="rest",
postgres_uri_without_scheme="localhost:5432/test",
postgres_username="test",
postgres_password="test",
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="test",
directory_path="/tmp/test",
commit_batch_size=1000,
s3_endpoint="http://localhost:9000",
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
s3_region="us-west-2",
s3_auth_username="texera_minio",
s3_auth_password="password",
)

def test_get_s3_client_initializes_once(self):
Expand Down
6 changes: 6 additions & 0 deletions amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ def init_loguru_logger(stream_log_level) -> None:
output_port,
logger_level,
r_path,
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
Expand All @@ -58,9 +61,12 @@ def init_loguru_logger(stream_log_level) -> None:
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,12 @@ class PythonWorkflowWorker(
Integer.toString(pythonProxyServer.getPortNumber.get()),
UdfConfig.pythonLogStreamHandlerLevel,
RENVPath,
StorageConfig.icebergCatalogType,
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
StorageConfig.icebergPostgresCatalogUsername,
StorageConfig.icebergPostgresCatalogPassword,
StorageConfig.icebergRESTCatalogUri,
StorageConfig.icebergRESTCatalogWarehouseName,
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
Expand Down
Loading
Loading