Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep alive is now configurable per request #7122

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ public Http2ClientRequest property(String propertyName, String propertyValue) {
return this;
}

@Override
public Http2ClientRequest keepAlive(boolean keepAlive) {
//NOOP
return this;
}

@Override
public Http2ClientRequest priority(int priority) {
if (priority < 1 || priority > 256) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ void testConnectionQueueDequeue() {
connectionNow = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connectionNow);
Http1ClientResponse response = request.request();
// connection will be queued up
Expand All @@ -242,9 +243,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
connectionList.add(ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers()));
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive()));
request.connection(connectionList.get(i));
responseList.add(request.request());
}
Expand All @@ -261,9 +263,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
connection = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connection);
response = request.request();
if (i < connectionQueueSize) {
Expand All @@ -280,9 +283,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
ClientConnection connectionNow = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connectionNow);
Http1ClientResponse responseNow = request.request();
// Verify that the connection was dequeued
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ default <T> T request(Class<T> type) {
*/
B property(String propertyName, String propertyValue);

/**
* Whether to use keep alive with this request.
*
* @param keepAlive use keep alive
* @return updated client request
*/
B keepAlive(boolean keepAlive);

/**
* Handle output stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ClientRequestImpl implements Http1ClientRequest {
private ClientConnection connection;
private UriFragment fragment = UriFragment.empty();
private boolean skipUriEncoding = false;
private boolean keepAlive;

ClientRequestImpl(Http1ClientConfig clientConfig,
Http.Method method,
Expand All @@ -82,6 +83,7 @@ class ClientRequestImpl implements Http1ClientRequest {
this.maxRedirects = clientConfig.maxRedirects();
this.tls = clientConfig.tls().orElse(null);
this.query = query;
this.keepAlive = clientConfig.defaultKeepAlive();

this.requestId = "http1-client-" + COUNTER.getAndIncrement();
this.explicitHeaders = WritableHeaders.create(clientConfig.defaultHeaders());
Expand Down Expand Up @@ -199,7 +201,8 @@ public Http1ClientResponse outputStream(OutputStreamHandler streamHandler) {
rejectHeadWithEntity();
CompletableFuture<WebClientServiceRequest> whenSent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> whenComplete = new CompletableFuture<>();
WebClientService.Chain callChain = new HttpCallOutputStreamChain(clientConfig,
WebClientService.Chain callChain = new HttpCallOutputStreamChain(this,
clientConfig,
connection,
tls,
whenSent,
Expand Down Expand Up @@ -244,6 +247,12 @@ public Http1ClientRequest property(String propertyName, String propertyValue) {
return this;
}

@Override
public Http1ClientRequest keepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
return this;
}

Http1ClientConfig clientConfig() {
return clientConfig;
}
Expand All @@ -252,6 +261,10 @@ UriHelper uri() {
return uri;
}

boolean keepAlive() {
return keepAlive;
}

@Override
public ClientRequestHeaders headers() {
return ClientRequestHeaders.create(explicitHeaders);
Expand Down Expand Up @@ -306,7 +319,8 @@ private ClientResponseImpl invokeWithFollowRedirectsEntity(Object entity) {
private ClientResponseImpl invokeRequestWithEntity(Object entity) {
CompletableFuture<WebClientServiceRequest> whenSent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> whenComplete = new CompletableFuture<>();
WebClientService.Chain callChain = new HttpCallEntityChain(clientConfig,
WebClientService.Chain callChain = new HttpCallEntityChain(this,
clientConfig,
connection,
tls,
whenSent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ private ConnectionCache() {
static ClientConnection connection(Http1ClientConfig clientConfig,
Tls tls,
UriHelper uri,
ClientRequestHeaders headers) {
boolean keepAlive = handleKeepAlive(clientConfig.defaultKeepAlive(), headers);
ClientRequestHeaders headers,
boolean defaultKeepAlive) {
boolean keepAlive = handleKeepAlive(defaultKeepAlive, headers);
Tls effectiveTls = HTTPS.equals(uri.scheme()) ? tls : null;
if (keepAlive) {
return keepAliveConnection(clientConfig, effectiveTls, uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ abstract class HttpCallChainBase implements WebClientService.Chain {
private final Http1ClientConfig clientConfig;
private final ClientConnection connection;
private final Tls tls;
private final boolean keepAlive;

HttpCallChainBase(Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls) {
Tls tls,
boolean keepAlive) {
this.clientConfig = clientConfig;
this.connection = connection;
this.tls = tls;
this.keepAlive = keepAlive;
}

static void writeHeaders(Headers headers, BufferData bufferData, boolean validate) {
Expand Down Expand Up @@ -105,6 +108,7 @@ private ClientConnection obtainConnection(WebClientServiceRequest request) {
return ConnectionCache.connection(clientConfig,
tls,
request.uri(),
request.headers());
request.headers(),
keepAlive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ class HttpCallEntityChain extends HttpCallChainBase {
private final CompletableFuture<WebClientServiceResponse> whenComplete;
private final Object entity;

HttpCallEntityChain(Http1ClientConfig clientConfig,
HttpCallEntityChain(ClientRequestImpl request,
Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
Object entity) {
super(clientConfig, connection, tls);
super(clientConfig, connection, tls, request.keepAlive());
this.clientConfig = clientConfig;
this.whenSent = whenSent;
this.whenComplete = whenComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ class HttpCallOutputStreamChain extends HttpCallChainBase {
private final CompletableFuture<WebClientServiceResponse> whenComplete;
private final ClientRequest.OutputStreamHandler osHandler;

HttpCallOutputStreamChain(Http1ClientConfig clientConfig,
HttpCallOutputStreamChain(ClientRequestImpl clientRequest,
Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
ClientRequest.OutputStreamHandler osHandler) {
super(clientConfig, connection, tls);
super(clientConfig, connection, tls, clientRequest.keepAlive());
this.clientConfig = clientConfig;
this.whenSent = whenSent;
this.whenComplete = whenComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,10 @@ public FakeHttpClientRequest skipUriEncoding() {
public FakeHttpClientRequest property(String propertyName, String propertyValue) {
return null;
}

@Override
public FakeHttpClientRequest keepAlive(boolean keepAlive) {
return this;
}
}
}