Skip to content

fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path#126

Closed
rampage644 wants to merge 4 commits intomainfrom
rampage644/fix-merge-partition-transform
Closed

fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path#126
rampage644 wants to merge 4 commits intomainfrom
rampage644/fix-merge-partition-transform

Conversation

@rampage644
Copy link
Copy Markdown
Contributor

@rampage644 rampage644 commented Apr 15, 2026

Four related Embucket MERGE-path fixes, all needed to make MERGE INTO correct, observable, and tunable on partitioned Iceberg targets.

Changes

  1. fix(custom_type_coercion)CustomTypeCoercionRewriter::analyze_internal now falls back to plan.schema() when plan.inputs() is empty (leaf TableScan case). Without this, the target filter that merge_query injects via LogicalPlanBuilder::scan_with_filters against a partitioned DataFusionTable failed the analyzer with Schema error: No field named <col>. Unblocks source-side partition-hint pruning.

  2. feat(merge): EXPLAIN routing — Embucket has its own MERGE planner, so DataFusion rejected EXPLAIN MERGE INTO … with Unsupported SQL statement. Split merge_query into a pure plan-builder + a thin executing wrapper; in execute(), when the parsed statement is Explain(MergeInto), build the plan and wrap it in LogicalPlan::Explain / LogicalPlan::Analyze to match DataFusion's explain_to_plan. Adds an EXPLAIN MERGE snapshot test (22 existing merge_into tests stay green).

  3. feat(merge): MetricsSet — surface updated_rows / inserted_rows / deleted_rows as per-clause counters on MergeIntoCOWSinkExec so EXPLAIN ANALYZE MERGE INTO … now shows them alongside child DataSourceExec scan metrics (bytes_scanned, files_ranges_pruned_statistics, row_groups_pruned_statistics, pushdown_rows_pruned, output_rows, elapsed_compute).

  4. fix(merge): preserve target rowsfixes MERGE INTO silently drops unmatched target rows when target is unsorted by the ON-key #128. A stale fast-path guard in MergeCOWFilterStream::poll_next short-circuited on matching_data_and_manifest_files.is_empty() alone, without checking all_matching_data_files. So when a target file had been seen as matching in an earlier batch and a later batch contained only target rows for that file, the rows in the later batch were silently dropped (the COW commit then overwrote the original file with the partial result). Six-line guard tightening: also require all_matching_data_files to be empty before short-circuiting. Counters were always correct; only the committed parquet was short. Sorting either input by the join key masked the bug, which is why it took finding to surface.

Why all four together

(1) and (2)+(3) work in tandem: with (1) alone the source-side filter is pushed into the target TableScan as a partial_filter and the MERGE commits the right rows, but the prune is invisible from outside. (2)+(3) make EXPLAIN MERGE and EXPLAIN ANALYZE MERGE work end-to-end and surface the prune metrics, so an operator can verify and tune the pruning. (4) is the silent-data-loss bug those tools exposed once the bigger MERGE workloads started running on real partitioned Iceberg tables.

Tests

  • 3 new unit tests on MergeCOWFilterStream covering the matching-then-target-only patterns (fail on main, pass here)
  • 1 new SQL snapshot test merge_into_mixed_unsorted_multi_row_no_data_loss (10-row target × 4-row source, asserts no row loss)
  • 1 new SQL snapshot test for EXPLAIN MERGE (EXPLAIN ANALYZE is exercised end-to-end against a Lambda but intentionally not snapshot-tested — its formatted-table widths vary per run)
  • cargo test -p executor: 362 passed, 0 failed

End-to-end validation against deployed Embucket Lambda

#128 isolated repro — 5 consecutive runs against a freshly-deployed lambda built from this branch, all loss=0 deterministic:

