feat(table): preserve row lineage through copy-on-write rewrites#1099
Conversation
|
@laskoviymishka can you please review it? |
laskoviymishka
left a comment
There was a problem hiding this comment.
Nice work getting _row_id preservation wired through both CoW deletes and ExecuteCompactionGroup. The explicit write + read-back path feels like the right shape for v3, and the fresh-table happy-path test is a good start.
I’d still hold this before merge though, a few things need tightening.
Biggest one: _last_updated_sequence_number. The comment in rewriteSingleFile says leaving it null makes CoW rewrites bump it to the new commit seq. I think the v3 spec says the opposite for surviving unmodified rows: copy the existing non-null value forward. That’s also what Java does in the CoW path by projecting both columns. With the current PR, every preserved row looks “last updated” by the rewrite commit to Java / PyIceberg / iceberg-rust, which breaks the cross-engine watermark semantics.
Second: SchemaWithRowID. It panics if the schema already has _row_id, which is legal in v3, and append(s.Fields(), RowID()) can alias the source schema’s backing array. The deleted TestProjectionV3SchemaWithRowIDOnly was the guardrail for the first case, so losing that test hides the issue.
Third: compaction gating. hasRowLineage returns true if any task has lineage. That means a mixed group can go through the preservation path with null and explicit _row_ids in one output file. At read time, synthesized IDs for the null rows can collide with explicit IDs from post-lineage rows — exactly what the uniqueness guarantee is supposed to avoid.
What I’d like before merge:
- fix
_last_updated_sequence_number, or clearly document that we intentionally diverge from v3 / Java - make
SchemaWithRowIDidempotent and copy the field slice - change compaction gating to “all tasks have lineage”, or split mixed groups
- gate
WithRowLineageon v3 inProjection() - bring back the still-relevant scanner tests:
Select("_row_id"), schema already has_row_id, v2 rejection - add at least one
ExecuteCompactionGrouptest - make the
next-row-idaccounting testEqual(5), notGreaterOrEqual(5) - hoist the per-batch bind/convert in
filterRecordBatchout of the iterator
Once these are covered, happy to take another look and approve.
- SchemaWithRowLineage is idempotent and projects both lineage columns
- Preserve _last_updated_sequence_number through CoW rewrites and
compaction (thread file_sequence_number from manifest entries)
- Gate WithRowLineage and WithPreserveRowLineage on v3 with explicit
errors instead of silent ignore
- Restore Select("_row_id") escape hatch for v3 tables
- Skip preserve path on mixed-lineage compaction groups instead of
producing per-file invariant violations
- Hoist filter bind/convert out of the per-batch loop; fix AlwaysFalse
empty-record to preserve schema and column arrays
- Add scanner tests for v2 rejection, schema-already-has-row-id,
Select escape hatch, and end-to-end compaction lineage preservation
1bf692f to
4fa6813
Compare
|
Thanks for the thorough pass, addressed all 12 inline comments. Going through them in order: 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. Newly added tests:
@laskoviymishka PTAL when you have a moment. |
|
Rebasing should fix the integration tests |
- SchemaWithRowLineage is idempotent and projects both lineage columns
- Preserve _last_updated_sequence_number through CoW rewrites and
compaction (thread file_sequence_number from manifest entries)
- Gate WithRowLineage and WithPreserveRowLineage on v3 with explicit
errors instead of silent ignore
- Restore Select("_row_id") escape hatch for v3 tables
- Skip preserve path on mixed-lineage compaction groups instead of
producing per-file invariant violations
- Hoist filter bind/convert out of the per-batch loop; fix AlwaysFalse
empty-record to preserve schema and column arrays
- Add scanner tests for v2 rejection, schema-already-has-row-id,
Select escape hatch, and end-to-end compaction lineage preservation
4fa6813 to
381c67c
Compare
- SchemaWithRowLineage is idempotent and projects both lineage columns
- Preserve _last_updated_sequence_number through CoW rewrites and
compaction (thread file_sequence_number from manifest entries)
- Gate WithRowLineage and WithPreserveRowLineage on v3 with explicit
errors instead of silent ignore
- Restore Select("_row_id") escape hatch for v3 tables
- Skip preserve path on mixed-lineage compaction groups instead of
producing per-file invariant violations
- Hoist filter bind/convert out of the per-batch loop; fix AlwaysFalse
empty-record to preserve schema and column arrays
- Add scanner tests for v2 rejection, schema-already-has-row-id,
Select escape hatch, and end-to-end compaction lineage preservation
The v3 first_row_id requirement in validateDataFilesToAdd was meant for externally-supplied data files. Library-internal rewrite paths get first_row_id assigned via manifest-list inheritance at write time, so the check now skips when withRewriteSemantics is set. Restores the ExecuteCompactionGroup row-lineage test.
3fe1884 to
6423310
Compare
laskoviymishka
left a comment
There was a problem hiding this comment.
Almost there. The big pieces are in now: SchemaWithRowLineage is extracted, synthesizeRowLineageColumns has its first unit test, the CoW path carries _row_id end-to-end, and TestExecuteCompactionGroupPreservesRowID covers the compaction path. Nice cleanup.
I’d still hold merge for two correctness fixes:
_last_updated_sequence_numberis synthesized fromfile_sequence_number, but it should usedata_sequence_number.synthesizeRowLineageColumnsonly has an all-null fixture right now, so we still don’t test the null-coalescing behavior this feature depends on.
Both look narrow, but they matter for cross-engine read parity.
A few smaller things are inline too: mixed-group compaction can still silently drop lineage, the WithRowLineage + selectedFields v1/v2 path needs tightening, and prepareBatchFilter captures ctx a bit oddly.
| // manifest entry so the rewrite path can synthesize | ||
| // _last_updated_sequence_number for source rows | ||
| // that have a null value (or no column) in the file. | ||
| if fseq := entry.FileSequenceNum(); fseq != nil { |
There was a problem hiding this comment.
I think this is reading the wrong manifest field. Spec §Row Lineage says when _last_updated_sequence_number is null on read, it gets assigned the manifest entry's sequence_number (field id 3, the data sequence number) — not file_sequence_number (field id 4, the snapshot that added the file). For back-dated files where they diverge, a Java or iceberg-rust reader would compute a different value for the same row.
I'd switch both this site and the matching one in scanner.go to entry.SequenceNum(). SequenceNum() returns int64 not *int64, so the nil guard collapses — drop the conditional or guard on >= 0. While we're here, I'd rename FileScanTask.DataSequenceNumber if that's where this plumbs through, so the field name matches what it actually holds.
There was a problem hiding this comment.
Good catch, switched both sites to entry.SequenceNum() and added a guard on >= 0 (since SequenceNum() returns -1 for unset). Left FileScanTask.DataSequenceNumber named as-is since the field name already matches what we're now putting in it (data sequence number); the bug was the source, not the destination.
| ) | ||
| // TestSynthesizeRowLineageColumns verifies that _row_id and _last_updated_sequence_number | ||
| // are filled from task constants when those columns are present and null. | ||
| func TestSynthesizeRowLineageColumns(t *testing.T) { |
There was a problem hiding this comment.
The test only constructs all-null inputs, so it doesn't actually prove the property the feature depends on. If synthesizeRowLineageColumns had a bug where it overwrote pre-existing non-null values, this test would still pass — and so would TestCoWRewritePreservesRowID, because fresh appends produce all-null source columns too.
The case that matters is the mixed one: some rows have explicit _row_id / _last_updated_sequence_number values from a prior write, the rest are null. That only arises after multiple rewrites of files that already carry explicit values, which is exactly the scenario the spec's null-coalescing rule exists for.
I'd add a fixture that builds a record with three rows — one explicit non-null, one null, one explicit non-null with a different value — and asserts the non-null ones survive untouched while the null one picks up firstRowID + position. Same shape for _last_updated_sequence_number.
There was a problem hiding this comment.
Added TestSynthesizeRowLineageColumnsPreservesExplicit with the 3-row mixed fixture (explicit / null / explicit-different) for both _row_id and _last_updated_sequence_number. Asserts the non-null rows survive untouched and the null row picks up firstRowID + position / task.DataSequenceNumber.
| // outputs is a larger refactor and is left as a follow-up; for now we | ||
| // degrade gracefully (the rewrite still succeeds, but lineage is not | ||
| // preserved for the surviving rows). | ||
| preserveLineage := tbl.metadata.Version() >= 3 && allTasksHaveRowLineage(group.Tasks) |
There was a problem hiding this comment.
Mixed-lineage groups silently fall through to the non-lineage path. Compaction succeeds, the output is valid, but _row_id is gone — no log, no metric, no flag in CompactionGroupResult. For tables migrating to v3, every compaction pass over a group that contains any legacy file will quietly drop lineage on the surviving rows. That's the common case during migration, not the edge case.
I'd at minimum add a slog.Warn matching the v3 pos-delete pattern, and ideally a RowLineagePreserved bool on CompactionGroupResult so callers can observe it.
There was a problem hiding this comment.
Added a slog.Warn matching the v3 pos-delete pattern in ExecuteCompactionGroup when a v3 table sees a mixed group with at least one lineage file. Holding off on the RowLineagePreserved bool on CompactionGroupResult for now — the log surfaces the silent loss, and adding to the result struct locks in API surface for what should be a transitional migration concern. Happy to add it as a follow-up if it shows up in practice.
| // versions don't have row lineage, so requesting these columns | ||
| // is an error. | ||
| userFields, lineageFields := splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive) | ||
| if len(lineageFields) > 0 && curVersion < minFormatVersionRowLineage { |
There was a problem hiding this comment.
The rejection here works only because Select happens to fail when _row_id isn't in the v1/v2 schema — the user sees ErrInvalidSchema rather than a clear "row lineage requires v3". And it's fragile: if a future v2 schema ever carries a column literally named _row_id for unrelated reasons, this short-circuits in the wrong direction.
I'd return an explicit error here for the curVersion < minFormatVersionRowLineage && len(lineageFields) > 0 case, naming the format version requirement. Same logic, but the contract is in the code rather than emergent from Select's error path.
There was a problem hiding this comment.
Done, explicit ErrInvalidOperation with the format version named, rather than relying on Select's ErrInvalidSchema. Updated TestProjectionRowLineageRejectedOnV1V2 to match.
| } | ||
| } | ||
|
|
||
| if scan.includeRowLineage { |
There was a problem hiding this comment.
There are now two code paths that select lineage columns into the projection: the includeRowLineage flag path (calls SchemaWithRowLineage) and the explicit-field-name path (calls appendMissingLineageFields). Both run when the user supplied lineage field names and set WithRowLineage, producing a redundant pass.
More concerning, they have subtly different error behavior — one rejects on v1/v2 with a clear error, the other relies on Select to fail. Anyone adding a third lineage column down the road has to update both paths and both test sets.
I'd canonicalize on one path: detect lineage fields in selectedFields up front, set scan.includeRowLineage = true if any are present, drop them from selectedFields, and from there only the includeRowLineage branch runs. wdyt?
There was a problem hiding this comment.
Did the smaller piece — skipping appendMissingLineageFields when scan.includeRowLineage is set so the redundant pass goes away. Pushed back on the full canonicalization (auto-setting includeRowLineage=true whenever any lineage name appears in selectedFields): selecting only _row_id today gives you only _row_id, but with the rewrite it would also pull in _last_updated_sequence_number. That's a behavior change for the legacy Select(_row_id) escape hatch. The maintainability concern (third column down the road) is mostly handled by splitLineageMetadataFields + SchemaWithRowLineage already centralizing the column list.
| return nil, fmt.Errorf("prepareBatchFilter: convert expression: %w", err) | ||
| } | ||
|
|
||
| ctx = exprs.WithExtensionIDSet(ctx, exprs.NewExtensionSetDefault(*extSet)) |
There was a problem hiding this comment.
The closure captures ctx at construction time with the extension-ID set baked in, but per-batch allocator and deadline values from the call-site ctx are then lost. If filterRecords resolves the allocator from ctx, intermediate arrays end up on the construction-time allocator and memory.NewCheckedAllocator accounting goes sideways under test.
I'd take the call-site ctx as a parameter on the returned closure, derive the extension-set context inside, and let allocator/deadline propagate.
While we're in here: the positional-binding assumption (lineage fields at indices N..N+1 in the record batch, filter bound against indices 0..N-1) is correct but undocumented. Worth a one-line comment so a future change to lineage column placement doesn't silently break filter evaluation.
There was a problem hiding this comment.
Changed the closure signature to func(context.Context, arrow.RecordBatch) — extSet is pre-computed once and reused, but the per-call ctx (with allocator/deadline) is attached inside the closure on each call. Also added a one-line comment on the positional-binding assumption (lineage fields at indices N..N+1, filter bound against 0..N-1).
| // rewriteSingleFile reads a single data file, applies the filter, and writes new files with filtered data. | ||
| // fileSeqNum is the source file's file_sequence_number from its manifest entry; required to synthesize | ||
| // _last_updated_sequence_number for rows whose value is null in the source file. | ||
| func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, originalFile iceberg.DataFile, fileSeqNum *int64, filter iceberg.BooleanExpression, postFilter func(arrow.RecordBatch) (arrow.RecordBatch, error), caseSensitive bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) { |
There was a problem hiding this comment.
Nine positional parameters with several same-typed adjacents (commitUUID, concurrency, and now two same-shaped filter closures) — no compile-time order protection if anyone swaps a pair. I'd pull these into a rewriteSingleFileArgs struct.
Separate but related: BindExpr runs unconditionally in rewriteSingleFile, but when preserveRowLineage == true the bound filter is unused (scanFilter becomes AlwaysTrue{}) and the same expression is bound again inside prepareBatchFilter. With preserveRowLineage == false, prepareBatchFilter runs BindExpr + ConvertExpr but postFilter is never called. One of those two binds is always wasted.
There was a problem hiding this comment.
Bundled the params into rewriteSingleFileArgs. On the wasted bind: hoisted BindExpr into the !preserveRowLineage branch so the per-file bind is no longer wasted on the lineage path. Left the prepareBatchFilter bind alone — it runs once per rewriteFilesWithFilter call rather than per file, so the cost is amortized across all rewritten files in the operation.
| // The returned function takes ownership of the input batch (it releases it on | ||
| // the AlwaysFalse fast-path) and returns a possibly-new batch the caller is | ||
| // responsible for releasing. | ||
| func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression, schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch) (arrow.RecordBatch, error), error) { |
There was a problem hiding this comment.
The AlwaysFalse fast-path does defer rec.Release(); the AlwaysTrue / bound-nil paths return rec unchanged with no Retain. So depending on which fast-path triggers, the caller either does or doesn't own a reference to the returned record. That's two contracts for one return type, and the next caller to add a Release() will double-free on the AlwaysFalse path.
I'd have every fast-path do a rec.Retain() so the caller always owns exactly one reference on the result, mirroring the bound-filter path's semantics.
There was a problem hiding this comment.
I think the current contract is consistent: input ownership transfers to output, caller releases once. Adding Retain() on the AlwaysTrue path would leak the input (caller would have to release input AND output, but the contract today is "after postFilter(rec), only release the result").
| // Idempotent: if a row-lineage column is already present (by reserved field ID), | ||
| // it is not appended again. The returned schema always allocates a fresh field | ||
| // slice so it cannot alias the input schema's backing array. | ||
| func SchemaWithRowLineage(s *Schema) *Schema { |
There was a problem hiding this comment.
Two things on the new helper.
The comment a few lines up reads as if the second reserved ID is literally 108. Should be "Integer.MAX_VALUE - 107 and Integer.MAX_VALUE - 108" — 2147483540 and 2147483539 respectively.
More substantively, SchemaWithRowLineage lives in the top-level iceberg package, which makes it part of the v1 public API surface. Java keeps the equivalent logic internal. I'd consider an unexported schemaWithRowLineage in table, or a method on *Schema, so we're not locked into the helper's exact shape if v4 lineage semantics shift. Same for WithPreserveRowLineage(schema) — making callers construct the schema feels backwards; could it take no arg and derive from the table internally?
Also worth a nil guard on s at the top here — it's exported, a nil receiver will panic on s.Fields().
There was a problem hiding this comment.
On unexporting: SchemaWithRowLineage is already used by callers outside the table package (compaction's ExecuteCompactionGroup builds the projected schema this way before passing it to WithPreserveRowLineage), so unexporting it now means duplicating the helper or carving out a different API. Worth doing as a follow-up if v4 lineage actually changes the shape, but I'd rather not preemptively redesign for a hypothetical. Same reasoning on WithPreserveRowLineage(schema) — having the caller construct the schema once keeps scan-side and write-side projections in lockstep, which matters for the compaction path where both have to match exactly.
| // ID space but doesn't violate uniqueness — actual row IDs come from | ||
| // the explicit Parquet column, not the global counter. | ||
| assert.Equal(t, int64(5), tbl.Metadata().NextRowID(), | ||
| "next-row-id should advance by original (3) + rewritten (2) = 5") |
There was a problem hiding this comment.
The comment is arithmetically misleading — the advance is 2 (the new manifest's added rows), and 5 = firstRowID(3) + 2. The "3" is the starting value, not an addend. Worth rewriting the comment to match the actual computation, and linking to the snapshot_producers.go site that mirrors ManifestListWriter.V3Writer.prepare() in Java.
Separately, I'd double-check snapshot.AddedRows here — Java's added-rows summary field is the actual added row count, not existing + added. If we're stamping the Java-equivalent of existing + added into the snapshot summary, downstream tools reading that field will see inflated numbers.
There was a problem hiding this comment.
On snapshot.AddedRows: the existing comment in snapshot_producers.go:920 explicitly says it matches Java's ManifestListWriter.V3Writer accounting (counts existing + added in new manifests). Worth a separate look since if Java's snapshot summary added-rows field actually means "newly added rows only," that's a pre-existing bug in the v3 writer rather than something this PR introduced. Want me to file that as a separate issue?
Per the v3 spec, when _last_updated_sequence_number is null on read it defaults to the manifest entry's data sequence number (field id 3), not file_sequence_number (field id 4). For back-dated EXISTING entries the two diverge and a Java/iceberg-rust reader would compute a different value for the same row. Fixed in the CoW rewrite path and PlanFiles. Also addresses inline review feedback: explicit v1/v2 rejection for lineage column names, slog.Warn on mixed-lineage compaction groups, mixed-fixture test for synthesizeRowLineageColumns, and minor cleanups (nil guard on SchemaWithRowLineage, ID comment clarification, ctx plumbing in prepareBatchFilter, rewriteSingleFileArgs struct, hoisted the wasted BindExpr off the lineage rewrite path).
|
@laskoviymishka PTAL when you have a moment. |
laskoviymishka
left a comment
There was a problem hiding this comment.
Thanks for the quick next round. Overall, this looks good to land.
The SequenceNum() fix and TestSynthesizeRowLineageColumnsPreservesExplicit cover the two main blockers. The slog.Warn on mixed-lineage compaction also surfaces the silent-drop case I was worried about.
The three inline pushbacks all look fine to me:
postFilterownership: fair. Re-reading the cases, the contract really is “one release per batch on every path”.- Keeping
SchemaWithRowLineageexported: agreed.ExecuteCompactionGroupis already a cross-package caller, and the nil guard plus the v4 follow-up plan is enough. snapshot.AddedRowsaccounting: agreed, this is pre-existingV3Writerparity, not this PR’s concern.
🚢 👍
Parent: #999
Preserves
_row_idthrough CoW rewrites and compaction on v3 tables. The scanner synthesizes row IDs from file metadata, the filter is applied post-synthesis to preserve correct positions, and the writer stores_row_idexplicitly in output Parquet._last_updated_sequence_numberis left null to inherit from the new file'sdata_sequence_number.Also documents that
next-row-idaccounting intentionally overcounts (matching Java'sManifestListWriter.V4Writer) and removes the oldProjection()synthesis helpers replaced by the explicitWithRowLineage()scan option.Test plan
TestCoWRewritePreservesRowID: verifies surviving rows keep original_row_idafter a CoW deleteTestCoWRewriteRowIDNextRowIDAccounting: validates next-row-id advances correctly