Skip to content

Export partition to apache iceberg#1618

Open
arthurpassos wants to merge 49 commits intoantalya-26.1from
export_partition_iceberg
Open

Export partition to apache iceberg#1618
arthurpassos wants to merge 49 commits intoantalya-26.1from
export_partition_iceberg

Conversation

@arthurpassos
Copy link
Copy Markdown
Collaborator

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

...

Documentation entry for user-facing changes

...

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • S3 Export (2h)
  • Swarms (30m)
  • Tiered Storage (2h)

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Workflow [PR], commit [f8bacec]

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5b0e833565

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

{
auto * object_storage = dynamic_cast<StorageObjectStorageCluster *>(dest_storage.get());

auto * iceberg_metadata = dynamic_cast<IcebergMetadata *>(object_storage->getExternalMetadata(query_context));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard IcebergMetadata casts with USE_AVRO

IcebergMetadata is defined only under #if USE_AVRO in IcebergMetadata.h, but this new dynamic_cast<IcebergMetadata *> is compiled unconditionally. In non-AVRO builds (USE_AVRO=0), this translation unit (and the same pattern added in StorageReplicatedMergeTree.cpp) cannot compile, so the change breaks optional-AVRO build configurations.

Useful? React with 👍 / 👎.

Comment on lines +1780 to +1781
const String sidecar_path = replaceFileExtensionWithAvro(
filename_generator.convertMetadataPathToStoragePath(path));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use storage paths directly when reading export sidecars

The export path list is populated from filename.path_in_storage, but commit now treats each entry as a metadata path and calls convertMetadataPathToStoragePath before reading sidecars. With write_full_path_in_iceberg_metadata=1, table_dir is an URI prefix (for example s3://...) while these entries are plain storage paths (/...), so the conversion throws and EXPORT PARTITION cannot commit.

Useful? React with 👍 / 👎.

Comment thread src/Storages/ExportReplicatedMergeTreePartitionManifest.h
@arthurpassos
Copy link
Copy Markdown
Collaborator Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6c6194814d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/Storages/MergeTree/MergeTreeData.cpp
Comment thread src/Storages/StorageReplicatedMergeTree.cpp Outdated
@arthurpassos
Copy link
Copy Markdown
Collaborator Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 763dec1c0a

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +6535 to +6537
auto * object_storage = dynamic_cast<StorageObjectStorageCluster *>(dest_storage.get());

auto * iceberg_metadata = dynamic_cast<IcebergMetadata *>(object_storage->getExternalMetadata(query_context));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle non-cluster data lakes before dereferencing cast

When EXPORT PART targets a regular Iceberg table (engine backed by StorageObjectStorage, not StorageObjectStorageCluster), this dynamic_cast returns nullptr and object_storage->getExternalMetadata(...) dereferences it. That turns a user-facing validation path into a null-pointer crash instead of an error, and it is reachable because this branch runs whenever iceberg_metadata_json_ is not pre-supplied.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +8261 to +8263
auto * object_storage = dynamic_cast<StorageObjectStorageCluster *>(dest_storage.get());

auto * iceberg_metadata = dynamic_cast<IcebergMetadata *>(object_storage->getExternalMetadata(query_context));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard Iceberg cast in replicated partition export path

The replicated EXPORT PARTITION ... TO TABLE path unconditionally casts dest_storage to StorageObjectStorageCluster and immediately dereferences it. For local Iceberg destination tables, dest_storage is typically StorageObjectStorage, so this can null-deref before any graceful validation, causing a crash instead of returning a structured exception.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +676 to +678
std::lock_guard lock(status_changes_mutex);
std::swap(status_changes, local_status_changes);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve unprocessed status events on handler failure

Swapping all pending status changes into a local queue means any exception during processing drops the remaining events. exportMergeTreePartitionStatusHandlingTask() catches and returns (especially on ZooKeeper/session errors), but does not requeue these keys, so some exports can miss terminal status transitions until another watch event happens (or forever if none arrives).

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 18eb4d0

Comment on lines +1695 to +1700
const auto latest_schema_id = metadata->getValue<Int64>(Iceberg::f_current_schema_id);
if (latest_schema_id != original_schema_id)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Table schema changed before export could commit (expected schema {}, got {}). "
"Restart the export operation.",
original_schema_id, latest_schema_id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Check committed transaction id before schema/spec mismatch

This code throws on schema/spec drift before running the idempotency scan. If the first commit already succeeded but the node crashed before ZooKeeper was marked completed, and the Iceberg table schema/spec changes before retry, the retry will now fail permanently instead of recognizing the existing snapshot tagged with the same transaction id.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by d9564ac

@arthurpassos
Copy link
Copy Markdown
Collaborator Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 675061716f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".


manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve backward compatibility for manifest parsing

Treating write_full_path_in_iceberg_metadata as mandatory breaks reading manifests that were written before this field existed. During rolling upgrades, existing ZooKeeper metadata.json entries for in-flight exports won't have this key, so getValue<bool> throws and status/polling paths that call fromJsonString cannot process those tasks. Please make this field optional with a default (false) when absent.

Useful? React with 👍 / 👎.

auto dot_pos = data_file_storage_path.rfind('.');
auto slash_pos = data_file_storage_path.rfind('/');
if (dot_pos != String::npos && (slash_pos == String::npos || dot_pos > slash_pos))
return data_file_storage_path.substr(0, dot_pos) + ".avro";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid overwriting Avro data files with sidecars

sidecarStoragePath rewrites any extension to .avro; if the Iceberg table writes Avro data files, the data file already ends with .avro, so the sidecar path equals the data file path. writeDataFileSidecar then writes in Rewrite mode, replacing the actual data file with sidecar metadata during onFinish, which corrupts exported data for Avro-formatted Iceberg tables.

Useful? React with 👍 / 👎.

@arthurpassos arthurpassos changed the title [Draft] Export partition to apache iceberg Export partition to apache iceberg Apr 14, 2026
@arthurpassos
Copy link
Copy Markdown
Collaborator Author

I think there is a race condition on the commit procedure, and that is why I am observing failures that indicate more data was written than expected. The normal procedure of exporting partitions is that the replica that finishes the last part export will try to commit. At the same time, it is possible that some other replica is running the cleanup routine and finds a task in the pseudo "pending commit" state. In this case, it will try to commit as well, concurrently. On plain object storage that is not a problem because the commit filename will guarantee the idempotency. On Iceberg, we don't have that control. The only control we have is the clickhouse_export tag in the metadata files - but it has to be checked before trying to commit, causing a TOCTOU problem. if all replicas check it at th same time, nothing prevents them from comitting the same set of files multiple times.

I think the simplest fix is to introduce a commit lock on zookeeper

status	name	duration
FAIL	test_storage_iceberg_with_spark/test_export_partition_iceberg.py::test_concurrent_exports_different_partitions_across_replicas retry_failed[cidb](https://play.clickhouse.com/play?user=play&run=1#V0lUSAogICAgOTAgQVMgaW50ZXJ2YWxfZGF5cwpTRUxFQ1QKICAgIHRvU3RhcnRPZkRheShjaGVja19zdGFydF90aW1lKSBBUyBkYXksCiAgICBjb3VudCgpIEFTIGZhaWx1cmVzLAogICAgZ3JvdXBVbmlxQXJyYXkocHVsbF9yZXF1ZXN0X251bWJlcikgQVMgcHJzLAogICAgYW55KHJlcG9ydF91cmwpIEFTIHJlcG9ydF91cmwKRlJPTSBjaGVja3MKV0hFUkUgKG5vdygpIC0gdG9JbnRlcnZhbERheShpbnRlcnZhbF9kYXlzKSkgPD0gY2hlY2tfc3RhcnRfdGltZQogICAgQU5EIHRlc3RfbmFtZSA9ICd0ZXN0X3N0b3JhZ2VfaWNlYmVyZ193aXRoX3NwYXJrL3Rlc3RfZXhwb3J0X3BhcnRpdGlvbl9pY2ViZXJnLnB5Ojp0ZXN0X2NvbmN1cnJlbnRfZXhwb3J0c19kaWZmZXJlbnRfcGFydGl0aW9uc19hY3Jvc3NfcmVwbGljYXMnCiAgICAtLSBBTkQgY2hlY2tfbmFtZSA9ICdJbnRlZ3JhdGlvbiB0ZXN0cyAoYW1kX2FzYW4sIGRiIGRpc2ssIG9sZCBhbmFseXplciwgNC82KScKICAgIEFORCB0ZXN0X3N0YXR1cyBJTiAoJ0ZBSUwnLCAnRVJST1InKQogICAgQU5EIChwdWxsX3JlcXVlc3RfbnVtYmVyID0gMCBPUiBiYXNlX3JlZiBJTiAoJ2FudGFseWEtMjYuMScsICdhbnRhbHlhJykpCiAgICBBTkQgdGVzdF9jb250ZXh0X3JhdyBMSUtFICclQXNzZXJ0aW9uRXJyb3IlJwpHUk9VUCBCWSBkYXkKT1JERVIgQlkgZGF5IERFU0MK)	15s
File: test_storage_iceberg_with_spark/test_export_partition_iceberg.py:832 - in test_concurrent_exports_different_partitions_across_replicas
    assert count == 9, f"Expected 9 rows total (3 per partition), got {count}"
E   AssertionError: Expected 9 rows total (3 per partition), got 15
E   assert 15 == 9
FAIL	test_storage_iceberg_with_spark/test_export_partition_iceberg.py::test_three_replica_concurrent_exports retry_ok[cidb](https://play.clickhouse.com/play?user=play&run=1#V0lUSAogICAgOTAgQVMgaW50ZXJ2YWxfZGF5cwpTRUxFQ1QKICAgIHRvU3RhcnRPZkRheShjaGVja19zdGFydF90aW1lKSBBUyBkYXksCiAgICBjb3VudCgpIEFTIGZhaWx1cmVzLAogICAgZ3JvdXBVbmlxQXJyYXkocHVsbF9yZXF1ZXN0X251bWJlcikgQVMgcHJzLAogICAgYW55KHJlcG9ydF91cmwpIEFTIHJlcG9ydF91cmwKRlJPTSBjaGVja3MKV0hFUkUgKG5vdygpIC0gdG9JbnRlcnZhbERheShpbnRlcnZhbF9kYXlzKSkgPD0gY2hlY2tfc3RhcnRfdGltZQogICAgQU5EIHRlc3RfbmFtZSA9ICd0ZXN0X3N0b3JhZ2VfaWNlYmVyZ193aXRoX3NwYXJrL3Rlc3RfZXhwb3J0X3BhcnRpdGlvbl9pY2ViZXJnLnB5Ojp0ZXN0X3RocmVlX3JlcGxpY2FfY29uY3VycmVudF9leHBvcnRzJwogICAgLS0gQU5EIGNoZWNrX25hbWUgPSAnSW50ZWdyYXRpb24gdGVzdHMgKGFtZF9hc2FuLCBkYiBkaXNrLCBvbGQgYW5hbHl6ZXIsIDQvNiknCiAgICBBTkQgdGVzdF9zdGF0dXMgSU4gKCdGQUlMJywgJ0VSUk9SJykKICAgIEFORCAocHVsbF9yZXF1ZXN0X251bWJlciA9IDAgT1IgYmFzZV9yZWYgSU4gKCdhbnRhbHlhLTI2LjEnLCAnYW50YWx5YScpKQogICAgQU5EIHRlc3RfY29udGV4dF9yYXcgTElLRSAnJUFzc2VydGlvbkVycm9yJScKR1JPVVAgQlkgZGF5Ck9SREVSIEJZIGRheSBERVNDCg==)	23s
File: test_storage_iceberg_with_spark/test_export_partition_iceberg.py:920 - in test_three_replica_concurrent_exports
    assert count == 9, f"Expected 9 rows total (3 per partition), got {count}"
E   AssertionError: Expected 9 rows total (3 per partition), got 18
E   assert 18 == 9

@arthurpassos
Copy link
Copy Markdown
Collaborator Author

I think the simplest fix is to introduce a commit lock on zookeeper

Locking on zookeeper is not enough because the locks are ephemeral and there is a chance the replica briefly loses connection to zookeeper, the lock is released, some other replica b acquires it and the replica a doesn't hear about it. And it still tries to commit.

The real deal is to re-check the clickhouse-export-transaction upon retry. It is the best solution because there is no extra lock, and the contention point becomes the metadata file name on non catalog storages and the metadata content on catalog based storages.

@vzakaznikov
Copy link
Copy Markdown
Collaborator

vzakaznikov commented Apr 17, 2026

Audit Report: PR #1618

AI audit note: This review comment was generated by AI (gpt-5.3-codex).

Scope: Export partition to Apache Iceberg.

Confirmed defects

High: Iceberg write path can report success after exhausting metadata commit retries

  • Impact: Iceberg write operation can finish without a committed metadata snapshot after repeated metadata-conflict retries, creating a success-path correctness failure (data files written but snapshot not advanced).
  • Anchor: src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp (IcebergStorageSink::finalizeBuffers, IcebergStorageSink::onFinish).
  • Trigger: initializeMetadata() returns false for all MAX_TRANSACTION_RETRIES attempts (e.g., sustained metadata conflicts/catalog contention).
  • Why this is a defect: Retry loop has no terminal failure check; code continues as if commit succeeded.
  • Smallest logical reproduction:
    1. Write into Iceberg table via sink path.
    2. Force initializeMetadata() to return false for every retry attempt.
    3. Observe onFinish() completes without throw.
  • Fix direction: After retry loop, throw when no attempt succeeded.
  • Regression test direction: Inject deterministic repeated metadata-conflict return path and assert write fails rather than succeeds silently.
  • Subsystem and blast radius: Iceberg write commit path; affects correctness/visibility under sustained write contention.
size_t i = 0;
while (i < MAX_TRANSACTION_RETRIES)
{
    if (initializeMetadata())
        break;
    ++i;
}

Medium: Export commit aggregates can overflow due 64-bit to 32-bit narrowing

  • Impact: Large export commits can overflow total_rows / total_chunks_size, producing incorrect snapshot aggregate statistics.
  • Anchor: src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp (commitExportPartitionTransaction).
  • Trigger: Exported partition with aggregate rows/bytes above Int32 range.
  • Why this is a defect: Sidecar values are Int64, but aggregation narrows/casts into Int32.
  • Smallest logical reproduction:
    1. Export partition with >2^31 rows (or bytes aggregate >2^31-1 in the tracked field).
    2. Commit path accumulates into Int32.
    3. Observe wraparound/incorrect aggregate metadata values.
  • Fix direction: Keep aggregate counters as Int64 through commit/metadata generation path.
  • Regression test direction: Add boundary test around Int32 limits for sidecar-derived aggregate counters.
  • Subsystem and blast radius: Iceberg export-commit metadata correctness; impacts observability/planning stats on large exports.
Int32 total_rows = 0;
Int32 total_chunks_size = 0;
...
total_rows += static_cast<Int32>(sidecar.record_count);
total_chunks_size += static_cast<Int32>(sidecar.file_size_in_bytes);

Medium: Export commit depends on mutable source partition state instead of persisted manifest state

  • Impact: Partition export commit can fail indefinitely if source partition disappears or is unavailable on the committing replica after files are already exported, leaving task stuck in PENDING.
  • Anchor: src/Storages/MergeTree/ExportPartitionUtils.cpp (getPartitionValuesForIcebergCommit, commit).
  • Trigger: Source partition dropped/aged out/not locally present between part export completion and final Iceberg commit.
  • Why this is a defect: Commit derives Iceberg partition_values from live source parts at commit time, but this state is not immutable and is not persisted in manifest at export creation.
  • Smallest logical reproduction:
    1. Start replicated EXPORT PARTITION to Iceberg.
    2. Let parts export complete (processing becomes empty).
    3. Remove source partition (or commit runs where no active local part exists).
    4. Commit path throws on missing active part and keeps retrying.
  • Fix direction: Persist partition values (or equivalent immutable commit payload) in manifest at task creation and use that for commit.
  • Regression test direction: Remove source partition after export files are written but before commit; assert commit still succeeds using persisted values.
  • Subsystem and blast radius: Replicated partition-export commit reliability to Iceberg.
const auto parts = storage.getDataPartsVectorInPartitionForInternalUsage(
    MergeTreeDataPartState::Active, partition_id, lock);
if (parts.empty())
    throw Exception(ErrorCodes::BAD_ARGUMENTS,
        "Cannot find active part for partition_id '{}' to derive Iceberg partition values...",
        partition_id);
if (!manifest.iceberg_metadata_json.empty())
{
    iceberg_args.metadata_json_string = manifest.iceberg_metadata_json;
    if (source_storage.getInMemoryMetadataPtr()->hasPartitionKey())
        iceberg_args.partition_values =
            getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id);
}

Medium: Backward-incompatible manifest parsing breaks in-flight exports after upgrade

  • Impact: New binaries can fail to load older metadata.json export manifests that do not contain write_full_path_in_iceberg_metadata, causing status/polling/cleanup paths to fail for in-flight tasks.
  • Anchor: src/Storages/ExportReplicatedMergeTreePartitionManifest.h (ExportReplicatedMergeTreePartitionManifest::fromJsonString), read from src/Storages/StorageReplicatedMergeTree.cpp.
  • Trigger: Rolling upgrade while export manifests created by older code are still present.
  • Why this is a defect: New parser treats a newly introduced field as mandatory, but historical data does not guarantee its existence.
  • Smallest logical reproduction:
    1. Create export manifest on old binary (without write_full_path_in_iceberg_metadata).
    2. Upgrade node.
    3. Read existing export entry (poll/cleanup/system table path).
    4. Parse fails on missing field.
  • Fix direction: Parse field as optional and default to false when absent.
  • Regression test direction: Deserialize old-style manifest JSON without this field and assert parse succeeds with false.
  • Subsystem and blast radius: Replicated export partition control-plane; affects upgrade scenarios with in-flight exports.
manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");
manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

Medium: Commit-phase failures are swallowed without terminal state transition

  • Impact: Permanent commit failures can keep task status at PENDING indefinitely, causing repeated retries and operational noise without convergence.
  • Anchor: src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp (handlePartExportSuccess).
  • Trigger: All parts are processed, then ExportPartitionUtils::commit(...) throws persistently.
  • Why this is a defect: Exception info is logged/persisted, but no status transition to FAILED nor commit-specific retry budget is enforced in this path.
  • Smallest logical reproduction:
    1. Export all parts successfully.
    2. Force commit step to fail deterministically (e.g., persistent schema/spec mismatch or integration failure).
    3. Observe repeated scheduler attempts with export status remaining PENDING.
  • Fix direction: Add bounded commit retry accounting and set terminal FAILED status when limit is exceeded.
  • Regression test direction: Inject deterministic commit failure after all parts are processed and assert eventual FAILED status.
  • Subsystem and blast radius: Replicated partition-export scheduler and ZooKeeper state machine; reliability risk for stuck exports.
try
{
    auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest);
    ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage);
}
catch (const Exception & e)
{
    const auto error_requests = getErrorRequests(export_path, storage.replica_name, zk, storage.log.load(), part_name, e);
    // logs + error bookkeeping only, no terminal status transition here
}

Low: KILL can be silently overwritten by an in-flight commit (KILLED -> COMPLETED)

  • Impact: A successful KILL EXPORT PARTITION (CAS PENDING -> KILLED) can be silently reverted to COMPLETED by an already-running commit finishing after the kill. User-visible status machine is non-monotonic and the kill is not recorded at all. No data corruption, but a state-machine invariant violation.
  • Anchor: src/Storages/MergeTree/ExportPartitionUtils.cpp (commit, final trySet(... COMPLETED, -1)), interacting with src/Storages/StorageReplicatedMergeTree.cpp (killExportPartition).
  • Trigger:
    1. All parts processed; leader enters handlePartExportSuccess -> ExportPartitionUtils::commit(...).
    2. While commit is in flight (Iceberg manifest/metadata writes), user runs SYSTEM KILL EXPORT PARTITION <tid>.
    3. killExportPartition CAS PENDING -> KILLED on stat.version succeeds; returns CancelSent.
    4. Status-handling task calls storage.killExportPart(tid); cancels only in-flight ExportPartTask pipelines, not the commit path.
    5. Commit finishes; Iceberg snapshot published; then zk->trySet(status, COMPLETED, -1) unconditionally overwrites KILLED with COMPLETED.
  • Why this is a defect:
    • killExportPartition establishes KILLED as terminal (refuses to kill unless status == PENDING), so KILLED -> COMPLETED breaks the state-machine invariant.
    • The final trySet(..., -1) is a blind write with no version/state guard.
    • User who successfully issued KILL sees no evidence in task state that the kill occurred; monitoring systems observe non-monotonic transitions.
  • Smallest logical reproduction:
    1. Start replicated EXPORT PARTITION and let all parts finish; commit begins.
    2. Inject a slow Iceberg metadata write (or use iceberg_export_after_commit_before_zk_completed failpoint for adjacent timing).
    3. Issue KILL EXPORT PARTITION <tid> while commit is in flight.
    4. Observe status transitions PENDING -> KILLED -> COMPLETED.
  • Fix direction: Replace the blind trySet(..., -1) with a guarded transition (CAS against the expected PENDING znode version captured at commit start, or explicit check that current status is PENDING); if the guard fails, log a warning and leave the terminal state as-is (or transition to a distinct COMPLETED_AFTER_KILL state).
  • Regression test direction: With commit artificially slowed, issue KILL mid-commit and assert final status remains KILLED (or a dedicated COMPLETED_AFTER_KILL) and never flips back to COMPLETED.
  • Subsystem and blast radius: Replicated partition-export control-plane state machine; narrow race window (overlap with in-flight commit), user-visible inconsistency, no data corruption.
if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status",
        String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1))
{
    LOG_INFO(log, "ExportPartition: Marked export as completed");
}
if (status_from_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
    LOG_INFO(log, "Export partition task is {}, can not cancel it", ...);
    return CancellationCode::CancelCannotBeSent;
}

