Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,133 @@ def test_data_blob_writer_multiple_blob_columns(self):
result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(result.num_rows, 3)

def test_data_blob_writer_partial_write_with_write_type(self):
"""Partial write (normal + blob subset) via with_write_type: split must match batch columns."""
from pypaimon import Schema

pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('blob_data', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true'
}
)
self.catalog.create_table('test_db.blob_partial_write_type', schema, False)
table = self.catalog.get_table('test_db.blob_partial_write_type')

partial_schema = pa.schema([('id', pa.int32()), ('blob_data', pa.large_binary())])
test_data = pa.Table.from_pydict({
'id': [1, 2],
'blob_data': [b'a', b'b'],
}, schema=partial_schema)

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write().with_write_type(['id', 'blob_data'])
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
self.assertGreater(len(commit_messages), 0)
all_files = [f for msg in commit_messages for f in msg.new_files]
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertEqual(len(parquet_files), 1)
self.assertGreaterEqual(len(blob_files), 1)
self.assertEqual(parquet_files[0].write_cols, ['id'])
self.assertEqual(parquet_files[0].row_count, 2)
for bf in blob_files:
self.assertEqual(bf.write_cols, ['blob_data'])
self.assertEqual(bf.row_count, 2)
write_builder.new_commit().commit(commit_messages)
writer.close()

read_builder = table.new_read_builder()
out = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
self.assertEqual(out.num_rows, 2)
self.assertEqual(out.column('id').to_pylist(), [1, 2])
self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
self.assertEqual(out.column('name').to_pylist(), [None, None])

def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
"""Partial write without blob columns in write_cols must not touch blob split paths."""
from pypaimon import Schema

pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('blob_data', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true'
}
)
self.catalog.create_table('test_db.blob_partial_normal_only', schema, False)
table = self.catalog.get_table('test_db.blob_partial_normal_only')

partial_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
test_data = pa.Table.from_pydict({'id': [7], 'name': ['n']}, schema=partial_schema)

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write().with_write_type(['id', 'name'])
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
all_files = [f for msg in commit_messages for f in msg.new_files]
self.assertFalse(any(f.file_name.endswith('.blob') for f in all_files))
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
self.assertEqual(len(parquet_files), 1)
self.assertEqual(parquet_files[0].write_cols, ['id', 'name'])
self.assertEqual(parquet_files[0].row_count, 1)
write_builder.new_commit().commit(commit_messages)
writer.close()

read_builder = table.new_read_builder()
out = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
self.assertEqual(out.column('id').to_pylist(), [7])
self.assertEqual(out.column('name').to_pylist(), ['n'])
self.assertEqual(out.column('blob_data').to_pylist(), [None])

def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
"""with_write_type lists only one blob column: only that column gets .blob files."""
from pypaimon import Schema

pa_schema = pa.schema([
('id', pa.int32()),
('blob1', pa.large_binary()),
('blob2', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true'
}
)
self.catalog.create_table('test_db.blob_partial_one_of_two', schema, False)
table = self.catalog.get_table('test_db.blob_partial_one_of_two')

partial_schema = pa.schema([('id', pa.int32()), ('blob1', pa.large_binary())])
test_data = pa.Table.from_pydict({
'id': [1],
'blob1': [b'only_b1'],
}, schema=partial_schema)

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write().with_write_type(['id', 'blob1'])
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
all_files = [f for msg in commit_messages for f in msg.new_files]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertEqual(len(blob_files), 1)
self.assertEqual(blob_files[0].write_cols, ['blob1'])
write_builder.new_commit().commit(commit_messages)
writer.close()

def test_data_blob_writer_write_operations(self):
"""Test DataBlobWriter write operations with real data."""
from pypaimon import Schema
Expand Down
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/write/file_store_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def max_seq_number():
partition=partition,
bucket=bucket,
max_seq_number=0,
options=options
options=options,
write_cols=self.write_cols,
)
elif self.table.is_primary_key_table:
return KeyValueDataWriter(
Expand Down
34 changes: 26 additions & 8 deletions paimon-python/pypaimon/write/writer/data_blob_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class DataBlobWriter(DataWriter):
- One normal data file may correspond to multiple blob data files
- Blob data is written immediately to disk to prevent memory corruption
- Blob file metadata is stored as separate DataFileMeta objects after normal file metadata
- When TableWrite.with_write_type narrows columns, incoming batches only carry that subset;
column lists are narrowed accordingly so splitting never selects missing columns.

Rolling behavior:
- Normal data rolls: Both normal and blob writers are closed together, blob metadata added after normal metadata
Expand Down Expand Up @@ -76,8 +78,9 @@ class DataBlobWriter(DataWriter):
# Constant for checking rolling condition periodically
CHECK_ROLLING_RECORD_CNT = 1000

def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None):
super().__init__(table, partition, bucket, max_seq_number, options)
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None,
write_cols: Optional[List[str]] = None):
super().__init__(table, partition, bucket, max_seq_number, options, write_cols=write_cols)

# Determine blob columns from table schema
self.blob_column_names = self._get_blob_columns_from_schema()
Expand All @@ -93,16 +96,31 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op
)

# Blob fields that should still be written to `.blob` files.
self.blob_file_column_names = [
full_blob_file_column_names = [
col for col in self.blob_column_names if col not in self.blob_descriptor_fields
]

full_blob_file_set = set(full_blob_file_column_names)
all_column_names = self.table.field_names
self.normal_column_names = [
col for col in all_column_names if col not in self.blob_file_column_names
]

# Narrow columns when TableWrite.with_write_type(...) supplies a partial column list.
# Incoming RecordBatches only contain those columns; selecting full normal/blob lists
# would raise KeyError.
if write_cols is not None:
write_col_set = set(write_cols)
self.blob_file_column_names = [
col for col in full_blob_file_column_names if col in write_col_set
]
self.normal_column_names = [
col for col in write_cols if col not in full_blob_file_set
]
else:
self.blob_file_column_names = list(full_blob_file_column_names)
self.normal_column_names = [
col for col in all_column_names if col not in full_blob_file_set
]
normal_name_set = set(self.normal_column_names)
self.normal_columns = [
field for field in self.table.table_schema.fields if field.name in self.normal_column_names
field for field in self.table.table_schema.fields if field.name in normal_name_set
]
self.write_cols = self.normal_column_names

Expand Down
Loading