target=2101659 source=2103638 merge=[1052088, 1051550] post=3153747 expected=3153747 loss=0
target=2101659 source=2103638 merge=[1052088, 1051550] post=3153747 expected=3153747 loss=0
target=2101659 source=2103638 merge=[1052088, 1051550] post=3153747 expected=3153747 loss=0
target=2101659 source=2103638 merge=[1052088, 1051550] post=3153747 expected=3153747 loss=0
target=2101659 source=2103638 merge=[1052088, 1051550] post=3153747 expected=3153747 loss=0

12.6 M target × 6.3 M source: post=14697755 expected=14697755 loss=0.

dbt-snowplow-web stretch validation — fresh seed + R1–R5 of loop_dbt.py, complete. user_mapping Δ is the strongest MERGE-correctness signal (its source CTE feeds straight from the buggy MERGE on lifecycle_manifest):

Round dbt Δ user_mapping (this PR) Δ user_mapping (pre-fix)
1 280.9 s +1,702,223 +1,702,223
2 381.1 s +1,699,998 +1,699,998
3 386.0 s +1,700,804 ✓ +690,664 ❌
4 390.3 s +1,699,815 ✓ +186,950 ❌
5 385.5 s +1,699,577 ✓ +611,373 ❌
total after R5 10,204,627 6,593,418

R3 is where the bug first manifested in the original buggy run. With the fix, R3, R4, and R5 each deliver the full ~+1.7 M rows that match the day-partitioned baseline. The fix recovered 3,611,209 rows of user_mapping data (≈55% of the post-fix total) that the buggy MERGE was silently dropping over 5 rounds.

Source-side partition pruning end-to-end: MERGE INTO atomic.events_hooli_tiny USING atomic.events_hooli_ident ON t.event_id = s.event_id WHEN MATCHED THEN UPDATE … — previously failed with custom_type_coercion / Schema error: No field named event_name, now returns 100 matched rows and commits. The companion EXPLAIN ANALYZE MERGE INTO … emits the target TableScan's partial_filters=[event_name = Utf8("ad_start_event")] and full DataSourceExec scan-metrics block.

Out of scope (separate follow-ups)

  • MergeCOWFilterStream::not_matched_buffer LRU eviction — capacity is 2 on Lambda; doesn't fire in single-target-file repros but will silently evict any third-and-later concurrently-buffered file's rows. Worth its own issue with a 3+ data file repro.
  • Round-6 dbt SELECT-phase OOM — independent of this PR. With the data-loss fix in place, lifecycle_manifest will now grow correctly and the model's previous_sessions LEFT JOIN OOMs the 9 GB Lambda pool around R6. Needs planner-level predicate pushdown or non-Lambda compute.

Companion to Embucket/iceberg-rust#57. Closes #128.

rampage644 and others added 2 commits April 15, 2026 10:54
`CustomTypeCoercionRewriter::analyze_internal` built its lookup schema
from `merge_schema(&plan.inputs())`. For leaf nodes like `LogicalPlan::TableScan`,
`plan.inputs()` is empty and the merged schema has no fields, so any
binary-op expression attached directly to the leaf — e.g. via
`LogicalPlanBuilder::scan_with_filters` — would fail coercion with
"Schema error: No field named <col>" during the analyzer pass.

This broke the target-side partition pruning hint path that
`UserQuery::merge_query` wires up when a MERGE source is a partitioned
`DataFusionTable`: `target_filter_expression()` builds a per-partition
`col(source) >= min AND col(source) <= max` predicate and pushes it into
the target `TableScan`'s filters via `scan_with_filters`, expecting
Iceberg's file pruner to use it at manifest level. The filter never made
it past the analyzer.

Fix: when `plan.inputs().is_empty()`, use `plan.schema()` directly for
type resolution, mirroring the pattern DataFusion's built-in
`TypeCoercion` analyzer uses. All existing `custom_type_coercion`
snapshot tests still pass, and the full `merge_into` suite (22 tests)
stays green.

