From fa5dcc77c0111bca41a4e2eb8a9404caba6dc902 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 2 May 2023 16:05:56 +0200 Subject: [PATCH] Core: Add REST API for committing changes against multiple tables --- .../org/apache/iceberg/BaseTransaction.java | 4 + .../iceberg/catalog/BaseSessionCatalog.java | 9 + .../apache/iceberg/catalog/TableCommit.java | 31 +++ .../apache/iceberg/rest/CatalogHandlers.java | 36 +++ .../org/apache/iceberg/rest/RESTCatalog.java | 6 + .../apache/iceberg/rest/RESTSerializers.java | 30 ++- .../iceberg/rest/RESTSessionCatalog.java | 31 +++ .../apache/iceberg/rest/ResourcePaths.java | 4 + .../requests/CommitTransactionRequest.java | 50 ++++ .../CommitTransactionRequestParser.java | 114 +++++++++ .../iceberg/rest/RESTCatalogAdapter.java | 12 +- .../apache/iceberg/rest/TestRESTCatalog.java | 137 +++++++++++ .../TestCommitTransactionRequestParser.java | 232 ++++++++++++++++++ open-api/rest-catalog-open-api.yaml | 139 +++++++++++ 14 files changed, 833 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/catalog/TableCommit.java create mode 100644 core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequest.java create mode 100644 core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequestParser.java create mode 100644 core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index cef487931b0e..61da776f4c44 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -107,6 +107,10 @@ public String tableName() { } public TableMetadata startMetadata() { + return base; + } + + public TableMetadata currentMetadata() { return current; } diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java index d6ee4d345cfa..b7ff55651782 100644 --- a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java @@ -62,6 +62,11 @@ public T withContext(SessionContext context, Function task) { return task.apply(asCatalog(context)); } + public void multiTableCommit(SessionContext context, List commits) { + throw new UnsupportedOperationException( + "Multi-table commits are not supported by catalog " + name()); + } + public class AsCatalog implements Catalog, SupportsNamespaces { private final SessionContext context; @@ -159,5 +164,9 @@ public boolean removeProperties(Namespace namespace, Set removals) { public boolean namespaceExists(Namespace namespace) { return BaseSessionCatalog.this.namespaceExists(context, namespace); } + + public void multiTableCommit(List commits) { + BaseSessionCatalog.this.multiTableCommit(context, commits); + } } } diff --git a/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java new file mode 100644 index 000000000000..372b0a3ecaac --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import org.apache.iceberg.TableMetadata; +import org.immutables.value.Value; + +@Value.Immutable +public interface TableCommit { + TableIdentifier identifier(); + + TableMetadata base(); + + TableMetadata updated(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index d297fc738317..ff607c4cadf0 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -40,6 +40,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -49,8 +50,10 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; @@ -361,4 +364,37 @@ private static TableMetadata commit(TableOperations ops, UpdateTableRequest requ return ops.current(); } + + /** + * This is a very simplistic approach that only validates the requirements for each table and does + * not do any other conflict detection. Therefore, it does not guarantee true transactional + * atomicity, which is left to the implementation details of a REST server. + */ + public static void commitTransaction(Catalog catalog, CommitTransactionRequest request) { + List transactions = Lists.newArrayList(); + + for (CommitTransactionRequest.CommitTableRequest tableChange : request.tableChanges()) { + Table table = catalog.loadTable(tableChange.identifier()); + if (table instanceof BaseTable) { + UpdateTableRequest updateTableRequest = + new UpdateTableRequest(tableChange.requirements(), tableChange.updates()); + + Transaction transaction = + Transactions.newTransaction( + tableChange.identifier().toString(), ((BaseTable) table).operations()); + transactions.add(transaction); + + BaseTransaction.TransactionTable txTable = + (BaseTransaction.TransactionTable) transaction.table(); + + // this performs validations and makes temporary commits that are in-memory + commit(txTable.operations(), updateTableRequest); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + + // only commit if validations passed previously + transactions.forEach(Transaction::commitTransaction); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 71195b9585ef..e8e0494ec83f 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -29,10 +29,12 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.BaseSessionCatalog; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; @@ -248,4 +250,8 @@ public void setConf(Object conf) { public void close() throws IOException { sessionCatalog.close(); } + + public void multiTableCommit(List commits) { + ((BaseSessionCatalog.AsCatalog) delegate).multiTableCommit(commits); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 9e17f50c530a..1d6ad7d14b73 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -42,6 +42,9 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CommitTransactionRequestParser; +import org.apache.iceberg.rest.requests.ImmutableCommitTransactionRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequestParser; @@ -83,7 +86,14 @@ public static void registerAll(ObjectMapper mapper) { .addDeserializer(ReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()) .addSerializer(ImmutableReportMetricsRequest.class, new ReportMetricsRequestSerializer<>()) .addDeserializer( - ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()); + ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()) + .addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer<>()) + .addSerializer( + ImmutableCommitTransactionRequest.class, new CommitTransactionRequestSerializer<>()) + .addDeserializer( + CommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>()) + .addDeserializer( + ImmutableCommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>()); mapper.registerModule(module); } @@ -280,4 +290,22 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) ReportMetricsRequestParser.fromJson(jsonNode); } } + + public static class CommitTransactionRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + CommitTransactionRequestParser.toJson(request, gen); + } + } + + public static class CommitTransactionRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) CommitTransactionRequestParser.fromJson(jsonNode); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 5814677be88e..718fc85b52db 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -51,6 +51,7 @@ import org.apache.iceberg.catalog.BaseSessionCatalog; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -66,8 +67,11 @@ import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.ImmutableCommitTableRequest; +import org.apache.iceberg.rest.requests.ImmutableCommitTransactionRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; @@ -873,4 +877,31 @@ private static Cache newSessionCache(Map pr (RemovalListener) (id, auth, cause) -> auth.stopRefreshing()) .build(); } + + @Override + public void multiTableCommit(SessionContext context, List commits) { + ImmutableCommitTransactionRequest.Builder builder = ImmutableCommitTransactionRequest.builder(); + + for (TableCommit commit : commits) { + UpdateTableRequest.Builder updateTableBuilder = UpdateTableRequest.builderFor(commit.base()); + commit.updated().changes().forEach(updateTableBuilder::update); + UpdateTableRequest updateTableRequest = updateTableBuilder.build(); + + ImmutableCommitTableRequest commitTableRequest = + ImmutableCommitTableRequest.builder() + .identifier(commit.identifier()) + .requirements(updateTableRequest.requirements()) + .updates(updateTableRequest.updates()) + .build(); + + builder.addTableChanges(commitTableRequest); + } + + client.post( + paths.commitTransaction(), + builder.build(), + null, + headers(context), + ErrorHandlers.tableCommitHandler()); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java index 6fa09f33d2ba..7e4f9e0c9888 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java +++ b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java @@ -85,4 +85,8 @@ public String metrics(TableIdentifier identifier) { RESTUtil.encodeString(identifier.name()), "metrics"); } + + public String commitTransaction() { + return SLASH.join("v1", prefix, "transactions", "commit"); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequest.java new file mode 100644 index 000000000000..0a7f91b18d80 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; +import org.immutables.value.Value; + +@Value.Immutable +public interface CommitTransactionRequest extends RESTRequest { + List tableChanges(); + + @Override + default void validate() { + check(); + } + + @Value.Check + default void check() { + Preconditions.checkArgument(!tableChanges().isEmpty(), "Invalid table changes: empty"); + } + + @Value.Immutable + interface CommitTableRequest { + TableIdentifier identifier(); + + List requirements(); + + List updates(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequestParser.java new file mode 100644 index 000000000000..bc8202df45d2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTransactionRequestParser.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.MetadataUpdateParser; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class CommitTransactionRequestParser { + private static final String IDENTIFIER = "identifier"; + private static final String REQUIREMENTS = "requirements"; + private static final String UPDATES = "updates"; + + private CommitTransactionRequestParser() {} + + public static String toJson(CommitTransactionRequest request) { + return toJson(request, false); + } + + public static String toJson(CommitTransactionRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(CommitTransactionRequest request, JsonGenerator gen) + throws IOException { + Preconditions.checkArgument(null != request, "Invalid commit tx request: null"); + + gen.writeStartObject(); + + for (CommitTransactionRequest.CommitTableRequest tableChange : request.tableChanges()) { + gen.writeObjectFieldStart(tableChange.identifier().toString()); + + gen.writeFieldName(IDENTIFIER); + TableIdentifierParser.toJson(tableChange.identifier(), gen); + + gen.writeArrayFieldStart(REQUIREMENTS); + for (UpdateTableRequest.UpdateRequirement updateRequirement : tableChange.requirements()) { + UpdateRequirementParser.toJson(updateRequirement, gen); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart(UPDATES); + for (MetadataUpdate metadataUpdate : tableChange.updates()) { + MetadataUpdateParser.toJson(metadataUpdate, gen); + } + gen.writeEndArray(); + + gen.writeEndObject(); + } + + gen.writeEndObject(); + } + + public static CommitTransactionRequest fromJson(String json) { + return JsonUtil.parse(json, CommitTransactionRequestParser::fromJson); + } + + public static CommitTransactionRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse commit tx request from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse commit tx request from non-object: %s", json); + + ImmutableCommitTransactionRequest.Builder builder = ImmutableCommitTransactionRequest.builder(); + json.fields() + .forEachRemaining( + node -> { + ImmutableCommitTableRequest.Builder commitTableBuilder = + ImmutableCommitTableRequest.builder(); + TableIdentifier identifier = + TableIdentifierParser.fromJson(JsonUtil.get(IDENTIFIER, node.getValue())); + commitTableBuilder.identifier(identifier); + + JsonNode requirements = JsonUtil.get(REQUIREMENTS, node.getValue()); + Preconditions.checkArgument( + requirements.isArray(), + "Cannot parse requirements from non-array: %s", + requirements); + requirements.forEach( + req -> commitTableBuilder.addRequirements(UpdateRequirementParser.fromJson(req))); + + JsonNode updates = JsonUtil.get(UPDATES, node.getValue()); + Preconditions.checkArgument( + updates.isArray(), "Cannot parse metadata updates from non-array: %s", updates); + + updates.forEach( + update -> commitTableBuilder.addUpdates(MetadataUpdateParser.fromJson(update))); + builder.addTableChanges(commitTableBuilder.build()); + }); + + return builder.build(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index c6d41818441c..d9ef37618033 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -40,6 +40,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; @@ -130,7 +131,9 @@ enum Route { HTTPMethod.POST, "v1/namespaces/{namespace}/tables/{table}/metrics", ReportMetricsRequest.class, - null); + null), + COMMIT_TRANSACTION( + HTTPMethod.POST, "v1/transactions/commit", CommitTransactionRequest.class, null); private final HTTPMethod method; private final int requiredLength; @@ -357,6 +360,13 @@ public T handleRequest( return null; } + case COMMIT_TRANSACTION: + { + CommitTransactionRequest request = castRequest(CommitTransactionRequest.class, body); + CatalogHandlers.commitTransaction(catalog, request); + return null; + } + default: } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index c4b277312b5f..9a108cca68c3 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; @@ -39,6 +40,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; @@ -47,13 +49,22 @@ import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; +import org.apache.iceberg.catalog.ImmutableTableCommit; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod; @@ -1746,4 +1757,130 @@ public void testCatalogTokenRefreshDisabledWithCredential() { eq(catalogHeaders), any()); } + + @Test + public void multipleDiffsAgainstSingleTable() { + Namespace namespace = Namespace.of("namespace"); + TableIdentifier identifier = TableIdentifier.of(namespace, "multipleDiffsAgainstSingleTable"); + Table table = restCatalog.buildTable(identifier, SCHEMA).create(); + + Transaction transaction = table.newTransaction(); + + UpdateSchema updateSchema = + transaction.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdatePartitionSpec updateSpec = + transaction.updateSpec().addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expectedSpec = updateSpec.apply(); + updateSpec.commit(); + + TableCommit tableCommit = + ImmutableTableCommit.builder() + .identifier(identifier) + .base(((BaseTransaction) transaction).startMetadata()) + .updated(((BaseTransaction) transaction).currentMetadata()) + .build(); + + restCatalog.multiTableCommit(ImmutableList.of(tableCommit)); + + Table loaded = catalog().loadTable(identifier); + assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct()); + assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields()); + } + + @Test + public void multipleDiffsAgainstMultipleTables() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + Transaction transaction1 = catalog().buildTable(identifier1, SCHEMA).create().newTransaction(); + Transaction transaction2 = catalog().buildTable(identifier2, SCHEMA).create().newTransaction(); + + UpdateSchema updateSchema = + transaction1.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdateSchema updateSchema2 = + transaction2.updateSchema().addColumn("new_col2", Types.LongType.get()); + Schema expectedSchema2 = updateSchema2.apply(); + updateSchema2.commit(); + + TableCommit tableCommit1 = + ImmutableTableCommit.builder() + .identifier(identifier1) + .base(((BaseTransaction) transaction1).startMetadata()) + .updated(((BaseTransaction) transaction1).currentMetadata()) + .build(); + + TableCommit tableCommit2 = + ImmutableTableCommit.builder() + .identifier(identifier2) + .base(((BaseTransaction) transaction2).startMetadata()) + .updated(((BaseTransaction) transaction2).currentMetadata()) + .build(); + + restCatalog.multiTableCommit(ImmutableList.of(tableCommit1, tableCommit2)); + + assertThat(catalog().loadTable(identifier1).schema().asStruct()) + .isEqualTo(expectedSchema.asStruct()); + + assertThat(catalog().loadTable(identifier2).schema().asStruct()) + .isEqualTo(expectedSchema2.asStruct()); + } + + @Test + public void multipleDiffsAgainstMultipleTablesLastFails() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + catalog().createTable(identifier1, SCHEMA); + catalog().createTable(identifier2, SCHEMA); + + Table one = catalog().loadTable(identifier1); + Table two = catalog().loadTable(identifier2); + Schema originalSchemaOne = one.schema(); + + Transaction transaction1 = catalog().loadTable(identifier1).newTransaction(); + transaction1.updateSchema().addColumn("new_col1", Types.LongType.get()).commit(); + + Transaction transaction2 = catalog().loadTable(identifier2).newTransaction(); + transaction2.updateSchema().renameColumn("data", "new-column").commit(); + + // delete the colum that is being renamed in the above TX to cause a conflict + two.updateSchema().deleteColumn("data").commit(); + Schema updatedSchemaTwo = two.schema(); + + TableCommit tableCommit1 = + ImmutableTableCommit.builder() + .identifier(identifier1) + .base(((BaseTransaction) transaction1).startMetadata()) + .updated(((BaseTransaction) transaction1).currentMetadata()) + .build(); + + TableCommit tableCommit2 = + ImmutableTableCommit.builder() + .identifier(identifier2) + .base(((BaseTransaction) transaction2).startMetadata()) + .updated(((BaseTransaction) transaction2).currentMetadata()) + .build(); + + assertThatThrownBy( + () -> restCatalog.multiTableCommit(ImmutableList.of(tableCommit1, tableCommit2))) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1"); + + Schema schema1 = catalog().loadTable(identifier1).schema(); + assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct()); + + Schema schema2 = catalog().loadTable(identifier2).schema(); + assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct()); + assertThat(schema2.findField("data")).isNull(); + assertThat(schema2.findField("new-column")).isNull(); + assertThat(schema2.columns()).hasSize(1); + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java new file mode 100644 index 000000000000..7734c56736d3 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.UpdateTableRequest.UpdateRequirement; +import org.junit.jupiter.api.Test; + +public class TestCommitTransactionRequestParser { + + @Test + public void nullAndEmptyCheck() { + assertThatThrownBy(() -> CommitTransactionRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid commit tx request: null"); + + assertThatThrownBy(() -> CommitTransactionRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse commit tx request from null object"); + + assertThatThrownBy(() -> CommitTransactionRequestParser.fromJson("{}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid table changes: empty"); + } + + @Test + public void invalidTableIdentifier() { + assertThatThrownBy( + () -> CommitTransactionRequestParser.fromJson("{\"ns1.table1\" : \"ns1.table1\"}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing field: identifier"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson("{\"ns1.table1\" : {\"identifier\" : {}}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: name"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\" : {\"identifier\" : { \"name\": 23}}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a string value: name: 23"); + } + + @Test + public void invalidRequirements() { + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing field: requirements"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[23],\"updates\":[]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse update requirement from non-object value: 23"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[{}],\"updates\":[]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse update requirement. Missing field: type"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[{\"type\":\"assert-table-uuid\"}],\"updates\":[]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: uuid"); + } + + @Test + public void invalidMetadataUpdates() { + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"},\"requirements\":[]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing field: updates"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[],\"updates\":[23]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse metadata update from non-object value: 23"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[],\"updates\":[{}]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse metadata update. Missing field: action"); + + assertThatThrownBy( + () -> + CommitTransactionRequestParser.fromJson( + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"}," + + "\"requirements\":[],\"updates\":[{\"action\":\"assign-uuid\"}]}}")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse missing string: uuid"); + } + + @Test + public void roundTripSerde() { + String uuid = "2cc52516-5e73-41f2-b139-545d41a4e151"; + CommitTransactionRequest.CommitTableRequest commitTableRequestOne = + ImmutableCommitTableRequest.builder() + .identifier(TableIdentifier.of("ns1", "table1")) + .addRequirements(new UpdateRequirement.AssertTableUUID(uuid)) + .addRequirements(new UpdateRequirement.AssertTableDoesNotExist()) + .addUpdates(new MetadataUpdate.AssignUUID(uuid)) + .addUpdates(new MetadataUpdate.SetCurrentSchema(23)) + .build(); + + CommitTransactionRequest.CommitTableRequest commitTableRequestTwo = + ImmutableCommitTableRequest.builder() + .identifier(TableIdentifier.of("ns1", "table2")) + .addRequirements(new UpdateRequirement.AssertDefaultSpecID(4)) + .addRequirements(new UpdateRequirement.AssertCurrentSchemaID(24)) + .addUpdates(new MetadataUpdate.RemoveSnapshot(101L)) + .addUpdates(new MetadataUpdate.SetCurrentSchema(25)) + .build(); + + CommitTransactionRequest request = + ImmutableCommitTransactionRequest.builder() + .addTableChanges(commitTableRequestOne, commitTableRequestTwo) + .build(); + + String expectedJson = + "{\n" + + " \"ns1.table1\" : {\n" + + " \"identifier\" : {\n" + + " \"namespace\" : [ \"ns1\" ],\n" + + " \"name\" : \"table1\"\n" + + " },\n" + + " \"requirements\" : [ {\n" + + " \"type\" : \"assert-table-uuid\",\n" + + " \"uuid\" : \"2cc52516-5e73-41f2-b139-545d41a4e151\"\n" + + " }, {\n" + + " \"type\" : \"assert-create\"\n" + + " } ],\n" + + " \"updates\" : [ {\n" + + " \"action\" : \"assign-uuid\",\n" + + " \"uuid\" : \"2cc52516-5e73-41f2-b139-545d41a4e151\"\n" + + " }, {\n" + + " \"action\" : \"set-current-schema\",\n" + + " \"schema-id\" : 23\n" + + " } ]\n" + + " },\n" + + " \"ns1.table2\" : {\n" + + " \"identifier\" : {\n" + + " \"namespace\" : [ \"ns1\" ],\n" + + " \"name\" : \"table2\"\n" + + " },\n" + + " \"requirements\" : [ {\n" + + " \"type\" : \"assert-default-spec-id\",\n" + + " \"default-spec-id\" : 4\n" + + " }, {\n" + + " \"type\" : \"assert-current-schema-id\",\n" + + " \"current-schema-id\" : 24\n" + + " } ],\n" + + " \"updates\" : [ {\n" + + " \"action\" : \"remove-snapshots\",\n" + + " \"snapshot-ids\" : [ 101 ]\n" + + " }, {\n" + + " \"action\" : \"set-current-schema\",\n" + + " \"schema-id\" : 25\n" + + " } ]\n" + + " }\n" + + "}"; + + String json = CommitTransactionRequestParser.toJson(request, true); + assertThat(json).isEqualTo(expectedJson); + + // can't do an equality comparison on CommitTransactionRequest because updates/requirements + // don't implement equals/hashcode + assertThat( + CommitTransactionRequestParser.toJson( + CommitTransactionRequestParser.fromJson(json), true)) + .isEqualTo(expectedJson); + } + + @Test + public void emptyRequirementsAndUpdates() { + CommitTransactionRequest commitTxRequest = + ImmutableCommitTransactionRequest.builder() + .addTableChanges( + ImmutableCommitTableRequest.builder() + .identifier(TableIdentifier.of("ns1", "table1")) + .build()) + .build(); + + String json = + "{\"ns1.table1\":{\"identifier\":{\"namespace\":[\"ns1\"],\"name\":\"table1\"},\"requirements\":[],\"updates\":[]}}"; + + assertThat(CommitTransactionRequestParser.toJson(commitTxRequest)).isEqualTo(json); + assertThat(CommitTransactionRequestParser.fromJson(json)).isEqualTo(commitTxRequest); + } +} diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index a2473488e9f4..8c284c03606d 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -846,6 +846,121 @@ paths: 5XX: $ref: '#/components/responses/ServerErrorResponse' + /v1/{prefix}/transactions/commit: + parameters: + - $ref: '#/components/parameters/prefix' + + post: + tags: + - Catalog API + summary: Commit updates to multiple tables + operationId: commitTransaction + requestBody: + description: + Commit updates to multiple tables + + + Given a table identifier, commits consist of two parts, requirements and updates. + Requirements are assertions that will be validated before attempting to make and commit changes. + For example, `assert-ref-snapshot-id` will check that a named ref's snapshot ID has a certain value. + + + Updates are changes to make to table metadata. For example, after asserting that the current main ref + is at the expected snapshot, a commit may add a new child snapshot and set the ref to the new + snapshot id. + content: + application/json: + schema: + $ref: '#/components/schemas/CommitTransactionRequest' + required: true + responses: + 204: + description: Success, no content + 400: + $ref: '#/components/responses/BadRequestErrorResponse' + 401: + $ref: '#/components/responses/UnauthorizedResponse' + 403: + $ref: '#/components/responses/ForbiddenResponse' + 404: + description: + Not Found - NoSuchTableException, table to load does not exist + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + examples: + TableToUpdateDoesNotExist: + $ref: '#/components/examples/NoSuchTableError' + 409: + description: + Conflict - CommitFailedException, one or more requirements failed. The client may retry. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + 419: + $ref: '#/components/responses/AuthenticationTimeoutResponse' + 500: + description: + An unknown server-side problem occurred; the commit state is unknown. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + example: { + "error": { + "message": "Internal Server Error", + "type": "CommitStateUnknownException", + "code": 500 + } + } + 503: + $ref: '#/components/responses/ServiceUnavailableResponse' + 502: + description: + A gateway or proxy received an invalid response from the upstream server; the commit state is unknown. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + example: { + "error": { + "message": "Invalid response from the upstream server", + "type": "CommitStateUnknownException", + "code": 502 + } + } + 504: + description: + A server-side gateway timeout occurred; the commit state is unknown. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + example: { + "error": { + "message": "Gateway timed out during commit", + "type": "CommitStateUnknownException", + "code": 504 + } + } + 5XX: + description: + A server-side problem that might not be addressable on the client. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorModel' + example: { + "error": { + "message": "Bad Gateway", + "type": "InternalServerError", + "code": 502 + } + } + + components: ####################################################### # Common Parameter Definitions Used In Several Routes # @@ -1756,6 +1871,30 @@ components: items: $ref: '#/components/schemas/TableUpdate' + CommitTransactionTableRequest: + type: object + required: + - identifier + - requirements + - updates + properties: + identifier: + $ref: '#/components/schemas/TableIdentifier' + requirements: + type: array + items: + $ref: '#/components/schemas/TableRequirement' + updates: + type: array + items: + $ref: '#/components/schemas/TableUpdate' + + CommitTransactionRequest: + type: array + uniqueItems: true + items: + $ref: '#/components/schemas/CommitTransactionTableRequest' + CreateTableRequest: type: object required: