diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4ba3d8250d35..31c42fb5a8eb 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1369,7 +1369,20 @@ acceptedBreaks: old: "class org.apache.iceberg.encryption.EncryptingFileIO" new: "class org.apache.iceberg.encryption.EncryptingFileIO" justification: "New method for Manifest List reading" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.UpdateSchema org.apache.iceberg.UpdateSchema::undeleteColumn(java.lang.String,\ + \ boolean)" + justification: "Adds an expected new spark function for undeleting a column.\ + \ This is backwards-compatible." org.apache.iceberg:iceberg-core: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.avro.SupportsIndexProjection" + new: "class org.apache.iceberg.avro.SupportsIndexProjection" + justification: "Serialization across versions is not guaranteed" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.hadoop.SerializableConfiguration" + new: "class org.apache.iceberg.hadoop.SerializableConfiguration" + justification: "Serialization across versions is not guaranteed" - code: "java.class.noLongerInheritsFromClass" old: "class org.apache.iceberg.rest.auth.OAuth2Manager" new: "class org.apache.iceberg.rest.auth.OAuth2Manager" diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java index cbcaa0ee2365..90165bf389d7 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java +++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java @@ -564,6 +564,25 @@ default UpdateSchema updateColumnDefault(String name, Literal newDefault) { */ UpdateSchema deleteColumn(String name); + /** + * Restore a previously deleted column from the schema history. + * + *

The name is used to search for the column in historical schemas. The column is restored with + * its original field ID and nullability, preserving data file compatibility. + * + *

