[SPARK-56568][SQL] Add id() to DSv2 Column to detect drop-and-re-add columns at refresh time#55376
[SPARK-56568][SQL] Add id() to DSv2 Column to detect drop-and-re-add columns at refresh time#55376longvu-db wants to merge 98 commits intoapache:masterfrom
Conversation
|
Please get an official JIRA ID and use it in the PR title to convert back from |
b05fd95 to
59fbbf4
Compare
| col("ID", StringType, nullable = true), // duplicate with different case | ||
| col("name", StringType, nullable = true)) | ||
| val table = TestTableWithMetadataSupport("test", currentCols) | ||
|
|
There was a problem hiding this comment.
Note: I feel like this suite is kinda redundant because it tests the implementation (all these internal helper functions), whereas we should just focus on testing the behavior (DSv2DFSuite.scala) (which should also sufficiently tests the implementation, if the behavior is well-tested)
469f398 to
5b13905
Compare
2ae37b5 to
b7ed4e5
Compare
andreaschat-db
left a comment
There was a problem hiding this comment.
Thanks @longvu-db. I left a couple of comments. I did not review the two suites yet.
| metadataInJSON: String, | ||
| override val id: String = null) extends Column { | ||
|
|
||
| // [[id]] is excluded from [[equals]] and [[hashCode]]. |
There was a problem hiding this comment.
This needs some more explanation about why it is correct. In principle, two columns with different column IDs shouldn't produce different hashcode? I guess including it in the hashcode break existing code so we extract this validation outside equals/hashcode?
Potentially this is also something we need to mention in the Java doc of ID?
There was a problem hiding this comment.
ColumnImpl.equals defines schema equality (name, type, nullability, etc.), not identity equality. Column ID is a catalog tracking concern validated separately by V2TableUtil.validateColumnIds, so it doesn't participate in schema comparison.
For example, CatalogSuite does assert(table.columns === columns) where the input columns have id = null but InMemoryBaseTable assigns IDs on creation.
Including id in equals would break this and every similar assertion, requiring tests to predict exact IDs assigned by the connector.
=> Added more comments in ColumnImpl
ColumnImpl is internal (o.a.s.sql.internal.connector), so this is an implementation detail. I think we can leave it out of the public java doc.
| .map(currentCol => normalize(currentCol.name()) -> currentCol).toMap | ||
| val errors = new mutable.ArrayBuffer[String]() | ||
| for (originalCapturedCol <- originalCapturedCols) { | ||
| if (originalCapturedCol.id() != null) { |
There was a problem hiding this comment.
You could do this functionally with something like:
| if (originalCapturedCol.id() != null) { | |
| originalCapturedCols | |
| .filter(_.id() != null) | |
| .flatMap { originalCol => | |
| currentColsByNormalizedName | |
| .get(normalize(originalCol.name())) | |
| .filter(current => current.id() != null && current.id() != originalCol.id()) | |
| .map(current => s"`${originalCol.name()}` column ID has changed from " + | |
| s"${originalCol.id()} to ${current.id()}") |
You can also move the content of flatMap to a different method to clean it up more. So it can become .flatMap(col).
There was a problem hiding this comment.
I tried it out
originalCapturedCols
165 - .filter(_.id() != null)
166 - .flatMap { originalCol =>
167 - currentColsByNormalizedName
168 - .get(normalize(originalCol.name()))
169 - .filter(current => current.id() != null && current.id() != originalCol.id())
170 - .map(current => s"${originalCol.name()}column ID has changed from " +
171 - s"${originalCol.id()} to ${current.id()}")
I think the current imperative style with pattern matching would be more readable since the chianed function transformation is quite complicated, and each case map would clearly reflect a simple transition state.
Incoroporated your other comment on the "1.", "2."
| newColumns.map { newCol => | ||
| oldColumns.find(c => normalize(c.name()) == normalize(newCol.name())) match { | ||
| case Some(oldCol) if oldCol.id() != null && | ||
| oldCol.dataType() == newCol.dataType() => |
There was a problem hiding this comment.
Why do we check the datatype here? Is it just the datatype we need to check? What about nullability for example?
There was a problem hiding this comment.
This is test infrastructure simulating a connector's ID assignment policy. The rationale: a type change (e.g., INT to STRING) is a fundamental change to the column's nature, so we treat it as a new column and assign a new ID. A nullability change (e.g., NOT NULL to NULLABLE) is a constraint change on the same column, so we preserve the ID.
TypeChangePreservesColIdTableCatalog exists to test the alternative policy where even type changes preserve IDs, showing this is a connector-specific decision.
| * | ||
| * This method reloads table metadata from the catalog and validates: | ||
| * - Table identity: Ensures table ID has not changed | ||
| * - Column IDs: Validates captured column IDs match the current table's column IDs |
There was a problem hiding this comment.
What about? Same below.
| * - Column IDs: Validates captured column IDs match the current table's column IDs | |
| * - Column IDs: Verifies column IDs have match |
There was a problem hiding this comment.
Would "Verifies column IDs have not changed" work? Cause if we mention "match", we need to mention who matches with who, so "have not changed" is more succint
| metadataInJSON: String, | ||
| override val id: String = null) extends Column { | ||
|
|
||
| // [[id]] is excluded from [[equals]] and [[hashCode]]. |
There was a problem hiding this comment.
ColumnImpl.equals defines schema equality (name, type, nullability, etc.), not identity equality. Column ID is a catalog tracking concern validated separately by V2TableUtil.validateColumnIds, so it doesn't participate in schema comparison.
For example, CatalogSuite does assert(table.columns === columns) where the input columns have id = null but InMemoryBaseTable assigns IDs on creation.
Including id in equals would break this and every similar assertion, requiring tests to predict exact IDs assigned by the connector.
=> Added more comments in ColumnImpl
ColumnImpl is internal (o.a.s.sql.internal.connector), so this is an implementation detail. I think we can leave it out of the public java doc.
| * | ||
| * This method reloads table metadata from the catalog and validates: | ||
| * - Table identity: Ensures table ID has not changed | ||
| * - Column IDs: Validates captured column IDs match the current table's column IDs |
There was a problem hiding this comment.
Would "Verifies column IDs have not changed" work? Cause if we mention "match", we need to mention who matches with who, so "have not changed" is more succint
| newColumns.map { newCol => | ||
| oldColumns.find(c => normalize(c.name()) == normalize(newCol.name())) match { | ||
| case Some(oldCol) if oldCol.id() != null && | ||
| oldCol.dataType() == newCol.dataType() => |
There was a problem hiding this comment.
This is test infrastructure simulating a connector's ID assignment policy. The rationale: a type change (e.g., INT to STRING) is a fundamental change to the column's nature, so we treat it as a new column and assign a new ID. A nullability change (e.g., NOT NULL to NULLABLE) is a constraint change on the same column, so we preserve the ID.
TypeChangePreservesColIdTableCatalog exists to test the alternative policy where even type changes preserve IDs, showing this is a connector-specific decision.
| .map(currentCol => normalize(currentCol.name()) -> currentCol).toMap | ||
| val errors = new mutable.ArrayBuffer[String]() | ||
| for (originalCapturedCol <- originalCapturedCols) { | ||
| if (originalCapturedCol.id() != null) { |
There was a problem hiding this comment.
I tried it out
originalCapturedCols
165 - .filter(_.id() != null)
166 - .flatMap { originalCol =>
167 - currentColsByNormalizedName
168 - .get(normalize(originalCol.name()))
169 - .filter(current => current.id() != null && current.id() != originalCol.id())
170 - .map(current => s"${originalCol.name()}column ID has changed from " +
171 - s"${originalCol.id()} to ${current.id()}")
I think the current imperative style with pattern matching would be more readable since the chianed function transformation is quite complicated, and each case map would clearly reflect a simple transition state.
Incoroporated your other comment on the "1.", "2."
|
@longvu-db you added a lot of testing infra with a lot of different catalogs. Could you add a verbose description in the testing section of this PR describing the overall concept of what they are all doing and testing for easier digestion? |
Review: SPARK-56568 —
|
| Dimension | Delta | Iceberg | This PR |
|---|---|---|---|
| ID type | Long stored, Int exposed (DeltaColumnMapping.scala:217-221) |
int (Types.NestedField.id) |
String |
| Granularity | Every StructField (incl. nested struct fields); array/map element/key/value IDs in side-channel delta.columnMapping.nested.ids (only with IcebergCompatV2) |
Every nesting level — struct field, list element, map key, map value | Top-level only |
| Historical uniqueness | Table prop delta.columnMapping.maxColumnId, monotonic, never reused (DeltaColumnMapping.scala:436-483) |
last-column-id in TableMetadata, Preconditions.checkArgument(newLastColumnId >= lastColumnId) (TableMetadata.java:1598-1621) |
Catalog's responsibility (contract: "drop+re-add must yield a different ID") |
| Rename | Preserves ID + physical name; only logical name changes (DeltaColumnMapping.scala:709-728) |
withName(newName) keeps fieldId (SchemaUpdate.java:205-219) |
Same expectation, but the PR's check uses name lookup, so a rename surfaces as COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSIS, not as a "rename" signal |
| Drop+re-add | New ID, new physical name | New ID via assignNewColumnId() (SchemaUpdate.java:478) |
Detected — exactly the motivating case |
equals includes ID? |
N/A (no Column equality contract) |
Yes (Types.java:996-998) — Iceberg's read path is id-keyed |
No — excluded; Spark's structural-identity comparisons must keep working |
| Read-time resolution | Physical name (rename-stable) | ID (Parquet field_id); name only as fallback |
Still by name; ID is purely a change-detection signal |
| Existing analyze-vs-execute check | Streaming only, via DeltaSourceMetadataEvolutionSupport snapshot diff using physical names |
Implicit — expectedSchema carries IDs, Parquet reader resolves by ID per file |
New, generic, batch+stream at the DSv2 boundary |
The contract this PR asks of connectors ("stable across renames, fresh on drop+re-add, never reused") is exactly the invariant both Delta (maxColumnId) and Iceberg (last-column-id) already enforce table-wide. The naive adoption is a one-liner each — Integer.toString(nestedField.fieldId()) for Iceberg, getColumnId(field).toString for Delta — but that under-uses the API. See Why top-level only, and what that means for connectors below for the full picture.
Why top-level only, and what that means for connectors
Why the PR limits IDs to top-level columns
Three forces stack up:
1. The DSv2 surface doesn't natively carry IDs below Column. Nested fields live inside DataType (StructType.fields, ArrayType.elementType, MapType.keyType/valueType). Putting IDs at every nesting level requires either growing StructField.metadata to carry an ID slot under a well-known key (the way parquet.field.id is propagated today) or threading a parallel ID tree through DataType. Both are invasive — every analyzer rule, every schema serializer, every equals site has to be audited. One default method on Column is the smallest change that ships.
2. The motivating bug is top-level. This is refresh-time validation: relation cache rehydrates, captured schema vs. live schema. The dangerous, silently-wrong case is "captured top-level column c:int and current table also has c:int, but c was dropped and recreated between analyze and execute." That's a top-level identity question. Nested same-name same-type drop+re-add inside a struct does happen, but it's rarer and almost always accompanied by other changes that the existing schema validator catches.
3. Layered defense is sufficient for the v1 use case. The PR description spells out the layering:
- Top-level drop+re-add (same name, same type) → caught by
Column.id()mismatch. - Any change that alters
DataType(added/removed nested field, type widening, etc.) → caught by the existingvalidateDataColumns(COLUMNS_MISMATCH). - Nested same-name same-type drop+re-add → falls through both, unless the connector chose to bump the top-level ID.
That last gap is the cost. The PR pushes the responsibility for closing it onto the connector — and, importantly, leaves the door open for connectors to do so without further API changes.
The escape hatch: String IDs can encode arbitrary subtree structure
The contract on Column.id() is just string equality. There is no constraint on what the string contains — only that it must differ when the column's identity differs and match when it doesn't. That means a connector can encode the entire subtree's IDs into the top-level string and Spark will detect any change at any depth, even though Spark itself never traverses below the top level.
This turns the top-level-only API from a sharp limitation into a soft one: the protocol is top-level, but the information the connector chooses to fold into that string can be as deep as the connector's own ID model goes. Iceberg has IDs everywhere → full nested coverage. Delta has IDs on every StructField (and on array/map nested elements when IcebergCompatV2 is on) → matching coverage. A simpler catalog with only top-level IDs gets only top-level coverage; that's an honest reflection of what the catalog itself tracks.
Iceberg adapter
Naive — top-level only, misses nested same-name same-type drop+re-add:
@Override public String id() {
return Integer.toString(nestedField.fieldId());
}Recommended — encodes the whole subtree, recovers full nested fidelity:
@Override public String id() {
StringBuilder sb = new StringBuilder();
sb.append(nestedField.fieldId());
appendSubtreeIds(nestedField.type(), sb); // visits struct fields, list element, map key/value
return sb.toString();
}
private static void appendSubtreeIds(Type type, StringBuilder sb) {
if (type.isStructType()) {
for (Types.NestedField f : type.asStructType().fields()) {
sb.append(',').append(f.fieldId());
appendSubtreeIds(f.type(), sb);
}
} else if (type.isListType()) {
Types.NestedField e = type.asListType().fields().get(0);
sb.append(",L").append(e.fieldId());
appendSubtreeIds(e.type(), sb);
} else if (type.isMapType()) {
Types.NestedField k = type.asMapType().fields().get(0);
Types.NestedField v = type.asMapType().fields().get(1);
sb.append(",K").append(k.fieldId()).append("V").append(v.fieldId());
appendSubtreeIds(k.type(), sb);
appendSubtreeIds(v.type(), sb);
}
}Iceberg's last-column-id invariant (TableMetadata.java:1598-1621 — never decreases) guarantees every drop+re-add at any depth produces a fresh int, so any subtree change yields a different string.
The natural plug-in point is the wrapper Column returned from SparkTable.columns(). Iceberg's existing TypeToSparkType adapter (spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java:60-92,159-164) doesn't expose IDs to Spark today — fieldMetadata only sets the __metadata_col flag. Iceberg either grows that path to put IDs in StructField.metadata under a Spark-side well-known key, or wraps Column directly to override id(). The latter is a smaller change and fits the PR's API exactly.
Delta adapter
Top-level only — sufficient for name/id mapping mode, no nested coverage:
override def id(): String = DeltaColumnMapping.getColumnId(field).toStringWith nested coverage — Delta has IDs on every nested StructField via delta.columnMapping.id, and (when IcebergCompatV2 is enabled) on array/map elements via delta.columnMapping.nested.ids:
override def id(): String = {
val sb = new StringBuilder
sb.append(DeltaColumnMapping.getColumnId(field))
appendNestedIds(field, sb) // recurse into struct fields + nested.ids map
sb.toString
}Without IcebergCompatV2, Delta has IDs on nested struct fields but not on array elements or map keys/values. Coverage there is limited to nested-struct depth; same-name same-type drop+re-add of an array element or map value won't change the encoded string. In practice this matches Delta's own internal model — Delta itself doesn't track those identities without IcebergCompatV2 — so it's an honest limitation, not a regression.
Renames stay rename-stable through any encoding
For both adapters above: a logical rename in Delta or Iceberg keeps every fieldId/columnMapping.id in the subtree unchanged. Only the StructField.name flips. So the encoded id() string is identical pre- and post-rename, and Spark sees "same column ID, different name." Combined with this PR's name-keyed lookup, that means a rename surfaces today as COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSIS (the original name disappeared) — see observation 3 below for why this is OK in v1 and what a v2 could do.
What stays unsolved
If a connector has no nested IDs at all (e.g. a JDBC catalog with only top-level column IDs in its system catalog), nested same-name same-type drop+re-add is undetectable through this PR. That's an inherent limitation of the connector, not of the API; no encoding trick rescues information the catalog never had.
Spark-side improvement worth a follow-up
Spark could provide a default helper on Column (or V2TableUtil) that does the subtree-id encoding for connectors that annotate nested IDs in StructField.metadata under a well-known key, mirroring how parquet.field.id works in the read path. That would let connectors annotate metadata in the natural place and have Spark do the encoding consistently, instead of every connector reimplementing the same recursion. The current PR doesn't need this — and shouldn't take it on — but its String-typed return value already leaves the door open for it as a future optimization.
Test infrastructure: what's there and what each piece is for
The PR adds ~1,500 lines of test scaffolding split across 5 new test catalogs, modifications to 2 existing test fixtures, and 2 test suites (one extended, one new). It looks like a lot, but each piece exists to exercise a distinct cell in the connector-behavior × schema-change × query-shape × session-topology matrix. Walking through it:
The base fixture: InMemoryBaseTable and InMemoryTableCatalog (modified)
Before this PR, the in-memory test table didn't track column IDs at all. The PR retrofits ID assignment so any test that uses these fixtures gets ID-aware behavior for free.
InMemoryBaseTable.scala:74-85 — at table construction, walksinitialColumnsand assigns a fresh string ID (from a globalAtomicLong-backed counter,nextColumnId) to any column withid == null. Existing IDs are preserved.InMemoryBaseTable.assignMissingIds(782-825) — the merge function called on schema changes. Matches by lowercased name; preserves the old ID iffoldCol.dataType == newCol.dataType, else mints a fresh one. This is what makes the canonical "drop+re-add gets new ID" semantics work in tests.BasicInMemoryTableCatalog.alterTable(InMemoryTableCatalog.scala:181-200) — everyalterTablenow routes throughassignMissingIds, so IDs survive the lossyColumn → StructType → Columnround-trip thatalterTablehad been performing on the underlying schema. This is the PR's commitb4900531858 Fix V2Filter alterTable column ID loss.
Point: provides the "fully-featured connector that supports column IDs" baseline. Every other test catalog is a deviation from this baseline along one specific axis.
The five new test catalogs — one per connector behavior axis
Each subclass extends InMemoryTableCatalog and overrides exactly one thing, modeling a real-world connector configuration:
| Catalog | Override | Real-world analog | What it proves |
|---|---|---|---|
NullTableIdInMemoryTableCatalog (50 LOC) |
createTable returns table with id = null, but columns still have IDs |
Catalog that doesn't track table identity (e.g. some legacy v2 connectors) but has column-level identity | Column-ID validation alone catches drop+recreate when table-ID validation is unavailable — proves the two checks are independent |
NullColumnIdInMemoryTableCatalog (105 LOC) |
columns() overridden to .copy(id = null) for every column |
Pre-PR connectors with no ID support at all | Validation is skipped gracefully — drop+re-add is not detected, no false positives, no NPEs. This is the "do no harm" guarantee for the opt-in default |
TypeChangePreservesColIdTableCatalog (68 LOC) |
alterTable overridden to preserve old IDs by name even across type changes |
Delta and Iceberg, both of which preserve fieldId across allowed type widening |
Layered defense: ID check passes (preserved), schema validation fires for the type mismatch instead. Proves the two defenses compose correctly |
MixedColumnIdTableCatalog (125 LOC) |
columns() selectively strips IDs based on a mutable name-set, snapshotted per-table at create/alter |
Catalog with partial ID support (some tables instrumented, others not), or a connector mid-migration where ID support is rolling out | The four ID-transition cells — null→null, null→id, id→null, id→id-different — are each their own test case. Only the last fires; the others all skip safely |
SharedInMemoryTableCatalog (47 LOC) |
Backs tables and namespaces with static ConcurrentHashMaps; extends NullTableIdInMemoryTableCatalog |
A real shared metastore (Hive Metastore, Unity Catalog, Glue) accessed by multiple sessions | Two SparkSessions see the same catalog state; one mutates, the other has a stale DataFrame. Used only by the cross-session suite |
The choice to extend NullTableIdInMemoryTableCatalog for the shared-catalog case is deliberate: by zeroing the table ID, the only line of defense left is column IDs, so the cross-session tests are forced to exercise the new code path rather than falling back to the existing validateTableIdentity.
The matrix the catalogs span: full support / no table ID / no column ID / partial column ID / type-preserving. This covers the realistic combinations a connector could be in, and gives the suite a way to assert that each behavior handles the new check correctly without coupling test outcomes to incidental fixture details.
DataSourceV2DataFrameSuite (extended, +838 lines)
Houses the bulk of behavioral tests, ~38 cases organized into 14 commented sections. The grouping is the spec — reading the section comments tells you what behavior the PR is committing to:
| Section | Tests verify… |
|---|---|
| Mismatch detection | Drop+re-add detected: same type, different type, different case, multiple columns. Same-case-different-name (case-sensitive mode) caught by schema check, not ID check. Mixed-case type. Column addition is not falsely flagged. |
| Complex types | Drop+re-add of array/map columns flagged. Nested struct field drop+re-add caught (because top-level type changes → InMemory mints new top-level ID). Adding nested field tolerated when connector preserves IDs and read mode allows new fields. Adding a field inside an array element struct or map value struct flagged. Pure data inserts on complex columns don't trigger. |
| Join detection | A stale DataFrame joined with a fresh one still trips the check on the stale side. |
| DataFrame operation types | Filter, aggregate, sort, project-subset all reach the validation — transformWithSubqueries walks every DataSourceV2Relation, and the check fires regardless of operator above the relation. |
| Subquery | Subquery referencing the stale relation also gets validated (proves transformWithSubqueries traversal). |
| Rename column interaction | Rename routes to COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSIS (column-by-old-name disappeared), not to COLUMN_ID_MISMATCH. Documents the current behavior — see observation 3 for the v2 follow-up. |
| Sequential schema changes | Multiple alters between analyze and execute don't confuse the check. |
| Type change in standard catalog | With the standard InMemoryTableCatalog, type widening produces COLUMN_ID_MISMATCH (because IDs are re-minted on type change). Pairs with the type-preserving catalog test, which produces COLUMNS_MISMATCH instead. |
| Assignment verification | IDs are unique and monotonically increasing across schema changes. Existing column IDs preserved on adds/drops of unrelated columns. |
| Temp view behavior | Temp views resolve by name on each access — they don't capture IDs. Same-type drop+re-add tolerated by temp view; different-type caught by the view's own type check. Documents that temp views are outside the column-ID world and rely on the existing view consistency check. |
| Write operations | writeTo().append() does not apply column-ID validation to the source DataFrame (the source is just data flowing in, not a refreshed relation). insertInto does validate the source if it's a stale catalog DataFrame. Documents the read-vs-write asymmetry in V2TableRefreshUtil.refresh. |
| Null table ID connector | Uses NullTableIdInMemoryTableCatalog. Drop+recreate caught by column ID alone (table-ID check skipped). |
| Null column ID connector | Uses NullColumnIdInMemoryTableCatalog. Drop+re-add not detected (validation skipped). Stale DataFrame after column addition still works. Negative test: graceful no-op when IDs aren't supported. |
| Mixed null/non-null | Uses MixedColumnIdTableCatalog. Each of the four transition cells (null→null, null→id, id→null, id→id-different) is its own test. Only the last fires; others skip. |
The pattern in every test is the same: capture a DataFrame with spark.table(t), mutate the table via sql(ALTER TABLE …), then df.collect() and assert either success or checkError(condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.…"). This locks in the contract end-to-end at the analyzer/executor seam — not just the unit-level V2TableUtil.validateColumnIds function, but the full "captured relation, mutated catalog, refreshed plan, error surface" path.
DataSourceV2ExtSessionColumnIdSuite (new, 231 lines, 6 tests)
Cross-session tests. The trick at lines 79-95: clear active/default SparkSession, build a fresh one sharing the same SparkContext (so catalog configs carry over) but with its own SharedState and CacheManager. Same JVM, but functionally two independent sessions for relation-cache purposes. SharedInMemoryTableCatalog's static map gives them shared catalog state.
The six tests are the cross-session reduction of the in-suite cases:
- Sanity: external write visible via fresh query — proves the test harness works.
- Drop+re-add column: caught via column ID across sessions.
- Drop+recreate table: caught via column ID even though table ID is null on this catalog.
- External add column: not falsely flagged (negative test).
- Drop+re-add multiple columns: error message references all changed columns.
- External type widening: caught via column ID (this catalog mints new IDs on type change).
Point: the same-JVM transformWithSubqueries path is exercised in DataSourceV2DataFrameSuite. This suite proves the check still works when the staleness arises from a different session's CacheManager not knowing about the mutation, which is the realistic shape of the bug in production catalogs without table IDs.
What the testing reveals about the design
A few things stand out from the structure:
-
The test catalog matrix is the spec. The ID-transition table (
null→null,null→id,id→null,id→id-same,id→id-different) inV2TableUtil.scala:140-150 is not an abstract enumeration — every cell has a corresponding test. That's the right way to write this kind of opt-in feature: every "we skip in case X" comment in the production code is paired with a test that exercises X. -
Layered defense is treated as a first-class invariant. The existence of
TypeChangePreservesColIdTableCatalogexists only to prove that when the column-ID check passes by design, the existing schema check still fires for incompatible changes. That's a tight test of the layering, not just the new check in isolation. -
The cross-session suite is small but load-bearing. It's the only place that exercises the realistic shape of the bug (separate sessions, shared catalog) — if you removed only this suite, the validation would still pass its unit-level tests but would have no end-to-end evidence that it catches the production scenario it's meant to catch.
-
Nested coverage works through composition, not through Spark traversal. The "Complex types" section tests nested struct field drop+re-add by relying on
InMemoryBaseTable.assignMissingIdsminting a new top-level ID whenever the top-leveldataTypediffers. That's a coarse form of the composition pattern — using the entireDataTypeas a proxy for "anything in the subtree changed." A real Delta or Iceberg adapter would do finer composition (encoding actual nested field IDs into the top-level string) but the shape is the same: any nested change perturbs the top-level identity. There is no test for "connector has nested IDs but doesn't compose them" — and that's correct, because such a connector is misimplementing the API, not exercising a legitimate configuration. There's likewise no test for "connector has no nested IDs at all" detecting nested changes, because such a connector inherently can't (the catalog doesn't track the information; Spark can't surface what the connector doesn't observe). The test matrix correctly stops at the boundary of what each connector class can observably distinguish.
Substantive observations
1. The Javadoc on Column.id() is too thin for a contract this opinionated
Column.java:170-181 says "Returns the ID of this table column" and "Spark skips column identity validation for null column IDs" — generic. The PR description is explicit about three load-bearing things that the API surface elides:
- Top-level only. Both Delta and Iceberg run with IDs at every nesting level; a connector author reading just the Javadoc may assume the same. Nested struct field dropped + re-added with same name and same type is undetected by the naive top-level-ID adapter; the connector either bumps the top-level ID on any nested change, or folds a subtree encoding into the top-level string (see escape hatch above).
- Null is policy, not just capability. The current Javadoc frames null as "the connector does not support the notion of column ID." But null is also the policy opt-out — a connector that has IDs but wants lenient drop+re-add semantics (treat same-name same-type drop+re-add as the same column, e.g. matching the user's name-based mental model on a JDBC-style catalog) returns null and Spark accepts the read silently. These are two different reasons for the same return value, and a connector author should know both routes are legitimate.
- Per-column null is supported. The PR ships
MixedColumnIdTableCatalogto test it, but the contract doesn't authorize it. A connector that wants strict semantics on most columns and lenient on specific ones (or that's mid-rollout of ID support) has no obvious signal that this works.
Suggested Javadoc rewrite that names all three:
/**
* Returns the ID of this top-level column, or null. The ID is an opt-in signal that the
* connector tracks column identity beyond column name and type.
* <p>
* When a non-null ID is returned, the connector commits to the following contract:
* <ul>
* <li>The ID is stable across renames (logical name changes preserve the ID).</li>
* <li>The ID changes when a column is dropped and re-added, even with the same name
* and type.</li>
* <li>IDs are not reused within a table's history.</li>
* </ul>
* <p>
* When null is returned, Spark skips identity validation for that column. Connectors should
* return null when:
* <ul>
* <li>The catalog has no notion of column identity beyond name and type, OR</li>
* <li>The connector chooses to treat same-name drop+re-add as the same column (lenient
* semantics — the cached DataFrame will be permitted to read the new column's data).</li>
* </ul>
* Returning null is per-column: a connector may return IDs for some columns and null for
* others.
* <p>
* Nested struct fields, array elements, and map keys/values do not have separate IDs
* through this API. Connectors with nested ID information can encode it into the returned
* string — Spark only compares string equality, so the contents are at the connector's
* discretion.
*/
@Nullable
default String id() {
return null;
}This lifts everything currently buried in the PR description and the test catalog set into the contract, where connector implementors will actually see it.
2. The default test catalog should match Delta/Iceberg semantics, not invert them
InMemoryBaseTable.scala:818-835 (snippet):
case Some(oldCol) if oldCol.id() != null && oldCol.dataType() == newCol.dataType() =>
newCol.asInstanceOf[ColumnImpl].copy(id = oldCol.id())
case _ if newCol.id() == null =>
newCol.asInstanceOf[ColumnImpl].copy(id = nextColumnId().toString)The default in-memory catalog re-mints the ID on any type change. Real connectors do the opposite:
- Delta: ID preserved across allowed type widening; the column has a single identity through its evolution chain.
- Iceberg: type promotions (
int → long, etc.) preservefieldId.SchemaUpdateonly allocates a fresh ID viaassignNewColumnIdforadd, never forupdateColumn.
The InMemoryBaseTable default is the inverse of canonical connector behavior. The variant TypeChangePreservesColIdTableCatalog exists to model the canonical case as an opt-in deviation — that's backwards. The defaults should reflect what real connectors do, with deviations modeled as variants.
Recommendation: flip the default. Make InMemoryBaseTable.assignMissingIds preserve IDs across type changes (match by name only, not by name + dataType). The current TypeChangePreservesColIdTableCatalog becomes the standard behavior of InMemoryTableCatalog — delete that variant, no longer needed. Add a new variant TypeChangeRemintsColIdTableCatalog (or similar) for tests that specifically need the "new type → new ID" behavior. Update the existing tests accordingly:
- The "Type change in standard catalog" tests in
DataSourceV2DataFrameSuitecurrently expectCOLUMN_ID_MISMATCHon type widening. They should move to the new variant catalog. - The "Type widening in standard catalog triggers column ID mismatch" test similarly moves.
- The "type-preserving" tests (currently against
preserveidcat) become tests against the standardtestcat.
This matters more than it sounds. The InMemory catalog is what every other DSv2 test in Spark builds on. If a future test author models a new schema-change scenario after the existing InMemoryTableCatalog behavior, they'll bake in the wrong semantics — and downstream connector authors who study the test suite as a reference for "how does this work in practice" will see the wrong picture. Aligning the default with Delta/Iceberg makes the test suite a faithful spec.
3. Renames trigger the wrong error class
In Delta and Iceberg, a rename keeps the ID. With this PR's name-keyed lookup at V2TableUtil.scala:166-185, a rename means the captured column's name is missing in the current schema, so it falls into validateDataColumns → COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSIS. The user sees "column dropped" instead of "column renamed."
For the PR's stated goal (catch drop+re-add), this is acceptable. But once IDs are flowing, a follow-up could swap the lookup to id-keyed-with-name-fallback and emit a distinct COLUMN_RENAMED_AFTER_ANALYSIS. Discussed in the Future discussion section.
4. String return type is fine
The PR mirrors Table.version(), which is also String, so the API is internally consistent. Delta and Iceberg both use integer IDs internally and would have to stringify on every id() call, but the conversion is one Integer.toString per column at refresh time — vanishing cost.
More importantly, the String return type is what unlocks the composition escape hatch: a connector with nested ID information can encode an arbitrary subtree into a single string, which a long or int couldn't carry. What looked like a minor type choice turns out to be load-bearing for the API's expressiveness. Keep it as String.
No action needed.
5. equals/hashCode exclusion is correct, but the inline comment is evasive
Iceberg includes id in NestedField.equals (Types.java:996-998) because Iceberg's read path resolves columns by ID everywhere. Spark's resolution is name-based, and existing structural-equality call sites (e.g. CatalogSuite.assertColumnsEqual, see commit 7689a7f5145) pre-date IDs. Including id would have caused widespread spurious mismatches; excluding it is the right call.
The inline comment at ColumnImpl.scala:36-40 is fine in spirit but evasive in detail. It says:
"would cause spurious mismatches when the same logical column is constructed with different IDs"
If the contract on id() is that IDs are stable per logical column, "the same logical column constructed with different IDs" sounds contradictory with the contract. The justification under-specifies why this hypothetical is real.
The actual cause: Column → StructType → Column is a lossy round-trip. This PR keeps the ID only on Column, not on StructField.metadata. Any code path that goes through StructType (and there are many — Table.schema(), CatalogV2Implicits.asSchema, DataTypeUtils.toAttributes, every createTable/stageCreate overload that takes a StructType, CatalogV2Util.structTypeToV2Columns itself) drops the ID. Same logical column instantiated from a fresh round-trip has id=null; same logical column read directly from the catalog has the assigned ID. They compare as the same column structurally but diverge on id.
The PR's commit history makes this concrete:
b4900531858 Fix V2Filter alterTable column ID loss—InMemoryTableWithV2Filter.alterTablewas rebuilding columns via rawstructTypeToV2Columns, silently zeroing IDs on every ALTER.7689a7f5145 Fix assertColumnsEqual to strip IDs from both sides— even when both sides came from the catalog, paths existed where one had IDs and the other didn't.ccbeee24ad7 Fix remaining sameElements column comparisons in CatalogSuite,8fc7a740223 Fix last columns() comparison,f2216ed0eac Fix CatalogSuite.stripIds to preserve all column fields— a long tail of equality fixes provoked by ID divergence in routine comparisons.
The categories of "same logical column, different IDs" that show up in practice:
- Round-trip loss. Catalog returns
Column(id=5); some code callsv2ColumnsToStructTypeand thenstructTypeToV2Columnsto re-derive columns. Result:Column(id=null). Same name/type/nullable, different ID. - Test construction.
Column.create(name, type, ...)producesid=null. Compared against a catalog-loaded column that hasid="7". - Catalog rebuilds.
BasicInMemoryTableCatalog.alterTablerecomputes the column array viastructTypeToV2Columns(schema)and then re-runsassignMissingIdsto graft IDs back on. If the recompute happens out of sync with the assign step (as it did pre-b4900531858), the post-alter column has a fresh ID even though it's the same logical column. - Mixed-source comparisons. A
Columnfrom one catalog session compared with one from another, where the ID-allocation runs are independent.
In every case the ID divergence is an artifact of the storage model, not a violation of the contract — "same logical column, different witnesses of that column."
Suggested rewording of the inline comment. A more honest version names the cause:
"Excluded because IDs only live on
Column, not onStructField.metadata. Any round-trip throughStructTypeloses the ID, so structural equality must ignore it for compatibility with code paths that rebuild columns from aStructType. Column ID validation is performed separately byV2TableUtil.validateColumnIds."
That phrasing names the actual cause and makes the design tension explicit: the PR is pragmatic — it adds id() without touching StructField — and the cost of that pragmatism is paid here, in equals.
The deeper fix. If Spark stored the column ID in StructField.metadata under a well-known key (mirroring how parquet.field.id is propagated), round-trips would preserve IDs and equals could legitimately include the ID. That's a much bigger change and correctly out of scope for this PR. The same metadata-on-StructField mechanism is what would unlock the Spark-side helper for nested IDs suggested earlier — both threads are picked up in the Future discussion section.
Suggested PR comment to the author (concrete, points at the load-bearing constraint):
Could you make this comment concrete? My read is that IDs diverge because they live only on
Column, not onStructField.metadata, so anyColumn → StructType → Columnround-trip drops the ID. That's what made theassertColumnsEqualand V2FilteralterTablefixes necessary, right? If so, this is a load-bearing constraint and naming it explicitly helps future maintainers understand why excludingidfromequalsis the right call.
6. The error-conditions placement is awkward in alphabetical order
COLUMN_ID_MISMATCH lands before METADATA_COLUMNS_MISMATCH in the INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.subClass (error-conditions.json:2785-2796), which is alphabetically correct (COLUMN_ID_MISMATCH < COLUMNS_MISMATCH < METADATA_COLUMNS_MISMATCH) — good. Just confirming.
7. Method rename is contained
columnsChangedAfterAnalysis → columnsMissingOrAddedAfterAnalysis: only 3 in-PR references (grep confirmed), no external callers. Safe rename. Worth keeping the new name — it's more accurate now that there's a sibling columnIdMismatchAfterAnalysis.
Recommended order of asks before merging
- Rewrite
Column.id()Javadoc to explicitly cover (a) top-level only with the composition escape hatch for nested IDs, (b) null as both capability and policy opt-out, (c) per-column null is supported. Suggested text in observation Removed reference to incubation in README.md. #1. - Flip the InMemory default to preserve column IDs across type changes (match Delta/Iceberg). Move the existing "type change re-mints ID" behavior into a variant catalog. Re-anchor the affected tests. Suggested approach in observation Removed reference to incubation in Spark user docs. #2.
- Rewrite the
ColumnImpl.equalsexclusion comment to name the real cause —Column → StructType → Columnround-trips drop the ID because the ID isn't onStructField.metadata. The current "same logical column constructed with different IDs" wording obscures the load-bearing constraint, and the author's revision inb4b1683picks the secondary case (drop+re-add) over the primary one (round-trip loss). See observation SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 #5.
Resolved (no action needed):
Stringreturn type is correct — it's what makes subtree-ID composition possible. See observation SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID #4.equals/hashCodeexclusion is correct (the what, not the why — the inline comment still needs the rewrite in SPARK-1135: fix broken anchors in docs #3 above).
Future discussion (out of scope for this PR)
These are not blockers and shouldn't gate this PR. They're follow-ups worth thinking about — the rationale below covers what they'd buy and why this PR shouldn't try to land them.
A. Column ID in StructField.metadata under a Spark-side well-known key
The proposal. Define a Spark-side metadata key (e.g. __SPARK_COLUMN_ID__, namespacing pattern TBD) that lives in StructField.metadata. Make CatalogV2Util.v2ColumnsToStructType write the ID into this key, and structTypeToV2Columns read it back. The round-trip Column → StructType → Column becomes ID-preserving.
What it unlocks.
Column.equalscould includeidcleanly. Today the equality contract excludesid(observation SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 #5) only because round-trips lose it. With round-trip preservation, equality could includeidand the workarounds inassertColumnsEqual,sameElements-on-arrays, schema-diff helpers, and theb4900531858 V2Filter alterTablefix all become unnecessary. TwoColumninstances compare equal iff they're the same column in every sense, structural and identity. The PR's commit history (long tail of equality fixes) is exactly the cost of not having this — every code path that goes throughStructTypehad to be patched individually.- Spark-side helper for subtree-ID composition. Connectors with nested IDs (Delta has them on every
StructField; Iceberg has them at every nesting level including list element / map key/value) need to recurse intoStructType.fields,ArrayType.elementType,MapType.keyType/valueTypeto encode subtree IDs into the top-level string. That recursion is identical across connectors. With nested IDs annotated inStructField.metadata, Spark could shipV2TableUtil.encodeSubtreeId(field): Stringthat walks the metadata tree and produces the canonical encoded string. Connectors then annotate IDs at every nesting level and let Spark do the encoding consistently — no per-connector recursion to maintain or get wrong.
Why not in this PR.
- Blast radius.
StructField.metadatais read and written by every analyzer rule, every schema serializer (JSON, Avro, Parquet, Hive), every Catalyst rule that synthesizes aStructField. Auditing all of those is its own project. - Forward-compatibility design. The metadata key needs a stable name, a documented serialization (does it round-trip through Avro? Hive? Parquet writer/reader?), and a story for what happens when an older Spark reads schemas written by a newer Spark.
- This is genuinely a substantial design change: it's a new contract on
StructField, not a new method on a single interface.
Migration story (the load-bearing reason it's safe to defer).
The Column.id() API in this PR is forward-compatible with the metadata path. When StructField.metadata carries IDs:
- Existing connectors that adopted
Column.id()keep working unchanged —ColumnImplcan choose to populate the metadata key when itsidis non-null, or not. - New connectors get a choice: use
Column.id(), annotateStructField.metadata, or both. The two are complementary, not exclusive. - The
Column.equalspolicy can be flipped from "excludeid" to "includeid" once round-trips are lossless, with no API break —idreturning the same value on both sides means the equality outcome is the same.
So the right shape is: ship Column.id() now, address the metadata key separately. This PR doesn't paint into a corner.
Open design questions:
- Key naming:
__SPARK_COLUMN_ID__,spark.column.id, mirrorparquet.field.idexactly, or define a Spark-namespaced family of column metadata keys. - Should the metadata be visible to connectors (read-only or read-write)? Read-only is safer; read-write lets connectors round-trip their own IDs through Spark's schema API.
- Should there be two equality forms on
Column— structural-only and structural-plus-identity — for code that genuinely wants the "ignore identity" comparison (schema diff helpers, plan-equality up to identity)? Probably yes, withequalsbecoming the strict form.
The current PR works without this; addressing it later would clean up the model and enable nested coverage natively rather than via the connector-side composition pattern.
B. Distinguish rename from drop via ID-keyed lookup
The proposal. Change V2TableUtil.validateColumnIds from name-keyed lookup to id-keyed-with-name-fallback. The current code (V2TableUtil.scala:166-185) iterates captured columns, looks each one up in the current table by normalized name, and compares IDs. Reverse the indexing: build currentColsById = currentTable.columns.filter(_.id != null).map(c => c.id -> c).toMap, then for each captured column with a non-null ID:
- ID found → check name. Same name + same ID → no error. Different name + same ID →
COLUMN_RENAMED_AFTER_ANALYSIS(oldName, newName). - ID not found → captured column was dropped. Falls through to existing
COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSISpath.
What it unlocks.
A specific error class for renames. Today:
- User runs
ALTER TABLE foo RENAME COLUMN salary TO compensation. - Captured DataFrame has
salary. Refresh logic looks upsalaryby name in the current table. Not found. - Falls through to schema validation, which sees
salaryremoved andcompensationadded. - Error:
COLUMNS_MISSING_OR_ADDED_AFTER_ANALYSISwith message"salarycolumn was removed;compensationcolumn was added".
After the change:
- ID-keyed lookup finds
salary's ID still present in the current table, mapped to a column namedcompensation. - Error:
COLUMN_RENAMED_AFTER_ANALYSISwith message"salarywas renamed tocompensation".
The query still fails, which is correct — a stale DataFrame against a renamed column is still an analyze/execute mismatch. Only the diagnostic improves.
Why not in this PR.
- The PR's stated scope is drop+re-add detection, not schema-evolution diagnostics. Renames are a different problem (different fix, different error class, different test surface).
- Adds a new error class and rewrites tests. Modest churn but not zero.
- Renames are rare relative to drops in production usage. The current behavior already correctly fails the query; the message just isn't optimal.
Open design questions:
- Rename + type change in one alter. Iceberg supports
updateColumnthat combines rename and type widening; the error message could distinguish "renamed only" vs. "renamed and widened to T." Probably best as sub-error classes ofCOLUMN_RENAMED_AFTER_ANALYSIS. - Multi-column rename swap.
ALTER TABLE foo RENAME COLUMN a TO tmp; RENAME COLUMN b TO a; RENAME COLUMN tmp TO b;swapsaandb. ID-keyed lookup handles it correctly (each captured column finds its ID at the new name), but the error message design needs thought — collect all renames and report together, vs. report first one. - Per-column vs. table-level error. Current
COLUMNS_MISSING_OR_ADDEDis collected (one error with a list of changes).COLUMN_RENAMED_AFTER_ANALYSISshould probably do the same. - Interaction with name-keyed lookup for backward compat. If a captured column has a null ID, name-keyed lookup is the only option — no rename detection possible. The fallback path needs to be preserved.
The current behavior is correct in outcome, suboptimal in messaging — this is UX polish, not correctness, so it's a fine candidate to defer.
Why these are deferred (the meta-rationale)
Both items expand the scope of "what Spark validates between analyze and execute" beyond the PR's stated goal of drop+re-add detection. They share a common shape: take the foundation this PR lays (the Column.id() contract, the validateColumnIds checkpoint in V2TableRefreshUtil) and build richer diagnostics or cleaner abstractions on top.
Landing them here would:
- Enlarge a focused 17-LOC core change to span schema-wide infrastructure work (item A) or new error-class design (item B).
- Couple the PR's review surface to design decisions that should get their own attention (metadata-key naming, rename diagnostics).
- Risk losing the PR to bikeshedding over the follow-ups when the core mechanism is clean and ready to ship.
The right shape: land this PR with the asks above, and let the follow-ups proceed at their own pace with their own reviews.
Bottom line
The core design is sound. The contract this PR demands of connectors — stable across renames, fresh on drop+re-add, never reused within a table's history — is exactly the invariant both Delta (maxColumnId) and Iceberg (last-column-id) already enforce table-wide, so the fundamental work is already done in those backends. The top-level-only restriction is a reasonable v1: it covers the cache-invalidation problem the PR motivates without growing a new nested-ID surface in DSv2, and the String return type lets connectors with a richer ID model encode subtree information at their discretion. The observable gap (nested same-name same-type drop+re-add) is closable by Iceberg and Delta adapters today by encoding nested IDs into the top-level string, with no further Spark API change required.
Source references
Delta (delta-io/delta, master)
spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala— constants (54-73),getColumnId/hasColumnId(217-221), nested IDs comment (822-849), parquet ID emission (265-278), modes (944-967), assignment loop (436-483),isRenameColumnOperation(709-728),collectDroppedColumns(686-700)spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala—COLUMN_MAPPING_MAX_ID(749-756)spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala— initialization check (227-236), mid-stream non-additive change detection (603-621)
Iceberg (apache/iceberg, main)
api/src/main/java/org/apache/iceberg/types/Types.java—NestedField(855-931),equals/hashCode(996-998), list/map ID requirements (1148-1310)api/src/main/java/org/apache/iceberg/Schema.java—highestFieldId(72-160)core/src/main/java/org/apache/iceberg/SchemaUpdate.java— add (166-186), delete (190-202), rename (205-219),assignNewColumnId(478)core/src/main/java/org/apache/iceberg/TableMetadata.java—lastColumnIdmonotonicity check (1598-1621)core/src/main/java/org/apache/iceberg/TableMetadataParser.java—last-column-idJSON (93, 175, 357)parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java— id emission (107, 113, 146, 162)parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java— id-based resolution (199-211)spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java— schema exposure (127, 216-218)spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java—expectedSchemacapture (117-138)spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java—fieldMetadataadapter point (60-92, 159-164)
Addendum: nested-field test coverageThe original review touched on nested-field gaps in the design discussion but didn't audit the test suite specifically through that lens. Re-reading the suite with "what nested schema changes does this actually exercise, and which slip through" as the only question, here's the picture. What the suite covers for nested fieldsThere are three groups of nested-related tests in Group 1 — top-level drop+re-add of complex columns, against test("drop+re-add array column rejects stale DataFrame")
test("drop+re-add map column rejects stale DataFrame")
test("drop+re-add nested struct field rejects stale DataFrame") // person.age, against testcatThe third test is the only one that touches a nested field per se. It works because the InMemory catalog mints a new top-level ID whenever the parent column's
Captured DataFrame holds Group 2 — adding fields inside container types, against test("add field to array element struct rejects stale DataFrame") // items.element.price
test("add field to map value struct rejects stale DataFrame") // props.value.labelSame mechanism: the parent column's Group 3 — additive changes against an ID-preserving connector, against test("same column ID but expanded struct type: read tolerates nested field addition")This is the only test where a nested change happens without the top-level ID being re-minted. The test asserts the read succeeds because:
This proves the layered defense works correctly when nested changes are additive and the read is tolerant. What the suite does NOT cover for nested fieldsTwo categories of nested behavior are absent. The first is the load-bearing gap; the second is a set of smaller callouts. 1. No test catalog demonstrates fine-grained subtree-ID composition. The InMemory catalog uses coarse composition — any subtree change perturbs the top-level A real Delta or Iceberg adapter following the escape hatch recommendation would correctly catch nested drop+re-add — because the inner field's ID changes ( The missing test catalog and tests: class ComposedColumnIdTableCatalog extends InMemoryTableCatalog {
// Assigns IDs at every nesting level (struct fields, list element,
// map key/value) and encodes the full subtree into each top-level
// Column.id() string. Models what a Delta or Iceberg adapter would
// do per the recommended adoption pattern.
}
test("composed nested IDs detect drop+re-add of nested field") {
val t = "composedcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, person STRUCT<name: STRING, age: INT>) USING foo")
val df = spark.table(t)
sql(s"ALTER TABLE $t DROP COLUMN person.age")
sql(s"ALTER TABLE $t ADD COLUMN person.age INT")
// Captured: person.id encodes [parent, name, age=10]
// Current: person.id encodes [parent, name, age=11] (inner ID re-minted)
// Strings differ → COLUMN_ID_MISMATCH fires.
checkError(
exception = intercept[AnalysisException] { df.collect() },
condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
...)
}
}
test("composed nested IDs detect rename within struct")
test("composed nested IDs tolerate same data inserted into nested column")
// etc.What this set of tests would prove:
This is purely a test-side addition — a new test catalog and a handful of tests. No changes to 2. Smaller missing nested cells. Independent of the composition gap, a few specific cells are uncovered:
What this means for the PRThe current suite proves the mechanism works for catalogs that use coarse subtree-change detection (re-mint top-level ID when This isn't a correctness gap — If that test catalog isn't added in this PR, the design doc should flag the boundary so connector authors looking at this suite as a reference for "how does this work for my Delta/Iceberg-shaped catalog" don't assume incidental InMemory behavior is the contract. |
Summary of the second pass
New observation —
|
| @@ -38,6 +38,11 @@ case class ColumnImpl( | |||
There was a problem hiding this comment.
Discussed with @andreaschat-db and changing back to
// [[id]] is excluded from [[equals]] and [[hashCode]] because it is an auxiliary tracking
// identifier, not part of the column's structural identity. Including it would break existing
// equality comparisons that rely on schema properties (name, type, nullability, etc.) and
// would cause spurious mismatches when the same logical column is constructed with different IDs.
// A same logical column can be constructed with different IDs: when a column is dropped and
// re-added with the same name, the original column data is already gone, so some connectors may
// treat this as a different column by assigning a new ID.
juliuszsompolski
left a comment
There was a problem hiding this comment.
Thank you! LGTM. Only one more nit to restore one test.
Reorder coverage no longer exercises the validation flow end-to-end
The reorder test went through three iterations:
- Commit
c27dda8ad3eadded "composed nested IDs tolerate nested field reorder" — created a table, inserted a row, captured a DataFrame, ranALTER COLUMN ... FIRST, thencheckAnswer(df, Seq(Row(1, Row("Alice", 30)))). Thedf.collect()insidecheckAnswerrunsV2TableRefreshUtiland both ID and schema validators; the assertion proves the read returns the original row through the reordered struct. - Commit
7ad3659d6b6deleted that test as "trivial". - Commit
a6377a58018added "composed nested IDs: reorder preserves composed column ID" — creates a table (no data), capturescat.loadTable(ident)columns before, runs the alter, captures columns after, and assertstypeBefore != typeAfterandidBefore == idAfter.
The current test asserts the catalog's encoding invariant (position-based composed string is unchanged across reorder). What is no longer exercised:
- The validation flow under reorder. There is no test that captures a DataFrame, reorders, then reads. A regression where
SchemaUtils.validateSchemaCompatibilitybecomes position-sensitive onStructType, or where ID validation fires on a reorder, would not be caught by the suite. - Read-path tolerance under reorder against
composedidcat. The dropped test asserted that the captured row reads correctly through the reordered struct.
Both Delta and Iceberg support reorder as a non-breaking schema operation, so "captured DataFrame survives a reorder" is a real user scenario.
Suggested addition — restore the end-to-end test alongside the existing catalog-state one. The two are complementary: the catalog-state test pins the encoding invariant, the end-to-end test pins the user-visible outcome.
test("composed nested IDs tolerate nested field reorder end-to-end") {
val t = "composedidcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, person STRUCT<name: STRING, age: INT>) USING foo")
sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))")
val df = spark.table(t)
sql(s"ALTER TABLE $t ALTER COLUMN person.age FIRST")
checkAnswer(df, Seq(Row(1, Row("Alice", 30))))
}
}|
Hey @aokolnychyi, could you please review and help us merge this? Thanks! |
|
@gengliangwang, can you check this one? |
gengliangwang
left a comment
There was a problem hiding this comment.
Re-review summary: prior ComposedColumnIdTableCatalog issues (name-keyed nested paths surfacing renames as COLUMN_ID_MISMATCH; double-encoding in alterTable) are resolved by the position-keyed paths + separate rootIds map, and the previously-uncovered nested cells (nested reorder, map-key struct drop+re-add, nested type change against an ID-preserving catalog) now have tests. One small comment-completeness nit inline.
…composed ID tests - Rewrite Column.id() Javadoc: document top-level only, null as capability and policy opt-out, per-column null, nested composition escape hatch - Rewrite ColumnImpl.equals comment: name the real cause (Column to StructType round-trip drops the ID) - Flip InMemory default: assignMissingIds now preserves IDs across type changes, matching Delta/Iceberg behavior - Delete TypeChangePreservesColIdTableCatalog (no longer needed) - Add TypeChangeResetsColIdTableCatalog (inverse: fresh IDs on type change) - Add ComposedColumnIdTableCatalog: encodes nested field IDs into top-level Column.id() string, demonstrating the recommended pattern for connectors with nested IDs - Add tests: composed IDs for struct/array/map nested drop+re-add, depth-3 nesting, nested type change, nested rename, parent/sibling ID preservation on nested drop/add - Migrate existing tests to match flipped default Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
…test names - Remove all Delta/Iceberg references from test comments and Scaladocs - Rewrite ComposedColumnIdTableCatalog with clear variable names and concrete examples in every Scaladoc - Rename oldByName to oldColsByName, c to oldCol in TypeChangeResets - Replace "schema validation" with "data columns validation" in ext suite - Rename test names to clarify standard (non-composed) vs composed catalog - Delete redundant drop+re-add same name test (already covered) - Remove word "mint" from all comments Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
… preservation Fix alterTableWithData to physically strip dropped column values from rows so they do not survive through ALTER chains (e.g. DROP COLUMN then ADD COLUMN with the same name). Previously, InMemoryBaseTable preserved old data by name matching, which was unrealistic. Add a new test demonstrating that temp views with stored plans are not affected by column ID changes: after dropping and re-adding a column, the salary < 999 filter returns empty (NULLs) while the IS NULL filter matches all rows. Co-authored-by: Isaac
…e-encoding Position-based ordinal keys (Seq[Int]) replace name-based keys (Seq[String]) so that rename preserves nested IDs (matching Delta/Iceberg semantics). Separate rootIds map prevents double-encoding on successive alterTable calls. Add tests for nested field reorder tolerance and map key struct drop+re-add. Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
…LOW_NEW_FIELDS tests Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
…d, add nested field tests Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
…umnIds Co-authored-by: Isaac
26a3a26 to
fac61fb
Compare
| class SharedInMemoryTableCatalog extends NullTableIdInMemoryTableCatalog { | ||
| override protected val tables: util.Map[Identifier, Table] = | ||
| SharedInMemoryTableCatalog.sharedTables | ||
| tables = SharedInMemoryTableCatalog.sharedTables |
There was a problem hiding this comment.
Why is this needed?
There was a problem hiding this comment.
@andreaschat-db After the txn PR is merged, "override protected val", "override protected var" don't work anymore, given that it's a compile matter in the testing infra and the functionalities remain the same, I haven't looked closely into it
There was a problem hiding this comment.
From Claude:
The issue is that BasicInMemoryTableCatalog declares tables as a protected var. In Scala, you can't override a var with either val or var. The only way to point tables at
the shared instance is to assign it in the constructor body. The override protected val that was there before only worked because tables used to be a val — the txn PR
changed it to a var, which broke the override.
So tables = SharedInMemoryTableCatalog.sharedTables is just a constructor-body assignment to the inherited mutable field, replacing the default ConcurrentHashMap with the
shared one. Same effect as the old override val, just the only way Scala allows it for a var.
fac61fb to
6c8712c
Compare
…rough alters Add validateColumnIds to validateLoadedTableInTransaction to detect columns that were dropped and re-added with the same name but a different column ID during a transaction. Also update the call site from SPARK-55855 to use the renamed error method columnsMissingOrAddedAfterAnalysis. Fix InMemoryRowLevelOperationTableCatalog.alterTable to preserve column IDs using assignMissingIds (matching InMemoryTableCatalog behavior). Fix TxnTable to propagate the delegate's column IDs instead of regenerating them from StructType. Add a self-merge test verifying that a drop-and-re-add of a column fails with COLUMN_ID_MISMATCH in the transaction path. Co-authored-by: Isaac
6c8712c to
e98dead
Compare
|
Thanks, merging to master |
Co-authored-by: Isaac
What changes were proposed in this pull request?
This PR adds column ID support to the DSv2 Column interface so that Spark can detect when a column has been
dropped and re-added with the same name during table refresh.
Connectors that support column IDs (e.g., Delta, Iceberg) can now return a unique identifier via
Column.id(). Spark validates these IDs at refresh time and fails with a clear error if a column has been replaced.Column IDs are assigned at the top-level column granularity. Nested struct fields, array elements,
and map keys/values do not have separate IDs through this API.
A top-level-only ID will not detect same-name same-type drop+re-add of a nested field. This
matches the current behavior of connectors like Delta and Iceberg, where the top-level column ID
is unchanged when a nested field is dropped and re-added.
Connectors that want to detect nested drop+re-add can do so by encoding their nested field IDs
into the top-level
Column.id()string. For example, a columnperson STRUCT<name, age>withroot ID 5 and nested IDs 10, 11 could return
"5[10,11]"— ifageis dropped and re-addedwith new nested ID 12, the string becomes
"5[10,12]", and Spark detects the mismatch.Changes to nested fields that alter the parent's data type (e.g., dropping
person.agechangesthe struct from
STRUCT<name, age>toSTRUCT<name>) are caught by schema validation regardlessof ID support.
The default
InMemoryTableCatalogpreserves column IDs across type changes, matching the behaviorof real connectors like Delta and Iceberg. A new
ComposedColumnIdTableCatalogdemonstrates therecommended adoption pattern for connectors with nested IDs: encoding the full subtree into the
top-level
Column.id()string so that any nested change is detected.Why are the changes needed?
Spark's existing refresh validation only checks column names and types, so it cannot detect when a column has been replaced with a same-named column. This PR exposes column IDs through the DSv2 API so Spark can catch these scenarios at refresh time.
Does this PR introduce any user-facing change?
No. Column IDs are a new opt-in API for DSv2 connectors. Existing connectors are unaffected because
Column.id()defaults tonull, and null IDs are skipped during validation. Only connectors that explicitly implementColumn.id()will see the newCOLUMN_ID_MISMATCHerror when a column is dropped and re-added between analysis and execution.How was this patch tested?
New tests added in
DataSourceV2DataFrameSuiteandDataSourceV2ExtSessionColumnIdSuite.Testing concept
Real connectors (Delta, Iceberg, etc.) differ in how much identity tracking they support: some assign unique IDs to every column, some only to certain columns, some support table-level IDs but not column IDs, and some support none at all. The test infrastructure needs to cover this full connector support matrix so we can verify that column ID validation works correctly when the connector opts in, and is safely skipped when it does not.
To achieve this, we introduce custom test catalogs that each simulate a different real-world connector behavior. They all extend
InMemoryTableCatalogand share the sameInMemoryBaseTablebase which auto-assigns unique column IDs via a global counter. Each catalog then selectively overrides or strips certain identity fields to model a specific connector scenario.Test catalog hierarchy
What is tested
testcat(InMemoryTableCatalog)nullidcat(NullTableIdInMemoryTableCatalog)nullcolidcat(NullColumnIdInMemoryTableCatalog)mixedcolidcat(MixedColumnIdTableCatalog)resetidcat(TypeChangeResetsColIdTableCatalog)composedidcat(ComposedColumnIdTableCatalog)sharedcat(SharedInMemoryTableCatalog)Was this patch authored or co-authored using generative AI tooling?
Yes