if (zk->trySet(status_path,
        String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED)),
        stat.version) != Coordination::Error::ZOK)
{
    ...
}

High: Ghost Iceberg metadata file after catalog update failure permanently blocks future commits (transactional catalogs)

  • Impact: When a metadata-file PUT succeeds but catalog->updateMetadata(...) returns failure (clean failure or ghost-ack after server actually committed), the orphan metadata file at version N is NOT deleted and blocks every subsequent commit attempt to that table (export-partition, INSERT, compaction, mutation) via the exists() pre-check inside writeMetadataFileAndVersionHint. Combined with the already-identified "commit-phase non-terminalization" defect, the replicated export task remains PENDING and re-triggers the same blocked commit indefinitely. Other writers to the table also fail after MAX_TRANSACTION_RETRIES.
  • Anchor:
    • src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp (commitImportPartitionTransactionImpl — catalog-based cleanup(true) branch and metadata-file write step).
    • src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp (writeMetadataFileAndVersionHintexists() pre-check returns false on existing version).
    • src/Databases/DataLake/RestCatalog.cpp (RestCatalog::updateMetadata — returns false on HTTPException).
  • Trigger (either of):
    1. object_storage->writeObject(... If-None-Match "*") succeeded server-side but client disconnected before ack; writeMessageToFile catches the exception and returns false. Metadata file at N is persisted on S3 with our transaction_id in summary.
    2. Metadata file PUT succeeded, then catalog->updateMetadata failed cleanly (REST HTTP error, concurrent update, auth, etc.).
  • Why this is a defect:
    • For transactional catalogs, cleanup(true) re-reads latest metadata via the catalog (still points to N-1), so isExportPartitionTransactionAlreadyCommitted(metadata_{N-1}, transaction_id) returns false on every retry.
    • filename_generator.setVersion(N-1 + 1) = N on each retry; exists(N) returns true deterministically; writeMetadataFileAndVersionHint returns false; cleanup(true) reruns; loop until MAX_TRANSACTION_RETRIES throws NOT_IMPLEMENTED.
    • The export task's commit path then catches and does no terminal transition (see the commit-phase non-terminalization defect), so the task keeps re-attempting the same blocked commit forever.
    • All other writes to this Iceberg table hit the same exists(N) blocker and fail.
  • Smallest logical reproduction:
    1. Run EXPORT PARTITION to a REST/Glue-backed Iceberg table; export reaches commit.
    2. Fail catalog->updateMetadata once (HTTP 5xx / drop connection) after the metadata-file PUT succeeded.
    3. Observe: metadata file at version N persists on S3, catalog stays at N-1. Export task stays PENDING. Next INSERT INTO iceberg_table eventually throws NOT_IMPLEMENTED after 100 retry attempts. Re-running anything for this table never unblocks without manual intervention.
  • Fix direction:
    • On cleanup(true) for transactional catalogs, also scan for a metadata version > latest_catalog_version on the filesystem; if one exists and its snapshot summary contains our transaction_id, treat as committed and attempt only the catalog-update (or idempotently short-circuit).
    • Alternatively, on catalog->updateMetadata failure, remove the just-written metadata file (best effort) before returning false, so the retry path is unblocked. Must still tolerate ghost-acks where the catalog actually advanced (re-read after delete; if catalog now at N, the delete would have orphaned the still-referenced file — so removal must be guarded by a second catalog check).
    • Long-term: on startup / periodic cleanup, detect and remove un-referenced metadata files (orphan sweeper with a safety margin).
  • Regression test direction:
    • Inject: metadata-file PUT succeeds; catalog->updateMetadata returns false. Assert table remains writable: a subsequent export/INSERT either succeeds or fails with a non-permanent error; repeated retries must not return the same blocker.
    • Inject ghost-ack on PUT (force writeMessageToFile exception after server commits); assert the retry path discovers the ghost file via transaction_id and self-heals.
  • Subsystem and blast radius: Iceberg write stack; permanent unwritability of affected Iceberg table from ClickHouse until the ghost metadata file is manually deleted. Amplified in the export path by non-terminalization and periodic replica fix-up retries.
