Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
# under the License.

import sys
from datetime import timedelta
from enum import Enum
from typing import Dict, Optional

from datetime import timedelta

from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
from pypaimon.common.options.config_options import ConfigOptions
from pypaimon.common.options.config_option import ConfigOption
from pypaimon.common.options.config_options import ConfigOptions


class ExternalPathStrategy(str, Enum):
Expand Down Expand Up @@ -220,6 +219,28 @@ class CoreOptions:
)
)

BLOB_EXTERNAL_STORAGE_PATH: ConfigOption[str] = (
ConfigOptions.key("blob-external-storage-path")
.string_type()
.no_default_value()
.with_description(
"The external storage path where raw BLOB data from fields configured "
"by 'blob-external-storage-field' is written at write time. "
"Orphan file cleanup is not applied to this path."
)
)

BLOB_EXTERNAL_STORAGE_FIELD: ConfigOption[str] = (
ConfigOptions.key("blob-external-storage-field")
.string_type()
.no_default_value()
.with_description(
"Comma-separated BLOB field names (must be a subset of 'blob-descriptor-field') "
"whose raw data will be written to external storage at write time. "
"The external storage path is configured via 'blob-external-storage-path'."
)
)

TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("target-file-size")
.memory_type()
Expand Down Expand Up @@ -633,6 +654,19 @@ def blob_descriptor_fields(self, default=None):
return {str(field).strip() for field in value if str(field).strip()}
return set()

def blob_external_storage_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD, default)
if value is None:
return set()
if isinstance(value, str):
return {field.strip() for field in value.split(",") if field.strip()}
if isinstance(value, (list, set, tuple)):
return {str(field).strip() for field in value if str(field).strip()}
return set()

def blob_external_storage_path(self, default=None):
return self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH, default)

def target_file_size(self, has_primary_key, default=None):
return self.options.get(CoreOptions.TARGET_FILE_SIZE,
MemorySize.of_mebi_bytes(
Expand Down
64 changes: 54 additions & 10 deletions paimon-python/pypaimon/schema/schema_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional, List
from typing import List, Optional

from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException
from pypaimon.catalog.catalog_exception import (ColumnAlreadyExistException,
ColumnNotExistException)
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
from pypaimon.common.json_util import JSON
from pypaimon.common.options import Options, CoreOptions
from pypaimon.common.options import CoreOptions, Options
from pypaimon.schema.data_types import AtomicInteger, DataField
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import (
AddColumn, DropColumn, RemoveOption, RenameColumn,
SchemaChange, SetOption, UpdateColumnComment,
UpdateColumnNullability, UpdateColumnPosition,
UpdateColumnType, UpdateComment
)
from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption,
RenameColumn, SchemaChange,
SetOption, UpdateColumnComment,
UpdateColumnNullability,
UpdateColumnPosition,
UpdateColumnType, UpdateComment)
from pypaimon.schema.table_schema import TableSchema


Expand Down Expand Up @@ -148,6 +149,48 @@ def _assert_not_renaming_blob_column(
)


def _validate_blob_external_storage_fields(fields: List[DataField], options: dict):
"""Validate blob-external-storage-field configuration.

Validation order aligned with Java's SchemaValidation.validateBlobExternalStorageFields():
1. Field must be a BLOB type in the schema
2. Field must be in blob-descriptor-field
3. blob-external-storage-path must be configured
"""
core_options = CoreOptions(Options(options))
external_fields = core_options.blob_external_storage_fields()
if not external_fields:
return

# 1. Configured fields must be BLOB type
field_type_map = {f.name: f.type for f in fields}
for field_name in external_fields:
field_type = field_type_map.get(field_name)
if field_type is None or getattr(field_type, 'type', None) != 'BLOB':
raise ValueError(
f"Field '{field_name}' in "
f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must be a BLOB type field."
)

# 2. Must be a subset of blob-descriptor-field
descriptor_fields = core_options.blob_descriptor_fields()
not_in_descriptor = external_fields - descriptor_fields
if not_in_descriptor:
raise ValueError(
f"Fields {sorted(not_in_descriptor)} in "
f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must also be configured in "
f"'{CoreOptions.BLOB_DESCRIPTOR_FIELD.key()}'."
)

# 3. Must configure external-storage-path
external_path = core_options.blob_external_storage_path()
if not external_path:
raise ValueError(
f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' is configured but "
f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key()}' is not set."
)


def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
field_name = change.field_names[-1]
new_name = change.new_name
Expand Down Expand Up @@ -279,6 +322,7 @@ def create_table(self, schema: Schema) -> TableSchema:
if latest is not None:
raise RuntimeError("Schema in filesystem exists, creation is not allowed.")

_validate_blob_external_storage_fields(schema.fields, schema.options)
table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
success = self.commit(table_schema)
if success:
Expand Down
Loading
Loading