Verified end-to-end against a deployed Embucket Lambda:
`MERGE INTO demo.atomic.events_hooli_tiny USING demo.atomic.events_hooli_ident`
where the source is partitioned by `identity(event_name)` — previously
failed with `custom_type_coercion / Schema error: No field named event_name`,
now returns 100 matched rows and the update lands on disk.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Embucket has its own MERGE planner (`UserQuery::merge_query`) because
DataFusion's SQL path doesn't produce a usable plan for `MERGE INTO`.
The side effect was that `EXPLAIN MERGE INTO …` and
`EXPLAIN ANALYZE MERGE INTO …` both fell through `execute()` to
`execute_sql`, which hands the statement to DataFusion's planner and
bounces back with:

    SQL compilation error: unsupported feature: Unsupported SQL statement:
    MERGE INTO …

No observability for MERGE plans or for per-scan metrics — which made
it impossible to verify partition-pruning behaviour on partitioned
Iceberg targets (files scanned, bytes scanned, manifest-level pruning
counters).

Changes:

1. Split `merge_query` into a pure plan-builder `merge_to_logical_plan`
   and a thin wrapper that calls `execute_logical_plan`.
2. In `execute()`, when the parsed statement is
   `DFStatement::Explain(..)` whose inner statement is
   `Statement::Merge { .. }`, build the MERGE logical plan via
   `merge_to_logical_plan`, then wrap it in the same
   `LogicalPlan::Explain` / `LogicalPlan::Analyze` shape DataFusion's
   own `explain_to_plan` constructs. Everything downstream (physical
   planning, execution, output formatting) is unchanged.
3. Add a snapshot test `merge_into_explain` over a minimal
   unpartitioned target + source — asserts the logical and physical
   plans render. `EXPLAIN ANALYZE` is exercised end-to-end through the
   deployed Lambda rather than via snapshot because the formatted-table
   column widths depend on the pre-redaction metric value widths and
   aren't stable across runs.

After this change:

- `EXPLAIN MERGE INTO t USING s ON ... WHEN MATCHED THEN UPDATE ...`
  returns the logical plan + physical plan (including
  `MergeIntoSinkExec`, `HashJoinExec`, `DataSourceExec { file_groups,
  projection, file_type }` for each side).
- `EXPLAIN ANALYZE` of the same statement executes the MERGE and
  additionally reports per-node runtime metrics. The
  `DataSourceExec` rows now surface the DataFusion/Parquet scan
  counters that were previously invisible: `bytes_scanned`,
  `files_ranges_pruned_statistics`, `row_groups_pruned_statistics`,
  `pushdown_rows_pruned`, `page_index_rows_pruned`. That's the signal
  you need to verify source-side partition-hint pruning actually prunes.

All 23 `merge_into` tests pass (22 existing + 1 new). Full
`cargo test -p executor --lib` is 359/0.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rampage644 rampage644 changed the title fix: custom_type_coercion falls back to plan.schema() for leaf nodes fix: enable EXPLAIN [ANALYZE] on MERGE + custom_type_coercion leaf fallback Apr 15, 2026
`MergeIntoCOWSinkExec` tracked per-clause row counts in `AtomicI64`
purely to populate the final result batch. After the EXPLAIN / EXPLAIN
ANALYZE routing fix on this branch, `EXPLAIN ANALYZE MERGE INTO …`
reports rich per-scan metrics on every `DataSourceExec` in the plan,
but the sink line was still rendering as `MergeIntoSinkExec, metrics=[]`
because this node didn't own an `ExecutionPlanMetricsSet`.

Wire one up: register `Count` metrics `updated_rows`, `inserted_rows`,
and `deleted_rows` via `MetricBuilder::new(&self.metrics).counter(..)`
at the start of `execute()`, clone them into the async write closure,
and `add()` the final `AtomicI64` values after the transaction commits.
Implement `ExecutionPlan::metrics()` to return
`Some(self.metrics.clone_inner())` so DataFusion's plan formatter picks
them up. Row counts that exceed `usize::MAX` saturate via `try_from`
rather than panicking.

