Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4788 moving retry logic into the standard layer #4825

Merged
merged 2 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Fix #4739: honor optimistic concurrency control semantics in the mock server for `PUT` and `PATCH` requests.
* Fix #4644: generate CRDs in parallel and optimize code
* Fix #4795: don't print warning message when service account token property is unset
* Fix #4788: moved retry logic into the standard client so that it applies to all requests, including websockets

#### Dependency Upgrade
* Fix #4655: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.26.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,28 @@

import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class StandardHttpClient<C extends HttpClient, F extends HttpClient.Factory, T extends StandardHttpClientBuilder<C, F, ?>>
implements HttpClient {

private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

protected StandardHttpClientBuilder<C, F, T> builder;

protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder) {
Expand All @@ -50,15 +61,26 @@ public DerivedClientBuilder newBuilder() {
@Override
public <V> CompletableFuture<HttpResponse<V>> sendAsync(HttpRequest request, Class<V> type) {
CompletableFuture<HttpResponse<V>> upstream = HttpResponse.SupportedResponses.from(type).sendAsync(request, this);
return withUpstreamCancellation(upstream, b -> {
if (b instanceof Closeable) {
Utils.closeQuietly((Closeable) b);
final CompletableFuture<HttpResponse<V>> result = new CompletableFuture<>();
upstream.whenComplete(completeOrCancel(r -> {
if (r.body() instanceof Closeable) {
Utils.closeQuietly((Closeable) r.body());
}
});
}, result));
return result;
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, Consumer<List<ByteBuffer>> consumer) {
CompletableFuture<HttpResponse<AsyncBody>> result = new CompletableFuture<>();

retryWithExponentialBackoff(result, () -> consumeBytesOnce(request, consumer), request.uri(), HttpResponse::code,
r -> r.body().cancel());
return result;
}

private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(HttpRequest request,
Consumer<List<ByteBuffer>> consumer) {
StandardHttpRequest standardHttpRequest = (StandardHttpRequest) request;
StandardHttpRequest.Builder copy = standardHttpRequest.newBuilder();
for (Interceptor interceptor : builder.getInterceptors().values()) {
Expand All @@ -85,24 +107,63 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
return CompletableFuture.completedFuture(response);
});
}

return withUpstreamCancellation(cf, AsyncBody::cancel);
return cf;
}

static <V> CompletableFuture<HttpResponse<V>> withUpstreamCancellation(CompletableFuture<HttpResponse<V>> cf,
java.util.function.Consumer<V> cancel) {
final CompletableFuture<HttpResponse<V>> result = new CompletableFuture<>();
cf.whenComplete((r, t) -> {
private static <V> BiConsumer<? super V, ? super Throwable> completeOrCancel(java.util.function.Consumer<V> cancel,
final CompletableFuture<V> result) {
return (r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
// if already completed, take responsibility to proactively close
if (!result.complete(r)) {
cancel.accept(r.body());
cancel.accept(r);
}
}
});
return result;
};
}

/**
* Will retry the action if needed based upon the retry settings provided by the ExponentialBackoffIntervalCalculator.
*/
protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator) {

action.get()
.whenComplete((response, throwable) -> {
if (retryIntervalCalculator.shouldRetry() && !result.isDone()) {
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null) {
Integer code = codeExtractor.apply(response);
if (code != null && code >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
retry = true;
}
} else if (throwable instanceof IOException) {
LOG.debug(String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval), throwable);
retry = true;
}
if (retry) {
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, retryIntervalCalculator),
retryInterval,
TimeUnit.MILLISECONDS);
return;
}
}
completeOrCancel(cancel, result).accept(response, throwable);
});
}

protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel) {
retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel,
ExponentialBackoffIntervalCalculator.from(builder.getRequestConfig()));
}

@Override
Expand All @@ -119,6 +180,28 @@ public HttpRequest.Builder newHttpRequestBuilder() {
final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {

CompletableFuture<WebSocketResponse> intermediate = new CompletableFuture<>();

retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener),
standardWebSocketBuilder.asHttpRequest().uri(),
r -> Optional.ofNullable(r.wshse).map(WebSocketHandshakeException::getResponse).map(HttpResponse::code).orElse(null),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)));

CompletableFuture<WebSocket> result = new CompletableFuture<>();

// map to a websocket
intermediate.whenComplete((r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
completeOrCancel(w -> w.sendClose(1000, null), result).accept(r.webSocket, r.wshse);
}
});
return result;
}

private CompletableFuture<WebSocketResponse> buildWebSocketOnce(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {
final StandardWebSocketBuilder copy = standardWebSocketBuilder.newBuilder();
builder.getInterceptors().values().stream().map(Interceptor.useConfig(builder.requestConfig))
.forEach(i -> i.before(copy, copy.asHttpRequest()));
Expand All @@ -138,32 +221,7 @@ final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder stand
return CompletableFuture.completedFuture(response);
});
}

final CompletableFuture<WebSocket> result = new CompletableFuture<>();
// map back to the expected convention with the future completed by the response exception
cf.whenComplete(onWebSocketComplete(result));
return result;

}

private static BiConsumer<WebSocketResponse, Throwable> onWebSocketComplete(CompletableFuture<WebSocket> result) {
return (r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else if (r != null) {
if (r.wshse != null) {
result.completeExceptionally(r.wshse);
} else {
// if already completed, take responsibility to proactively close
if (!result.complete(r.webSocket)) {
r.webSocket.sendClose(1000, null);
}
}
} else {
// shouldn't happen
result.complete(null);
}
};
return cf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils.internal;
package io.fabric8.kubernetes.client.utils;

import io.fabric8.kubernetes.client.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class ExponentialBackoffIntervalCalculator {

private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class);

private static final int MAX_RETRY_INTERVAL_EXPONENT = 5;

public static final int UNLIMITED_RETRIES = -1;

private final int initialInterval;
// we were using the same default in multiple places, so it has been moved here for now
// other calculators express this as max wait
private final int maxRetryIntervalExponent;
private final int maxRetries;
final AtomicInteger currentReconnectAttempt = new AtomicInteger();

public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) {
public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetryIntervalExponent = maxRetryIntervalExponent;
this.maxRetries = maxRetries;
this.maxRetryIntervalExponent = MAX_RETRY_INTERVAL_EXPONENT;
}

public long getInterval(int retryIndex) {
Expand All @@ -56,4 +66,17 @@ public int getCurrentReconnectAttempt() {
return currentReconnectAttempt.get();
}

public boolean shouldRetry() {
return maxRetries < 0 || currentReconnectAttempt.get() < maxRetries;
}

public static ExponentialBackoffIntervalCalculator from(Config requestConfig) {
final int requestRetryBackoffInterval = Optional.ofNullable(requestConfig)
.map(Config::getRequestRetryBackoffInterval)
.orElse(Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL);
final int requestRetryBackoffLimit = Optional.ofNullable(requestConfig)
.map(Config::getRequestRetryBackoffLimit)
.orElse(Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT);
return new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, requestRetryBackoffLimit);
}
}
Loading