fix: MERGE INTO on partitioned Iceberg tables (projection, TIMESTAMPTZ, pruning, manifest rewrite)#57
Merged
rampage644 merged 6 commits intoembucket-sync-df50.0.0from Apr 23, 2026
Conversation
This was referenced Apr 15, 2026
…entity partition column filter
Three independent fixes needed to read and scan real v2 Iceberg tables
written by current Apache Iceberg (>= 1.0) and partitioned by an
identity column or a day/hour transform on TIMESTAMPTZ.
1. `iceberg-rust-spec/src/spec/manifest_list.rs` - the v2 manifest_list
Avro schema uses `added_data_files_count` / `existing_data_files_count`
/ `deleted_data_files_count`, but the reader still used the older
`added_files_count` / `existing_files_count` / `deleted_files_count`
names. Any manifest list written by modern Apache Iceberg failed to
deserialize with "field not found" before the reader even reached
an entry. Declare each count with
`#[serde(rename = "added_data_files_count", alias = "added_files_count")]`
so both new and legacy field names resolve cleanly, and update the
static reader Avro schema to emit the current names. New regression
test `test_manifest_list_v2_apache_field_names` simulates an Apache
Iceberg >= 1.0 writer and asserts the reader deserializes it.
2. `datafusion_iceberg/src/pruning_statistics.rs` - the internal
`DateTransform` UDF used a hardcoded
`OneOf(Exact([Utf8, Date32]), Exact([Utf8, Timestamp(us, None)]))`
signature, so any timezone-aware timestamp fell through with a
type-check error. Replace with `TypeSignature::UserDefined` plus a
`coerce_types` impl that accepts any `Timestamp(*, *)` and normalizes
to `Timestamp(Microsecond, None)` for the physical call. Partition
transforms operate on i64 microseconds-since-epoch and are timezone-
agnostic, so stripping the tz on input is safe.
3. `datafusion_iceberg/src/table.rs::datafusion_partition_columns` -
skip partition fields whose transform is `Identity` and whose name
equals the source column name. For those, Iceberg materializes the
column both in the parquet file body and in the Hive-style directory
encoding; DataFusion's parquet reader then trips on an off-by-one
("expected N cols but got N+1") because it tries to derive the same
column from both places. A follow-up commit promotes this filter
out of `datafusion_partition_columns` so the manifest pruner sees
the same filtered list.
… table_scan Move the identity-self-named partition drop out of `datafusion_partition_columns` and into `table_scan` itself, so the physical scan column set and the manifest pruner's `partition_column_names` set are computed from the same filtered list. Previously they diverged: `datafusion_partition_columns` filtered out identity-self-named fields but the pruner still built its column subset from the unfiltered `partition_fields`, which meant filters on identity-self-named columns were incorrectly routed through `PruneManifests` (and then failed the subset test on the reduced partition schema anyway). Introduces `file_partition_fields` (kept) and `drop_partition_indices` (dropped), constructed once at the top of `table_scan` from the unfiltered `partition_fields`. Both are then threaded through every downstream consumer: - `datafusion_partition_columns` is called with the kept list. - The manifest-level pruner's `partition_column_names` set is built from the kept list and a comment documents that identity-self-named predicates are intentionally excluded here because they are pruned by per-file statistics in `PruneDataFiles` instead. - `drop_partition_indices` is later consumed by `generate_partitioned_file` so callers that still need to see the unfiltered partition-field order can account for the gaps. Prerequisite for follow-up commits that add the projection remap, TIMESTAMPTZ transform acceptance, PruneDataFiles arrow_schema fix, and manifest nested-id resolution.
…pace DataFusionTable::schema() returns [file_schema, __data_file_path?, __manifest_file_path?] but the physical FileScanConfig output is [file_schema, kept_partition_transform_cols..., __data_file_path?, __manifest_file_path?]. Any partition spec with a non-identity transform (day, hour, month, year, bucket, truncate) creates synthetic columns (e.g. ts_day for day(ts)) that sit between the user columns and the metadata columns in the physical schema but are absent from the provider schema. table_scan() was passing the caller's `projection` (indices into the provider schema) straight through to FileScanConfig::with_projection, which interprets indices against the combined schema. With enable_data_file_path_column=true this picked up `ts_day` at the slot where `__data_file_path` was expected and silently truncated `__manifest_file_path`, which in turn made any downstream ProjectionExec referencing those columns by name+index fail with: Internal error: Input field name <col>_<transform> does not match with the projection expression __data_file_path. Embucket's MERGE COW planner hits this on every partitioned target. Fix: compute combined_projection once from the caller's projection, remapping provider-schema indices for __data_file_path / __manifest_file_path to their actual positions in [file_schema, kept_partition_cols, __data_file_path?, __manifest_file_path?]. Use combined_projection throughout table_scan (no-delete path, equality-delete base, per-closure clones). Adds 7 regression tests (day, hour, month, year, bucket, truncate, renamed-identity) alongside the existing unpartitioned test_datafusion_table_insert_with_data_file_path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nsforms
`transform_arrow()` only matched `DataType::Timestamp(TimeUnit::Microsecond, None)`
for the day/hour/month/year arms, so any `timestamptz` column fell through to
the catchall and raised `Compute error: Failed to perform transform for datatype`.
Embucket's MERGE write path on `events_hooli` — whose `collector_tstamp` is
`TIMESTAMP_TZ` partitioned by `day(collector_tstamp)` — tripped this every time.
Iceberg's day/hour/month/year transforms are defined on the absolute instant
(microseconds since the Unix epoch), so the Arrow timezone metadata is
irrelevant to the numeric result. Widen each arm to `Timestamp(Microsecond, _)`.
For month and year the existing `date_part` call used a named-timezone path
that requires `chrono-tz`; cast to `Timestamp(Microsecond, None)` first so we
run on a naive variant that works without that feature flag.
Adds 4 regression tests exercising all four transforms with a
`TimestampMicrosecondArray::with_timezone("UTC")` input to lock the fix in.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The second-stage data-file pruner (PruneDataFiles) was constructed with `partition_schema` — a subset schema holding only the Hive-style partition columns. Its `min_values`/`max_values` implementation looks up each column referenced by the pruning predicate via `arrow_schema.field_with_name(..)` to fetch the datatype, so any filter on a column absent from `partition_schema` silently returned `None` and pruned nothing. Identity-self-named partition columns (where `pf.name() == pf.source_name()`) are intentionally dropped from `file_partition_fields` so the parquet reader doesn't duplicate them between the path encoding and the file body, which also drops them from `table_partition_cols` and therefore from `partition_schema`. The result: a filter like `event_name = 'ad_start'` against a table partitioned by `identity(event_name)` reached the second- stage pruner but found no schema hit, so every partition file of the target was scanned in full (`files_ranges_pruned_statistics=0`). This only surfaced now because Embucket/embucket#126 unblocked the filter reaching TableScan in the first place. Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every column the predicate might reference — identity-self-named partition columns, non-partition columns with per-file statistics, etc. Correctness is preserved because the first-stage `PruneManifests` path still prunes transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...) via manifest-list partition bounds, and synthetic partition-transform columns simply return `None` from `PruneDataFiles` (no per-file stats exist for them), which is the same behavior they had before. Adds a regression test: `test_identity_self_named_partition_filter_prunes_files` creates a `identity(kind)` partitioned table, inserts one row per partition value to materialize 3 distinct parquet files, then scans with `kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file instead of 3. Refs: Embucket/embucket#127 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
DataFile statistics maps (`lower_bounds`, `upper_bounds`, `column_sizes`, `value_counts`, `null_value_counts`, `nan_value_counts`) are keyed by global column id and Iceberg assigns those ids from the same pool at every depth — a struct field nested inside a list<struct> or inside a context/unstruct top-level column is just as valid a key as a top-level column. `AvroMap::into_value_map` was looking keys up via `StructType::get`, which only consults the *top-level* `lookup` table, so any nested id (e.g. Snowplow's `contexts_com_snowplowanalytics_*.*` fields reaching into the 400-700 range) silently failed with `ColumnNotInSchema`. That error was then `.unwrap()`'d inside `from_existing_with_filter`'s per-entry closure, which panicked the tokio worker and aborted the whole Lambda (`signal: aborted`) on any MERGE that touched a real Snowplow events table. Three fixes, smallest-to-largest: 1. `StructType::field_by_id(id)` — new recursive id lookup that walks nested `Struct`, `List`, and `Map` types. Independent from the existing top-level-only `get` so current callers of `get` are unaffected. 2. `AvroMap::into_value_map` now resolves ids via `field_by_id`. Unknown ids — entries pointing at fields that have been removed from the schema since the manifest was written — are now skipped rather than raised as `ColumnNotInSchema`. This matches Iceberg's schema-evolution semantics (old stats on removed fields are tolerated on read). 3. `iceberg-rust/src/table/manifest.rs::from_existing_with_filter`'s main rewrite loop is switched from `filter_map(...).unwrap()` to an explicit `for` loop that propagates per-entry errors via `?`. Any future deserialization edge case surfaces as a clean `Error` instead of a SIGABRT inside a tokio worker. Two new regression tests: - `types::tests::field_by_id_finds_nested_fields` — covers top-level, struct-of-struct, list<struct>, map<string, struct>, and unknown ids. - `manifest::tests::into_value_map_accepts_nested_field_ids` — builds an `AvroMap<ByteBuf>` with a nested-field key (479 inside a list<struct>), a top-level key, and an unknown key, and asserts all three paths (decode nested, decode top-level, silently skip unknown). Reproduced end-to-end: pre-fix, `MERGE INTO demo.atomic.events_hooli` aborts the Lambda after ~21s with `panicked at iceberg-rust/src/table/manifest.rs:549:18: ... Column 479 not in schema`. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f65b93d to
e7b9d06
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Six commits that together let
MERGE INTOwork on real partitioned Iceberg targets, verified end-to-end against a 161 GB / 143M-row Snowplowevents_hoolitable on S3 Tables through a deployed Embucket Lambda. Supersedes and folds in #56.The fixes
v2 manifest-list field renames + TIMESTAMPTZ date_transform + identity partition column filter (
fe678ad) — three small wire-format / type-check fixes needed to even read real v2 Iceberg tables:manifest_list.rs: declare count fields with#[serde(rename = "added_data_files_count", alias = "added_files_count")]so manifest lists written by current Apache Iceberg (>= 1.0, which renamed the v2 field names) deserialize cleanly. Regression testtest_manifest_list_v2_apache_field_names.pruning_statistics.rs::DateTransform: replace the hardcodedOneOf(Exact(...))signature withTypeSignature::UserDefined+coerce_typesso thedate_transformUDF accepts timezone-aware timestamps and normalizes them toTimestamp(Microsecond, None)for the transform call.table.rs::datafusion_partition_columns: skip identity-self-named partition fields so DataFusions parquet reader doesnt trip on the duplicate-column "expected N cols but got N+1" error.refactor(table_scan): promote identity-self-named partition filter to table_scan (
b940459) — introducefile_partition_fields/drop_partition_indicesat the top oftable_scanand thread them through every downstream consumer. Ensures the physical scan column set and the manifest prunerspartition_column_namesset agree (they used to diverge). Prerequisite for the rest of the stack.Projection remap to combined-schema space (
690b42b) —table_scanwas passing the callers provider-schema projection straight through toFileScanConfig::with_projection, which interprets indices against[file_schema, table_partition_cols]. With synthetic transform partition columns (ts_day,id_bucket, ...) added by the physical layer, the two schemas diverged and the indices for__data_file_path/__manifest_file_pathshifted. Fix: compute acombined_projectiononce, remapping caller indices to combined-schema positions, and use it in both the no-delete and equality-delete branches. 7 new regression tests in thetable.rsmod tests (day / hour / month / year / bucket / truncate / identity-renamed partition transforms all stay green on the__data_file_pathscan path).Accept TIMESTAMPTZ for day/hour/month/year transforms (
8d242d5) —transform_arrowiniceberg-rust/src/arrow/transform.rsonly matchedTimestamp(Microsecond, None)for day/hour/month/year and fell through for timezone-aware timestamps withCompute error: Failed to perform transform for datatype. Widen the match arms toTimestamp(Microsecond, _). Month/year additionally cast the array toTimestamp(Microsecond, None)before callingdate_partto avoid achrono-tzdependency. 4 new regression tests.PruneDataFiles arrow_schema (
cac01cd) — pass the fullarrow_schematoPruneDataFiles::newinstead of the narrowpartition_schema. The second-stage pruner looks up each filter columns datatype viaarrow_schema.field_with_name(...)to read per-file lower/upper bounds from the manifest; with onlypartition_schema(which is built from the reducedtable_partition_cols), any column not in the Hive-style partition-key set — including identity-self-named partition columns that were dropped fromtable_partition_colsbecause theyre materialized in the parquet body — silently returnedNoneand pruned nothing. With the full arrow schema, filters likeevent_name = ad_start_eventnow actually prune files. Fixes [iceberg-rust] identity-self-named partition columns are excluded from target-side filter pruning embucket#127. Regression testtest_identity_self_named_partition_filter_prunes_filesasserts a 3-partition identity-partitioned table drops to 1 file under a filter.Manifest DataFile stats — resolve nested column ids + propagate deserialization errors (
e7b9d06) —AvroMap::into_value_mapused to validatelower_bounds/upper_boundskeys against only the top-levelStructType::lookup, so any nested field id (e.g. a Snowplowcontexts_com_snowplowanalytics_*.*leaf at id 479) returnedColumnNotInSchema. The error was then.unwrap()d insidefrom_existing_with_filters per-entry closure, panicking the tokio worker and SIGABRT-ing the whole Lambda on any MERGE that rewrote an existing manifest. Three changes:StructType::field_by_id(id)that walks nestedStruct,List,Maptypes recursively.into_value_mapnow resolves ids viafield_by_idand silently skips unknown ids, matching Icebergs schema-evolution semantics (old stats on removed fields must not break readers).filter_map(...).unwrap()infrom_existing_with_filteris replaced by an explicitforloop that propagates per-entry errors via?, so any future deserialization edge case surfaces as a cleanErrorinstead of a panic.types::tests::field_by_id_finds_nested_fields(top-level + struct-of-struct + list + map<string, struct> + unknown id) andmanifest::tests::into_value_map_accepts_nested_field_ids(nested decode + top-level decode + silent-skip).Tests
cargo test -p iceberg-rust --lib— 89 / 89 green.cargo test -p iceberg-rust-spec --lib— 88 / 88 green (includes the new nested-id tests).cargo test -p datafusion_iceberg --lib— 19 new/changed tests pass. Eight pre-existing failures onfix/v2-manifest-field-names(tokio-runtime,test_datafusion_table_insert_partitioned, materialized view tests) are unrelated and still pre-existing.End-to-end proof on a patched Embucket Lambda
Deployed the combined build (this PR + Embucket/embucket#126) to
embucket-demo-embucket-demo-ramp-1775514830and ran, against the real 143M-row / 161 GB Snowplowevents_hooli:event_name = ad_start_eventpartition).events_hoolipartitioned byday(collector_tstamp) + identity(event_name).Unsupported SQL statement: MERGE INTOon EXPLAIN paths → (b)Schema error: No field named event_namein the analyzer → (c)Input field name <col>_<transform> does not match with the projection expression __data_file_pathat plan time → (d)Compute error: Failed to perform transform for datatypeon TIMESTAMPTZ → (e) scan-time "expected N cols but got N+1" from the identity-partition duplicate → (f) partition pruning silently dropping nothing → (g)panicked at iceberg-rust/src/table/manifest.rs:549: Column 479 not in schemaduring manifest rewrite, SIGABRT. Each commit in this PR addresses one of these failure modes.MergeIntoSinkExec, metrics=[updated_rows=10, inserted_rows=0, deleted_rows=0], 10 rows carry the new tag, 0 rows bleed into other partitions.EXPLAIN ANALYZEconfirms plan-time pruning: the targetDataSourceExecshowsfile_groups={6 groups}but every path in it sits underevent_name=ad_start_event(one ~644 MB partition file split into 6 byte-range groups for parallel scan). Every other partition file in the table was pruned at plan time by iceberg-rusts manifest pruner before the physical plan was built. Scan output = 628,274 rows; HashJoin build = 10 rows withbuild_mem_used=1212bytes; sink rewrites the partition file with the 10 updates applied in-place via MERGE COW.Covered by 14 new regression tests plus this end-to-end verification against real Snowplow data on S3 Tables.