After this change, `EXPLAIN ANALYZE MERGE` shows the sink counters
alongside the child scan counters, so an operator can read updated /
inserted / deleted counts directly off the plan output instead of only
from the result row.

All 23 `merge_into` tests stay green.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
rampage644 added a commit to Embucket/iceberg-rust that referenced this pull request Apr 15, 2026
The second-stage data-file pruner (PruneDataFiles) was constructed with
`partition_schema` — a subset schema holding only the Hive-style partition
columns. Its `min_values`/`max_values` implementation looks up each column
referenced by the pruning predicate via `arrow_schema.field_with_name(..)`
to fetch the datatype, so any filter on a column absent from
`partition_schema` silently returned `None` and pruned nothing.

Identity-self-named partition columns (where `pf.name() == pf.source_name()`)
are intentionally dropped from `file_partition_fields` so the parquet
reader doesn't duplicate them between the path encoding and the file body,
which also drops them from `table_partition_cols` and therefore from
`partition_schema`. The result: a filter like `event_name = 'ad_start'`
against a table partitioned by `identity(event_name)` reached the second-
stage pruner but found no schema hit, so every partition file of the
target was scanned in full (`files_ranges_pruned_statistics=0`). This
only surfaced now because Embucket/embucket#126 unblocked the filter
reaching TableScan in the first place.

Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every
column the predicate might reference — identity-self-named partition
columns, non-partition columns with per-file statistics, etc. Correctness
is preserved because the first-stage `PruneManifests` path still prunes
transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...)
via manifest-list partition bounds, and synthetic partition-transform
columns simply return `None` from `PruneDataFiles` (no per-file stats
exist for them), which is the same behavior they had before.

Adds a regression test: `test_identity_self_named_partition_filter_prunes_files`
creates a `identity(kind)` partitioned table, inserts one row per
partition value to materialize 3 distinct parquet files, then scans with
`kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file
instead of 3.

Refs: Embucket/embucket#127

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
rampage644 added a commit to Embucket/iceberg-rust that referenced this pull request Apr 15, 2026
The second-stage data-file pruner (PruneDataFiles) was constructed with
`partition_schema` — a subset schema holding only the Hive-style partition
columns. Its `min_values`/`max_values` implementation looks up each column
referenced by the pruning predicate via `arrow_schema.field_with_name(..)`
to fetch the datatype, so any filter on a column absent from
`partition_schema` silently returned `None` and pruned nothing.

Identity-self-named partition columns (where `pf.name() == pf.source_name()`)
are intentionally dropped from `file_partition_fields` so the parquet
reader doesn't duplicate them between the path encoding and the file body,
which also drops them from `table_partition_cols` and therefore from
`partition_schema`. The result: a filter like `event_name = 'ad_start'`
against a table partitioned by `identity(event_name)` reached the second-
stage pruner but found no schema hit, so every partition file of the
target was scanned in full (`files_ranges_pruned_statistics=0`). This
only surfaced now because Embucket/embucket#126 unblocked the filter
reaching TableScan in the first place.

Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every
column the predicate might reference — identity-self-named partition
columns, non-partition columns with per-file statistics, etc. Correctness
is preserved because the first-stage `PruneManifests` path still prunes
transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...)
via manifest-list partition bounds, and synthetic partition-transform
columns simply return `None` from `PruneDataFiles` (no per-file stats
exist for them), which is the same behavior they had before.

Adds a regression test: `test_identity_self_named_partition_filter_prunes_files`
creates a `identity(kind)` partitioned table, inserts one row per
partition value to materialize 3 distinct parquet files, then scans with
`kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file
instead of 3.