If the original column was required, this method verifies that no data has been written to + * the table since the column was deleted. If data was written, the undelete fails because the + * restored required column cannot satisfy its non-null constraint for rows added after deletion. + * Pass {@code setNullable=true} to bypass this check and restore the column as optional instead. + * + * @param name name of the column to restore (supports dot notation for nested fields) + * @param setNullable if true, restore as optional even if the original was required + * @return this for method chaining + * @throws IllegalArgumentException if name already exists, was never deleted, parent struct does + * not exist, or the column was originally required and data was written after deletion + */ + UpdateSchema undeleteColumn(String name, boolean setNullable); + /** * Move a column from its current position to the start of the schema or its parent struct. * diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index e42df2fe5ed3..35269898f29c 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.Collection; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -121,17 +122,7 @@ private void internalAddColumn( if (parent != null) { Types.NestedField parentField = findField(parent); Preconditions.checkArgument(parentField != null, "Cannot find parent struct: %s", parent); - Type parentType = parentField.type(); - if (parentType.isNestedType()) { - Type.NestedType nested = parentType.asNestedType(); - if (nested.isMapType()) { - // fields are added to the map value type - parentField = nested.asMapType().fields().get(1); - } else if (nested.isListType()) { - // fields are added to the element type - parentField = nested.asListType().fields().get(0); - } - } + parentField = getNestedParentField(parentField); Preconditions.checkArgument( parentField.type().isNestedType() && parentField.type().asNestedType().isStructType(), "Cannot add to non-struct column: %s: %s", @@ -200,6 +191,140 @@ public UpdateSchema deleteColumn(String name) { return this; } + @Override + public UpdateSchema undeleteColumn(String name, boolean setNullable) { + Types.NestedField existingField = findField(name); + Preconditions.checkArgument( + existingField == null, + "Cannot undelete column '%s': a column with this name already exists in the current schema", + name); + + Preconditions.checkArgument( + base != null, + "Cannot undelete column: table metadata is required to access historical schemas"); + + DeletedColumnInfo deletedInfo = findDeletedColumn(name); + Preconditions.checkArgument( + deletedInfo != null, + "Cannot undelete column '%s': column not found in any historical schema", + name); + + int parentId = deletedInfo.parentId(); + Types.NestedField originalField = deletedInfo.field(); + + Types.NestedField field; + if (setNullable || originalField.isOptional()) { + field = originalField.asOptional(); + } else { + Preconditions.checkArgument( + !dataWrittenSinceDeletion(originalField.fieldId()), + "Cannot undelete required column '%s': data was written after the column was deleted. " + + "Pass setNullable=true to restore the column as optional.", + name); + field = originalField; + } + + if (parentId != TABLE_ROOT_ID) { + idToParent.put(field.fieldId(), parentId); + } + + updates.put(field.fieldId(), field); + parentToAddedIds.put(parentId, field.fieldId()); + addedNameToId.put(name, field.fieldId()); + + return this; + } + + /** + * Returns true if any snapshot after the column's last presence added data files. Walks snapshots + * newest-first: once we hit a snapshot whose schema contains the field, no later snapshot wrote + * data without it. + */ + private boolean dataWrittenSinceDeletion(int fieldId) { + List snapshots = Lists.newArrayList(base.snapshots()); + snapshots.sort(Comparator.comparingLong(Snapshot::sequenceNumber).reversed()); + Map schemasById = base.schemasById(); + + for (Snapshot snapshot : snapshots) { + Integer schemaId = snapshot.schemaId(); + if (schemaId != null && schemasById.get(schemaId).findField(fieldId) != null) { + return false; + } + Map summary = snapshot.summary(); + String added = summary != null ? summary.get(SnapshotSummary.ADDED_FILES_PROP) : null; + if (added != null && !"0".equals(added)) { + return true; + } + } + return false; + } + + private record DeletedColumnInfo(int parentId, Types.NestedField field) {} + + /** Find the first instance of the deleted column, from most recent to oldest. */ + private DeletedColumnInfo findDeletedColumn(String name) { + List schemas = base.schemas(); + + String[] parts = name.split("\\."); + String parentPath = + parts.length > 1 + ? String.join(".", java.util.Arrays.copyOf(parts, parts.length - 1)) + : null; + + if (parentPath != null) { + Types.NestedField currentParent = findField(parentPath); + Preconditions.checkArgument( + currentParent != null, + "Cannot undelete nested column '%s': parent struct '%s' does not exist in current schema. " + + "Undelete the parent first.", + name, + parentPath); + } + + for (int i = schemas.size() - 1; i >= 0; i--) { + Schema historicalSchema = schemas.get(i); + + Types.NestedField field = + caseSensitive + ? historicalSchema.findField(name) + : historicalSchema.caseInsensitiveFindField(name); + + if (field != null) { + int parentId; + if (parentPath != null) { + Types.NestedField parentField = + caseSensitive + ? historicalSchema.findField(parentPath) + : historicalSchema.caseInsensitiveFindField(parentPath); + + parentField = getNestedParentField(parentField); + parentId = parentField.fieldId(); + } else { + parentId = TABLE_ROOT_ID; + } + + return new DeletedColumnInfo(parentId, field); + } + } + + return null; + } + + // Properly pull out the type of the parent field from a map or array + private Types.NestedField getNestedParentField(Types.NestedField parentField) { + Types.NestedField nestedParentField = parentField; + Type parentType = parentField.type(); + if (parentType.isNestedType()) { + Type.NestedType nested = parentType.asNestedType(); + if (nested.isMapType()) { + nestedParentField = nested.asMapType().fields().get(1); + } else if (nested.isListType()) { + nestedParentField = nested.asListType().fields().get(0); + } + } + return nestedParentField; + } + @Override public UpdateSchema renameColumn(String name, String newName) { Types.NestedField field = findField(name); @@ -561,7 +686,6 @@ private static Schema applyChanges( } } - // apply schema changes Types.StructType struct = TypeUtil.visit(schema, new ApplyChanges(deletes, updates, parentToAddedIds, moves)) .asNestedType() diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestSchemaUndelete.java b/core/src/test/java/org/apache/iceberg/hadoop/TestSchemaUndelete.java new file mode 100644 index 000000000000..af431758e065 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestSchemaUndelete.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestSchemaUndelete extends HadoopTableTestBase { + + @Test + public void testUndeleteTopLevelColumn() { + // Add a column, then delete it, then undelete it + table.updateSchema().addColumn("count", Types.LongType.get(), "a count column").commit(); + + int originalFieldId = table.schema().findField("count").fieldId(); + assertThat(table.schema().findField("count")).isNotNull(); + + // Delete the column + table.updateSchema().deleteColumn("count").commit(); + assertThat(table.schema().findField("count")).isNull(); + + // Undelete the column + table.updateSchema().undeleteColumn("count", false).commit(); + + Types.NestedField restoredField = table.schema().findField("count"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(originalFieldId); + assertThat(restoredField.type()).isEqualTo(Types.LongType.get()); + assertThat(restoredField.doc()).isEqualTo("a count column"); + } + + @Test + public void testUndeleteNestedField() { + // Add a struct with nested fields + table + .updateSchema() + .addColumn( + "location", + Types.StructType.of( + Types.NestedField.optional(100, "lat", Types.DoubleType.get()), + Types.NestedField.optional(101, "long", Types.DoubleType.get()))) + .commit(); + + int latFieldId = table.schema().findField("location.lat").fieldId(); + assertThat(table.schema().findField("location.lat")).isNotNull(); + + // Delete the nested field + table.updateSchema().deleteColumn("location.lat").commit(); + assertThat(table.schema().findField("location.lat")).isNull(); + + // Undelete the nested field + table.updateSchema().undeleteColumn("location.lat", false).commit(); + + Types.NestedField restoredField = table.schema().findField("location.lat"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(latFieldId); + assertThat(restoredField.type()).isEqualTo(Types.DoubleType.get()); + } + + @Test + public void testUndeleteColumnAlreadyExists() { + // Try to undelete a column that already exists (id is part of SCHEMA) + assertThatThrownBy(() -> table.updateSchema().undeleteColumn("id", false).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("already exists in the current schema"); + } + + @Test + public void testUndeleteColumnNotFound() { + // Try to undelete a column that was never in the schema + assertThatThrownBy( + () -> table.updateSchema().undeleteColumn("nonexistent_column", false).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not found in any historical schema"); + } + + @Test + public void testUndeletePreservesFieldId() { + // This test explicitly verifies that the field ID is preserved (not a new ID) + table.updateSchema().addColumn("temp_col", Types.StringType.get()).commit(); + + int originalId = table.schema().findField("temp_col").fieldId(); + + // Add another column to increment the lastColumnId + table.updateSchema().addColumn("another_col", Types.IntegerType.get()).commit(); + int lastIdAfterAdd = table.schema().findField("another_col").fieldId(); + + // Delete temp_col + table.updateSchema().deleteColumn("temp_col").commit(); + + // Undelete temp_col + table.updateSchema().undeleteColumn("temp_col", false).commit(); + + Types.NestedField restored = table.schema().findField("temp_col"); + assertThat(restored.fieldId()) + .as("Restored field should have original ID, not a new one") + .isEqualTo(originalId); + assertThat(restored.fieldId()) + .as("Restored field ID should be less than the last assigned ID") + .isLessThan(lastIdAfterAdd); + } + + @Test + public void testUndeleteNestedFieldParentMissing() { + // Add a struct, delete the whole struct, then try to undelete a nested field + table + .updateSchema() + .addColumn( + "prefs", + Types.StructType.of( + Types.NestedField.optional(200, "setting1", Types.BooleanType.get()), + Types.NestedField.optional(201, "setting2", Types.BooleanType.get()))) + .commit(); + + // Delete the entire parent struct + table.updateSchema().deleteColumn("prefs").commit(); + assertThat(table.schema().findField("prefs")).isNull(); + + // Try to undelete nested field when parent doesn't exist + assertThatThrownBy(() -> table.updateSchema().undeleteColumn("prefs.setting1", false).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("parent struct") + .hasMessageContaining("does not exist") + .hasMessageContaining("Undelete the parent first"); + } + + @Test + public void testUndeleteParentThenNestedField() { + // Add a struct, delete the whole struct, then undelete parent, then undelete nested field + table + .updateSchema() + .addColumn( + "config", + Types.StructType.of( + Types.NestedField.optional(300, "enabled", Types.BooleanType.get()), + Types.NestedField.optional(301, "value", Types.StringType.get()))) + .commit(); + + int enabledId = table.schema().findField("config.enabled").fieldId(); + int configId = table.schema().findField("config").fieldId(); + + // Delete both nested fields to empty the struct, then delete the struct + table.updateSchema().deleteColumn("config.enabled").deleteColumn("config.value").commit(); + table.updateSchema().deleteColumn("config").commit(); + + assertThat(table.schema().findField("config")).isNull(); + + // Undelete the parent struct first + table.updateSchema().undeleteColumn("config", false).commit(); + + Types.NestedField restoredConfig = table.schema().findField("config"); + assertThat(restoredConfig).isNotNull(); + assertThat(restoredConfig.fieldId()).isEqualTo(configId); + + // Now undelete the nested field + table.updateSchema().undeleteColumn("config.enabled", false).commit(); + + Types.NestedField restoredEnabled = table.schema().findField("config.enabled"); + assertThat(restoredEnabled).isNotNull(); + assertThat(restoredEnabled.fieldId()).isEqualTo(enabledId); + } + + @Test + public void testUndeleteRequiredColumnPreservedWhenNoData() { + // Add a required column, delete it, then undelete with setNullable=false. + // No data has been written, so the column should be restored as required. + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("required_col", Types.StringType.get()) + .commit(); + + Types.NestedField originalField = table.schema().findField("required_col"); + assertThat(originalField.isRequired()).isTrue(); + int originalFieldId = originalField.fieldId(); + + table.updateSchema().deleteColumn("required_col").commit(); + assertThat(table.schema().findField("required_col")).isNull(); + + table.updateSchema().undeleteColumn("required_col", false).commit(); + + Types.NestedField restoredField = table.schema().findField("required_col"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(originalFieldId); + assertThat(restoredField.isRequired()) + .as("Required column must stay required when no data was written since deletion") + .isTrue(); + } + + @Test + public void testUndeleteRequiredColumnPreservedAcrossAppends() { + // Data appended *while the column is present* is fine. After deletion, no more data is + // written, so undelete should succeed as required. + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("required_col", Types.StringType.get()) + .commit(); + int originalFieldId = table.schema().findField("required_col").fieldId(); + + // Append data while the column exists — not a blocker. + table.newFastAppend().appendFile(FILE_A).commit(); + + table.updateSchema().deleteColumn("required_col").commit(); + assertThat(table.schema().findField("required_col")).isNull(); + + table.updateSchema().undeleteColumn("required_col", false).commit(); + + Types.NestedField restoredField = table.schema().findField("required_col"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(originalFieldId); + assertThat(restoredField.isRequired()).isTrue(); + } + + @Test + public void testUndeleteRequiredColumnFailsWhenDataWritten() { + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("required_col", Types.StringType.get()) + .commit(); + + table.newFastAppend().appendFile(FILE_A).commit(); + table.updateSchema().deleteColumn("required_col").commit(); + // Data written while the column is absent — blocks the undelete. + table.newFastAppend().appendFile(FILE_B).commit(); + + assertThatThrownBy(() -> table.updateSchema().undeleteColumn("required_col", false).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("data was written after the column was deleted") + .hasMessageContaining("setNullable=true"); + } + + @Test + public void testUndeleteRequiredColumnAsNullableWithDataWritten() { + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("required_col", Types.StringType.get()) + .commit(); + int originalFieldId = table.schema().findField("required_col").fieldId(); + + table.newFastAppend().appendFile(FILE_A).commit(); + table.updateSchema().deleteColumn("required_col").commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + + // setNullable=true bypasses the safety check and restores the column as optional. + table.updateSchema().undeleteColumn("required_col", true).commit(); + + Types.NestedField restoredField = table.schema().findField("required_col"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(originalFieldId); + assertThat(restoredField.isOptional()).isTrue(); + } + + @Test + public void testUndeleteOptionalColumnWithDataWritten() { + // Optional columns skip the data-since-deletion check entirely. + table.updateSchema().addColumn("opt_col", Types.StringType.get()).commit(); + int originalFieldId = table.schema().findField("opt_col").fieldId(); + + table.newFastAppend().appendFile(FILE_A).commit(); + table.updateSchema().deleteColumn("opt_col").commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + + table.updateSchema().undeleteColumn("opt_col", false).commit(); + + Types.NestedField restoredField = table.schema().findField("opt_col"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.fieldId()).isEqualTo(originalFieldId); + assertThat(restoredField.isOptional()).isTrue(); + } + + @Test + public void testUndeleteRequiredColumnFailsWithLegacySnapshotNoSchemaId() { + table + .updateSchema() + .allowIncompatibleChanges() + .addRequiredColumn("required_col", Types.StringType.get()) + .commit(); + + table.updateSchema().deleteColumn("required_col").commit(); + + // Inject a synthetic snapshot with null schemaId (simulating a pre-schema-id Iceberg + // snapshot) that claims to have added data. Constructed via SnapshotParser.fromJson so + // we don't need package-private BaseSnapshot access. + BaseTable baseTable = (BaseTable) table; + TableMetadata current = baseTable.operations().current(); + long nextSeq = current.lastSequenceNumber() + 1; + long snapshotId = nextSeq + 1000L; + String legacySnapshotJson = + String.format( + "{\"snapshot-id\":%d,\"sequence-number\":%d,\"timestamp-ms\":%d," + + "\"summary\":{\"operation\":\"append\",\"added-data-files\":\"1\"}," + + "\"manifest-list\":\"no-such-manifest-list.avro\"}", + snapshotId, nextSeq, System.currentTimeMillis()); + Snapshot legacySnapshot = SnapshotParser.fromJson(legacySnapshotJson); + TableMetadata withLegacySnapshot = + TableMetadata.buildFrom(current).addSnapshot(legacySnapshot).build(); + baseTable.operations().commit(current, withLegacySnapshot); + table.refresh(); + + assertThatThrownBy(() -> table.updateSchema().undeleteColumn("required_col", false).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("data was written after the column was deleted"); + + // setNullable=true should bypass the check even with a legacy snapshot present. + table.updateSchema().undeleteColumn("required_col", true).commit(); + assertThat(table.schema().findField("required_col")).isNotNull(); + assertThat(table.schema().findField("required_col").isOptional()).isTrue(); + } + + @Test + public void testUndeleteCaseInsensitive() { + // Add and delete a column + table.updateSchema().addColumn("MixedCase", Types.StringType.get()).commit(); + int originalId = table.schema().findField("MixedCase").fieldId(); + table.updateSchema().deleteColumn("MixedCase").commit(); + + // Undelete with different case (case insensitive mode) + table.updateSchema().caseSensitive(false).undeleteColumn("mixedcase", false).commit(); + + Types.NestedField restored = table.schema().findField("MixedCase"); + assertThat(restored).isNotNull(); + assertThat(restored.fieldId()).isEqualTo(originalId); + } + + @Test + public void testUndeletePreservesDefaults() { + // Upgrade to v3 to support non-null defaults + table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + + // Add a column with initialDefault and writeDefault + table + .updateSchema() + .addColumn("count", Types.IntegerType.get(), "a count column", Literal.of(42)) + .commit(); + + Types.NestedField originalField = table.schema().findField("count"); + assertThat(originalField.initialDefault()).isEqualTo(42); + assertThat(originalField.writeDefault()).isEqualTo(42); + + // Delete the column + table.updateSchema().deleteColumn("count").commit(); + assertThat(table.schema().findField("count")).isNull(); + + // Undelete the column + table.updateSchema().undeleteColumn("count", false).commit(); + + Types.NestedField restoredField = table.schema().findField("count"); + assertThat(restoredField).isNotNull(); + assertThat(restoredField.initialDefault()) + .as("initialDefault should be preserved after undelete") + .isEqualTo(42); + assertThat(restoredField.writeDefault()) + .as("writeDefault should be preserved after undelete") + .isEqualTo(42); + } + + @Test + public void testUndeleteRestoresMostRecentlyDeletedField() { + // Add a column, delete it, add it again (new ID), delete it again + // Undelete should restore the most recently deleted field (second one) + table.updateSchema().addColumn("reused_name", Types.StringType.get()).commit(); + int firstFieldId = table.schema().findField("reused_name").fieldId(); + + // Delete the first field + table.updateSchema().deleteColumn("reused_name").commit(); + assertThat(table.schema().findField("reused_name")).isNull(); + + // Add a new field with the same name (will get a new ID) + table.updateSchema().addColumn("reused_name", Types.IntegerType.get()).commit(); + int secondFieldId = table.schema().findField("reused_name").fieldId(); + assertThat(secondFieldId) + .as("Second field should have a different ID than the first") + .isNotEqualTo(firstFieldId); + + // Delete the second field + table.updateSchema().deleteColumn("reused_name").commit(); + assertThat(table.schema().findField("reused_name")).isNull(); + + // Undelete - should restore the most recently deleted field (the second one) + table.updateSchema().undeleteColumn("reused_name", false).commit(); + + Types.NestedField restored = table.schema().findField("reused_name"); + assertThat(restored).isNotNull(); + assertThat(restored.fieldId()) + .as("Undelete should restore the most recently deleted field, not the first") + .isEqualTo(secondFieldId); + assertThat(restored.type()) + .as("Restored field should have the type of the second field") + .isEqualTo(Types.IntegerType.get()); + } +} diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 7f211d9f260b..92620f02a4f1 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -245,6 +245,58 @@ Fast-forward the main branch to the head of `audit-branch` CALL catalog_name.system.fast_forward('my_table', 'main', 'audit-branch'); ``` +## Schema management + +### `undelete_column` + +Restores a previously deleted column from the table's schema history. + +The column is restored with its original field ID and nullability, preserving data file compatibility. This allows you to recover columns that were accidentally deleted without losing access to existing data. + +!!! info + If the original column was required (non-nullable), the procedure verifies that no data has been written to the table since the column was deleted. If data was written after deletion, the procedure fails — restoring the column as required would violate the non-null constraint for rows added after deletion. Pass `set_nullable => true` to restore the column as optional instead, bypassing the check. + +#### Usage + +| Argument Name | Required? | Type | Description | +|-----------------|-----------|---------|-------------| +| `table` | ✔️ | string | Name of the table to update | +| `column` | ✔️ | string | Name of the column to restore (use dotted notation for nested fields, e.g., `struct.field`) | +| `set_nullable` | | boolean | If `true`, restore the column as optional even if the original was required. Default: `false`. | + +#### Output + +| Output Name | Type | Description | +| ------------|------|-------------| +| `column_name` | string | The name of the restored column | +| `field_id` | int | The field ID of the restored column | +| `type` | string | The type of the restored column | + +#### Examples + +Restore a deleted top-level column `count` in table `db.sample`: +```sql +CALL catalog_name.system.undelete_column('db.sample', 'count'); +``` + +Restore a deleted nested field `location.lat` in table `db.sample`: +```sql +CALL catalog_name.system.undelete_column('db.sample', 'location.lat'); +``` + +Restore a column using named arguments: +```sql +CALL catalog_name.system.undelete_column(table => 'db.sample', column => 'deleted_col'); +``` + +Restore a required column as optional because data was written since deletion: +```sql +CALL catalog_name.system.undelete_column(table => 'db.sample', column => 'required_col', set_nullable => true); +``` + +!!! warning + If you want to undelete a nested field whose parent struct was also deleted, you must first undelete the parent struct, then undelete the nested field. + ## Metadata management Many [maintenance actions](maintenance.md) can be performed using Iceberg stored procedures. diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 82f44996c8e1..af7bad32569b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -64,6 +64,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder); mapBuilder.put("compute_partition_stats", ComputePartitionStatsProcedure::builder); mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder); + mapBuilder.put("undelete_column", UndeleteColumnProcedure::builder); return mapBuilder.build(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java new file mode 100644 index 000000000000..b4094ad020f7 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A procedure that restores a previously deleted column from the schema history. + * + *

