Skip to content

Commit

Permalink
Core: Add REST spec and request for committing changes against multip…
Browse files Browse the repository at this point in the history
…le tables
  • Loading branch information
nastra committed May 30, 2023
1 parent 601c5af commit 67bef79
Show file tree
Hide file tree
Showing 7 changed files with 590 additions and 2 deletions.
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CommitTransactionRequestParser;
import org.apache.iceberg.rest.requests.ImmutableCommitTransactionRequest;
import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequestParser;
Expand Down Expand Up @@ -83,7 +86,14 @@ public static void registerAll(ObjectMapper mapper) {
.addDeserializer(ReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(ImmutableReportMetricsRequest.class, new ReportMetricsRequestSerializer<>())
.addDeserializer(
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>());
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer<>())
.addSerializer(
ImmutableCommitTransactionRequest.class, new CommitTransactionRequestSerializer<>())
.addDeserializer(
CommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>())
.addDeserializer(
ImmutableCommitTransactionRequest.class, new CommitTransactionRequestDeserializer<>());
mapper.registerModule(module);
}

Expand Down Expand Up @@ -280,4 +290,22 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce
return (T) ReportMetricsRequestParser.fromJson(jsonNode);
}
}

public static class CommitTransactionRequestSerializer<T extends CommitTransactionRequest>
extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
CommitTransactionRequestParser.toJson(request, gen);
}
}

public static class CommitTransactionRequestDeserializer<T extends CommitTransactionRequest>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) CommitTransactionRequestParser.fromJson(jsonNode);
}
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ public String metrics(TableIdentifier identifier) {
RESTUtil.encodeString(identifier.name()),
"metrics");
}

public String commitTransaction() {
return SLASH.join("v1", prefix, "transactions", "commit");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

@Value.Immutable
public interface CommitTransactionRequest extends RESTRequest {
List<UpdateTableRequest> tableChanges();

@Override
default void validate() {
check();
}

@Value.Check
default void check() {
Preconditions.checkArgument(!tableChanges().isEmpty(), "Invalid table changes: empty");
for (UpdateTableRequest tableChange : tableChanges()) {
Preconditions.checkArgument(
null != tableChange.identifier(), "Invalid table changes: table identifier required");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.MetadataUpdateParser;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;

public class CommitTransactionRequestParser {
private static final String TABLE_CHANGES = "table-changes";
private static final String IDENTIFIER = "identifier";
private static final String REQUIREMENTS = "requirements";
private static final String UPDATES = "updates";

private CommitTransactionRequestParser() {}

public static String toJson(CommitTransactionRequest request) {
return toJson(request, false);
}

public static String toJson(CommitTransactionRequest request, boolean pretty) {
return JsonUtil.generate(gen -> toJson(request, gen), pretty);
}

public static void toJson(CommitTransactionRequest request, JsonGenerator gen)
throws IOException {
Preconditions.checkArgument(null != request, "Invalid commit tx request: null");

gen.writeStartObject();
gen.writeFieldName(TABLE_CHANGES);
gen.writeStartArray();

for (UpdateTableRequest tableChange : request.tableChanges()) {
gen.writeStartObject();

gen.writeFieldName(IDENTIFIER);
TableIdentifierParser.toJson(tableChange.identifier(), gen);

gen.writeArrayFieldStart(REQUIREMENTS);
for (UpdateTableRequest.UpdateRequirement updateRequirement : tableChange.requirements()) {
UpdateRequirementParser.toJson(updateRequirement, gen);
}
gen.writeEndArray();

gen.writeArrayFieldStart(UPDATES);
for (MetadataUpdate metadataUpdate : tableChange.updates()) {
MetadataUpdateParser.toJson(metadataUpdate, gen);
}
gen.writeEndArray();

gen.writeEndObject();
}

gen.writeEndArray();
gen.writeEndObject();
}

public static CommitTransactionRequest fromJson(String json) {
return JsonUtil.parse(json, CommitTransactionRequestParser::fromJson);
}

public static CommitTransactionRequest fromJson(JsonNode json) {
Preconditions.checkArgument(null != json, "Cannot parse commit tx request from null object");

ImmutableCommitTransactionRequest.Builder builder = ImmutableCommitTransactionRequest.builder();
JsonNode changes = JsonUtil.get(TABLE_CHANGES, json);

Preconditions.checkArgument(
changes.isArray(), "Cannot parse commit tx request from non-array: %s", changes);

for (JsonNode node : changes) {
TableIdentifier identifier = TableIdentifierParser.fromJson(JsonUtil.get(IDENTIFIER, node));

JsonNode requirementsNode = JsonUtil.get(REQUIREMENTS, node);
List<UpdateTableRequest.UpdateRequirement> requirements = Lists.newArrayList();
Preconditions.checkArgument(
requirementsNode.isArray(),
"Cannot parse requirements from non-array: %s",
requirementsNode);
requirementsNode.forEach(req -> requirements.add(UpdateRequirementParser.fromJson(req)));

JsonNode updatesNode = JsonUtil.get(UPDATES, node);
List<MetadataUpdate> updates = Lists.newArrayList();
Preconditions.checkArgument(
updatesNode.isArray(), "Cannot parse metadata updates from non-array: %s", updatesNode);

updatesNode.forEach(update -> updates.add(MetadataUpdateParser.fromJson(update)));
builder.addTableChanges(new UpdateTableRequest(identifier, requirements, updates));
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -33,6 +34,7 @@

public class UpdateTableRequest implements RESTRequest {

private TableIdentifier identifier;
private List<UpdateRequirement> requirements;
private List<MetadataUpdate> updates;

Expand All @@ -45,6 +47,14 @@ public UpdateTableRequest(List<UpdateRequirement> requirements, List<MetadataUpd
this.updates = updates;
}

public UpdateTableRequest(
TableIdentifier identifier,
List<UpdateRequirement> requirements,
List<MetadataUpdate> updates) {
this(requirements, updates);
this.identifier = identifier;
}

@Override
public void validate() {}

Expand All @@ -56,6 +66,10 @@ public List<MetadataUpdate> updates() {
return updates != null ? updates : ImmutableList.of();
}

public TableIdentifier identifier() {
return identifier;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down Expand Up @@ -84,6 +98,7 @@ public static class Builder {
private final List<MetadataUpdate> updates = Lists.newArrayList();
private final Set<String> changedRefs = Sets.newHashSet();
private final boolean isReplace;
private TableIdentifier identifier = null;
private boolean addedSchema = false;
private boolean setSchemaId = false;
private boolean addedSpec = false;
Expand All @@ -95,6 +110,12 @@ public Builder(TableMetadata base, boolean isReplace) {
this.isReplace = isReplace;
}

public Builder forTable(TableIdentifier ident) {
Preconditions.checkArgument(null != ident, "Invalid table identifier: null");
this.identifier = ident;
return this;
}

private Builder require(UpdateRequirement requirement) {
Preconditions.checkArgument(requirement != null, "Invalid requirement: null");
requirements.add(requirement);
Expand Down Expand Up @@ -217,7 +238,8 @@ private void update(MetadataUpdate.SetDefaultSortOrder update) {
}

public UpdateTableRequest build() {
return new UpdateTableRequest(requirements.build(), ImmutableList.copyOf(updates));
return new UpdateTableRequest(
identifier, requirements.build(), ImmutableList.copyOf(updates));
}
}

Expand Down

0 comments on commit 67bef79

Please sign in to comment.