Refs: Embucket/embucket#127

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The MergeCOWFilterStream "no matches in this batch" fast path
short-circuited on `matching_data_and_manifest_files.is_empty()`
without checking the cumulative `all_matching_data_files` set. If a
target file had been seen as matching in an earlier batch and a later
batch contained only target rows for that file, the rows in the later
batch were silently dropped. The downstream COW commit then overwrote
the original file with the partial result, permanently losing the
unmatched target rows whose batch hit the dead path.

The fix tightens the guard to also require `all_matching_data_files`
to be empty before taking the fast path. When a batch belongs to a
file already in the overwrite set, it falls through to the main
filter path which correctly emits target rows via
`file_predicate OR source_exists`.

Adds three unit tests against MergeCOWFilterStream covering the
matching-then-target patterns, plus a SQL snapshot test that exercises
the same shape end-to-end.

Fixes #128

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rampage644 rampage644 changed the title fix: enable EXPLAIN [ANALYZE] on MERGE + custom_type_coercion leaf fallback fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path Apr 15, 2026
@rampage644 rampage644 closed this Apr 23, 2026
rampage644 added a commit to Embucket/iceberg-rust that referenced this pull request Apr 23, 2026
…Z, pruning, manifest rewrite) (#57)

* fix: v2 manifest-list field renames + TIMESTAMPTZ date_transform + identity partition column filter

Three independent fixes needed to read and scan real v2 Iceberg tables
written by current Apache Iceberg (>= 1.0) and partitioned by an
identity column or a day/hour transform on TIMESTAMPTZ.

1. `iceberg-rust-spec/src/spec/manifest_list.rs` - the v2 manifest_list
   Avro schema uses `added_data_files_count` / `existing_data_files_count`
   / `deleted_data_files_count`, but the reader still used the older
   `added_files_count` / `existing_files_count` / `deleted_files_count`
   names. Any manifest list written by modern Apache Iceberg failed to
   deserialize with "field not found" before the reader even reached
   an entry. Declare each count with
   `#[serde(rename = "added_data_files_count", alias = "added_files_count")]`
   so both new and legacy field names resolve cleanly, and update the
   static reader Avro schema to emit the current names. New regression
   test `test_manifest_list_v2_apache_field_names` simulates an Apache
   Iceberg >= 1.0 writer and asserts the reader deserializes it.

2. `datafusion_iceberg/src/pruning_statistics.rs` - the internal
   `DateTransform` UDF used a hardcoded
   `OneOf(Exact([Utf8, Date32]), Exact([Utf8, Timestamp(us, None)]))`
   signature, so any timezone-aware timestamp fell through with a
   type-check error. Replace with `TypeSignature::UserDefined` plus a
   `coerce_types` impl that accepts any `Timestamp(*, *)` and normalizes
   to `Timestamp(Microsecond, None)` for the physical call. Partition
   transforms operate on i64 microseconds-since-epoch and are timezone-
   agnostic, so stripping the tz on input is safe.

3. `datafusion_iceberg/src/table.rs::datafusion_partition_columns` -
   skip partition fields whose transform is `Identity` and whose name
   equals the source column name. For those, Iceberg materializes the
   column both in the parquet file body and in the Hive-style directory
   encoding; DataFusion's parquet reader then trips on an off-by-one
   ("expected N cols but got N+1") because it tries to derive the same
   column from both places. A follow-up commit promotes this filter
   out of `datafusion_partition_columns` so the manifest pruner sees
   the same filtered list.

* refactor(table_scan): promote identity-self-named partition filter to table_scan

Move the identity-self-named partition drop out of
`datafusion_partition_columns` and into `table_scan` itself, so the
physical scan column set and the manifest pruner's
`partition_column_names` set are computed from the same filtered list.
Previously they diverged: `datafusion_partition_columns` filtered out
identity-self-named fields but the pruner still built its column
subset from the unfiltered `partition_fields`, which meant filters on
identity-self-named columns were incorrectly routed through
`PruneManifests` (and then failed the subset test on the reduced
partition schema anyway).

Introduces `file_partition_fields` (kept) and `drop_partition_indices`
(dropped), constructed once at the top of `table_scan` from the
unfiltered `partition_fields`. Both are then threaded through every
downstream consumer:
- `datafusion_partition_columns` is called with the kept list.
- The manifest-level pruner's `partition_column_names` set is built
  from the kept list and a comment documents that identity-self-named
  predicates are intentionally excluded here because they are pruned
  by per-file statistics in `PruneDataFiles` instead.
- `drop_partition_indices` is later consumed by
  `generate_partitioned_file` so callers that still need to see the
  unfiltered partition-field order can account for the gaps.

Prerequisite for follow-up commits that add the projection remap,
TIMESTAMPTZ transform acceptance, PruneDataFiles arrow_schema fix,
and manifest nested-id resolution.

* fix(datafusion_iceberg): remap caller projection to combined-schema space

DataFusionTable::schema() returns [file_schema, __data_file_path?,
__manifest_file_path?] but the physical FileScanConfig output is
[file_schema, kept_partition_transform_cols..., __data_file_path?,
__manifest_file_path?]. Any partition spec with a non-identity transform
(day, hour, month, year, bucket, truncate) creates synthetic columns
(e.g. ts_day for day(ts)) that sit between the user columns and the
metadata columns in the physical schema but are absent from the provider
schema.

table_scan() was passing the caller's `projection` (indices into the
provider schema) straight through to FileScanConfig::with_projection,
which interprets indices against the combined schema. With
enable_data_file_path_column=true this picked up `ts_day` at the slot
where `__data_file_path` was expected and silently truncated
`__manifest_file_path`, which in turn made any downstream ProjectionExec
referencing those columns by name+index fail with:

  Internal error: Input field name <col>_<transform> does not match
  with the projection expression __data_file_path.

