From 56c1993ea9e557e7f1a25614765cadc15a74497e Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Wed, 6 Jul 2022 17:25:52 -0400 Subject: [PATCH] Spark: Add procedure to publish WAP changes using wap.id (#4715) --- .../TestPublishChangesProcedure.java | 176 ++++++++++++++++++ .../procedures/PublishChangesProcedure.java | 108 +++++++++++ .../spark/procedures/SparkProcedures.java | 1 + .../TestPublishChangesProcedure.java | 176 ++++++++++++++++++ .../procedures/PublishChangesProcedure.java | 108 +++++++++++ .../spark/procedures/SparkProcedures.java | 1 + .../TestPublishChangesProcedure.java | 176 ++++++++++++++++++ .../procedures/PublishChangesProcedure.java | 108 +++++++++++ .../spark/procedures/SparkProcedures.java | 1 + 9 files changed, 855 insertions(+) create mode 100644 spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java create mode 100644 spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java create mode 100644 spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java create mode 100644 spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java create mode 100644 spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java new file mode 100644 index 000000000000..f8080818a1e3 --- /dev/null +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.junit.After; +import org.junit.Test; + +import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; + +public class TestPublishChangesProcedure extends SparkExtensionsTestBase { + + public TestPublishChangesProcedure(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testApplyWapChangesUsingPositionalArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesUsingNamedArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')", + catalogName, wapId, tableIdent); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesRefreshesRelationCache() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + Dataset query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 1"); + query.createOrReplaceTempView("tmp"); + + spark.sql("CACHE TABLE tmp"); + + assertEquals("View should not produce rows", ImmutableList.of(), sql("SELECT * FROM tmp")); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + sql("CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + assertEquals("Apply of WAP changes should be visible", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM tmp")); + + sql("UNCACHE TABLE tmp"); + } + + @Test + public void testApplyInvalidWapId() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + + AssertHelpers.assertThrows("Should reject invalid wap id", + ValidationException.class, "Cannot apply unknown WAP ID", + () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", catalogName, tableIdent)); + } + + @Test + public void testInvalidApplyWapChangesCases() { + AssertHelpers.assertThrows("Should not allow mixed args", + AnalysisException.class, "Named and positional arguments cannot be mixed", + () -> sql("CALL %s.system.publish_changes('n', table => 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces", + NoSuchProcedureException.class, "not found", + () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls without all required args", + AnalysisException.class, "Missing required parameters", + () -> sql("CALL %s.system.publish_changes('t')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with empty table identifier", + IllegalArgumentException.class, "Cannot handle an empty identifier", + () -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName)); + } +} diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java new file mode 100644 index 000000000000..a7d8b344a8db --- /dev/null +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.procedures; + +import java.util.Optional; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.util.WapUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * A procedure that applies changes in a snapshot created within a Write-Audit-Publish workflow with a wap_id and + * creates a new snapshot which will be set as the current snapshot in a table. + *

+ * Note: this procedure invalidates all cached Spark plans that reference the affected table. + * + * @see org.apache.iceberg.ManageSnapshots#cherrypick(long) + */ +class PublishChangesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("wap_id", DataTypes.StringType) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("source_snapshot_id", DataTypes.LongType, false, Metadata.empty()), + new StructField("current_snapshot_id", DataTypes.LongType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + protected PublishChangesProcedure doBuild() { + return new PublishChangesProcedure(tableCatalog()); + } + }; + } + + private PublishChangesProcedure(TableCatalog catalog) { + super(catalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String wapId = args.getString(1); + + return modifyIcebergTable(tableIdent, table -> { + Optional wapSnapshot = Optional.ofNullable( + Iterables.find(table.snapshots(), snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), null)); + if (!wapSnapshot.isPresent()) { + throw new ValidationException(String.format("Cannot apply unknown WAP ID '%s'", wapId)); + } + + long wapSnapshotId = wapSnapshot.get().snapshotId(); + table.manageSnapshots() + .cherrypick(wapSnapshotId) + .commit(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); + return new InternalRow[]{outputRow}; + }); + } + + @Override + public String description() { + return "PublishChangesProcedure"; + } +} diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 4ce9460b90ce..d481c19d59a1 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -53,6 +53,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("snapshot", SnapshotTableProcedure::builder); mapBuilder.put("add_files", AddFilesProcedure::builder); mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); + mapBuilder.put("publish_changes", PublishChangesProcedure::builder); return mapBuilder.build(); } diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java new file mode 100644 index 000000000000..f8080818a1e3 --- /dev/null +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.junit.After; +import org.junit.Test; + +import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; + +public class TestPublishChangesProcedure extends SparkExtensionsTestBase { + + public TestPublishChangesProcedure(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testApplyWapChangesUsingPositionalArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesUsingNamedArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')", + catalogName, wapId, tableIdent); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesRefreshesRelationCache() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + Dataset query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 1"); + query.createOrReplaceTempView("tmp"); + + spark.sql("CACHE TABLE tmp"); + + assertEquals("View should not produce rows", ImmutableList.of(), sql("SELECT * FROM tmp")); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + sql("CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + assertEquals("Apply of WAP changes should be visible", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM tmp")); + + sql("UNCACHE TABLE tmp"); + } + + @Test + public void testApplyInvalidWapId() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + + AssertHelpers.assertThrows("Should reject invalid wap id", + ValidationException.class, "Cannot apply unknown WAP ID", + () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", catalogName, tableIdent)); + } + + @Test + public void testInvalidApplyWapChangesCases() { + AssertHelpers.assertThrows("Should not allow mixed args", + AnalysisException.class, "Named and positional arguments cannot be mixed", + () -> sql("CALL %s.system.publish_changes('n', table => 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces", + NoSuchProcedureException.class, "not found", + () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls without all required args", + AnalysisException.class, "Missing required parameters", + () -> sql("CALL %s.system.publish_changes('t')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with empty table identifier", + IllegalArgumentException.class, "Cannot handle an empty identifier", + () -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName)); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java new file mode 100644 index 000000000000..a7d8b344a8db --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.procedures; + +import java.util.Optional; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.util.WapUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * A procedure that applies changes in a snapshot created within a Write-Audit-Publish workflow with a wap_id and + * creates a new snapshot which will be set as the current snapshot in a table. + *

+ * Note: this procedure invalidates all cached Spark plans that reference the affected table. + * + * @see org.apache.iceberg.ManageSnapshots#cherrypick(long) + */ +class PublishChangesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("wap_id", DataTypes.StringType) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("source_snapshot_id", DataTypes.LongType, false, Metadata.empty()), + new StructField("current_snapshot_id", DataTypes.LongType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + protected PublishChangesProcedure doBuild() { + return new PublishChangesProcedure(tableCatalog()); + } + }; + } + + private PublishChangesProcedure(TableCatalog catalog) { + super(catalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String wapId = args.getString(1); + + return modifyIcebergTable(tableIdent, table -> { + Optional wapSnapshot = Optional.ofNullable( + Iterables.find(table.snapshots(), snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), null)); + if (!wapSnapshot.isPresent()) { + throw new ValidationException(String.format("Cannot apply unknown WAP ID '%s'", wapId)); + } + + long wapSnapshotId = wapSnapshot.get().snapshotId(); + table.manageSnapshots() + .cherrypick(wapSnapshotId) + .commit(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); + return new InternalRow[]{outputRow}; + }); + } + + @Override + public String description() { + return "PublishChangesProcedure"; + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 4ce9460b90ce..d481c19d59a1 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -53,6 +53,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("snapshot", SnapshotTableProcedure::builder); mapBuilder.put("add_files", AddFilesProcedure::builder); mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); + mapBuilder.put("publish_changes", PublishChangesProcedure::builder); return mapBuilder.build(); } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java new file mode 100644 index 000000000000..f8080818a1e3 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.junit.After; +import org.junit.Test; + +import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; + +public class TestPublishChangesProcedure extends SparkExtensionsTestBase { + + public TestPublishChangesProcedure(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testApplyWapChangesUsingPositionalArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesUsingNamedArgs() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots()); + + List output = sql( + "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')", + catalogName, wapId, tableIdent); + + table.refresh(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + assertEquals("Procedure output must match", + ImmutableList.of(row(wapSnapshot.snapshotId(), currentSnapshot.snapshotId())), + output); + + assertEquals("Apply of WAP changes must be successful", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testApplyWapChangesRefreshesRelationCache() { + String wapId = "wap_id_1"; + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + Dataset query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 1"); + query.createOrReplaceTempView("tmp"); + + spark.sql("CACHE TABLE tmp"); + + assertEquals("View should not produce rows", ImmutableList.of(), sql("SELECT * FROM tmp")); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + assertEquals("Should not see rows from staged snapshot", + ImmutableList.of(), + sql("SELECT * FROM %s", tableName)); + + sql("CALL %s.system.publish_changes('%s', '%s')", + catalogName, tableIdent, wapId); + + assertEquals("Apply of WAP changes should be visible", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM tmp")); + + sql("UNCACHE TABLE tmp"); + } + + @Test + public void testApplyInvalidWapId() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + + AssertHelpers.assertThrows("Should reject invalid wap id", + ValidationException.class, "Cannot apply unknown WAP ID", + () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", catalogName, tableIdent)); + } + + @Test + public void testInvalidApplyWapChangesCases() { + AssertHelpers.assertThrows("Should not allow mixed args", + AnalysisException.class, "Named and positional arguments cannot be mixed", + () -> sql("CALL %s.system.publish_changes('n', table => 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces", + NoSuchProcedureException.class, "not found", + () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls without all required args", + AnalysisException.class, "Missing required parameters", + () -> sql("CALL %s.system.publish_changes('t')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with empty table identifier", + IllegalArgumentException.class, "Cannot handle an empty identifier", + () -> sql("CALL %s.system.publish_changes('', 'not_valid')", catalogName)); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java new file mode 100644 index 000000000000..e86d698e1646 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.procedures; + +import java.util.Optional; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.iceberg.util.WapUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * A procedure that applies changes in a snapshot created within a Write-Audit-Publish workflow with a wap_id and + * creates a new snapshot which will be set as the current snapshot in a table. + *

+ * Note: this procedure invalidates all cached Spark plans that reference the affected table. + * + * @see org.apache.iceberg.ManageSnapshots#cherrypick(long) + */ +class PublishChangesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("wap_id", DataTypes.StringType) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("source_snapshot_id", DataTypes.LongType, false, Metadata.empty()), + new StructField("current_snapshot_id", DataTypes.LongType, false, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + protected PublishChangesProcedure doBuild() { + return new PublishChangesProcedure(tableCatalog()); + } + }; + } + + private PublishChangesProcedure(TableCatalog catalog) { + super(catalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String wapId = args.getString(1); + + return modifyIcebergTable(tableIdent, table -> { + Optional wapSnapshot = Optional.ofNullable( + Iterables.find(table.snapshots(), snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), null)); + if (!wapSnapshot.isPresent()) { + throw new ValidationException(String.format("Cannot apply unknown WAP ID '%s'", wapId)); + } + + long wapSnapshotId = wapSnapshot.get().snapshotId(); + table.manageSnapshots() + .cherrypick(wapSnapshotId) + .commit(); + + Snapshot currentSnapshot = table.currentSnapshot(); + + InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); + return new InternalRow[]{outputRow}; + }); + } + + @Override + public String description() { + return "ApplyWapChangesProcedure"; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 1e944fcf0651..a7c036e1c6ec 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -54,6 +54,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("add_files", AddFilesProcedure::builder); mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); mapBuilder.put("register_table", RegisterTableProcedure::builder); + mapBuilder.put("publish_changes", PublishChangesProcedure::builder); return mapBuilder.build(); }