The column is restored with its original field ID, preserving data file compatibility. + * + * @see org.apache.iceberg.UpdateSchema#undeleteColumn(String) + */ +class UndeleteColumnProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("column", DataTypes.StringType), + ProcedureParameter.optional("set_nullable", DataTypes.BooleanType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("column_name", DataTypes.StringType, false, Metadata.empty()), + new StructField("field_id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("type", DataTypes.StringType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + protected UndeleteColumnProcedure doBuild() { + return new UndeleteColumnProcedure(tableCatalog()); + } + }; + } + + private UndeleteColumnProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String columnName = args.getString(1); + boolean setNullable = !args.isNullAt(2) && args.getBoolean(2); + + return modifyIcebergTable( + tableIdent, + table -> { + table.updateSchema().undeleteColumn(columnName, setNullable).commit(); + + // Get the restored field info + Types.NestedField restoredField = table.schema().findField(columnName); + + return new InternalRow[] { + newInternalRow( + UTF8String.fromString(columnName), + restoredField.fieldId(), + UTF8String.fromString(restoredField.type().toString())) + }; + }); + } + + @Override + public String description() { + return "UndeleteColumnProcedure"; + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 6b42a04421dc..58c54d46a194 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -60,6 +60,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put(ComputeTableStatsProcedure.NAME, ComputeTableStatsProcedure::builder); mapBuilder.put(ComputePartitionStatsProcedure.NAME, ComputePartitionStatsProcedure::builder); mapBuilder.put(RewriteTablePathProcedure.NAME, RewriteTablePathProcedure::builder); + mapBuilder.put(UndeleteColumnProcedure.NAME, UndeleteColumnProcedure::builder); return mapBuilder.build(); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java new file mode 100644 index 000000000000..e98bff758467 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Iterator; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure; +import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A procedure that restores a previously deleted column from the schema history. + * + *