Embucket's MERGE COW planner hits this on every partitioned target.

Fix: compute combined_projection once from the caller's projection,
remapping provider-schema indices for __data_file_path /
__manifest_file_path to their actual positions in
[file_schema, kept_partition_cols, __data_file_path?, __manifest_file_path?].
Use combined_projection throughout table_scan (no-delete path,
equality-delete base, per-closure clones).

Adds 7 regression tests (day, hour, month, year, bucket, truncate,
renamed-identity) alongside the existing unpartitioned
test_datafusion_table_insert_with_data_file_path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(arrow/transform): accept TIMESTAMP_TZ for day/hour/month/year transforms

`transform_arrow()` only matched `DataType::Timestamp(TimeUnit::Microsecond, None)`
for the day/hour/month/year arms, so any `timestamptz` column fell through to
the catchall and raised `Compute error: Failed to perform transform for datatype`.
Embucket's MERGE write path on `events_hooli` — whose `collector_tstamp` is
`TIMESTAMP_TZ` partitioned by `day(collector_tstamp)` — tripped this every time.

Iceberg's day/hour/month/year transforms are defined on the absolute instant
(microseconds since the Unix epoch), so the Arrow timezone metadata is
irrelevant to the numeric result. Widen each arm to `Timestamp(Microsecond, _)`.
For month and year the existing `date_part` call used a named-timezone path
that requires `chrono-tz`; cast to `Timestamp(Microsecond, None)` first so we
run on a naive variant that works without that feature flag.

Adds 4 regression tests exercising all four transforms with a
`TimestampMicrosecondArray::with_timezone("UTC")` input to lock the fix in.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(datafusion_iceberg): route full arrow_schema to PruneDataFiles

The second-stage data-file pruner (PruneDataFiles) was constructed with
`partition_schema` — a subset schema holding only the Hive-style partition
columns. Its `min_values`/`max_values` implementation looks up each column
referenced by the pruning predicate via `arrow_schema.field_with_name(..)`
to fetch the datatype, so any filter on a column absent from
`partition_schema` silently returned `None` and pruned nothing.

