Skip to content

Commit

Permalink
fix #4201: generalizing sendAsync support
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 19, 2022
1 parent 66ca429 commit 23ad882
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,11 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -253,35 +247,6 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
} else if (type == InputStream.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofInputStream();
} else if (type == String.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofString();
} else if (type == byte[].class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofByteArray();
} else {
bodyHandler = responseInfo -> {
BodySubscriber<InputStream> upstream = BodyHandlers.ofInputStream().apply(responseInfo);

BodySubscriber<Reader> downstream = BodySubscribers.mapping(
upstream,
(InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8));
return (BodySubscriber<T>) downstream;
};
}
return bodyHandler;
}

public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.websocket.client.WebSocketClient;
Expand All @@ -40,7 +38,6 @@

import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN;
import static org.eclipse.jetty.util.BufferUtil.toArray;

public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient {

Expand Down Expand Up @@ -74,39 +71,10 @@ public DerivedClientBuilder newBuilder() {
return builder.copy();
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest originalRequest, Class<T> type) {
final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type);
final var request = toStandardHttpRequest(originalRequest);
final CompletableFuture<HttpResponse<T>> future = new CompletableFuture<>();
newRequest(request).send(new BufferingResponseListener() {

// TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory.
// Find a way to stream the response body without completing the future
// We need two signals, one when the response is received, and one when the body is completely
// read.
// Should this method be completely replaced by consumeXxx()?
@Override
public void onComplete(Result result) {
future.complete(new JettyHttpResponse<>(
request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type)));
}
});
return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, BodyConsumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener<>(request, consumer) {

@Override
protected String process(Response response, ByteBuffer content) {
return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class);
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
throw new UnsupportedOperationException("Not supported by the Jetty client");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.utils.Utils;
import org.eclipse.jetty.client.api.Response;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

public class JettyHttpResponse<T> implements HttpResponse<T> {

Expand Down Expand Up @@ -82,45 +76,4 @@ public Optional<HttpResponse<?>> previousResponse() {
return Optional.empty();
}

enum SupportedResponse {

TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))),
INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)),
READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))),
BYTE_ARRAY(byte[].class, (r, bytes) -> bytes);

private final Class<?> type;
private final BiFunction<Response, byte[], Object> processor;

SupportedResponse(Class<?> type, BiFunction<Response, byte[], Object> processor) {
this.type = type;
this.processor = processor;
}

public <T> T process(Response response, byte[] bytes, Class<T> type) {
return type.cast(processor.apply(response, bytes));
}

static SupportedResponse from(Class<?> type) {
for (SupportedResponse sr : SupportedResponse.values()) {
if (type.isAssignableFrom(sr.type)) {
return sr;
}
}
throw new IllegalArgumentException("Unsupported response type: " + type.getName());
}

private static Charset responseCharset(Response response) {
var responseCharset = StandardCharsets.UTF_8;
final var responseEncoding = response.getHeaders().get("Content-Encoding");
if (Utils.isNotNullOrEmpty(responseEncoding)) {
try {
responseCharset = Charset.forName(responseEncoding);
} catch (Exception e) {
// ignored
}
}
return responseCharset;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void consumeLinesNotProcessedIfCancelled() throws Exception {
// consume lines not supported
}

@Override
public void consumeLinesProcessedAfterConsume() throws Exception {
// consume lines not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep
}
}

@Test
@DisplayName("sendAsync with unsupported type throws Exception")
void sendAsyncUnsupportedType() {
try (var jettyHttpClient = new JettyHttpClient(
null, httpClient, webSocketClient, Collections.emptyList(), null)) {
// When
final var result = assertThrows(IllegalArgumentException.class,
() -> jettyHttpClient.sendAsync(null, Integer.class));
// Then
assertThat(result).hasMessage("Unsupported response type: java.lang.Integer");
}
}

@Test
@DisplayName("sendAsync with unsupported HttpRequest throws Exception")
void sendAsyncUnsupportedHttpRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@

import org.eclipse.jetty.client.HttpResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

class JettyHttpResponseTest {

Expand All @@ -56,22 +47,4 @@ void headersHandlesJettyHttpFields() {
.containsEntry("Via", Arrays.asList("proxy-1", "proxy-2"));
}

@ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''")
@MethodSource("supportedResponsesInput")
void supportedResponses(Class<?> type, JettyHttpResponse.SupportedResponse supportedResponse) {
// When
final var result = JettyHttpResponse.SupportedResponse.from(type);
// Then
assertThat(result).isEqualTo(supportedResponse);
}

static Stream<Arguments> supportedResponsesInput() {
return Stream.of(
arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT),
arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM),
arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM),
arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER),
arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER),
arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public class JettyInterceptorTest extends AbstractInterceptorTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// consume lines not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,30 +257,6 @@ public void onFailure(Call call, IOException e) {
return future;
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
CompletableFuture<HttpResponse<T>> future = new CompletableFuture<>();
Call call = httpClient.newCall(((OkHttpRequestImpl) request).getRequest());
call.enqueue(new Callback() {

@Override
public void onResponse(Call call, Response response) throws IOException {
future.complete(new OkHttpResponseImpl<>(response, type));
}

@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}
});
future.whenComplete((r, t) -> {
if (future.isCancelled()) {
call.cancel();
}
});
return future;
}

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
void testConsumeLines() throws Exception {
// line parsing not yet supported
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.io.InputStream;
import java.io.Reader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -178,6 +180,41 @@ void testAsyncBody() throws Exception {
assertTrue(consumed.get(5, TimeUnit.SECONDS));
}

@Test
void testConsumeLines() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always();

ArrayList<String> strings = new ArrayList<>();
CompletableFuture<Void> consumed = new CompletableFuture<>();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeLines(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
strings.add(value);
asyncBody.consume();
});

responseFuture.whenComplete((r, t) -> {
if (t != null) {
consumed.completeExceptionally(t);
}
if (r != null) {
r.body().consume();
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
} else {
consumed.complete(null);
}
});
}
});

consumed.get(5, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world", "lines"), strings);
}

@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
Expand Down

0 comments on commit 23ad882

Please sign in to comment.