The column is restored with its original field ID, preserving data file compatibility. + * + * @see org.apache.iceberg.UpdateSchema#undeleteColumn(String) + */ +class UndeleteColumnProcedure extends BaseProcedure { + + static final String NAME = "undelete_column"; + + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter COLUMN_PARAM = + requiredInParameter("column", DataTypes.StringType); + private static final ProcedureParameter SET_NULLABLE_PARAM = + optionalInParameter("set_nullable", DataTypes.BooleanType); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {TABLE_PARAM, COLUMN_PARAM, SET_NULLABLE_PARAM}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("column_name", DataTypes.StringType, false, Metadata.empty()), + new StructField("field_id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("type", DataTypes.StringType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + protected UndeleteColumnProcedure doBuild() { + return new UndeleteColumnProcedure(tableCatalog()); + } + }; + } + + private UndeleteColumnProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public BoundProcedure bind(StructType inputType) { + return this; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public Iterator call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + String columnName = input.asString(COLUMN_PARAM, null); + boolean setNullable = input.asBoolean(SET_NULLABLE_PARAM, false); + + return modifyIcebergTable( + tableIdent, + table -> { + table.updateSchema().undeleteColumn(columnName, setNullable).commit(); + + // Get the restored field info + Types.NestedField restoredField = table.schema().findField(columnName); + + InternalRow outputRow = + newInternalRow( + UTF8String.fromString(columnName), + restoredField.fieldId(), + UTF8String.fromString(restoredField.type().toString())); + return asScanIterator(OUTPUT_TYPE, outputRow); + }); + } + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "UndeleteColumnProcedure"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index bad31a12c19a..c7d0b3f36fd3 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -65,6 +65,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put(ComputeTableStatsProcedure.NAME, ComputeTableStatsProcedure::builder); mapBuilder.put(ComputePartitionStatsProcedure.NAME, ComputePartitionStatsProcedure::builder); mapBuilder.put(RewriteTablePathProcedure.NAME, RewriteTablePathProcedure::builder); + mapBuilder.put(UndeleteColumnProcedure.NAME, UndeleteColumnProcedure::builder); return mapBuilder.build(); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java new file mode 100644 index 000000000000..e98bff758467 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/UndeleteColumnProcedure.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Iterator; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure; +import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A procedure that restores a previously deleted column from the schema history. + * + *

