diff --git a/src/main/java/io/vertx/core/http/HttpClientRequest.java b/src/main/java/io/vertx/core/http/HttpClientRequest.java
index 5f6d3135c0e..24f77176c1b 100644
--- a/src/main/java/io/vertx/core/http/HttpClientRequest.java
+++ b/src/main/java/io/vertx/core/http/HttpClientRequest.java
@@ -328,7 +328,7 @@ default boolean reset() {
*
*
* - for HTTP/2, this performs send an HTTP/2 reset frame with the specified error {@code code}
- * - for HTTP/1.x, this closes the connection after the current in-flight requests are ended
+ * - for HTTP/1.x, this closes the connection when the current request is inflight
*
*
* When the request has not yet been sent, the request will be aborted and false is returned as indicator.
diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
index 41cd7aa759f..4e7891b1fd6 100644
--- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
@@ -22,6 +22,7 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.ReferenceCountUtil;
+import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.VertxException;
@@ -63,7 +64,9 @@
*
* @author Tim Fox
*/
-class Http1xClientConnection extends Http1xConnectionBase implements HttpClientConnection, HttpClientStream {
+class Http1xClientConnection extends Http1xConnectionBase implements HttpClientConnection {
+
+ private static final int ST_NOOP = 0, ST_CLOSE = 1, ST_RECYCLE = 2;
private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);
@@ -77,17 +80,15 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
private final boolean keepAlive;
private final int pipeliningLimit;
// Requests can be pipelined so we need a queue to keep track of requests
- private final Deque requests = new ArrayDeque<>();
+ private final Deque inflight = new ArrayDeque<>();
private final HttpClientMetrics metrics;
private final HttpVersion version;
private WebSocketClientHandshaker handshaker;
- private HttpClientRequestImpl currentRequest;
- private HttpClientResponseImpl currentResponse;
- private HttpClientRequestImpl requestForResponse;
+ private StreamImpl currentRequest;
+ private StreamImpl currentResponse;
private WebSocketImpl ws;
- private boolean reset;
private boolean paused;
private Buffer pausedChunk;
@@ -115,6 +116,315 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
this.pipeliningLimit = client.getOptions().getPipeliningLimit();
}
+ private class StreamImpl implements HttpClientStream {
+
+ private HttpClientRequestImpl request;
+ private HttpClientResponseImpl response;
+ private boolean reset;
+ private int status = ST_NOOP;
+ private boolean requestEnded;
+ private boolean responseEnded;
+
+ @Override
+ public int id() {
+ return -1;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return version;
+ }
+
+ @Override
+ public HttpClientConnection connection() {
+ return Http1xClientConnection.this;
+ }
+
+ @Override
+ public Context getContext() {
+ return context;
+ }
+
+ private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) {
+ DefaultHttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(version), HttpUtils.toNettyHttpMethod(method, rawMethod), uri, false);
+ if (headers != null) {
+ for (Map.Entry header : headers) {
+ // Todo : multi valued headers
+ request.headers().add(header.getKey(), header.getValue());
+ }
+ }
+ return request;
+ }
+
+ private void prepareHeaders(HttpRequest request, String hostHeader, boolean chunked) {
+ HttpHeaders headers = request.headers();
+ headers.remove(TRANSFER_ENCODING);
+ if (!headers.contains(HOST)) {
+ request.headers().set(HOST, hostHeader);
+ }
+ if (chunked) {
+ HttpHeaders.setTransferEncodingChunked(request);
+ }
+ if (client.getOptions().isTryUseCompression() && request.headers().get(ACCEPT_ENCODING) == null) {
+ // if compression should be used but nothing is specified by the user support deflate and gzip.
+ request.headers().set(ACCEPT_ENCODING, DEFLATE_GZIP);
+ }
+ if (!client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_1) {
+ request.headers().set(CONNECTION, CLOSE);
+ } else if (client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_0) {
+ request.headers().set(CONNECTION, KEEP_ALIVE);
+ }
+ }
+
+ public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked) {
+ HttpRequest request = createRequest(version, method, rawMethod, uri, headers);
+ prepareHeaders(request, hostHeader, chunked);
+ writeToChannel(request);
+ }
+
+ public void writeHeadWithContent(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end) {
+ HttpRequest request = createRequest(version, method, rawMethod, uri, headers);
+ prepareHeaders(request, hostHeader, chunked);
+ if (end) {
+ if (buf != null) {
+ writeToChannel(new AssembledFullHttpRequest(request, buf));
+ } else {
+ writeToChannel(new AssembledFullHttpRequest(request));
+ }
+ } else {
+ writeToChannel(new AssembledHttpRequest(request, buf));
+ }
+ }
+
+ @Override
+ public void writeBuffer(ByteBuf buff, boolean end) {
+ if (end) {
+ if (buff != null && buff.isReadable()) {
+ writeToChannel(new DefaultLastHttpContent(buff, false));
+ } else {
+ writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ } else if (buff != null) {
+ writeToChannel(new DefaultHttpContent(buff));
+ }
+ }
+
+ @Override
+ public void writeFrame(int type, int flags, ByteBuf payload) {
+ throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
+ }
+
+ @Override
+ public void doSetWriteQueueMaxSize(int size) {
+ Http1xClientConnection.this.doSetWriteQueueMaxSize(size);
+ }
+
+ @Override
+ public boolean isNotWritable() {
+ return Http1xClientConnection.this.isNotWritable();
+ }
+
+ @Override
+ public void checkDrained() {
+ handleInterestedOpsChanged();
+ }
+
+ @Override
+ public void doPause() {
+ Http1xClientConnection.this.doPause();
+ }
+
+ @Override
+ public void doResume() {
+ Http1xClientConnection.this.doResume();
+ }
+
+ @Override
+ public void reset(long code) {
+ if (request == null) {
+ throw new IllegalStateException("Sanity check");
+ }
+ if (!reset) {
+ reset = true;
+ if (!requestEnded || !responseEnded) {
+ close();
+ }
+ }
+ }
+
+ public synchronized void beginRequest(HttpClientRequestImpl req) {
+ if (request != null) {
+ throw new IllegalStateException("Request already in progress");
+ }
+ if (currentRequest != this) {
+ throw new IllegalStateException("Connection is already writing another request");
+ }
+ if (metrics != null) {
+ Object reqMetric = metrics.requestBegin(endpointMetric, metric(), localAddress(), remoteAddress(), req);
+ req.metric(reqMetric);
+ }
+ request = req;
+ currentRequest = this;
+ inflight.add(currentRequest);
+ }
+
+ public synchronized void endRequest() {
+ if (currentRequest != this) {
+ throw new IllegalStateException("No write in progress");
+ }
+ if (metrics != null) {
+ metrics.requestEnd(currentRequest.request.metric());
+ }
+ currentRequest = null;
+ requestEnded = true;
+ int st;
+ if (responseEnded) {
+ st = status;
+ } else {
+ if (pipelining && inflight.size() < pipeliningLimit) {
+ st = ST_RECYCLE;
+ } else {
+ st = ST_NOOP;
+ }
+ status = st;
+ }
+ switch (st) {
+ case ST_CLOSE:
+ close();
+ break;
+ case ST_RECYCLE:
+ listener.onRecycle(Http1xClientConnection.this);
+ break;
+ }
+ }
+
+ @Override
+ public NetSocket createNetSocket() {
+ return Http1xClientConnection.this.createNetSocket();
+ }
+
+ void handleResponse(HttpResponse resp) {
+ io.netty.handler.codec.http.HttpVersion nettyVersion = resp.protocolVersion();
+ HttpVersion vertxVersion;
+ if (nettyVersion == io.netty.handler.codec.http.HttpVersion.HTTP_1_0) {
+ vertxVersion = HttpVersion.HTTP_1_0;
+ } else if (nettyVersion == io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
+ vertxVersion = HttpVersion.HTTP_1_1;
+ } else {
+ vertxVersion = null;
+ }
+ response = new HttpClientResponseImpl(request, vertxVersion, this, resp.status().code(), resp.status().reasonPhrase(), new HeadersAdaptor(resp.headers()));
+ if (metrics != null) {
+ metrics.responseBegin(request.metric(), response);
+ }
+ if (vertxVersion != null) {
+ request.handleResponse(response);
+ } else {
+ request.handleException(new IllegalStateException("Unsupported HTTP version: " + nettyVersion));
+ }
+ }
+
+ void handleResponseEnd(LastHttpContent trailer) {
+ if (metrics != null) {
+ HttpClientRequestBase req = request;
+ Object reqMetric = req.metric();
+ if (req.exceptionOccurred != null) {
+ metrics.requestReset(reqMetric);
+ } else {
+ metrics.responseEnd(reqMetric, response);
+ }
+ }
+ Buffer last = pausedChunk;
+ pausedChunk = null;
+ response.handleEnd(last, new HeadersAdaptor(trailer.trailingHeaders()));
+
+ int st;
+ /* if (reset && inflight.isEmpty()) {
+ st = ST_CLOSE;
+ } else */ if (keepAlive) {
+ if (status == ST_RECYCLE) {
+ st = ST_NOOP;
+ } else {
+ st = ST_RECYCLE;
+ }
+ } else {
+ st = ST_CLOSE;
+ }
+
+ // We don't signal response end for a 100-continue response as a real response will follow
+ // Also we keep the connection open for an HTTP CONNECT
+ if (response.statusCode() != 100 && request.method() != io.vertx.core.http.HttpMethod.CONNECT) {
+
+ // See https://tools.ietf.org/html/rfc7230#section-6.3
+ String responseConnectionHeader = response.getHeader(HttpHeaders.Names.CONNECTION);
+ io.vertx.core.http.HttpVersion protocolVersion = client.getOptions().getProtocolVersion();
+ String requestConnectionHeader = request.headers().get(HttpHeaders.Names.CONNECTION);
+ // We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
+ if (HttpHeaders.Values.CLOSE.equalsIgnoreCase(responseConnectionHeader) || HttpHeaders.Values.CLOSE.equalsIgnoreCase(requestConnectionHeader)) {
+ // In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
+ st = ST_CLOSE;
+ } else if (protocolVersion == io.vertx.core.http.HttpVersion.HTTP_1_0 && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(responseConnectionHeader)) {
+ // In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
+ // currently Vertx forces the Connection header if keepalive is enabled for 1.0
+ st = ST_CLOSE;
+ }
+ }
+
+ responseEnded = true;
+ if (requestEnded) {
+ switch (st) {
+ case ST_CLOSE:
+ close();
+ break;
+ case ST_RECYCLE:
+ listener.onRecycle(Http1xClientConnection.this);
+ break;
+ }
+ } else {
+ status = st;
+ }
+ }
+ }
+
+ void handleResponse(HttpResponse resp) {
+ StreamImpl requestForResponse;
+ if (resp.status().code() == 100) {
+ //If we get a 100 continue it will be followed by the real response later, so we don't remove it yet
+ requestForResponse = inflight.peek();
+ } else {
+ requestForResponse = inflight.poll();
+ }
+ if (requestForResponse == null) {
+ throw new IllegalStateException("No response handler");
+ }
+ currentResponse = requestForResponse;
+ currentResponse.handleResponse(resp);
+ }
+
+ void handleResponseChunk(Buffer buff) {
+ if (paused) {
+ if (pausedChunk == null) {
+ pausedChunk = buff.copy();
+ } else {
+ pausedChunk.appendBuffer(buff);
+ }
+ } else {
+ if (pausedChunk != null) {
+ buff = pausedChunk.appendBuffer(buff);
+ pausedChunk = null;
+ }
+ currentResponse.response.handleChunk(buff);
+ }
+ }
+
+ void handleResponseEnd(LastHttpContent trailer) {
+ StreamImpl resp = currentResponse;
+ if (resp.response.statusCode() != 100) {
+ currentResponse = null;
+ resp.handleResponseEnd(trailer);
+ }
+ }
+
@Override
public Channel channel() {
return chctx.channel();
@@ -270,56 +580,20 @@ private void handshakeComplete(ChannelHandlerContext ctx, FullHttpResponse respo
}
public synchronized boolean isValid() {
- return !reset && chctx.channel().isOpen();
- }
-
- @Override
- public void checkDrained() {
- handleInterestedOpsChanged();
+ return /*!reset &&*/ chctx.channel().isOpen();
}
@Override
public synchronized void handleInterestedOpsChanged() {
if (!isNotWritable()) {
if (currentRequest != null) {
- currentRequest.handleDrained();
+ currentRequest.request.handleDrained();
} else if (ws != null) {
ws.writable();
}
}
}
- void handleResponse(HttpResponse resp) {
- if (resp.status().code() == 100) {
- //If we get a 100 continue it will be followed by the real response later, so we don't remove it yet
- requestForResponse = requests.peek();
- } else {
- requestForResponse = requests.poll();
- }
- if (requestForResponse == null) {
- throw new IllegalStateException("No response handler");
- }
- io.netty.handler.codec.http.HttpVersion nettyVersion = resp.protocolVersion();
- HttpVersion vertxVersion;
- if (nettyVersion == io.netty.handler.codec.http.HttpVersion.HTTP_1_0) {
- vertxVersion = HttpVersion.HTTP_1_0;
- } else if (nettyVersion == io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
- vertxVersion = HttpVersion.HTTP_1_1;
- } else {
- vertxVersion = null;
- }
- HttpClientResponseImpl nResp = new HttpClientResponseImpl(requestForResponse, vertxVersion, this, resp.status().code(), resp.status().reasonPhrase(), new HeadersAdaptor(resp.headers()));
- currentResponse = nResp;
- if (metrics != null) {
- metrics.responseBegin(requestForResponse.metric(), nResp);
- }
- if (vertxVersion != null) {
- requestForResponse.handleResponse(nResp);
- } else {
- requestForResponse.handleException(new IllegalStateException("Unsupported HTTP version: " + nettyVersion));
- }
- }
-
public void doPause() {
super.doPause();
paused = true;
@@ -333,77 +607,12 @@ public void doResume() {
if (pausedChunk != null) {
Buffer chunk = pausedChunk;
pausedChunk = null;
- currentResponse.handleChunk(chunk);
+ currentResponse.response.handleChunk(chunk);
}
});
}
}
- void handleResponseChunk(Buffer buff) {
- if (paused) {
- if (pausedChunk == null) {
- pausedChunk = buff.copy();
- } else {
- pausedChunk.appendBuffer(buff);
- }
- } else {
- if (pausedChunk != null) {
- buff = pausedChunk.appendBuffer(buff);
- pausedChunk = null;
- }
- currentResponse.handleChunk(buff);
- }
- }
-
- void handleResponseEnd(LastHttpContent trailer) {
- if (metrics != null) {
- HttpClientRequestBase req = currentResponse.request();
- Object reqMetric = req.metric();
- if (req.exceptionOccurred != null) {
- metrics.requestReset(reqMetric);
- } else {
- metrics.responseEnd(reqMetric, currentResponse);
- }
- }
- Buffer last = pausedChunk;
- pausedChunk = null;
- currentResponse.handleEnd(last, new HeadersAdaptor(trailer.trailingHeaders()));
-
- // We don't signal response end for a 100-continue response as a real response will follow
- // Also we keep the connection open for an HTTP CONNECT
- if (currentResponse.statusCode() != 100 && requestForResponse.method() != io.vertx.core.http.HttpMethod.CONNECT) {
-
- boolean close = false;
- // See https://tools.ietf.org/html/rfc7230#section-6.3
- String responseConnectionHeader = currentResponse.getHeader(HttpHeaders.Names.CONNECTION);
- io.vertx.core.http.HttpVersion protocolVersion = client.getOptions().getProtocolVersion();
- String requestConnectionHeader = requestForResponse.headers().get(HttpHeaders.Names.CONNECTION);
- // We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
- if (HttpHeaders.Values.CLOSE.equalsIgnoreCase(responseConnectionHeader) || HttpHeaders.Values.CLOSE.equalsIgnoreCase(requestConnectionHeader)) {
- // In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
- close = true;
- } else if (protocolVersion == io.vertx.core.http.HttpVersion.HTTP_1_0 && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(responseConnectionHeader)) {
- // In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
- // currently Vertx forces the Connection header if keepalive is enabled for 1.0
- close = true;
- }
-
- if (close) {
- connection().close();
- } else {
- if (reset) {
- if (requests.isEmpty()) {
- connection().close();
- }
- } else {
- responseEnded();
- }
- }
- }
- requestForResponse = null;
- currentResponse = null;
- }
-
synchronized void handleWsFrame(WebSocketFrameInternal frame) {
if (ws != null) {
ws.handleFrame(frame);
@@ -419,24 +628,24 @@ protected synchronized void handleClosed() {
// Signal requests failed
if (metrics != null) {
- for (HttpClientRequestImpl req: requests) {
- metrics.requestReset(req.metric());
+ for (StreamImpl req: inflight) {
+ metrics.requestReset(req.request.metric());
}
if (currentResponse != null) {
- metrics.requestReset(currentResponse.request().metric());
+ metrics.requestReset(currentResponse.request.metric());
}
}
// Connection was closed - call exception handlers for any requests in the pipeline or one being currently written
- for (HttpClientRequestImpl req: requests) {
+ for (StreamImpl req: inflight) {
if (req != currentRequest) {
- req.handleException(e);
+ req.request.handleException(e);
}
}
if (currentRequest != null) {
- currentRequest.handleException(e);
+ currentRequest.request.handleException(e);
} else if (currentResponse != null) {
- currentResponse.handleException(e);
+ currentResponse.response.handleException(e);
}
}
@@ -444,146 +653,21 @@ public ContextImpl getContext() {
return super.getContext();
}
- @Override
- public void reset(long code) {
- if (!reset) {
- reset = true;
- if (currentRequest != null) {
- requests.removeLast();
- }
- if (requests.size() == 0) {
- connection().close();
- }
- }
- }
-
- private void requestEnded() {
- context.runOnContext(v -> {
- if (pipelining && requests.size() < pipeliningLimit) {
- listener.onRecycle(this);
- }
- });
- }
-
- private void responseEnded() {
- if (!keepAlive) {
- close();
- } else {
- context.runOnContext(v -> {
- if (currentRequest == null) {
- listener.onRecycle(this);
- }
- });
- }
- }
-
- private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) {
- DefaultHttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(version), HttpUtils.toNettyHttpMethod(method, rawMethod), uri, false);
- if (headers != null) {
- for (Map.Entry header : headers) {
- // Todo : multi valued headers
- request.headers().add(header.getKey(), header.getValue());
- }
- }
- return request;
- }
-
- private void prepareHeaders(HttpRequest request, String hostHeader, boolean chunked) {
- HttpHeaders headers = request.headers();
- headers.remove(TRANSFER_ENCODING);
- if (!headers.contains(HOST)) {
- request.headers().set(HOST, hostHeader);
- }
- if (chunked) {
- HttpHeaders.setTransferEncodingChunked(request);
- }
- if (client.getOptions().isTryUseCompression() && request.headers().get(ACCEPT_ENCODING) == null) {
- // if compression should be used but nothing is specified by the user support deflate and gzip.
- request.headers().set(ACCEPT_ENCODING, DEFLATE_GZIP);
- }
- if (!client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_1) {
- request.headers().set(CONNECTION, CLOSE);
- } else if (client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_0) {
- request.headers().set(CONNECTION, KEEP_ALIVE);
- }
- }
-
- public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked) {
- HttpRequest request = createRequest(version, method, rawMethod, uri, headers);
- prepareHeaders(request, hostHeader, chunked);
- writeToChannel(request);
- }
-
- public void writeHeadWithContent(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end) {
- HttpRequest request = createRequest(version, method, rawMethod, uri, headers);
- prepareHeaders(request, hostHeader, chunked);
- if (end) {
- if (buf != null) {
- writeToChannel(new AssembledFullHttpRequest(request, buf));
- } else {
- writeToChannel(new AssembledFullHttpRequest(request));
- }
- } else {
- writeToChannel(new AssembledHttpRequest(request, buf));
- }
- }
-
- @Override
- public void writeBuffer(ByteBuf buff, boolean end) {
- if (end) {
- if (buff != null && buff.isReadable()) {
- writeToChannel(new DefaultLastHttpContent(buff, false));
- } else {
- writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT);
- }
- } else if (buff != null) {
- writeToChannel(new DefaultHttpContent(buff));
- }
- }
-
- @Override
- public void writeFrame(int type, int flags, ByteBuf payload) {
- throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
- }
-
@Override
protected synchronized void handleException(Throwable e) {
super.handleException(e);
if (currentRequest != null) {
- currentRequest.handleException(e);
+ currentRequest.request.handleException(e);
} else {
- HttpClientRequestImpl req = requests.poll();
+ StreamImpl req = inflight.poll();
if (req != null) {
- req.handleException(e);
+ req.request.handleException(e);
} else if (currentResponse != null) {
- currentResponse.handleException(e);
+ currentResponse.response.handleException(e);
}
}
}
- public synchronized void beginRequest(HttpClientRequestImpl req) {
- if (currentRequest != null) {
- throw new IllegalStateException("Connection is already writing a request");
- }
- if (metrics != null) {
- Object reqMetric = metrics.requestBegin(endpointMetric, metric(), localAddress(), remoteAddress(), req);
- req.metric(reqMetric);
- }
- this.currentRequest = req;
- this.requests.add(req);
- }
-
- public synchronized void endRequest() {
- if (currentRequest == null) {
- throw new IllegalStateException("No write in progress");
- }
- if (metrics != null) {
- metrics.requestEnd(currentRequest.metric());
- }
- currentRequest = null;
- requestEnded();
- }
-
@Override
public synchronized void close() {
if (handshaker == null) {
@@ -636,21 +720,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
@Override
public HttpClientStream createStream() throws Exception {
- return this;
- }
-
- @Override
- public HttpClientConnection connection() {
- return this;
- }
-
- @Override
- public HttpVersion version() {
- return version;
- }
-
- @Override
- public int id() {
- return -1;
+ currentRequest = new StreamImpl();
+ return currentRequest;
}
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
index caa4dd10ac0..0df6480df05 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
@@ -81,7 +81,6 @@ public HttpClientMetrics metrics() {
@Override
void onStreamClosed(Http2Stream nettyStream) {
super.onStreamClosed(nettyStream);
- listener.onRecycle(this);
}
public synchronized HttpClientStream createStream() throws Http2Exception {
@@ -197,6 +196,9 @@ void handleReset(long errorCode) {
@Override
void handleClose() {
+ if (request instanceof HttpClientRequestImpl) {
+ conn.listener.onRecycle(conn);
+ }
if (!responseEnded) {
responseEnded = true;
if (conn.metrics != null) {
diff --git a/src/main/java/io/vertx/core/http/impl/pool/Pool.java b/src/main/java/io/vertx/core/http/impl/pool/Pool.java
index a91dc3a9dc1..c5e4c8de6b2 100644
--- a/src/main/java/io/vertx/core/http/impl/pool/Pool.java
+++ b/src/main/java/io/vertx/core/http/impl/pool/Pool.java
@@ -19,6 +19,8 @@
import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.impl.ContextImpl;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import java.util.*;
@@ -31,6 +33,8 @@
*/
public class Pool {
+ private static final Logger log = LoggerFactory.getLogger(Pool.class);
+
private final HttpClientMetrics metrics;
private final String peerHost;
private final boolean ssl;
@@ -200,9 +204,10 @@ private synchronized void recycle(ConnectionHolder conn) {
}
private void recycleConnection(ConnectionHolder conn) {
- // if (conn.inflight == 0) {
- // throw new IllegalStateException("Attempt to recycle a connection more than permitted");
- //}
+ if (conn.inflight == 0) {
+ log.debug("Attempt to recycle a connection more than permitted");
+ return;
+ }
capacity++;
if (conn.inflight == conn.concurrency) {
available.add(conn);
diff --git a/src/test/java/io/vertx/test/core/Http1xTest.java b/src/test/java/io/vertx/test/core/Http1xTest.java
index 0e9c78bf6e0..6f22baf8e80 100644
--- a/src/test/java/io/vertx/test/core/Http1xTest.java
+++ b/src/test/java/io/vertx/test/core/Http1xTest.java
@@ -3017,7 +3017,7 @@ public void testResetKeepAliveClientRequest() throws Exception {
@Test
public void testResetPipelinedClientRequest() throws Exception {
waitFor(2);
- CompletableFuture reset = new CompletableFuture<>();
+ CompletableFuture doReset = new CompletableFuture<>();
server.close();
NetServer server = vertx.createNetServer();
AtomicInteger count = new AtomicInteger();
@@ -3032,7 +3032,7 @@ public void testResetPipelinedClientRequest() throws Exception {
"POST /somepath HTTP/1.1\r\n" +
"Host: localhost:8080\r\n" +
"\r\n")) {
- reset.complete(null);
+ doReset.complete(null);
so.write(
"HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
@@ -3053,13 +3053,11 @@ public void testResetPipelinedClientRequest() throws Exception {
awaitLatch(listenLatch);
client.close();
client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true));
- AtomicInteger status = new AtomicInteger();
HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> {
- assertEquals(0, status.getAndIncrement());
+ fail();
});
req1.connectionHandler(conn -> {
conn.closeHandler(v -> {
- assertEquals(1, status.getAndIncrement());
complete();
});
});
@@ -3068,7 +3066,7 @@ public void testResetPipelinedClientRequest() throws Exception {
fail();
});
req2.sendHead();
- reset.thenAccept(v -> {
+ doReset.thenAccept(v -> {
assertTrue(req2.reset());
});
await();