Skip to content

Commit

Permalink
refactor: remove code smells and improve coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Nuri <marc@marcnuri.com>
  • Loading branch information
manusa committed Feb 10, 2023
1 parent cf761d2 commit 85588d6
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.fabric8.kubernetes.client.http;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
Expand Down Expand Up @@ -75,9 +74,8 @@ public <V> CompletableFuture<HttpResponse<V>> sendAsync(HttpRequest request, Cla
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, Consumer<List<ByteBuffer>> consumer) {
CompletableFuture<HttpResponse<AsyncBody>> result = new CompletableFuture<>();

retryWithExponentialBackoff(result, () -> consumeBytesOnce(request, consumer), request.uri(), r -> r.code(),
r -> r.body().cancel(),
null);
retryWithExponentialBackoff(result, () -> consumeBytesOnce(request, consumer), request.uri(), HttpResponse::code,
r -> r.body().cancel());
return result;
}

Expand Down Expand Up @@ -133,22 +131,10 @@ protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator) {

if (retryIntervalCalculator == null) {
Config requestConfig = this.builder.getRequestConfig();
int requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
int requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
if (requestConfig != null) {
requestRetryBackoffInterval = requestConfig.getRequestRetryBackoffInterval();
requestRetryBackoffLimit = requestConfig.getRequestRetryBackoffLimit();
}
retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, requestRetryBackoffLimit);
}

final ExponentialBackoffIntervalCalculator backoff = retryIntervalCalculator;
action.get()
.whenComplete((response, throwable) -> {
if (backoff.shouldRetry() && !result.isDone()) {
long retryInterval = backoff.nextReconnectInterval();
if (retryIntervalCalculator.shouldRetry() && !result.isDone()) {
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null) {
Integer code = codeExtractor.apply(response);
Expand All @@ -164,7 +150,7 @@ protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
}
if (retry) {
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, backoff),
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, retryIntervalCalculator),
retryInterval,
TimeUnit.MILLISECONDS);
return;
Expand All @@ -174,6 +160,13 @@ protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
});
}

protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel) {
retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel,
ExponentialBackoffIntervalCalculator.from(builder.getRequestConfig()));
}

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new StandardWebSocketBuilder(this);
Expand All @@ -193,8 +186,7 @@ final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder stand
retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener),
standardWebSocketBuilder.asHttpRequest().uri(),
r -> Optional.ofNullable(r.wshse).map(WebSocketHandshakeException::getResponse).map(HttpResponse::code).orElse(null),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)),
null);
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)));

CompletableFuture<WebSocket> result = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.fabric8.kubernetes.client.utils;