The column is restored with its original field ID, preserving data file compatibility. + * + * @see org.apache.iceberg.UpdateSchema#undeleteColumn(String) + */ +class UndeleteColumnProcedure extends BaseProcedure { + + static final String NAME = "undelete_column"; + + private static final ProcedureParameter TABLE_PARAM = + requiredInParameter("table", DataTypes.StringType); + private static final ProcedureParameter COLUMN_PARAM = + requiredInParameter("column", DataTypes.StringType); + private static final ProcedureParameter SET_NULLABLE_PARAM = + optionalInParameter("set_nullable", DataTypes.BooleanType); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {TABLE_PARAM, COLUMN_PARAM, SET_NULLABLE_PARAM}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("column_name", DataTypes.StringType, false, Metadata.empty()), + new StructField("field_id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("type", DataTypes.StringType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + protected UndeleteColumnProcedure doBuild() { + return new UndeleteColumnProcedure(tableCatalog()); + } + }; + } + + private UndeleteColumnProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public BoundProcedure bind(StructType inputType) { + return this; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public Iterator call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + String columnName = input.asString(COLUMN_PARAM, null); + boolean setNullable = input.asBoolean(SET_NULLABLE_PARAM, false); + + return modifyIcebergTable( + tableIdent, + table -> { + table.updateSchema().undeleteColumn(columnName, setNullable).commit(); + + // Get the restored field info + Types.NestedField restoredField = table.schema().findField(columnName); + + InternalRow outputRow = + newInternalRow( + UTF8String.fromString(columnName), + restoredField.fieldId(), + UTF8String.fromString(restoredField.type().toString())); + return asScanIterator(OUTPUT_TYPE, outputRow); + }); + } + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "UndeleteColumnProcedure"; + } +}