Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.ParseException;
Expand All @@ -42,8 +45,8 @@
import org.apache.hc.core5.io.CloseMode;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.ErrorResponseParser;
import org.slf4j.Logger;
Expand All @@ -60,12 +63,14 @@ public class HTTPClient implements RESTClient {
private final CloseableHttpClient httpClient;
private final ObjectMapper mapper;
private final Map<String, String> additionalHeaders;
private final Set<String> exchangeHeaders;

private HTTPClient(
String uri, CloseableHttpClient httpClient, Map<String, String> additionalHeaders) {
String uri, CloseableHttpClient httpClient, Map<String, String> additionalHeaders, Set<String> exchangeHeaders) {
this.uri = uri;
this.httpClient = httpClient != null ? httpClient : HttpClients.createDefault();
this.additionalHeaders = additionalHeaders != null ? additionalHeaders : ImmutableMap.of();
this.additionalHeaders = additionalHeaders;
this.exchangeHeaders = exchangeHeaders;
this.mapper = RESTObjectMapper.mapper();
}

Expand Down Expand Up @@ -163,6 +168,8 @@ private <T> T execute(
}

try (CloseableHttpResponse response = httpClient.execute(request)) {
// exchange response headers for future requests
exchangeHeaders(response);

// Skip parsing the response stream for any successful request not expecting a response body
if (response.getCode() == HttpStatus.SC_NO_CONTENT || (responseType == null && isSuccessful(response))) {
Expand Down Expand Up @@ -224,6 +231,17 @@ private void addRequestHeaders(HttpUriRequest request) {
additionalHeaders.forEach(request::setHeader);
}

private void exchangeHeaders(HttpResponse response) {
if (!exchangeHeaders.isEmpty()) {
exchangeHeaders.forEach(exchangeHeader -> {
Header responseHeader = response.getFirstHeader(exchangeHeader);
if (responseHeader != null) {
additionalHeaders.put(responseHeader.getName(), responseHeader.getValue());
}
});
}
}

@Override
public void close() throws IOException {
httpClient.close(CloseMode.GRACEFUL);
Expand All @@ -235,6 +253,7 @@ public static Builder builder() {

public static class Builder {
private final Map<String, String> additionalHeaders = Maps.newHashMap();
private final Set<String> exchangeHeaders = Sets.newHashSet();
private String uri;
private CloseableHttpClient httpClient;
private ObjectMapper mapper;
Expand All @@ -252,6 +271,12 @@ public Builder uri(String baseUri) {
return this;
}

public Builder withExchangeHeader(String header) {
Preconditions.checkNotNull(header, "Invalid exchange header for http client: null");
exchangeHeaders.add(header.trim());
return this;
}

public Builder withHeader(String key, String value) {
additionalHeaders.put(key, value);
return this;
Expand All @@ -269,7 +294,7 @@ public Builder withBearerAuth(String token) {
}

public HTTPClient build() {
return new HTTPClient(uri, httpClient, additionalHeaders);
return new HTTPClient(uri, httpClient, additionalHeaders, exchangeHeaders);
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/HTTPClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.rest;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -31,6 +32,7 @@
* RESTCatalog.
*/
public class HTTPClientFactory implements Function<Map<String, String>, RESTClient> {
private static final String EXCHANGE_HEADER_DELIM = ",";

@Override
public RESTClient apply(Map<String, String> properties) {
Expand All @@ -48,6 +50,14 @@ public RESTClient apply(Map<String, String> properties) {
builder.withBearerAuth(token.trim());
}

// Apply exchange headers if provided
String exchangeHeaders = properties.get(RESTCatalogProperties.EXCHANGE_HEADERS);
if (exchangeHeaders != null && !exchangeHeaders.trim().isEmpty()) {
Arrays.stream(exchangeHeaders.split(EXCHANGE_HEADER_DELIM))
.map(String::trim)
.forEach(builder::withExchangeHeader);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ private RESTCatalogProperties() {
* A Bearer authorization token which will be used to authenticate requests with the server.
*/
public static final String AUTH_TOKEN = "token";

/**
* Comma separated list of response headers that should be exchanged for future requests.
*/
public static final String EXCHANGE_HEADERS = "exchange.headers";
}
22 changes: 22 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.ErrorResponseParser;
Expand All @@ -48,6 +49,7 @@ public class TestHTTPClient {

private static final int PORT = 1080;
private static final String BEARER_AUTH_TOKEN = "auth_token";
private static final String EXCHANGE_HEADER = "Exchange_1";
private static final String URI = String.format("http://127.0.0.1:%d", PORT);
private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();

Expand All @@ -61,6 +63,7 @@ public static void beforeClass() {
.builder()
.uri(URI)
.withBearerAuth(BEARER_AUTH_TOKEN)
.withExchangeHeader(EXCHANGE_HEADER)
.build();
}

Expand Down Expand Up @@ -110,6 +113,25 @@ public void testHeadFailure() throws JsonProcessingException {
testHttpMethodOnFailure(HttpMethod.HEAD);
}

@Test
public void testHeaderExchange() throws JsonProcessingException {
String exchangeValue = "exchanged-value";
String path = "exchange";

HttpRequest request = request("/" + path).withMethod(HttpMethod.GET.name());

// Response with an exchange header after the first request
mockServer.when(request).respond(response().withStatusCode(HttpStatus.SC_NO_CONTENT)
.withHeader(EXCHANGE_HEADER, exchangeValue));

// First request without header
restClient.get(path, null, null);

// Following request should have the header requested to be exchanged by the server
restClient.get(path, null, null);
mockServer.verify(request().withHeader(EXCHANGE_HEADER, exchangeValue));
}

public static void testHttpMethodOnSuccess(HttpMethod method) throws JsonProcessingException {
Item body = new Item(0L, "hank");
int statusCode = 200;
Expand Down