import io.fabric8.kubernetes.client.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class ExponentialBackoffIntervalCalculator {
Expand All @@ -31,13 +33,14 @@ public class ExponentialBackoffIntervalCalculator {
private final int initialInterval;
// we were using the same default in multiple places, so it has been moved here for now
// other calculators express this as max wait
private final int maxRetryIntervalExponent = MAX_RETRY_INTERVAL_EXPONENT;
private final int maxRetryIntervalExponent;
private final int maxRetries;
final AtomicInteger currentReconnectAttempt = new AtomicInteger();

public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetries = maxRetries;
this.maxRetryIntervalExponent = MAX_RETRY_INTERVAL_EXPONENT;
}

public long getInterval(int retryIndex) {
Expand Down Expand Up @@ -67,4 +70,13 @@ public boolean shouldRetry() {
return maxRetries < 0 || currentReconnectAttempt.get() < maxRetries;
}

public static ExponentialBackoffIntervalCalculator from(Config requestConfig) {
final int requestRetryBackoffInterval = Optional.ofNullable(requestConfig)
.map(Config::getRequestRetryBackoffInterval)
.orElse(Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL);
final int requestRetryBackoffLimit = Optional.ofNullable(requestConfig)
.map(Config::getRequestRetryBackoffLimit)
.orElse(Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT);
return new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, requestRetryBackoffLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/
package io.fabric8.kubernetes.client.http;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.HttpClient.Factory;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,59 +25,22 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class StandardHttpClientTest {

private static final class TestableStandardHttpClient
extends StandardHttpClient<HttpClient, Factory, StandardHttpClientBuilder<HttpClient, Factory, ?>> {
List<CompletableFuture<WebSocketResponse>> wsFutures = new ArrayList<>();
int wsIndex;
List<CompletableFuture<HttpResponse<AsyncBody>>> respFutures = new ArrayList<>();
int respIndex;

private TestableStandardHttpClient() {
super(Mockito.mock(StandardHttpClientBuilder.class));
}

@Override
public void close() {
throw new UnsupportedOperationException();
}

@Override
public synchronized CompletableFuture<WebSocketResponse> buildWebSocketDirect(
StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {
if (wsFutures.size() <= wsIndex) {
wsFutures.add(new CompletableFuture<>());
}
return wsFutures.get(wsIndex++);
}

@Override
public synchronized CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHttpRequest request,
Consumer<List<ByteBuffer>> consumer) {
if (respFutures.size() <= respIndex) {
respFutures.add(new CompletableFuture<>());
}
return respFutures.get(respIndex++);
}
}

private TestableStandardHttpClient client;
private TestStandardHttpClient client;

@BeforeEach
void setup() {
client = new TestableStandardHttpClient();
client = new TestStandardHttpClientFactory().newBuilder().build();
}

@Test
Expand All @@ -87,47 +49,43 @@ void webSocketFutureCancel() {
.buildAsync(new Listener() {
});

WebSocket ws = Mockito.mock(WebSocket.class);
WebSocket ws = mock(WebSocket.class);

// cancel the future before the websocket response
future.cancel(true);
client.wsFutures.get(0).complete(new WebSocketResponse(ws, null));
client.getWsFutures().get(0).complete(new WebSocketResponse(ws, null));

// ensure that the ws has been closed
Mockito.verify(ws).sendClose(1000, null);
}

@Test
void consumeBytesFutureCancel() {
HttpResponse<AsyncBody> asyncResp = Mockito.mock(HttpResponse.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(asyncResp.body()).thenReturn(Mockito.mock(AsyncBody.class));
final HttpResponse<AsyncBody> asyncResp = new TestHttpResponse<AsyncBody>().withBody(mock(AsyncBody.class));

CompletableFuture<?> consumeFuture = client.consumeBytes(client.newHttpRequestBuilder().uri("http://localhost").build(),
new Consumer<List<ByteBuffer>>() {
@Override
public void consume(List<ByteBuffer> value, AsyncBody asyncBody) throws Exception {
CompletableFuture<HttpResponse<AsyncBody>> consumeFuture = client.consumeBytes(
client.newHttpRequestBuilder().uri("http://localhost").build(),
(value, asyncBody) -> {

}
});

// cancel the future before the response
consumeFuture.cancel(true);
client.respFutures.get(0).complete(asyncResp);
client.getRespFutures().get(0).complete(asyncResp);
Mockito.verify(asyncResp.body()).cancel();
}

@Test
void sendAsyncFutureCancel() {
HttpResponse<AsyncBody> asyncResp = Mockito.mock(HttpResponse.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(asyncResp.body()).thenReturn(Mockito.mock(AsyncBody.class));
Mockito.when(asyncResp.body().done()).thenReturn(new CompletableFuture<>());
final HttpResponse<AsyncBody> asyncResp = new TestHttpResponse<AsyncBody>().withBody(mock(AsyncBody.class));
when(asyncResp.body().done()).thenReturn(new CompletableFuture<>());

CompletableFuture<?> sendAsyncFuture = client.sendAsync(client.newHttpRequestBuilder().uri("http://localhost").build(),
InputStream.class);

// cancel the future before the response
sendAsyncFuture.cancel(true);
client.respFutures.get(0).complete(asyncResp);
client.getRespFutures().get(0).complete(asyncResp);
Mockito.verify(asyncResp.body()).cancel();
}

Expand All @@ -136,7 +94,7 @@ void testNoHttpRetryWithDefaultConfig() throws InterruptedException {
CompletableFuture<?> sendAsyncFuture = client.sendAsync(client.newHttpRequestBuilder().uri("http://localhost").build(),
InputStream.class);

client.respFutures.get(0).completeExceptionally(new IOException());
client.getRespFutures().get(0).completeExceptionally(new IOException());

try {
sendAsyncFuture.get();
Expand All @@ -148,26 +106,22 @@ void testNoHttpRetryWithDefaultConfig() throws InterruptedException {

@Test
void testHttpRetryWithMoreFailuresThanRetries() throws Exception {
Mockito.when(client.builder.getRequestConfig())
.thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default")
.withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build());
client = client.newBuilder().requestConfig(new ConfigBuilder(Config.empty())
.withRequestRetryBackoffLimit(3)
.withRequestRetryBackoffInterval(50).build())
.build();

CompletableFuture<HttpResponse<AsyncBody>> consumeFuture = client.consumeBytes(
client.newHttpRequestBuilder().uri("http://localhost").build(),
new Consumer<List<ByteBuffer>>() {
@Override
public void consume(List<ByteBuffer> value, AsyncBody asyncBody) throws Exception {
(value, asyncBody) -> {

}
});

HttpResponse<AsyncBody> error = Mockito.mock(HttpResponse.class);
Mockito.when(error.code()).thenReturn(500);
long start = System.currentTimeMillis();
client.respFutures.get(0).completeExceptionally(new IOException());
client.respFutures.add(client.respFutures.get(0));
client.respFutures.add(client.respFutures.get(0));
client.respFutures.add(CompletableFuture.completedFuture(error));
client.getRespFutures().get(0).completeExceptionally(new IOException());
client.getRespFutures().add(client.getRespFutures().get(0));
client.getRespFutures().add(client.getRespFutures().get(0));
client.getRespFutures().add(CompletableFuture.completedFuture(new TestHttpResponse<AsyncBody>().withCode(500)));

// should ultimately error with the final 500
assertEquals(500, consumeFuture.get().code());
Expand All @@ -177,61 +131,55 @@ public void consume(List<ByteBuffer> value, AsyncBody asyncBody) throws Exceptio
assertTrue(stop - start >= 350); //50+100+200

// only 4 requests issued
assertEquals(4, client.respFutures.size());
assertEquals(4, client.getRespFutures().size());
}

@Test
void testHttpRetryWithLessFailuresThanRetries() throws Exception {
Mockito.when(client.builder.getRequestConfig())
.thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default")
.withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build());

HttpResponse<AsyncBody> error = Mockito.mock(HttpResponse.class);
Mockito.when(error.code()).thenReturn(500);
client.respFutures.add(CompletableFuture.completedFuture(error));
client.respFutures.add(CompletableFuture.completedFuture(error));
client.respFutures.add(CompletableFuture.completedFuture(error));
HttpResponse<AsyncBody> success = Mockito.mock(HttpResponse.class);
Mockito.when(error.code()).thenReturn(200);
client.respFutures.add(CompletableFuture.completedFuture(success));
client = client.newBuilder().requestConfig(new ConfigBuilder(Config.empty())
.withRequestRetryBackoffLimit(3)
.withRequestRetryBackoffInterval(50).build())
.build();

final HttpResponse<AsyncBody> error = new TestHttpResponse<AsyncBody>().withCode(500);
client.getRespFutures().add(CompletableFuture.completedFuture(error));
client.getRespFutures().add(CompletableFuture.completedFuture(error));
client.getRespFutures().add(CompletableFuture.completedFuture(error));
client.getRespFutures().add(CompletableFuture.completedFuture(new TestHttpResponse<AsyncBody>().withCode(200)));

CompletableFuture<HttpResponse<AsyncBody>> consumeFuture = client.consumeBytes(
client.newHttpRequestBuilder().uri("http://localhost").build(),
new Consumer<List<ByteBuffer>>() {
@Override
public void consume(List<ByteBuffer> value, AsyncBody asyncBody) throws Exception {

}
(value, asyncBody) -> {
});

// should ultimately succeed with the final 500
// should ultimately succeed with the final 200
assertEquals(200, consumeFuture.get().code());

// only 4 requests issued
assertEquals(4, client.respFutures.size());
assertEquals(4, client.getRespFutures().size());
}

@Test
void testWebSocketWithLessFailuresThanRetries() throws Exception {
Mockito.when(client.builder.getRequestConfig())
.thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default")
.withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build());
client = client.newBuilder().requestConfig(new ConfigBuilder(Config.empty())
.withRequestRetryBackoffLimit(3)
.withRequestRetryBackoffInterval(50).build())
.build();

WebSocket ws = Mockito.mock(WebSocket.class);
WebSocket ws = mock(WebSocket.class);

CompletableFuture<WebSocket> future = client.newWebSocketBuilder().uri(URI.create("ws://localhost"))
.buildAsync(new Listener() {
});

HttpResponse<AsyncBody> error = Mockito.mock(HttpResponse.class);
Mockito.when(error.code()).thenReturn(500);
client.wsFutures.get(0).completeExceptionally(new WebSocketHandshakeException(error));
client.wsFutures.add(client.wsFutures.get(0));
client.wsFutures.add(CompletableFuture.completedFuture((new WebSocketResponse(ws, null))));
client.getWsFutures().get(0)
.completeExceptionally(new WebSocketHandshakeException(new TestHttpResponse<AsyncBody>().withCode(500)));
client.getWsFutures().add(client.getWsFutures().get(0));
client.getWsFutures().add(CompletableFuture.completedFuture((new WebSocketResponse(ws, null))));

future.get();

assertEquals(3, client.wsFutures.size());
assertEquals(3, client.getWsFutures().size());
}

}
Loading

0 comments on commit 85588d6

Please sign in to comment.