// Utils.cpp
if (object_storage->exists(StoredObject(metadata_file_path)))
    return false;

Iceberg::writeMessageToFile(metadata_file_content, metadata_file_path, object_storage, context,
    /* write-if-none-match */ "*", "", compression_method);
// IcebergMetadata.cpp
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
{
    cleanup(true);          // does NOT delete the orphan metadata file
    return false;
}
// cleanup(true) branch, transactional catalogs
catalog->getTableMetadata(namespace_name, table_name, context, table_metadata);
// -> returns version N-1 path; never discovers the ghost file at N with our transaction_id

High: Outer catch(...) in Iceberg commit deletes manifest files after metadata+catalog already published -> snapshot corruption

  • Impact: Any exception thrown after catalog->updateMetadata succeeds but before commitImportPartitionTransactionImpl returns true is caught by the outer catch(...) which runs cleanup(false), deleting storage_manifest_entry_name and storage_manifest_list_name from object storage. At this point the newly-written metadata file N and the catalog snapshot-ref already reference those manifest files, so the published current snapshot becomes dangling. All subsequent reads of the Iceberg table hit 404 / no-such-key when loading the current snapshot's manifest list / manifest entries.
  • Anchor: src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp::commitImportPartitionTransactionImpl (outer try { ... } catch (...) { cleanup(false); throw; }, wrapping post-catalog-success region).
  • Trigger: exception raised between catalog-update success (line ~1693) and the end of the try block (line ~1706). Known throw sources in this region:
    1. persistent_components.metadata_cache->remove(persistent_components.table_path) (line 1702) and metadata_cache->remove(*persistent_components.table_uuid) (line 1704) — cache infrastructure exceptions (e.g., std::bad_alloc in metric/housekeeping paths, custom cache policy failures).
    2. Any future code added to this region (logging, metrics, profile events) that can throw.
  • Why this is a defect:
    • cleanup(false) is designed for pre-commit failures ("intermediate artifacts must be removed"); scope is too wide and catches post-commit exceptions.
    • Commit-path atomicity invariant violated: once catalog->updateMetadata returns true, the snapshot is published and the referenced manifest files must outlive any further exception.
    • No committed=true guard short-circuits cleanup(false) for the post-commit region.
  • Smallest logical reproduction:
    1. Inject a throw (e.g., wrap metadata cache to throw on remove).
    2. Run any INSERT or EXPORT PARTITION commit.
    3. Observe: catalog points to metadata version N; metadata N references manifest list L and manifest entry E; both L and E are missing from object storage; reading the table fails with no-such-key.
  • Fix direction: introduce a bool committed = false local; set committed = true immediately after catalog->updateMetadata returns true (or, when catalog is null, immediately after the writeMetadataFileAndVersionHint success branch). In the outer catch, if (!committed) cleanup(false);. Alternatively, restructure so the post-commit region is outside the try block.
  • Regression test direction:
    1. Fault-injection: throw from cache invalidation path after catalog success; assert subsequent read of the Iceberg table succeeds (manifest list and manifest entries still exist).
    2. Verify behavior for both transactional-catalog and filesystem-only catalog code paths.
  • Subsystem and blast radius: Iceberg write stack — affects INSERT, EXPORT PARTITION, Compaction, and Mutations (any path through commitImportPartitionTransactionImpl). A single unhandled exception in the trailing region produces irrecoverable current-snapshot corruption.
