From 2c0c7a9283bb1e17fad68585dfb9d6f7fc9ee6ec Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 21 Aug 2023 16:58:03 +0200 Subject: [PATCH] Core: Add exponential retry strategy to REST client --- .../ExponentialHttpRequestRetryStrategy.java | 148 +++++++++++++ .../org/apache/iceberg/rest/HTTPClient.java | 32 ++- ...stExponentialHttpRequestRetryStrategy.java | 199 ++++++++++++++++++ 3 files changed, 377 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java create mode 100644 core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java diff --git a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java new file mode 100644 index 000000000000..b51f77a02638 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import javax.net.ssl.SSLException; +import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import org.apache.hc.client5.http.utils.DateUtils; +import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.http.ConnectionClosedException; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.TimeValue; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +/** + * Defines an exponential HTTP request retry strategy and provides the same characteristics as the + * {@link org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy}, + * + *

using the following list of non-retriable I/O exception classes:
+ * + *

+ * + * and retriable HTTP status codes:
+ * + * + */ +class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy { + private final int maxRetries; + private final Set> nonRetriableExceptions; + private final Set retriableCodes; + + ExponentialHttpRequestRetryStrategy(int maximumRetries) { + Preconditions.checkArgument( + maximumRetries > 0, "Cannot set retries to %s, the value must be positive", maximumRetries); + this.maxRetries = maximumRetries; + this.retriableCodes = + ImmutableSet.of(HttpStatus.SC_TOO_MANY_REQUESTS, HttpStatus.SC_SERVICE_UNAVAILABLE); + this.nonRetriableExceptions = + ImmutableSet.of( + InterruptedIOException.class, + UnknownHostException.class, + ConnectException.class, + ConnectionClosedException.class, + NoRouteToHostException.class, + SSLException.class); + } + + @Override + public boolean retryRequest( + HttpRequest request, IOException exception, int execCount, HttpContext context) { + if (execCount > maxRetries) { + // Do not retry if over max retries + return false; + } + + if (nonRetriableExceptions.contains(exception.getClass())) { + return false; + } else { + for (Class rejectException : nonRetriableExceptions) { + if (rejectException.isInstance(exception)) { + return false; + } + } + } + + if (request instanceof CancellableDependency + && ((CancellableDependency) request).isCancelled()) { + return false; + } + + // Retry if the request is considered idempotent + return Method.isIdempotent(request.getMethod()); + } + + @Override + public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) { + return execCount <= maxRetries && retriableCodes.contains(response.getCode()); + } + + @Override + public TimeValue getRetryInterval(HttpResponse response, int execCount, HttpContext context) { + // a server may send a 429 / 503 with a Retry-After header + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + Header header = response.getFirstHeader(HttpHeaders.RETRY_AFTER); + TimeValue retryAfter = null; + if (header != null) { + String value = header.getValue(); + try { + retryAfter = TimeValue.ofSeconds(Long.parseLong(value)); + } catch (NumberFormatException ignore) { + Instant retryAfterDate = DateUtils.parseStandardDate(value); + if (retryAfterDate != null) { + retryAfter = + TimeValue.ofMilliseconds(retryAfterDate.toEpochMilli() - System.currentTimeMillis()); + } + } + + if (TimeValue.isPositive(retryAfter)) { + return retryAfter; + } + } + + int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1), 64.0); + int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1))); + + return TimeValue.ofMilliseconds(delayMillis + jitter); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java index b2b6fc8a7c59..0f26ae7a35dc 100644 --- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -26,10 +26,12 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.hc.client5.http.classic.methods.HttpUriRequest; import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; @@ -47,6 +49,7 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.net.URIBuilder; +import org.apache.hc.core5.util.TimeValue; import org.apache.iceberg.IcebergBuild; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; @@ -71,6 +74,12 @@ public class HTTPClient implements RESTClient { @VisibleForTesting static final String CLIENT_GIT_COMMIT_SHORT_HEADER = "X-Client-Git-Commit-Short"; + private static final String RETRY_STRATEGY = "rest.client.retry-strategy"; + private static final String REST_MAX_RETRIES = "rest.client.max-retries"; + private static final String REST_RETRY_INTERVAL_MILLIS = "rest.client.retry-interval-millis"; + private static final String RETRY_STRATEGY_DEFAULT = "default"; + private static final String RETRY_STRATEGY_EXPONENTIAL = "exponential"; + private final String uri; private final CloseableHttpClient httpClient; private final ObjectMapper mapper; @@ -79,7 +88,8 @@ private HTTPClient( String uri, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor) { + HttpRequestInterceptor requestInterceptor, + Map properties) { this.uri = uri; this.mapper = objectMapper; @@ -96,6 +106,24 @@ private HTTPClient( clientBuilder.addRequestInterceptorLast(requestInterceptor); } + if (properties.containsKey(RETRY_STRATEGY)) { + String retryStrategy = properties.getOrDefault(RETRY_STRATEGY, RETRY_STRATEGY_DEFAULT); + if (RETRY_STRATEGY_DEFAULT.equalsIgnoreCase(retryStrategy)) { + // max retries = 1 and retry interval = 1000L are the defaults defined by + // DefaultHttpRequestRetryStrategy + int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 1); + long retryIntervalMillis = + PropertyUtil.propertyAsLong(properties, REST_RETRY_INTERVAL_MILLIS, 1000L); + + clientBuilder.setRetryStrategy( + new DefaultHttpRequestRetryStrategy( + maxRetries, TimeValue.of(retryIntervalMillis, TimeUnit.MILLISECONDS))); + } else if (RETRY_STRATEGY_EXPONENTIAL.equalsIgnoreCase(retryStrategy)) { + int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); + clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); + } + } + this.httpClient = clientBuilder.build(); } @@ -466,7 +494,7 @@ public HTTPClient build() { interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); } - return new HTTPClient(uri, baseHeaders, mapper, interceptor); + return new HTTPClient(uri, baseHeaders, mapper, interceptor, properties); } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java new file mode 100644 index 000000000000..e63bdfd06758 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import javax.net.ssl.SSLException; +import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.utils.DateUtils; +import org.apache.hc.core5.http.ConnectionClosedException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestExponentialHttpRequestRetryStrategy { + + private final HttpRequestRetryStrategy retryStrategy = new ExponentialHttpRequestRetryStrategy(5); + + @ParameterizedTest + @ValueSource(ints = {-1, 0}) + public void invalidRetries(int retries) { + assertThatThrownBy(() -> new ExponentialHttpRequestRetryStrategy(retries)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(String.format("Cannot set retries to %s, the value must be positive", retries)); + } + + @Test + public void exponentialRetry() { + HttpRequestRetryStrategy strategy = new ExponentialHttpRequestRetryStrategy(10); + BasicHttpResponse response = new BasicHttpResponse(503, "Oopsie"); + + // note that the upper limit includes ~10% variability + assertThat(strategy.getRetryInterval(response, 0, null).toMilliseconds()).isEqualTo(0); + assertThat(strategy.getRetryInterval(response, 1, null).toMilliseconds()) + .isBetween(1000L, 2000L); + assertThat(strategy.getRetryInterval(response, 2, null).toMilliseconds()) + .isBetween(2000L, 3000L); + assertThat(strategy.getRetryInterval(response, 3, null).toMilliseconds()) + .isBetween(4000L, 5000L); + assertThat(strategy.getRetryInterval(response, 4, null).toMilliseconds()) + .isBetween(8000L, 9000L); + assertThat(strategy.getRetryInterval(response, 5, null).toMilliseconds()) + .isBetween(16000L, 18000L); + assertThat(strategy.getRetryInterval(response, 6, null).toMilliseconds()) + .isBetween(32000L, 36000L); + assertThat(strategy.getRetryInterval(response, 7, null).toMilliseconds()) + .isBetween(64000L, 72000L); + assertThat(strategy.getRetryInterval(response, 10, null).toMilliseconds()) + .isBetween(64000L, 72000L); + } + + @Test + public void basicRetry() { + BasicHttpResponse response503 = new BasicHttpResponse(503, "Oopsie"); + assertThat(retryStrategy.retryRequest(response503, 3, null)).isTrue(); + + BasicHttpResponse response429 = new BasicHttpResponse(429, "Oopsie"); + assertThat(retryStrategy.retryRequest(response429, 3, null)).isTrue(); + + BasicHttpResponse response404 = new BasicHttpResponse(404, "Oopsie"); + assertThat(retryStrategy.retryRequest(response404, 3, null)).isFalse(); + } + + @Test + public void noRetryOnConnectTimeout() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null)) + .isFalse(); + } + + @Test + public void noRetryOnConnect() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new ConnectException(), 1, null)).isFalse(); + } + + @Test + public void noRetryOnConnectionClosed() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new ConnectionClosedException(), 1, null)) + .isFalse(); + } + + @Test + public void noRetryForNoRouteToHostException() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new NoRouteToHostException(), 1, null)) + .isFalse(); + } + + @Test + public void noRetryOnSSLFailure() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new SSLException("encryption failed"), 1, null)) + .isFalse(); + } + + @Test + public void noRetryOnUnknownHost() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new UnknownHostException(), 1, null)).isFalse(); + } + + @Test + public void noRetryOnInterruptedFailure() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new InterruptedIOException(), 1, null)) + .isFalse(); + } + + @Test + public void noRetryOnAbortedRequests() { + HttpGet request = new HttpGet("/"); + request.cancel(); + + assertThat(retryStrategy.retryRequest(request, new IOException(), 1, null)).isFalse(); + } + + @Test + public void retryOnNonAbortedRequests() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new IOException(), 1, null)).isTrue(); + } + + @Test + public void retryAfterHeaderAsLong() { + HttpResponse response = new BasicHttpResponse(503, "Oopsie"); + response.setHeader(HttpHeaders.RETRY_AFTER, "321"); + + assertThat(retryStrategy.getRetryInterval(response, 3, null).toSeconds()).isEqualTo(321L); + } + + @Test + public void retryAfterHeaderAsDate() { + HttpResponse response = new BasicHttpResponse(503, "Oopsie"); + response.setHeader( + HttpHeaders.RETRY_AFTER, + DateUtils.formatStandardDate(Instant.now().plus(100, ChronoUnit.SECONDS))); + + assertThat(retryStrategy.getRetryInterval(response, 3, null).toSeconds()).isBetween(0L, 100L); + } + + @Test + public void retryAfterHeaderAsPastDate() { + HttpResponse response = new BasicHttpResponse(503, "Oopsie"); + response.setHeader( + HttpHeaders.RETRY_AFTER, + DateUtils.formatStandardDate(Instant.now().minus(100, ChronoUnit.SECONDS))); + + assertThat(retryStrategy.getRetryInterval(response, 3, null).toMilliseconds()) + .isBetween(4000L, 5000L); + } + + @Test + public void invalidRetryAfterHeader() { + HttpResponse response = new BasicHttpResponse(503, "Oopsie"); + response.setHeader(HttpHeaders.RETRY_AFTER, "Stuff"); + + assertThat(retryStrategy.getRetryInterval(response, 3, null).toMilliseconds()) + .isBetween(4000L, 5000L); + } +}