Identity-self-named partition columns (where `pf.name() == pf.source_name()`)
are intentionally dropped from `file_partition_fields` so the parquet
reader doesn't duplicate them between the path encoding and the file body,
which also drops them from `table_partition_cols` and therefore from
`partition_schema`. The result: a filter like `event_name = 'ad_start'`
against a table partitioned by `identity(event_name)` reached the second-
stage pruner but found no schema hit, so every partition file of the
target was scanned in full (`files_ranges_pruned_statistics=0`). This
only surfaced now because Embucket/embucket#126 unblocked the filter
reaching TableScan in the first place.

Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every
column the predicate might reference — identity-self-named partition
columns, non-partition columns with per-file statistics, etc. Correctness
is preserved because the first-stage `PruneManifests` path still prunes
transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...)
via manifest-list partition bounds, and synthetic partition-transform
columns simply return `None` from `PruneDataFiles` (no per-file stats
exist for them), which is the same behavior they had before.

Adds a regression test: `test_identity_self_named_partition_filter_prunes_files`
creates a `identity(kind)` partitioned table, inserts one row per
partition value to materialize 3 distinct parquet files, then scans with
`kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file
instead of 3.

Refs: Embucket/embucket#127

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(manifest): resolve nested column ids in DataFile statistics

DataFile statistics maps (`lower_bounds`, `upper_bounds`, `column_sizes`,
`value_counts`, `null_value_counts`, `nan_value_counts`) are keyed by
global column id and Iceberg assigns those ids from the same pool at
every depth — a struct field nested inside a list<struct> or inside a
context/unstruct top-level column is just as valid a key as a top-level
column. `AvroMap::into_value_map` was looking keys up via
`StructType::get`, which only consults the *top-level* `lookup` table, so
any nested id (e.g. Snowplow's `contexts_com_snowplowanalytics_*.*` fields
reaching into the 400-700 range) silently failed with
`ColumnNotInSchema`. That error was then `.unwrap()`'d inside
`from_existing_with_filter`'s per-entry closure, which panicked the
tokio worker and aborted the whole Lambda (`signal: aborted`) on any
MERGE that touched a real Snowplow events table.

Three fixes, smallest-to-largest:

1. `StructType::field_by_id(id)` — new recursive id lookup that walks
   nested `Struct`, `List`, and `Map` types. Independent from the
   existing top-level-only `get` so current callers of `get` are
   unaffected.

2. `AvroMap::into_value_map` now resolves ids via `field_by_id`. Unknown
   ids — entries pointing at fields that have been removed from the
   schema since the manifest was written — are now skipped rather than
   raised as `ColumnNotInSchema`. This matches Iceberg's schema-evolution
   semantics (old stats on removed fields are tolerated on read).

3. `iceberg-rust/src/table/manifest.rs::from_existing_with_filter`'s
   main rewrite loop is switched from `filter_map(...).unwrap()` to an
   explicit `for` loop that propagates per-entry errors via `?`. Any
   future deserialization edge case surfaces as a clean `Error` instead
   of a SIGABRT inside a tokio worker.

Two new regression tests:

- `types::tests::field_by_id_finds_nested_fields` — covers top-level,
  struct-of-struct, list<struct>, map<string, struct>, and unknown ids.
- `manifest::tests::into_value_map_accepts_nested_field_ids` — builds an
  `AvroMap<ByteBuf>` with a nested-field key (479 inside a
  list<struct>), a top-level key, and an unknown key, and asserts all
  three paths (decode nested, decode top-level, silently skip unknown).

Reproduced end-to-end: pre-fix, `MERGE INTO demo.atomic.events_hooli`
aborts the Lambda after ~21s with
`panicked at iceberg-rust/src/table/manifest.rs:549:18: ... Column 479
not in schema`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MERGE INTO silently drops unmatched target rows when target is unsorted by the ON-key

1 participant