Skip to content

fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path (rebased)#134

Merged
rampage644 merged 8 commits intomainfrom
claude/rebase-pr-126-i2t0U
Apr 23, 2026
Merged

fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path (rebased)#134
rampage644 merged 8 commits intomainfrom
claude/rebase-pr-126-i2t0U

Conversation

@rampage644
Copy link
Copy Markdown
Contributor

Rebase of #126 on top of latest main (through 7482768 fix(datediff)) with the CI failures fixed so stable / fmt, stable / clippy, and ubuntu / stable pass. Content of the four original MERGE-path commits is unchanged — see #126 for the full technical write-up, end-to-end validation, and closes-#128 context.

Rebase on top of main

The 4 original commits replay cleanly onto current main:

  • f24614f fix(custom_type_coercion): fall back to plan.schema() for leaf nodes
  • 16d54c2 feat(merge): route EXPLAIN / EXPLAIN ANALYZE into the MERGE planner
  • 99645d6 feat(merge): expose updated/inserted/deleted row counts as MetricsSet
  • 31c7bba fix(merge): preserve target rows when MERGE batch contains only target

CI fixes stacked on top

Three small follow-ups needed to turn the CI green — none of them change MERGE semantics.

  1. 67ee46e style: apply rustfmt after rebase on main — rustc 1.94 rustfmt formats the long #[instrument(name = "UserQuery::merge_to_logical_plan", level = "trace", skip(self), err)] attribute and its following pub async fn merge_to_logical_plan(&self, statement: Statement) signature differently than the PR was originally authored against. Mechanical cargo fmt application only.

  2. a6f2b3f style(merge): collapse nested if into let-chain for EXPLAIN MERGE routeclippy::collapsible_if (denied via clippy::all) flags the if let DFStatement::Statement(inner) = explain.statement.as_ref() { if matches!(inner.as_ref(), Statement::Merge { .. }) { ... } } guard in UserQuery::execute. Merged both conditions into a single let-chain; observable behaviour of the EXPLAIN-MERGE routing is unchanged. This was the hard error that broke stable / clippy.

  3. ac68f7a test(merge): normalize RoundRobinBatch fan-out in EXPLAIN snapshots — the DataFusion planner's RoundRobinBatch(N) partition target equals the host CPU count, so the query_merge_into_explain snapshot baked in RoundRobinBatch(10) from the PR author's 10-core dev box and failed on the 4-core ubuntu-latest runner. Added an insta filter in the shared test_query! macro that rewrites RoundRobinBatch(\d+)RoundRobinBatch([N]) (no other snapshots contain this token, verified by grep) and regenerated the snapshot. This was the failure that broke ubuntu / stable.

