From 66cdefac690252f7637ce7659e5d326787c68ad3 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 14 May 2026 12:54:21 +0800 Subject: [PATCH] [python] Fix DataBlobWriter KeyError for partial writes with blob columns DataBlobWriter._split_data selected full-table normal and blob-file column names, while TableWrite.with_write_type only supplies narrowed batches, so pa.RecordBatch.select raised KeyError on missing names. Pass write_cols from FileStoreWrite into DataBlobWriter; narrow normal and blob-file column lists and BlobWriter initialization accordingly. Add blob_table_test coverage for partial writes (normal+blob, normal-only, single blob of two). --- .../pypaimon/tests/blob_table_test.py | 127 ++++++++++++++++++ .../pypaimon/write/file_store_write.py | 3 +- .../pypaimon/write/writer/data_blob_writer.py | 34 +++-- 3 files changed, 155 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 7d0ad90ec961..56359c1d1fac 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -242,6 +242,133 @@ def test_data_blob_writer_multiple_blob_columns(self): result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits()) self.assertEqual(result.num_rows, 3) + def test_data_blob_writer_partial_write_with_write_type(self): + """Partial write (normal + blob subset) via with_write_type: split must match batch columns.""" + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('blob_data', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_partial_write_type', schema, False) + table = self.catalog.get_table('test_db.blob_partial_write_type') + + partial_schema = pa.schema([('id', pa.int32()), ('blob_data', pa.large_binary())]) + test_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'blob_data': [b'a', b'b'], + }, schema=partial_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write().with_write_type(['id', 'blob_data']) + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + self.assertGreater(len(commit_messages), 0) + all_files = [f for msg in commit_messages for f in msg.new_files] + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertEqual(len(parquet_files), 1) + self.assertGreaterEqual(len(blob_files), 1) + self.assertEqual(parquet_files[0].write_cols, ['id']) + self.assertEqual(parquet_files[0].row_count, 2) + for bf in blob_files: + self.assertEqual(bf.write_cols, ['blob_data']) + self.assertEqual(bf.row_count, 2) + write_builder.new_commit().commit(commit_messages) + writer.close() + + read_builder = table.new_read_builder() + out = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + self.assertEqual(out.num_rows, 2) + self.assertEqual(out.column('id').to_pylist(), [1, 2]) + self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b']) + self.assertEqual(out.column('name').to_pylist(), [None, None]) + + def test_data_blob_writer_partial_write_normal_only_with_write_type(self): + """Partial write without blob columns in write_cols must not touch blob split paths.""" + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('blob_data', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_partial_normal_only', schema, False) + table = self.catalog.get_table('test_db.blob_partial_normal_only') + + partial_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + test_data = pa.Table.from_pydict({'id': [7], 'name': ['n']}, schema=partial_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write().with_write_type(['id', 'name']) + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + all_files = [f for msg in commit_messages for f in msg.new_files] + self.assertFalse(any(f.file_name.endswith('.blob') for f in all_files)) + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + self.assertEqual(len(parquet_files), 1) + self.assertEqual(parquet_files[0].write_cols, ['id', 'name']) + self.assertEqual(parquet_files[0].row_count, 1) + write_builder.new_commit().commit(commit_messages) + writer.close() + + read_builder = table.new_read_builder() + out = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + self.assertEqual(out.column('id').to_pylist(), [7]) + self.assertEqual(out.column('name').to_pylist(), ['n']) + self.assertEqual(out.column('blob_data').to_pylist(), [None]) + + def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self): + """with_write_type lists only one blob column: only that column gets .blob files.""" + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('blob1', pa.large_binary()), + ('blob2', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_partial_one_of_two', schema, False) + table = self.catalog.get_table('test_db.blob_partial_one_of_two') + + partial_schema = pa.schema([('id', pa.int32()), ('blob1', pa.large_binary())]) + test_data = pa.Table.from_pydict({ + 'id': [1], + 'blob1': [b'only_b1'], + }, schema=partial_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write().with_write_type(['id', 'blob1']) + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + all_files = [f for msg in commit_messages for f in msg.new_files] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertEqual(len(blob_files), 1) + self.assertEqual(blob_files[0].write_cols, ['blob1']) + write_builder.new_commit().commit(commit_messages) + writer.close() + def test_data_blob_writer_write_operations(self): """Test DataBlobWriter write operations with real data.""" from pypaimon import Schema diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index a98383d9c956..c33fca3792eb 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -69,7 +69,8 @@ def max_seq_number(): partition=partition, bucket=bucket, max_seq_number=0, - options=options + options=options, + write_cols=self.write_cols, ) elif self.table.is_primary_key_table: return KeyValueDataWriter( diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index 1006ec8289b1..e7a28cddb7e3 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -48,6 +48,8 @@ class DataBlobWriter(DataWriter): - One normal data file may correspond to multiple blob data files - Blob data is written immediately to disk to prevent memory corruption - Blob file metadata is stored as separate DataFileMeta objects after normal file metadata + - When TableWrite.with_write_type narrows columns, incoming batches only carry that subset; + column lists are narrowed accordingly so splitting never selects missing columns. Rolling behavior: - Normal data rolls: Both normal and blob writers are closed together, blob metadata added after normal metadata @@ -76,8 +78,9 @@ class DataBlobWriter(DataWriter): # Constant for checking rolling condition periodically CHECK_ROLLING_RECORD_CNT = 1000 - def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None): - super().__init__(table, partition, bucket, max_seq_number, options) + def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None, + write_cols: Optional[List[str]] = None): + super().__init__(table, partition, bucket, max_seq_number, options, write_cols=write_cols) # Determine blob columns from table schema self.blob_column_names = self._get_blob_columns_from_schema() @@ -93,16 +96,31 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op ) # Blob fields that should still be written to `.blob` files. - self.blob_file_column_names = [ + full_blob_file_column_names = [ col for col in self.blob_column_names if col not in self.blob_descriptor_fields ] - + full_blob_file_set = set(full_blob_file_column_names) all_column_names = self.table.field_names - self.normal_column_names = [ - col for col in all_column_names if col not in self.blob_file_column_names - ] + + # Narrow columns when TableWrite.with_write_type(...) supplies a partial column list. + # Incoming RecordBatches only contain those columns; selecting full normal/blob lists + # would raise KeyError. + if write_cols is not None: + write_col_set = set(write_cols) + self.blob_file_column_names = [ + col for col in full_blob_file_column_names if col in write_col_set + ] + self.normal_column_names = [ + col for col in write_cols if col not in full_blob_file_set + ] + else: + self.blob_file_column_names = list(full_blob_file_column_names) + self.normal_column_names = [ + col for col in all_column_names if col not in full_blob_file_set + ] + normal_name_set = set(self.normal_column_names) self.normal_columns = [ - field for field in self.table.table_schema.fields if field.name in self.normal_column_names + field for field in self.table.table_schema.fields if field.name in normal_name_set ] self.write_cols = self.normal_column_names