Skip to content

Commit

Permalink
Core: Add REST API for committing changes against multiple tables
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed May 10, 2023
1 parent 46fe807 commit fa5dcc7
Show file tree
Hide file tree
Showing 14 changed files with 833 additions and 2 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Expand Up @@ -107,6 +107,10 @@ public String tableName() {
}

public TableMetadata startMetadata() {
return base;
}

public TableMetadata currentMetadata() {
return current;
}

Expand Down
Expand Up @@ -62,6 +62,11 @@ public <T> T withContext(SessionContext context, Function<Catalog, T> task) {
return task.apply(asCatalog(context));
}

public void multiTableCommit(SessionContext context, List<TableCommit> commits) {
throw new UnsupportedOperationException(
"Multi-table commits are not supported by catalog " + name());
}

public class AsCatalog implements Catalog, SupportsNamespaces {
private final SessionContext context;

Expand Down Expand Up @@ -159,5 +164,9 @@ public boolean removeProperties(Namespace namespace, Set<String> removals) {
public boolean namespaceExists(Namespace namespace) {
return BaseSessionCatalog.this.namespaceExists(context, namespace);
}

public void multiTableCommit(List<TableCommit> commits) {
BaseSessionCatalog.this.multiTableCommit(context, commits);
}
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import org.apache.iceberg.TableMetadata;
import org.immutables.value.Value;

@Value.Immutable
public interface TableCommit {
TableIdentifier identifier();

TableMetadata base();

TableMetadata updated();
}
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -49,8 +50,10 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
Expand Down Expand Up @@ -361,4 +364,37 @@ private static TableMetadata commit(TableOperations ops, UpdateTableRequest requ

return ops.current();
}

/**
* This is a very simplistic approach that only validates the requirements for each table and does
* not do any other conflict detection. Therefore, it does not guarantee true transactional
* atomicity, which is left to the implementation details of a REST server.
*/
public static void commitTransaction(Catalog catalog, CommitTransactionRequest request) {
List<Transaction> transactions = Lists.newArrayList();

for (CommitTransactionRequest.CommitTableRequest tableChange : request.tableChanges()) {
Table table = catalog.loadTable(tableChange.identifier());
if (table instanceof BaseTable) {
UpdateTableRequest updateTableRequest =
new UpdateTableRequest(tableChange.requirements(), tableChange.updates());

Transaction transaction =
Transactions.newTransaction(
tableChange.identifier().toString(), ((BaseTable) table).operations());
transactions.add(transaction);

BaseTransaction.TransactionTable txTable =
(BaseTransaction.TransactionTable) transaction.table();

// this performs validations and makes temporary commits that are in-memory
commit(txTable.operations(), updateTableRequest);
} else {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}
}

// only commit if validations passed previously
transactions.forEach(Transaction::commitTransaction);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Expand Up @@ -29,10 +29,12 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.BaseSessionCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
Expand Down Expand Up @@ -248,4 +250,8 @@ public void setConf(Object conf) {
public void close() throws IOException {
sessionCatalog.close();
}

public void multiTableCommit(List<TableCommit> commits) {
((BaseSessionCatalog.AsCatalog) delegate).multiTableCommit(commits);
}
}
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java
Expand Up @@ -42,6 +42,9 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CommitTransactionRequestParser;
import org.apache.iceberg.rest.requests.ImmutableCommitTransactionRequest;
import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequestParser;
Expand Down Expand Up @@ -83,7 +86,14 @@ public static void registerAll(ObjectMapper mapper) {
.addDeserializer(ReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(ImmutableReportMetricsRequest.class, new ReportMetricsRequestSerializer<>())
.addDeserializer(
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>());
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer<>())
.addSerializer(
ImmutableCommitTransactionRequest.class, new CommitTransactionRequestSerializer<>())
.addDeserializer(
CommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>())
.addDeserializer(
ImmutableCommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>());
mapper.registerModule(module);
}

Expand Down Expand Up @@ -280,4 +290,22 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce
return (T) ReportMetricsRequestParser.fromJson(jsonNode);
}
}

public static class CommitTransactionRequestSerializer<T extends CommitTransactionRequest>
extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
CommitTransactionRequestParser.toJson(request, gen);
}
}

public static class CommitTransactionRequestDeserializer<T extends CommitTransactionRequest>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) CommitTransactionRequestParser.fromJson(jsonNode);
}
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Expand Up @@ -51,6 +51,7 @@
import org.apache.iceberg.catalog.BaseSessionCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -66,8 +67,11 @@
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.ImmutableCommitTableRequest;
import org.apache.iceberg.rest.requests.ImmutableCommitTransactionRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
Expand Down Expand Up @@ -873,4 +877,31 @@ private static Cache<String, AuthSession> newSessionCache(Map<String, String> pr
(RemovalListener<String, AuthSession>) (id, auth, cause) -> auth.stopRefreshing())
.build();
}

@Override
public void multiTableCommit(SessionContext context, List<TableCommit> commits) {
ImmutableCommitTransactionRequest.Builder builder = ImmutableCommitTransactionRequest.builder();

for (TableCommit commit : commits) {
UpdateTableRequest.Builder updateTableBuilder = UpdateTableRequest.builderFor(commit.base());
commit.updated().changes().forEach(updateTableBuilder::update);
UpdateTableRequest updateTableRequest = updateTableBuilder.build();

ImmutableCommitTableRequest commitTableRequest =
ImmutableCommitTableRequest.builder()
.identifier(commit.identifier())
.requirements(updateTableRequest.requirements())
.updates(updateTableRequest.updates())
.build();

builder.addTableChanges(commitTableRequest);
}

client.post(
paths.commitTransaction(),
builder.build(),
null,
headers(context),
ErrorHandlers.tableCommitHandler());
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
Expand Up @@ -85,4 +85,8 @@ public String metrics(TableIdentifier identifier) {
RESTUtil.encodeString(identifier.name()),
"metrics");
}

public String commitTransaction() {
return SLASH.join("v1", prefix, "transactions", "commit");
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import java.util.List;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

@Value.Immutable
public interface CommitTransactionRequest extends RESTRequest {
List<CommitTableRequest> tableChanges();

@Override
default void validate() {
check();
}

@Value.Check
default void check() {
Preconditions.checkArgument(!tableChanges().isEmpty(), "Invalid table changes: empty");
}

@Value.Immutable
interface CommitTableRequest {
TableIdentifier identifier();

List<UpdateTableRequest.UpdateRequirement> requirements();

List<MetadataUpdate> updates();
}
}

0 comments on commit fa5dcc7

Please sign in to comment.