Local verification (rustc 1.94, 4-core box)

  • cargo fmt --check: clean
  • cargo clippy --all-targets --workspace: exit 0 (remaining output is clippy::pedantic / clippy::nursery warnings on pre-existing code, not denied by the workspace config)
  • cargo test --profile=ci -p executor --all-targets: 364 passed, 0 failed, 1 ignored (the original PR reports 362; the extra 2 are the new merge_into_mixed_unsorted_multi_row_no_data_loss SQL snapshot and the new merge_into_explain SQL snapshot added by this PR's own tests)

Supersedes #126. Closes #128.


Generated by Claude Code

rampage644 and others added 8 commits April 23, 2026 16:08
`CustomTypeCoercionRewriter::analyze_internal` built its lookup schema
from `merge_schema(&plan.inputs())`. For leaf nodes like `LogicalPlan::TableScan`,
`plan.inputs()` is empty and the merged schema has no fields, so any
binary-op expression attached directly to the leaf — e.g. via
`LogicalPlanBuilder::scan_with_filters` — would fail coercion with
"Schema error: No field named <col>" during the analyzer pass.

This broke the target-side partition pruning hint path that
`UserQuery::merge_query` wires up when a MERGE source is a partitioned
`DataFusionTable`: `target_filter_expression()` builds a per-partition
`col(source) >= min AND col(source) <= max` predicate and pushes it into
the target `TableScan`'s filters via `scan_with_filters`, expecting
Iceberg's file pruner to use it at manifest level. The filter never made
it past the analyzer.

Fix: when `plan.inputs().is_empty()`, use `plan.schema()` directly for
type resolution, mirroring the pattern DataFusion's built-in
`TypeCoercion` analyzer uses. All existing `custom_type_coercion`
snapshot tests still pass, and the full `merge_into` suite (22 tests)
stays green.

Verified end-to-end against a deployed Embucket Lambda:
`MERGE INTO demo.atomic.events_hooli_tiny USING demo.atomic.events_hooli_ident`
where the source is partitioned by `identity(event_name)` — previously
failed with `custom_type_coercion / Schema error: No field named event_name`,
now returns 100 matched rows and the update lands on disk.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Embucket has its own MERGE planner (`UserQuery::merge_query`) because
DataFusion's SQL path doesn't produce a usable plan for `MERGE INTO`.
The side effect was that `EXPLAIN MERGE INTO …` and
`EXPLAIN ANALYZE MERGE INTO …` both fell through `execute()` to
`execute_sql`, which hands the statement to DataFusion's planner and
bounces back with:

    SQL compilation error: unsupported feature: Unsupported SQL statement:
    MERGE INTO …

No observability for MERGE plans or for per-scan metrics — which made
it impossible to verify partition-pruning behaviour on partitioned
Iceberg targets (files scanned, bytes scanned, manifest-level pruning
counters).

Changes:

1. Split `merge_query` into a pure plan-builder `merge_to_logical_plan`
   and a thin wrapper that calls `execute_logical_plan`.
2. In `execute()`, when the parsed statement is
   `DFStatement::Explain(..)` whose inner statement is
   `Statement::Merge { .. }`, build the MERGE logical plan via
   `merge_to_logical_plan`, then wrap it in the same
   `LogicalPlan::Explain` / `LogicalPlan::Analyze` shape DataFusion's
   own `explain_to_plan` constructs. Everything downstream (physical
   planning, execution, output formatting) is unchanged.
3. Add a snapshot test `merge_into_explain` over a minimal
   unpartitioned target + source — asserts the logical and physical
   plans render. `EXPLAIN ANALYZE` is exercised end-to-end through the
   deployed Lambda rather than via snapshot because the formatted-table
   column widths depend on the pre-redaction metric value widths and
   aren't stable across runs.

After this change:

- `EXPLAIN MERGE INTO t USING s ON ... WHEN MATCHED THEN UPDATE ...`
  returns the logical plan + physical plan (including
  `MergeIntoSinkExec`, `HashJoinExec`, `DataSourceExec { file_groups,
  projection, file_type }` for each side).
- `EXPLAIN ANALYZE` of the same statement executes the MERGE and
  additionally reports per-node runtime metrics. The
  `DataSourceExec` rows now surface the DataFusion/Parquet scan
  counters that were previously invisible: `bytes_scanned`,
  `files_ranges_pruned_statistics`, `row_groups_pruned_statistics`,
  `pushdown_rows_pruned`, `page_index_rows_pruned`. That's the signal
  you need to verify source-side partition-hint pruning actually prunes.

All 23 `merge_into` tests pass (22 existing + 1 new). Full
`cargo test -p executor --lib` is 359/0.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
`MergeIntoCOWSinkExec` tracked per-clause row counts in `AtomicI64`
purely to populate the final result batch. After the EXPLAIN / EXPLAIN
ANALYZE routing fix on this branch, `EXPLAIN ANALYZE MERGE INTO …`
reports rich per-scan metrics on every `DataSourceExec` in the plan,
but the sink line was still rendering as `MergeIntoSinkExec, metrics=[]`
because this node didn't own an `ExecutionPlanMetricsSet`.

Wire one up: register `Count` metrics `updated_rows`, `inserted_rows`,
and `deleted_rows` via `MetricBuilder::new(&self.metrics).counter(..)`
at the start of `execute()`, clone them into the async write closure,
and `add()` the final `AtomicI64` values after the transaction commits.
Implement `ExecutionPlan::metrics()` to return
`Some(self.metrics.clone_inner())` so DataFusion's plan formatter picks
them up. Row counts that exceed `usize::MAX` saturate via `try_from`
rather than panicking.

After this change, `EXPLAIN ANALYZE MERGE` shows the sink counters
alongside the child scan counters, so an operator can read updated /
inserted / deleted counts directly off the plan output instead of only
from the result row.

All 23 `merge_into` tests stay green.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The MergeCOWFilterStream "no matches in this batch" fast path
short-circuited on `matching_data_and_manifest_files.is_empty()`
without checking the cumulative `all_matching_data_files` set. If a
target file had been seen as matching in an earlier batch and a later
batch contained only target rows for that file, the rows in the later
batch were silently dropped. The downstream COW commit then overwrote
the original file with the partial result, permanently losing the
unmatched target rows whose batch hit the dead path.

The fix tightens the guard to also require `all_matching_data_files`
to be empty before taking the fast path. When a batch belongs to a
file already in the overwrite set, it falls through to the main
filter path which correctly emits target rows via
`file_predicate OR source_exists`.

Adds three unit tests against MergeCOWFilterStream covering the
matching-then-target patterns, plus a SQL snapshot test that exercises
the same shape end-to-end.

Fixes #128

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rustfmt on Rust 1.94 formats the long #[instrument(...)] attribute and
the following fn signature differently than the PR was originally
authored against. No semantic change.
Rust 1.94 clippy (clippy::collapsible_if, denied via clippy::all) flags
the nested `if let ... { if matches!(...) { ... } }` guard in execute().
Merge both conditions into a single let-chain so clippy is happy without
changing the observable behaviour of the MERGE EXPLAIN routing.
The DataFusion planner uses the host CPU count as the RoundRobinBatch
partition target, so the EXPLAIN snapshot literal differed between the
PR author's dev box (10 cores) and the 4-core ubuntu-latest GitHub
runner. Add an insta filter to the shared test_query! macro that rewrites
`RoundRobinBatch(N)` to `RoundRobinBatch([N])`, and regenerate the
`query_merge_into_explain` snapshot to use the normalized token so the
test is stable across core counts.
@rampage644 rampage644 merged commit 63669ca into main Apr 23, 2026
3 checks passed
@rampage644 rampage644 deleted the claude/rebase-pr-126-i2t0U branch April 23, 2026 19:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MERGE INTO silently drops unmatched target rows when target is unsorted by the ON-key

1 participant