From 955adbfccbdfe052d255b8f9f1fddb41e96ac938 Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Wed, 29 Apr 2026 03:40:24 -0700 Subject: [PATCH 1/4] Implementing input changelog generation support in python lib --- .../tests/write/changelog_producer_test.py | 271 ++++++++++++++++++ .../pypaimon/write/commit_message.py | 5 +- .../pypaimon/write/file_store_commit.py | 78 ++++- .../pypaimon/write/file_store_write.py | 13 +- .../pypaimon/write/writer/data_writer.py | 76 ++++- 5 files changed, 419 insertions(+), 24 deletions(-) create mode 100644 paimon-python/pypaimon/tests/write/changelog_producer_test.py diff --git a/paimon-python/pypaimon/tests/write/changelog_producer_test.py b/paimon-python/pypaimon/tests/write/changelog_producer_test.py new file mode 100644 index 000000000000..9bf331ca039f --- /dev/null +++ b/paimon-python/pypaimon/tests/write/changelog_producer_test.py @@ -0,0 +1,271 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import glob +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +import json + +from pypaimon import CatalogFactory, Schema +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.write.commit_message import CommitMessage + + +class ChangelogProducerTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + cls.pk_schema = pa.schema([ + pa.field('user_id', pa.int32(), nullable=False), + ('item_id', pa.int64()), + ('behavior', pa.string()), + pa.field('dt', pa.string(), nullable=False) + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_table(self, table_name, options=None): + schema = Schema.from_pyarrow_schema( + self.pk_schema, + partition_keys=['dt'], + primary_keys=['user_id', 'dt'], + options=options or {} + ) + self.catalog.create_table(f'default.{table_name}', schema, False) + return self.catalog.get_table(f'default.{table_name}') + + def _sample_data(self): + return pa.Table.from_pydict({ + 'user_id': [1, 2, 3], + 'item_id': [101, 102, 103], + 'behavior': ['click', 'buy', 'view'], + 'dt': ['p1', 'p1', 'p1'] + }, schema=self.pk_schema) + + def test_commit_message_with_changelog(self): + msg = CommitMessage(partition=('p1',), bucket=0, new_files=[], changelog_files=[]) + self.assertTrue(msg.is_empty()) + + msg2 = CommitMessage(partition=('p1',), bucket=0, new_files=['fake']) + self.assertFalse(msg2.is_empty()) + self.assertEqual(msg2.changelog_files, []) + + def test_full_compaction_and_lookup_no_changelog_from_writer(self): + """FULL_COMPACTION and LOOKUP rely on dedicated compaction for changelog, + so the Python writer should not produce changelog files for these modes.""" + for mode in ['full-compaction', 'lookup']: + table = self._create_table( + f'test_no_changelog_{mode.replace("-", "_")}', + options={'changelog-producer': mode, 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + bucket_dir = os.path.join( + self.warehouse, 'default.db', + f'test_no_changelog_{mode.replace("-", "_")}', 'dt=p1', 'bucket-0') + changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertEqual(len(changelog_files), 0, + f"Writer should not produce changelog files for {mode}") + + table_write.close() + table_commit.close() + + def test_none_mode_no_changelog(self): + table = self._create_table( + 'test_none_mode', + options={'changelog-producer': 'none', 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + snapshot_path = glob.glob( + os.path.join(self.warehouse, 'default.db', 'test_none_mode', 'snapshot', 'snapshot-*')) + self.assertTrue(len(snapshot_path) > 0) + + snapshot_json = open(snapshot_path[0]).read() + snapshot = json.loads(snapshot_json) + self.assertNotIn('changelogManifestList', snapshot) + + table_write.close() + table_commit.close() + + def test_input_mode_produces_changelog_files(self): + table = self._create_table( + 'test_input_files', + options={'changelog-producer': 'input', 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + bucket_dir = os.path.join( + self.warehouse, 'default.db', 'test_input_files', 'dt=p1', 'bucket-0') + data_files = glob.glob(os.path.join(bucket_dir, 'data-*')) + changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertTrue(len(data_files) > 0, "Should have data files") + self.assertTrue(len(changelog_files) > 0, "Should have changelog files") + + table_write.close() + table_commit.close() + + def test_input_mode_snapshot_has_changelog_manifest(self): + table = self._create_table( + 'test_input_snapshot', + options={'changelog-producer': 'input', 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + snapshot_path = glob.glob( + os.path.join(self.warehouse, 'default.db', 'test_input_snapshot', + 'snapshot', 'snapshot-*')) + self.assertTrue(len(snapshot_path) > 0) + + snapshot_json = open(snapshot_path[0]).read() + snapshot = json.loads(snapshot_json) + self.assertIn('changelogManifestList', snapshot) + self.assertIsNotNone(snapshot['changelogManifestList']) + self.assertIn('changelogRecordCount', snapshot) + self.assertEqual(snapshot['changelogRecordCount'], 3) + self.assertIn('changelogManifestListSize', snapshot) + + table_write.close() + table_commit.close() + + def test_input_mode_changelog_manifest_readable(self): + table = self._create_table( + 'test_input_readable', + options={'changelog-producer': 'input', 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + from pypaimon.snapshot.snapshot_manager import SnapshotManager + snapshot_manager = SnapshotManager(table) + snapshot = snapshot_manager.get_latest_snapshot() + + self.assertIsNotNone(snapshot.changelog_manifest_list) + + manifest_list_manager = ManifestListManager(table) + changelog_manifests = manifest_list_manager.read_changelog(snapshot) + self.assertTrue(len(changelog_manifests) > 0) + + total_changelog_added = sum(m.num_added_files for m in changelog_manifests) + self.assertTrue(total_changelog_added > 0) + + table_write.close() + table_commit.close() + + def test_input_mode_multiple_commits(self): + table = self._create_table( + 'test_input_multi', + options={'changelog-producer': 'input', 'bucket': '1'} + ) + + # First commit + write_builder1 = table.new_batch_write_builder() + table_write1 = write_builder1.new_write() + table_commit1 = write_builder1.new_commit() + table_write1.write_arrow(self._sample_data()) + table_commit1.commit(table_write1.prepare_commit()) + table_write1.close() + table_commit1.close() + + # Second commit with different data + write_builder2 = table.new_batch_write_builder() + table_write2 = write_builder2.new_write() + table_commit2 = write_builder2.new_commit() + data2 = pa.Table.from_pydict({ + 'user_id': [4, 5], + 'item_id': [104, 105], + 'behavior': ['click', 'buy'], + 'dt': ['p1', 'p1'] + }, schema=self.pk_schema) + table_write2.write_arrow(data2) + table_commit2.commit(table_write2.prepare_commit()) + table_write2.close() + table_commit2.close() + + from pypaimon.snapshot.snapshot_manager import SnapshotManager + snapshot_manager = SnapshotManager(table) + snapshot = snapshot_manager.get_latest_snapshot() + self.assertEqual(snapshot.id, 2) + self.assertIsNotNone(snapshot.changelog_manifest_list) + self.assertEqual(snapshot.changelog_record_count, 2) + + def test_abort_cleans_up_changelog_files(self): + table = self._create_table( + 'test_input_abort', + options={'changelog-producer': 'input', 'bucket': '1'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + commit_messages = table_write.prepare_commit() + + bucket_dir = os.path.join( + self.warehouse, 'default.db', 'test_input_abort', 'dt=p1', 'bucket-0') + changelog_files_before = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertTrue(len(changelog_files_before) > 0) + + table_commit.abort(commit_messages) + + data_files_after = glob.glob(os.path.join(bucket_dir, 'data-*')) + changelog_files_after = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertEqual(len(data_files_after), 0, "Data files should be cleaned up after abort") + self.assertEqual(len(changelog_files_after), 0, "Changelog files should be cleaned up after abort") + + table_write.close() + table_commit.close() + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index d560c5a2479f..98d972012733 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import List, Tuple, Optional from pypaimon.manifest.schema.data_file_meta import DataFileMeta @@ -27,7 +27,8 @@ class CommitMessage: partition: Tuple bucket: int new_files: List[DataFileMeta] + changelog_files: List[DataFileMeta] = field(default_factory=list) check_from_snapshot: Optional[int] = -1 def is_empty(self): - return not self.new_files + return not self.new_files and not self.changelog_files diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 832a39ba6887..05964d56a27a 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -127,6 +127,7 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): len(commit_messages), ) commit_entries = [] + changelog_entries = [] for msg in commit_messages: partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: @@ -137,8 +138,17 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): total_buckets=self.table.total_buckets, file=file )) + for file in msg.changelog_files: + changelog_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) - logger.info("Finished collecting changes, including: %d entries", len(commit_entries)) + logger.info("Finished collecting changes, including: %d entries, %d changelog entries", + len(commit_entries), len(changelog_entries)) commit_kind = "APPEND" detect_conflicts = False @@ -154,6 +164,7 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): self._try_commit(commit_kind=commit_kind, commit_identifier=commit_identifier, commit_entries_plan=lambda snapshot: commit_entries, + changelog_entries=changelog_entries, detect_conflicts=detect_conflicts, allow_rollback=allow_rollback) @@ -182,11 +193,24 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " f"in {msg.partition} does not belong to this partition") + changelog_entries = [] + for msg in commit_messages: + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + for file in msg.changelog_files: + changelog_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) + self._try_commit( commit_kind="OVERWRITE", commit_identifier=commit_identifier, commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( snapshot, partition_filter, commit_messages), + changelog_entries=changelog_entries, detect_conflicts=True, allow_rollback=False, ) @@ -241,7 +265,7 @@ def truncate_table(self, commit_identifier: int) -> None: ) def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan, - detect_conflicts=False, allow_rollback=False): + changelog_entries=None, detect_conflicts=False, allow_rollback=False): retry_count = 0 retry_result = None @@ -259,6 +283,7 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan, retry_result=retry_result, commit_kind=commit_kind, commit_entries=commit_entries, + changelog_entries=changelog_entries or [], commit_identifier=commit_identifier, latest_snapshot=latest_snapshot, detect_conflicts=detect_conflicts, @@ -311,7 +336,9 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan, retry_count += 1 def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, - commit_entries: List[ManifestEntry], commit_identifier: int, + commit_entries: List[ManifestEntry], + changelog_entries: List[ManifestEntry], + commit_identifier: int, latest_snapshot: Optional[Snapshot], detect_conflicts: bool = False, allow_rollback: bool = False) -> CommitResult: @@ -356,10 +383,27 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str return RetryResult(latest_snapshot, conflict_exception) raise conflict_exception + changelog_manifest_list_name = None + changelog_manifest_list_size = None + changelog_record_count = None try: new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + # Write changelog manifest if changelog entries exist + if changelog_entries: + changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog-0" + changelog_manifest_file_meta = self._write_manifest_file( + changelog_entries, changelog_manifest_file) + changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" + self.manifest_list_manager.write( + changelog_manifest_list_name, [changelog_manifest_file_meta]) + manifest_path = self.manifest_list_manager.manifest_path + changelog_manifest_list_size = self.table.file_io.get_file_size( + f"{manifest_path}/{changelog_manifest_list_name}") + changelog_record_count = sum( + entry.file.row_count for entry in changelog_entries if entry.kind == 0) + # process existing_manifest total_record_count = 0 if latest_snapshot: @@ -389,6 +433,9 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str schema_id=self.table.table_schema.id, base_manifest_list=base_manifest_list, delta_manifest_list=delta_manifest_list, + changelog_manifest_list=changelog_manifest_list_name, + changelog_manifest_list_size=changelog_manifest_list_size, + changelog_record_count=changelog_record_count, total_record_count=total_record_count, delta_record_count=delta_record_count, commit_user=self.commit_user, @@ -401,7 +448,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) + self._cleanup_preparation_failure( + delta_manifest_list, base_manifest_list, changelog_manifest_list_name) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -421,7 +469,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str commit_kind, commit_time_s, ) - self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) + self._cleanup_preparation_failure( + delta_manifest_list, base_manifest_list, changelog_manifest_list_name) return RetryResult(latest_snapshot, None) except Exception as e: # Commit exception, not sure about the situation and should not clean up the files @@ -553,7 +602,8 @@ def _commit_retry_wait(self, retry_count: int): def _cleanup_preparation_failure(self, delta_manifest_list: Optional[str], - base_manifest_list: Optional[str]): + base_manifest_list: Optional[str], + changelog_manifest_list: Optional[str] = None): try: manifest_path = self.manifest_list_manager.manifest_path @@ -568,21 +618,31 @@ def _cleanup_preparation_failure(self, if base_manifest_list: base_path = f"{manifest_path}/{base_manifest_list}" self.table.file_io.delete_quietly(base_path) + + if changelog_manifest_list: + try: + changelog_manifests = self.manifest_list_manager.read(changelog_manifest_list) + for manifest_meta in changelog_manifests: + manifest_file_path = ( + f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}") + self.table.file_io.delete_quietly(manifest_file_path) + except Exception: + pass + changelog_path = f"{manifest_path}/{changelog_manifest_list}" + self.table.file_io.delete_quietly(changelog_path) except Exception as e: logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling.""" for message in commit_messages: - for file in message.new_files: + for file in list(message.new_files) + list(message.changelog_files): try: path_to_delete = file.external_path if file.external_path else file.file_path if path_to_delete: path_str = str(path_to_delete) self.table.file_io.delete_quietly(path_str) except Exception as e: - import logging - logger = logging.getLogger(__name__) path_to_delete = file.external_path if file.external_path else file.file_path logger.warning(f"Failed to clean up file {path_to_delete} during abort: {e}") diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 75b1d3a7d708..573997ca3aef 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -41,6 +41,7 @@ def __init__(self, table, commit_user): self.write_cols = None self.commit_identifier = 0 self.options = CoreOptions.copy(table.options) + self.changelog_producer = self.options.changelog_producer() if self.table.bucket_mode() == BucketMode.POSTPONE_MODE: self.options.set(CoreOptions.DATA_FILE_PREFIX, (f"{self.options.data_file_prefix()}-u-{commit_user}" @@ -77,7 +78,8 @@ def max_seq_number(): partition=partition, bucket=bucket, max_seq_number=max_seq_number(), - options=options) + options=options, + changelog_producer=self.changelog_producer) else: seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number() return AppendOnlyDataWriter( @@ -86,7 +88,8 @@ def max_seq_number(): bucket=bucket, max_seq_number=seq_number, options=options, - write_cols=self.write_cols + write_cols=self.write_cols, + changelog_producer=self.changelog_producer ) def _has_blob_columns(self) -> bool: @@ -105,11 +108,13 @@ def prepare_commit(self, commit_identifier) -> List[CommitMessage]: commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): committed_files = writer.prepare_commit() - if committed_files: + changelog_files = writer.prepare_changelog_commit() + if committed_files or changelog_files: commit_message = CommitMessage( partition=partition, bucket=bucket, - new_files=committed_files + new_files=committed_files, + changelog_files=changelog_files, ) commit_messages.append(commit_message) return commit_messages diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index f4802ae74e5e..32d27433d25e 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -21,7 +21,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional, Tuple -from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.options.core_options import CoreOptions, ChangelogProducer from pypaimon.common.external_path_provider import ExternalPathProvider from pypaimon.data.timestamp import Timestamp from pypaimon.manifest.schema.data_file_meta import DataFileMeta @@ -35,7 +35,8 @@ class DataWriter(ABC): """Base class for data writers that handle PyArrow tables directly.""" def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None, - write_cols: Optional[List[str]] = None): + write_cols: Optional[List[str]] = None, + changelog_producer: ChangelogProducer = ChangelogProducer.NONE): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -61,6 +62,8 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op self.pending_data: Optional[pa.Table] = None self.committed_files: List[DataFileMeta] = [] + self.committed_changelog_files: List[DataFileMeta] = [] + self.changelog_producer = changelog_producer self.write_cols = write_cols self.blob_as_descriptor = self.options.blob_as_descriptor() @@ -111,6 +114,9 @@ def prepare_commit(self) -> List[DataFileMeta]: return self.committed_files.copy() + def prepare_changelog_commit(self) -> List[DataFileMeta]: + return self.committed_changelog_files.copy() + def close(self): try: if self.pending_data is not None and self.pending_data.num_rows > 0: @@ -130,16 +136,14 @@ def abort(self): Abort all writers and clean up resources. This method should be called when an error occurs during writing. It deletes any files that were written and cleans up resources. """ - # Delete any files that were written - for file_meta in self.committed_files: + # Delete any files that were written (data + changelog) + for file_meta in self.committed_files + self.committed_changelog_files: try: - # Use external_path if available (contains full URL scheme), otherwise use file_path path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path if path_to_delete: path_str = str(path_to_delete) self.file_io.delete_quietly(path_str) except Exception as e: - # Log but don't raise - we want to clean up as much as possible import logging logger = logging.getLogger(__name__) path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path @@ -148,6 +152,7 @@ def abort(self): # Clean up resources self.pending_data = None self.committed_files.clear() + self.committed_changelog_files.clear() @abstractmethod def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: @@ -232,6 +237,7 @@ def _write_data_to_file(self, data: pa.Table): min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current self.sequence_generator.start = self.sequence_generator.current + creation_time = Timestamp.now() self.committed_files.append(DataFileMeta.create( file_name=file_name, file_size=self.file_io.get_file_size(file_path), @@ -245,17 +251,23 @@ def _write_data_to_file(self, data: pa.Table): schema_id=self.table.table_schema.id, level=0, extra_files=[], - creation_time=Timestamp.now(), + creation_time=creation_time, delete_row_count=0, file_source=0, value_stats_cols=None if value_stats_enabled else [], - external_path=external_path_str, # Set external path if using external paths + external_path=external_path_str, first_row_id=None, write_cols=self.write_cols, - # None means all columns in the table have been written file_path=file_path, )) + if self.changelog_producer == ChangelogProducer.INPUT: + self._write_changelog_file( + data, min_key, max_key, key_stats, value_stats, + min_seq, max_seq, creation_time, + value_stats_enabled, external_path_str is not None, + ) + def _apply_variant_shredding(self, data: pa.Table) -> pa.Table: """Transform VARIANT columns into shredded Parquet format. @@ -281,6 +293,52 @@ def _apply_variant_shredding(self, data: pa.Table) -> pa.Table: return data return pa.Table.from_arrays(columns, schema=pa.schema(fields)) + def _write_changelog_file(self, data, min_key, max_key, key_stats, value_stats, + min_seq, max_seq, creation_time, + value_stats_enabled, is_external): + changelog_file_name = f"changelog-{uuid.uuid4()}-0.{self.file_format}" + changelog_file_path = self._generate_file_path(changelog_file_name) + + changelog_external_path = None + if is_external and self._current_external_path: + changelog_external_path = self._current_external_path + + if self.file_format == CoreOptions.FILE_FORMAT_PARQUET: + self.file_io.write_parquet(changelog_file_path, data, compression=self.compression, + zstd_level=self.zstd_level) + elif self.file_format == CoreOptions.FILE_FORMAT_ORC: + self.file_io.write_orc(changelog_file_path, data, compression=self.compression, + zstd_level=self.zstd_level) + elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: + self.file_io.write_avro(changelog_file_path, data, compression=self.compression, + zstd_level=self.zstd_level) + else: + self.file_io.write_parquet(changelog_file_path, data, compression=self.compression, + zstd_level=self.zstd_level) + + self.committed_changelog_files.append(DataFileMeta.create( + file_name=changelog_file_name, + file_size=self.file_io.get_file_size(changelog_file_path), + row_count=data.num_rows, + min_key=GenericRow(min_key, self.trimmed_primary_keys_fields), + max_key=GenericRow(max_key, self.trimmed_primary_keys_fields), + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=min_seq, + max_sequence_number=max_seq, + schema_id=self.table.table_schema.id, + level=0, + extra_files=[], + creation_time=creation_time, + delete_row_count=0, + file_source=0, + value_stats_cols=None if value_stats_enabled else [], + external_path=changelog_external_path, + first_row_id=None, + write_cols=self.write_cols, + file_path=changelog_file_path, + )) + def _generate_file_path(self, file_name: str) -> str: if self.external_path_provider: external_path = self.external_path_provider.get_next_external_data_path(file_name) From c56b4b7f69eedf6acd1d9ce9c1334b1cbf6022b8 Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Wed, 29 Apr 2026 04:48:09 -0700 Subject: [PATCH 2/4] Reject changelog-producer on non-PK tables and decouple changelog file format from data file format --- .../pypaimon/common/options/core_options.py | 11 +++ paimon-python/pypaimon/schema/schema.py | 7 ++ .../tests/write/changelog_producer_test.py | 88 +++++++++++++++++++ .../pypaimon/write/file_store_commit.py | 38 ++++---- .../pypaimon/write/writer/data_writer.py | 21 +++-- 5 files changed, 136 insertions(+), 29 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 6a4b51c8c33a..93b531b46b33 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -292,6 +292,14 @@ class CoreOptions: "Options: none, input, full-compaction, lookup.") ) + CHANGELOG_FILE_FORMAT: ConfigOption[str] = ( + ConfigOptions.key("changelog-file.format") + .string_type() + .no_default_value() + .with_description("Specify the file format of changelog files. " + "Currently parquet, avro and orc are supported.") + ) + MERGE_ENGINE: ConfigOption[MergeEngine] = ( ConfigOptions.key("merge-engine") .enum_type(MergeEngine) @@ -579,6 +587,9 @@ def deletion_vectors_enabled(self, default=None): def changelog_producer(self, default=None): return self.options.get(CoreOptions.CHANGELOG_PRODUCER, default) + def changelog_file_format(self, default=None): + return self.options.get(CoreOptions.CHANGELOG_FILE_FORMAT, default) + def merge_engine(self, default=None): return self.options.get(CoreOptions.MERGE_ENGINE, default) diff --git a/paimon-python/pypaimon/schema/schema.py b/paimon-python/pypaimon/schema/schema.py index f09b6c62744d..a87295bbd8c2 100644 --- a/paimon-python/pypaimon/schema/schema.py +++ b/paimon-python/pypaimon/schema/schema.py @@ -92,4 +92,11 @@ def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str] if primary_keys is not None: raise ValueError("Blob type is not supported with primary key.") + changelog_producer = (options or {}).get(CoreOptions.CHANGELOG_PRODUCER.key(), 'none') + if changelog_producer != 'none' and not primary_keys: + raise ValueError( + f"Cannot set 'changelog-producer' to '{changelog_producer}' on a table without primary keys. " + f"Changelog producer requires primary keys to be defined." + ) + return Schema(fields, partition_keys, primary_keys, options, comment) diff --git a/paimon-python/pypaimon/tests/write/changelog_producer_test.py b/paimon-python/pypaimon/tests/write/changelog_producer_test.py index 9bf331ca039f..7fc96642e11b 100644 --- a/paimon-python/pypaimon/tests/write/changelog_producer_test.py +++ b/paimon-python/pypaimon/tests/write/changelog_producer_test.py @@ -266,6 +266,94 @@ def test_abort_cleans_up_changelog_files(self): table_write.close() table_commit.close() + def test_reject_changelog_producer_on_append_only_table(self): + append_schema = pa.schema([ + ('user_id', pa.int32()), + ('item_id', pa.int64()), + ('behavior', pa.string()), + ('dt', pa.string()) + ]) + for mode in ['input', 'full-compaction', 'lookup']: + with self.assertRaises(ValueError, msg=f"Should reject changelog-producer={mode} without PKs"): + Schema.from_pyarrow_schema( + append_schema, + partition_keys=['dt'], + options={'changelog-producer': mode, 'bucket': '1'} + ) + + def test_changelog_producer_none_allowed_on_append_only_table(self): + append_schema = pa.schema([ + ('user_id', pa.int32()), + ('item_id', pa.int64()), + ('behavior', pa.string()), + ('dt', pa.string()) + ]) + schema = Schema.from_pyarrow_schema( + append_schema, + partition_keys=['dt'], + options={'changelog-producer': 'none', 'bucket': '1'} + ) + self.assertIsNotNone(schema) + + def test_input_mode_changelog_uses_parquet_regardless_of_data_format(self): + table = self._create_table( + 'test_input_changelog_format', + options={'changelog-producer': 'input', 'bucket': '1', 'file.format': 'orc'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + bucket_dir = os.path.join( + self.warehouse, 'default.db', 'test_input_changelog_format', 'dt=p1', 'bucket-0') + changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertTrue(len(changelog_files) > 0, "Should have changelog files") + for f in changelog_files: + self.assertTrue(f.endswith('.parquet'), + f"Changelog file should use parquet format by default, got {f}") + + data_files = glob.glob(os.path.join(bucket_dir, 'data-*')) + self.assertTrue(len(data_files) > 0, "Should have data files") + for f in data_files: + self.assertTrue(f.endswith('.orc'), + f"Data file should use orc format, got {f}") + + table_write.close() + table_commit.close() + + def test_input_mode_changelog_respects_changelog_file_format(self): + table = self._create_table( + 'test_input_cl_file_fmt', + options={'changelog-producer': 'input', 'bucket': '1', + 'file.format': 'parquet', 'changelog-file.format': 'orc'} + ) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_arrow(self._sample_data()) + table_commit.commit(table_write.prepare_commit()) + + bucket_dir = os.path.join( + self.warehouse, 'default.db', 'test_input_cl_file_fmt', 'dt=p1', 'bucket-0') + changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*')) + self.assertTrue(len(changelog_files) > 0, "Should have changelog files") + for f in changelog_files: + self.assertTrue(f.endswith('.orc'), + f"Changelog file should use orc format, got {f}") + + data_files = glob.glob(os.path.join(bucket_dir, 'data-*')) + self.assertTrue(len(data_files) > 0, "Should have data files") + for f in data_files: + self.assertTrue(f.endswith('.parquet'), + f"Data file should use parquet format, got {f}") + + table_write.close() + table_commit.close() + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 05964d56a27a..226cb4b9298f 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -127,7 +127,6 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): len(commit_messages), ) commit_entries = [] - changelog_entries = [] for msg in commit_messages: partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: @@ -138,17 +137,10 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): total_buckets=self.table.total_buckets, file=file )) - for file in msg.changelog_files: - changelog_entries.append(ManifestEntry( - kind=0, - partition=partition, - bucket=msg.bucket, - total_buckets=self.table.total_buckets, - file=file - )) + changelog_entries = self._collect_changelog_entries(commit_messages) logger.info("Finished collecting changes, including: %d entries, %d changelog entries", - len(commit_entries), len(changelog_entries)) + len(commit_entries), len(changelog_entries)) commit_kind = "APPEND" detect_conflicts = False @@ -193,17 +185,7 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " f"in {msg.partition} does not belong to this partition") - changelog_entries = [] - for msg in commit_messages: - partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) - for file in msg.changelog_files: - changelog_entries.append(ManifestEntry( - kind=0, - partition=partition, - bucket=msg.bucket, - total_buckets=self.table.total_buckets, - file=file - )) + changelog_entries = self._collect_changelog_entries(commit_messages) self._try_commit( commit_kind="OVERWRITE", @@ -600,6 +582,20 @@ def _commit_retry_wait(self, retry_count: int): time.sleep(total_wait_ms / 1000.0) + def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> List[ManifestEntry]: + changelog_entries = [] + for msg in commit_messages: + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + for file in msg.changelog_files: + changelog_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) + return changelog_entries + def _cleanup_preparation_failure(self, delta_manifest_list: Optional[str], base_manifest_list: Optional[str], diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 32d27433d25e..b6bf912bce8a 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -64,6 +64,10 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op self.committed_files: List[DataFileMeta] = [] self.committed_changelog_files: List[DataFileMeta] = [] self.changelog_producer = changelog_producer + self.changelog_file_format = ( + self.options.changelog_file_format() + or CoreOptions.FILE_FORMAT_PARQUET + ) self.write_cols = write_cols self.blob_as_descriptor = self.options.blob_as_descriptor() @@ -296,25 +300,26 @@ def _apply_variant_shredding(self, data: pa.Table) -> pa.Table: def _write_changelog_file(self, data, min_key, max_key, key_stats, value_stats, min_seq, max_seq, creation_time, value_stats_enabled, is_external): - changelog_file_name = f"changelog-{uuid.uuid4()}-0.{self.file_format}" + cl_fmt = self.changelog_file_format + changelog_file_name = f"changelog-{uuid.uuid4()}-0.{cl_fmt}" changelog_file_path = self._generate_file_path(changelog_file_name) changelog_external_path = None if is_external and self._current_external_path: changelog_external_path = self._current_external_path - if self.file_format == CoreOptions.FILE_FORMAT_PARQUET: + if cl_fmt == CoreOptions.FILE_FORMAT_PARQUET: self.file_io.write_parquet(changelog_file_path, data, compression=self.compression, zstd_level=self.zstd_level) - elif self.file_format == CoreOptions.FILE_FORMAT_ORC: + elif cl_fmt == CoreOptions.FILE_FORMAT_ORC: self.file_io.write_orc(changelog_file_path, data, compression=self.compression, - zstd_level=self.zstd_level) - elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: + zstd_level=self.zstd_level) + elif cl_fmt == CoreOptions.FILE_FORMAT_AVRO: self.file_io.write_avro(changelog_file_path, data, compression=self.compression, - zstd_level=self.zstd_level) + zstd_level=self.zstd_level) else: - self.file_io.write_parquet(changelog_file_path, data, compression=self.compression, - zstd_level=self.zstd_level) + raise ValueError(f"Unsupported changelog file format: {cl_fmt}. " + f"Supported formats: parquet, orc, avro.") self.committed_changelog_files.append(DataFileMeta.create( file_name=changelog_file_name, From f6324f62232620ab54db428c400ab066401b5a47 Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:30:42 -0700 Subject: [PATCH 3/4] Making the behavior consistent with Java implementation --- .../pypaimon/tests/write/changelog_producer_test.py | 6 +++--- paimon-python/pypaimon/write/writer/data_writer.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/write/changelog_producer_test.py b/paimon-python/pypaimon/tests/write/changelog_producer_test.py index 7fc96642e11b..ae8039ad4239 100644 --- a/paimon-python/pypaimon/tests/write/changelog_producer_test.py +++ b/paimon-python/pypaimon/tests/write/changelog_producer_test.py @@ -295,7 +295,7 @@ def test_changelog_producer_none_allowed_on_append_only_table(self): ) self.assertIsNotNone(schema) - def test_input_mode_changelog_uses_parquet_regardless_of_data_format(self): + def test_input_mode_changelog_inherits_data_file_format(self): table = self._create_table( 'test_input_changelog_format', options={'changelog-producer': 'input', 'bucket': '1', 'file.format': 'orc'} @@ -312,8 +312,8 @@ def test_input_mode_changelog_uses_parquet_regardless_of_data_format(self): changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*')) self.assertTrue(len(changelog_files) > 0, "Should have changelog files") for f in changelog_files: - self.assertTrue(f.endswith('.parquet'), - f"Changelog file should use parquet format by default, got {f}") + self.assertTrue(f.endswith('.orc'), + f"Changelog file should inherit data file format (orc), got {f}") data_files = glob.glob(os.path.join(bucket_dir, 'data-*')) self.assertTrue(len(data_files) > 0, "Should have data files") diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index b6bf912bce8a..f3237a809caa 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -66,7 +66,7 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op self.changelog_producer = changelog_producer self.changelog_file_format = ( self.options.changelog_file_format() - or CoreOptions.FILE_FORMAT_PARQUET + or self.file_format ) self.write_cols = write_cols self.blob_as_descriptor = self.options.blob_as_descriptor() From 01bc7fbd547b0b262d3605cba4b88ab0dfb284a2 Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:54:07 -0700 Subject: [PATCH 4/4] Changing position for backward compatibility --- paimon-python/pypaimon/tests/file_store_commit_test.py | 1 + paimon-python/pypaimon/tests/reader_append_only_test.py | 1 + paimon-python/pypaimon/write/commit_message.py | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index 958ea85a6b7e..8f4bfe1e10e8 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -437,6 +437,7 @@ def test_append_commit_inherits_index_manifest( retry_result=None, commit_kind="APPEND", commit_entries=[commit_entry], + changelog_entries=[], commit_identifier=11, latest_snapshot=latest_snapshot ) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 00b2f803f7ec..d5a077737983 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -496,6 +496,7 @@ def test_commit_retry_filter(self): RetryResult(None), "APPEND", commit_entries, + [], BATCH_COMMIT_IDENTIFIER, latest_snapshot) self.assertTrue(success.is_success()) diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index 98d972012733..bfac6d33c0c3 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -27,8 +27,8 @@ class CommitMessage: partition: Tuple bucket: int new_files: List[DataFileMeta] - changelog_files: List[DataFileMeta] = field(default_factory=list) check_from_snapshot: Optional[int] = -1 + changelog_files: List[DataFileMeta] = field(default_factory=list) def is_empty(self): return not self.new_files and not self.changelog_files