// commitImportPartitionTransactionImpl
try
{
    // ... generate manifest entry, manifest list, write metadata file ...

    if (catalog)
    {
        if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
        {
            cleanup(true);
            return false;
        }
        // <-- catalog already published; manifest files MUST outlive any further throw
    }

    if (persistent_components.metadata_cache)
    {
        persistent_components.metadata_cache->remove(persistent_components.table_path);       // can throw
        if (persistent_components.table_uuid)
            persistent_components.metadata_cache->remove(*persistent_components.table_uuid);  // can throw
    }
}
catch (...)
{
    LOG_ERROR(log, "Failed to commit import partition transaction: {}", getCurrentExceptionMessage(false));
    cleanup(false);   // <-- deletes manifest_entry + manifest_list AFTER snapshot is already published
    throw;
}

Medium: force_export during an in-flight export can double-commit the partition to the Iceberg destination

  • Impact: If a user issues EXPORT PARTITION ... SETTINGS export_merge_tree_partition_force_export = 1 while another replica is already in the commit phase of the same export key (last part processed, ExportPartitionUtils::commit running), both the in-flight commit (with OLD transaction_id) and the fresh re-export (with NEW transaction_id) will independently succeed on Iceberg — producing two snapshots that reference duplicate rows for the same source partition. The destination Iceberg table ends up with 2× the data.
  • Anchor: src/Storages/StorageReplicatedMergeTree.cpp::exportPartitionToTable — the tryRemoveRecursive(partition_exports_path) + subsequent fresh tryMulti that creates a NEW manifest.transaction_id = generateSnowflakeIDString() (line ~8182 and line ~8244).
  • Transition / trigger:
    1. Replica A is in handlePartExportSuccess → has already read processed/* file paths into memory → is inside commitExportPartitionTransaction(manifest_OLD, ...) writing the Iceberg snapshot.
    2. Client issues EXPORT PARTITION ... force_export=1 against the same key. That request removes the ZK subtree (including A's ephemeral lock) and re-creates it with a fresh transaction_id_NEW.
    3. Replica A's in-flight Iceberg commit completes with transaction_id_OLD (no ZK check inside the Iceberg write path); trySet(status, COMPLETED, -1) afterwards fails with ZNONODE and is ignored per existing code path.
    4. The fresh task, with transaction_id_NEW, processes all parts again from scratch and commits a second Iceberg snapshot.
    5. isExportPartitionTransactionAlreadyCommitted is keyed on transaction_id; since the two IDs differ, neither commit is suppressed.
  • Why this is a defect:
    • Idempotency relies on a stable transaction_id across retries; force_export intentionally rotates it.
    • The existing code comment on the tryRemoveRecursive call ("Worst case scenario an on-going export is going to be killed and a new task won't be scheduled") underestimates the consequence — it does NOT cancel an in-flight Iceberg snapshot commit.
    • Nothing in the force path signals an in-flight committer to abort, and the committer does not re-validate ZK before landing the Iceberg snapshot.
  • Smallest logical reproduction:
    1. Issue EXPORT PARTITION p TO iceberg_table against a partition with N parts.
    2. Pause the committing replica just before commitExportPartitionTransaction returns (failpoint / debugger).
    3. Issue EXPORT PARTITION p TO iceberg_table SETTINGS export_merge_tree_partition_force_export = 1.
    4. Let the first committer proceed; let the new task run to completion.
    5. Observe two snapshots with different clickhouse.export-partition-transaction-id summaries but identical source rows.
  • Fix direction (one line): on force_export, take a short-lived global advisory lock (e.g., exports/<key>/force_lock ephemeral) and refuse to proceed while a live commit is in flight; or preserve the OLD transaction_id under force so idempotency suppresses one side; or mark exports/<key>/killed and have the committer re-check ZK before issuing the Iceberg commit.
  • Regression test direction: inject a delay between commitExportPartitionTransaction entry and writeMetadataFileAndVersionHint; trigger force_export; assert destination table ends up with exactly 1× the rows.
  • Subsystem and blast radius: control plane + Iceberg commit path. Silent data duplication in the destination table; user-visible only after downstream deduplication or reconciliation.
if (!has_expired && !query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export])
{
    throw Exception(...);
}

LOG_INFO(log, "Overwriting export with key {}", export_key);

/// Not putting in ops (same transaction) because we can't construct a "tryRemoveRecursive" request.
/// ...
/// It is ok for this to be non transactional. Worst case scenario an on-going export is going to be killed and a new task won't be scheduled.
zookeeper->tryRemoveRecursive(partition_exports_path);

Low: Lost ACK on ephemeral per-part lock causes self-deadlock until ZooKeeper session expires

  • Impact: If tryCreate on the part lock gets ZCONNECTIONLOSS/ZOPERATIONTIMEOUT after the server actually created the ephemeral node, the replica treats it as "lock not held" and discards its local manifest. The ephemeral node persists while the ZooKeeper session is alive, so this replica (and any other replica reading locks/<part>) observes a lock held by storage.replica_name and skips the part. The part stalls until session expiry reclaims the ephemeral node.
  • Anchor: src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp (executeStep), src/Common/ZooKeeper/ZooKeeper.h (EphemeralNodeHolder::tryCreate binary success/failure semantics).
  • Trigger: Transient connection loss right after the client issued the lock tryCreate but before receiving the response, while the ZK session survives the blip.
  • Why this is a defect:
    • tryCreate is treated as a strict success/failure primitive; any error (including retryable hardware errors like ZCONNECTIONLOSS and ZOPERATIONTIMEOUT, see IKeeper.cpp::isHardwareError) falls into the "skip part" branch.
    • The code does not probe locks/<part> on retryable error to check whether the node exists with data == storage.replica_name (i.e., "we actually created it").
    • storage.export_manifests.erase(manifest) discards the local task; no re-enqueue mechanism until the next full scheduler cycle, and the next cycle still sees the ephemeral lock.
  • Smallest logical reproduction:
    1. Set up ZK with fault injection that returns ZCONNECTIONLOSS on responses but actually applies the write server-side.
    2. Run replicated EXPORT PARTITION that picks the part on this replica.
    3. Inject the connection loss at the tryCreate of locks/<part>.
    4. Observe: ephemeral znode persists with this replica as owner; no replica progresses the part until session timeout elapses.
  • Fix direction: On retryable error from tryCreate, probe the lock znode; if it exists and its data equals our replica_name, proceed as if we own the lock (and construct a corresponding EphemeralNodeHolder::existing / re-wrap for lifecycle).
  • Regression test direction: Inject ZCONNECTIONLOSS on tryCreate of locks/<part> with server-side-apply semantics; assert that part progresses on this replica or, on next cycle, self-heals without waiting for session timeout.
  • Subsystem and blast radius: Replicated per-part locking; liveness stall bounded by session timeout (typ. 10-30s), per part per replica.
if (Coordination::Error::ZOK == zk->tryCreate(
        fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name,
        storage.replica_name,
        zkutil::CreateMode::Ephemeral))
{
    LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
    export_part_task->executeStep();
    return false;
}

std::lock_guard inner_lock(storage.export_manifests_mutex);
storage.export_manifests.erase(manifest);

Medium: Commit precondition no-op can leave export indefinitely PENDING without failure accounting

  • Impact: once all parts are processed and commit phase starts, the export can remain PENDING forever if commit preconditions fail (exported_paths empty or fewer than expected). The code logs warnings and returns without throw, so scheduler commit-failure accounting is never engaged; no retry budget is consumed and no terminal state is reached.
  • Anchor: src/Storages/MergeTree/ExportPartitionUtils.cpp (commit, precondition checks after getExportedPaths).
  • Trigger:
    1. processing becomes empty and scheduler enters commit path.
    2. getExportedPaths(...) returns {} (e.g., zk read/list failures) OR exported_paths.size() < manifest.parts.size().
    3. commit() returns early with warning instead of throwing.
  • Why this is a defect:
    • handlePartExportSuccess only enters failure-accounting path on exceptions.
    • Early return bypasses getErrorRequests, retry/terminalization logic, and status transition updates.
    • The task can loop in PENDING indefinitely with warning logs only.
  • Smallest logical reproduction:
    1. Run async export until processing is empty.
    2. Force getExportedPaths to return empty/incomplete set (for example, by inducing deterministic failures in tryGetChildren/tryGet for /processed paths).
    3. Observe repeated commit attempts and persistent status=PENDING.
  • Fix direction: Convert these two precondition branches to exceptions (or explicit error accounting updates) so the existing failure-budget and terminalization path can execute.
  • Regression test direction: Inject deterministic empty/incomplete exported-path response during commit and assert bounded transition to FAILED (or explicit retriable state with cap), not infinite PENDING.
  • Subsystem and blast radius: Export commit control-plane liveness; affects any export hit by ZK processed-path read inconsistencies or corruption at commit time.
const auto exported_paths = ExportPartitionUtils::getExportedPaths(log, zk, entry_path);

if (exported_paths.empty())
{
    LOG_WARNING(log, "ExportPartition: No exported paths found, will not commit export. This might be a bug");
    return;
}

if (exported_paths.size() < manifest.parts.size())
{
    LOG_WARNING(log, "ExportPartition: Reached the commit phase, but exported paths size is less than the number of parts, will not commit export. This might be a bug");
    return;
}

Low: Per-part terminal FAILED state is not persisted in processing/<part> when retry budget is exceeded

  • Impact: control-plane inconsistency between export-level status and per-part status. When a part exceeds retry budget, export status can transition to FAILED, but the corresponding processing/<part> znode may still persist as PENDING (and without finished_by). This can mislead diagnostics and state reconstruction tooling that relies on per-part status details.
  • Anchor: src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp (handlePartExportFailure).
  • Trigger:
    1. Part export fails repeatedly.
    2. processing_part_entry.retry_count reaches/exceeds max_retries.
    3. Scheduler sets export status FAILED.
  • Why this is a defect:
    • The processing/<part> set request is created before mutating processing_part_entry.status = FAILED and finished_by.
    • No second set request is added after mutation, so persisted per-part state remains stale (PENDING).
  • Smallest logical reproduction:
    1. Start async export with small max_retries (for example, 1).
    2. Inject deterministic part-export failure for the same part until retry limit is exceeded.
    3. Observe mismatch: /exports/<key>/status = FAILED while /exports/<key>/processing/<part> still serializes status = PENDING.
  • Fix direction: Build the processing/<part> set request only after applying retry-limit branch mutations, or append a second set request with the updated serialized entry when terminalizing.
  • Regression test direction: Force repeat failures until retry exhaustion and assert both export-level status and processing/<part> payload reflect terminal failed state (status=FAILED, finished_by set).
  • Subsystem and blast radius: Export partition scheduler/ZooKeeper control plane; low direct correctness impact, but degrades observability and operator debugging fidelity.
processing_part_entry.retry_count++;

ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version));
ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1));

if (processing_part_entry.retry_count >= max_retries)
{
    processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED;
    processing_part_entry.finished_by = storage.replica_name;
    ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1));
}

Residual risks (not confirmed defects)

Risk: Transient ZooKeeper multi failures can stall part progression (processing -> processed)

  • Status: Not confirmed as a correctness defect; observed as a liveness/stall risk.
  • Anchor: src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp (tryToMovePartToProcessed, handlePartExportSuccess).
  • Scenario:
    1. Part export succeeds and callback enters success path.
    2. tryMulti in tryToMovePartToProcessed(...) fails transiently.
    3. Function returns false; success handler exits early and does not attempt commit in that cycle.
  • Why this can stall:
    • The part is not moved to processed, and commit is gated on processing becoming empty.
    • Under repeated infrastructure failures in this transition, export can remain pending for extended periods.
  • Suggested hardening:
    • Add explicit retry/backoff accounting for move-to-processed transition failures and alerting thresholds.
  • Regression test direction:
    • Inject deterministic tryMulti failure in tryToMovePartToProcessed(...) for N attempts and verify eventual progress/failure policy.
if (Coordination::Error::ZOK != zk->tryMulti(requests, responses))
{
    LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping");
    return false;
}
if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_paths_in_destination_storage, zk))
{
    LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to move part to processed, will not commit export partition");
    return;
}

Medium: PENDING export stalls indefinitely when an external dependency becomes unreachable (DETACH PART/PARTITION, RENAME/DROP destination table)

  • Impact: a PENDING export whose source part(s) or destination table become unreachable never progresses and never terminates. It stays in PENDING forever, consuming ZK state, until a manual KILL EXPORT PARTITION is issued. There is no retry budget, no timeout, no failure signal to the user (beyond the absent status transition). The TTL expiry branch and the commit fix-up branch are both gated to skip this case.
  • Anchors:
    • Scheduler, "part not found locally" branch (silent continue with no retry counter):
      • src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp:225-230.
    • Scheduler, "destination storage not found" branch (silent continue):
      • src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp:147-153.
    • tryCleanup expiry gate (skips PENDING):
      • src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp:60if (has_expired && !is_pending).
    • tryCleanup commit fix-up gate (requires no ZK parts remaining):
      • src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp:86if (parts_in_processing_or_pending.empty()).
  • Trigger (minimum repro — detach variant):
    1. Start an async EXPORT PARTITION P TO iceberg_tbl.
    2. While status = PENDING, run ALTER TABLE src DETACH PART '<part_being_processed>' (replicated DDL → applies on all replicas). Parts move from Active/Outdated to detached/.
    3. Scheduler: getPartIfExists(part_name, {Active, Outdated}) returns nullptr on every replica → "Part not found locally, skipping" (no retry counter increment).
    4. tryCleanup: has_expired && !is_pending is false (because PENDING), and parts_in_processing_or_pending contains the detached part name (ZK processing/<part> was not removed) → no commit fix-up attempt.
    5. Export stays PENDING indefinitely.
  • Trigger (destination variant — same stall class):
    • RENAME TABLE iceberg_tbl TO iceberg_tbl_renamed or DROP TABLE iceberg_tbl mid-export.
    • Both scheduler and tryCleanup do DatabaseCatalog::instance().tryGetTable(destination_storage_id, context); on failure they continue / return false with no retry accounting.
  • Why this is a defect: the system has no escape path for a PENDING export whose external dependency becomes unreachable. Normal failures (ExportPartTask exception, commit exception) transition to FAILED via getErrorRequests and the retry-exhaustion path; the "missing resource" branches bypass this budget entirely. Users get no signal short of inspecting system.replicated_partition_exports and noticing the task is frozen.
  • Reproduction:
    -- shell 1: start an async export
    EXPORT PARTITION 20240101 FROM src_mt TO iceberg_tbl SETTINGS export_merge_tree_partition_async = 1;
    -- shell 2: while shell 1's status is PENDING
    ALTER TABLE src_mt DETACH PART '20240101_1_1_0';
    -- observe indefinitely: SELECT status FROM system.replicated_partition_exports  -> 'PENDING'
  • Fix direction (one line):
    • In each "missing external resource" branch (scheduler continue + tryCleanup non-fix-up), route through getErrorRequests with a distinct error message so that the existing max_retries path terminalizes to FAILED, OR gate the TTL-expiry branch on the ORIGINAL has_expired without the !is_pending filter (with a longer PENDING grace period) to provide a last-resort safety net.
  • Regression test direction (one line):
    • Integration test: start async export, detach a part mid-flight, assert the export transitions to FAILED within max_retries * scheduling_period or within 2 * ttl_seconds (whichever fix is chosen).
  • Affected subsystem / blast radius:
    • ExportPartitionTaskScheduler + ExportPartitionManifestUpdatingTask across every replica; blocks any export that experiences DETACH/RENAME/DROP against its dependencies; operational burden (forced manual cleanup) rather than data loss; accumulates ZK state.

Risk: Non-monotonic Iceberg snapshot timestamp_ms under committing-replica wall-clock skew

  • Status: Not confirmed as a correctness defect for this PR's own invariants; noted as an interaction risk with downstream Iceberg readers that rely on snapshot time ordering.
  • Anchor: Iceberg MetadataGenerator writes the new snapshot's timestamp-ms from the committing replica's wall clock at commit time. The committer is whichever replica owned the last part (or the fix-up replica).
  • Scenario:
    1. Replica A with a wall clock skewed 5 minutes in the past commits snapshot N with timestamp-ms = T - 5min.
    2. A prior snapshot N-1 was committed by replica B with a correct clock at time T - 1min, so timestamp-ms(N) < timestamp-ms(N-1).
    3. Iceberg time-travel queries (FOR TIMESTAMP AS OF ...) and expiration policies that rely on snapshot timestamp can misorder or skip snapshot N.
  • Why this is not (quite) a defect here:
    • Snapshot lineage uses snapshot-id and parent-snapshot-id, which are monotonic even under skew.
    • The committed rows are the same regardless of timestamp, and the clickhouse.export-partition-transaction-id summary enforces idempotency.
  • Suggested hardening:
    • Bound committing replicas' clock skew via operator guidance, OR clamp timestamp-ms to max(local_wall_clock, parent_snapshot.timestamp_ms + 1) on commit.
  • Regression test direction:
    • Set the committing replica's clock N minutes in the past; commit; verify subsequent time-travel queries still land on the intended snapshot.

Coverage summary

  • Scope reviewed: PR 1618 export-partition-to-Iceberg flow across MergeTree scheduler/manifest/control-plane and ObjectStorage/Iceberg commit/idempotency/sidecar paths.
  • Categories failed:
    • Iceberg write retry terminalization.
    • Iceberg export aggregate counter width.
    • Iceberg commit dependence on mutable source partition state.
    • Backward compatibility in manifest deserialization.
    • Commit failure terminalization and retry boundedness.
    • Terminal-status CAS vs blind trySet (KILL race).
    • Orphan metadata file after catalog-update failure (ghost-ack / clean failure) permanently blocks future writes on transactional catalogs.
    • Retryable ZK error handling on ephemeral per-part tryCreate (self-deadlock until session expires).
    • Outer catch(...) cleanup scope too wide — deletes just-referenced manifest files after metadata + catalog are already published (current-snapshot corruption).
    • force_export during an in-flight commit produces two independent Iceberg snapshots with different transaction_ids — silent data duplication in destination.
    • PENDING exports stall indefinitely when an external dependency becomes unreachable (DETACH PART/PARTITION, RENAME/DROP destination table) — no retry budget on "missing resource" paths, and TTL expiry is gated to skip PENDING exports.
    • Per-part failure terminalization metadata is inconsistent: export-level status can become FAILED while processing/<part> remains serialized as PENDING due to request construction order.
    • Commit precondition checks (exported_paths empty/incomplete) can no-op without exception, bypassing error accounting and leaving exports indefinitely PENDING.
  • Categories passed:
    • Null-cast crash guards in Iceberg destination path.
    • Sidecar overwrite naming-collision fix.
    • Retry idempotency check placement in commit path.
    • Status-change requeue-on-exception handling.
    • Concurrent EXPORT PARTITION on same export_key without force_export (atomic tryMulti + deterministic key).
    • Concurrent EXPORT PARTITION on different export_keys (disjoint ZK subtrees; Iceberg commits serialized by retry loop + use_previous_snapshots=true manifest-list inheritance).
    • Idempotent recovery if two replicas concurrently commit the same transaction_id (summary-field idempotency check).
    • Source-side MERGE / mutations / REPLACE PARTITION mid-export (scheduler accepts Outdated parts; part_references pins data; partition_id → partition.value is stable).
  • Assumptions/limits: Static audit only in this session; no runtime fault-injection execution here. Storage-layer faults (bit rot, torn write, misdirected read/write, lost write) treated as out of scope and delegated to the object-storage provider's guarantees. Byzantine / memory-corruption faults not in threat model. Downstream Iceberg reader behavior (time-travel, snapshot expiration) treated as out of scope.

@vzakaznikov
Copy link
Copy Markdown
Collaborator

AI audit note: This review comment was generated by AI (gpt-5.3-codex).

Follow-up audit delta for PR #1618 — only newly found issues from the latest state-model-driven fault-injection pass:

  1. Medium: commit precondition no-op can leave export indefinitely PENDING
  • Anchor: src/Storages/MergeTree/ExportPartitionUtils.cpp (ExportPartitionUtils::commit).
  • What happens: when exported_paths is empty or fewer than manifest.parts.size(), commit() logs warning and returns without throw.
  • Why this is a defect: caller failure-accounting is exception-driven, so this path can bypass retry-budget/terminalization and loop in PENDING.
  • Fix direction: convert these precondition branches to exception or explicit error-accounting updates that consume retry budget and terminalize on exhaustion.
  1. Low: per-part FAILED state is not persisted in processing/<part> at retry exhaustion
  • Anchor: src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp (handlePartExportFailure).
  • What happens: set request for processing/<part> payload is queued before status mutation to FAILED; persisted payload can remain PENDING while export status becomes FAILED.
  • Why this is a defect: control-plane inconsistency for per-part diagnostics and state reconstruction.
  • Fix direction: serialize/set processing/<part> after applying retry-limit mutation (status=FAILED, finished_by), or append a second set request with updated payload.

@arthurpassos
Copy link
Copy Markdown
Collaborator Author

arthurpassos commented Apr 17, 2026

  1. Iceberg sink may report success after metadata-init retries are exhausted - Not an issue of export partitionn - this is related to Iceberg writes implementation2
  2. High: Post-publish exception-safety bug in Iceberg commit cleanup scope - True. Fixed by 8143c67
  3. Medium: Int64 -> Int32 narrowing in export aggregate counters - unlikely but true, fixed by f8bacec
  4. Medium: Export commit depends on mutable source partition state - not an issue, this can happen only in a tiny case and it is ok to fail
  5. Medium: Backward-incompatible manifest parsing for in-flight upgrades - not an issue, we don't need to be backwards compatible at this point - no users so far
  6. Medium: Commit-phase failures are swallowed without terminal state transition - this is true, I need to think how to solve this. Maybe by introducing a commit_attempts counter
  7. Low: KILL can be silently overwritten by an in-flight commit (KILLED -> COMPLETED) - true, but not an issue - we already discussed it
  8. High: Ghost Iceberg metadata file after catalog update failure permanently blocks future commits (transactional catalogs) - this does not seem right to be, looks like an hallucination. Ran it through Opus 4.7 and it agrees with me
  9. High: Outer catch(...) in Iceberg commit deletes manifest files after metadata+catalog already published -> snapshot corruption - this is the same as n2, already fixed
  10. Medium: force_export during an in-flight export can double-commit the partition to the Iceberg destination - this might be true, but the user is acknowledging to force an operation. The best we can do is to document such corner case
  11. Low: Lost ACK on ephemeral per-part lock causes self-deadlock until ZooKeeper session expires - go f yourself lol. I can't prepare for such cases, it is the same as me sending a PUT to S3 and not trusting the 200 status
  12. Medium: Commit precondition no-op can leave export indefinitely PENDING without failure accounting - it is the same as n6
  13. Low: Per-part terminal FAILED state is not persisted in processing/ when retry budget is exceeded - very low, does not have to be addressed

@vzakaznikov
Copy link
Copy Markdown
Collaborator

Confirmed defects:

High: Iceberg sink may report success after metadata-init retries are exhausted

Impact: write path can finish without advancing Iceberg snapshot metadata.
Anchor: src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp (IcebergStorageSink::finalizeBuffers).
Trigger: initializeMetadata() returns false for all retry attempts.
Why defect: loop exits without terminal throw/failure.

Not an issue of export partitionn - this is related to Iceberg writes implementation

But given that we rely on the Iceberg write, you think it does not make sense to fix it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants