From e3affd7fcb286bd393336ae1c7574db944a1ef6d Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 19 Sep 2016 15:06:42 +0200 Subject: [PATCH] Improve HttpClientRequest reset support - fixes #1642 --- .../io/vertx/core/http/HttpClientRequest.java | 23 +- .../core/http/impl/ClientConnection.java | 29 +- .../core/http/impl/HttpClientRequestImpl.java | 14 +- .../impl/HttpClientRequestPushPromise.java | 3 +- .../java/io/vertx/test/core/Http1xTest.java | 270 ++++++++++++++++++ .../java/io/vertx/test/core/Http2Test.java | 27 ++ 6 files changed, 352 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/vertx/core/http/HttpClientRequest.java b/src/main/java/io/vertx/core/http/HttpClientRequest.java index cff6e3ed6fa..7c07bfbcd85 100644 --- a/src/main/java/io/vertx/core/http/HttpClientRequest.java +++ b/src/main/java/io/vertx/core/http/HttpClientRequest.java @@ -137,13 +137,13 @@ public interface HttpClientRequest extends WriteStream, ReadStream * - * For HTTP2 it sets the {@literal :authority} pseudo header otherwise it sets the {@literal Host} header + * For HTTP/2 it sets the {@literal :authority} pseudo header otherwise it sets the {@literal Host} header */ @Fluent HttpClientRequest setHost(String host); /** - * @return the request host. For HTTP2 it returns the {@literal :authority} pseudo header otherwise it returns the {@literal Host} header + * @return the request host. For HTTP/2 it returns the {@literal :authority} pseudo header otherwise it returns the {@literal Host} header */ String getHost(); @@ -308,17 +308,28 @@ public interface HttpClientRequest extends WriteStream, ReadStream + *
    + *
  • for HTTP/2, this performs send an HTTP/2 reset frame with the specified error {@code code}
  • + *
  • for HTTP/1.x, this closes the connection after the current in-flight requests are ended
  • + *
+ *

+ * When the request has not yet been sent, the request will be aborted and false is returned as indicator. + *

* * @param code the error code + * @return true when reset has been performed */ - void reset(long code); + boolean reset(long code); /** * @return the {@link HttpConnection} associated with this request diff --git a/src/main/java/io/vertx/core/http/impl/ClientConnection.java b/src/main/java/io/vertx/core/http/impl/ClientConnection.java index d2af9445ed5..bfb15ca5286 100644 --- a/src/main/java/io/vertx/core/http/impl/ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/ClientConnection.java @@ -44,6 +44,7 @@ import java.net.URI; import java.util.ArrayDeque; +import java.util.Deque; import java.util.HashMap; import java.util.Map; import java.util.Queue; @@ -76,7 +77,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H private final Http1xPool pool; private final Object endpointMetric; // Requests can be pipelined so we need a queue to keep track of requests - private final Queue requests = new ArrayDeque<>(); + private final Deque requests = new ArrayDeque<>(); private final Object metric; private final HttpClientMetrics metrics; private final HttpVersion version; @@ -87,6 +88,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H private HttpClientRequestImpl requestForResponse; private WebSocketImpl ws; + private boolean reset; private boolean paused; private Buffer pausedChunk; @@ -377,7 +379,18 @@ void handleResponseEnd(LastHttpContent trailer) { // currently Vertx forces the Connection header if keepalive is enabled for 1.0 close = true; } - pool.responseEnded(this, close); + + if (close) { + pool.responseEnded(this, true); + } else { + if (reset) { + if (requests.isEmpty()) { + pool.responseEnded(this, true); + } + } else { + pool.responseEnded(this, false); + } + } } currentResponse = null; } @@ -426,7 +439,17 @@ public ContextImpl getContext() { } public void reset(long code) { - throw new UnsupportedOperationException("HTTP/1.x request cannot be reset"); + if (currentRequest == null) { + throw new IllegalStateException(); + } + if (!reset) { + currentRequest = null; + reset = true; + requests.removeLast(); + if (requests.size() == 0) { + pool.responseEnded(this, true); + } + } } private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) { diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index 4e6bcbb9d57..f6d7ddb098f 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -362,13 +362,19 @@ public HttpClientRequest pushHandler(Handler handler) { } @Override - public void reset(long code) { + public boolean reset(long code) { synchronized (getLock()) { - if (stream == null) { - throw new IllegalStateException("Cannot reset the request that is not yet connected"); + if (completed) { + throw new IllegalStateException("Request already completed"); } + exceptionOccurred = true; completed = true; - stream.reset(code); + if (stream != null) { + stream.reset(code); + return true; + } else { + return false; + } } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java index 8bed7c471d6..200d5c72c5a 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java @@ -103,9 +103,10 @@ public HttpClientRequest connectionHandler(@Nullable Handler han } @Override - public void reset(long code) { + public boolean reset(long code) { synchronized (conn) { stream.reset(code); + return true; } } diff --git a/src/test/java/io/vertx/test/core/Http1xTest.java b/src/test/java/io/vertx/test/core/Http1xTest.java index 8896d9339a0..7072013790c 100644 --- a/src/test/java/io/vertx/test/core/Http1xTest.java +++ b/src/test/java/io/vertx/test/core/Http1xTest.java @@ -2576,4 +2576,274 @@ public void testRandomPorts() throws Exception { } await(); } + + @Test + public void testResetClientRequestNotYetSent() throws Exception { + testResetClientRequestNotYetSent(false, false); + } + + @Test + public void testResetKeepAliveClientRequestNotYetSent() throws Exception { + testResetClientRequestNotYetSent(true, false); + } + + @Test + public void testResetPipelinedClientRequestNotYetSent() throws Exception { + testResetClientRequestNotYetSent(true, true); + } + + private void testResetClientRequestNotYetSent(boolean keepAlive, boolean pipelined) throws Exception { + waitFor(2); + server.close(); + NetServer server = vertx.createNetServer(); + try { + AtomicInteger numReq = new AtomicInteger(); + server.connectHandler(conn -> { + assertEquals(0, numReq.getAndIncrement()); + StringBuilder sb = new StringBuilder(); + conn.handler(buff -> { + sb.append(buff); + String content = sb.toString(); + if (content.startsWith("GET some-uri HTTP/1.1\r\n") && content.endsWith("\r\n\r\n")) { + conn.write("HTTP/1.1 200 OK\r\n" + + "Content-Length: 0\r\n" + + "\r\n"); + complete(); + } + }); + }); + CountDownLatch latch = new CountDownLatch(1); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v -> { + latch.countDown(); + })); + awaitLatch(latch); + client.close(); + client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setKeepAlive(keepAlive).setPipelining(pipelined)); + HttpClientRequest post = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + fail(); + }); + post.setChunked(true).write(TestUtils.randomBuffer(1024)); + assertFalse(post.reset()); + client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + assertEquals(1, numReq.get()); + complete(); + }); + await(); + } finally { + server.close(); + } + } + + @Test + public void testResetKeepAliveClientRequest() throws Exception { + waitFor(2); + server.close(); + NetServer server = vertx.createNetServer(); + try { + AtomicInteger count = new AtomicInteger(); + server.connectHandler(so -> { + assertEquals(0, count.getAndIncrement()); + Buffer total = Buffer.buffer(); + so.handler(buff -> { + total.appendBuffer(buff); + if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + + "Host: localhost:8080\r\n" + + "\r\n")) { + so.write( + "HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: 11\r\n" + + "\r\n" + + "Hello world"); + so.closeHandler(v -> { + complete(); + }); + } + }); + }); + CountDownLatch listenLatch = new CountDownLatch(1); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v -> { + listenLatch.countDown(); + })); + awaitLatch(listenLatch); + client.close(); + client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(false).setKeepAlive(true)); + AtomicInteger status = new AtomicInteger(); + HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + assertEquals(0, status.getAndIncrement()); + }); + req1.connectionHandler(conn -> { + conn.closeHandler(v -> { + assertEquals(1, status.getAndIncrement()); + complete(); + }); + }); + req1.end(); + HttpClientRequest req2 = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + fail(); + }); + req2.sendHead(v -> { + assertTrue(req2.reset()); + }); + await(); + } finally { + server.close(); + } + } + + @Test + public void testResetPipelinedClientRequest() throws Exception { + waitFor(2); + CompletableFuture reset = new CompletableFuture<>(); + server.close(); + NetServer server = vertx.createNetServer(); + AtomicInteger count = new AtomicInteger(); + server.connectHandler(so -> { + assertEquals(0, count.getAndIncrement()); + Buffer total = Buffer.buffer(); + so.handler(buff -> { + total.appendBuffer(buff); + if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + + "Host: localhost:8080\r\n" + + "\r\n" + + "POST /somepath HTTP/1.1\r\n" + + "Host: localhost:8080\r\n" + + "\r\n")) { + reset.complete(null); + so.write( + "HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: 11\r\n" + + "\r\n" + + "Hello world"); + so.closeHandler(v -> { + complete(); + }); + } + }); + }); + try { + CountDownLatch listenLatch = new CountDownLatch(1); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v -> { + listenLatch.countDown(); + })); + awaitLatch(listenLatch); + client.close(); + client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true)); + AtomicInteger status = new AtomicInteger(); + HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + assertEquals(0, status.getAndIncrement()); + }); + req1.connectionHandler(conn -> { + conn.closeHandler(v -> { + assertEquals(1, status.getAndIncrement()); + complete(); + }); + }); + req1.end(); + HttpClientRequest req2 = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + fail(); + }); + reset.thenAccept(v -> { + assertTrue(req2.reset()); + }); + req2.sendHead(); + await(); + } finally { + server.close(); + } + } + + @Test + public void testCloseTheConnectionAfterResetKeepAliveClientRequest() throws Exception { + testCloseTheConnectionAfterResetPersistentClientRequest(false); + } + + @Test + public void testCloseTheConnectionAfterResetPipelinedClientRequest() throws Exception { + testCloseTheConnectionAfterResetPersistentClientRequest(true); + } + + private void testCloseTheConnectionAfterResetPersistentClientRequest(boolean pipelined) throws Exception { + waitFor(2); + server.close(); + NetServer server = vertx.createNetServer(); + try { + AtomicInteger count = new AtomicInteger(); + AtomicBoolean closed = new AtomicBoolean(); + server.connectHandler(so -> { + Buffer total = Buffer.buffer(); + switch (count.getAndIncrement()) { + case 0: + so.handler(buff -> { + total.appendBuffer(buff); + if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + + "Host: localhost:8080\r\n" + + "\r\n")) { + so.closeHandler(v -> { + closed.set(true); + }); + } + }); + break; + case 1: + so.handler(buff -> { + total.appendBuffer(buff); + if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + + "Host: localhost:8080\r\n" + + "\r\n")) { + so.write( + "HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: 11\r\n" + + "\r\n" + + "Hello world"); + complete(); + } + }); + break; + default: + fail("Invalid count"); + break; + + } + }); + CountDownLatch listenLatch = new CountDownLatch(1); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v -> { + listenLatch.countDown(); + })); + awaitLatch(listenLatch); + client.close(); + client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(pipelined).setKeepAlive(true)); + HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + fail(); + }); + if (pipelined) { + req1.sendHead(v -> { + assertTrue(req1.reset()); + client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + assertEquals(200, resp.statusCode()); + resp.bodyHandler(body -> { + assertEquals("Hello world", body.toString()); + complete(); + }); + }); + }); + } else { + req1.sendHead(v -> { + assertTrue(req1.reset()); + }); + client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { + assertEquals(200, resp.statusCode()); + resp.bodyHandler(body -> { + assertEquals("Hello world", body.toString()); + complete(); + }); + }); + } + await(); + } finally { + server.close(); + } + } } diff --git a/src/test/java/io/vertx/test/core/Http2Test.java b/src/test/java/io/vertx/test/core/Http2Test.java index 8d52d9c34c2..243c48421f7 100644 --- a/src/test/java/io/vertx/test/core/Http2Test.java +++ b/src/test/java/io/vertx/test/core/Http2Test.java @@ -18,11 +18,13 @@ import io.vertx.core.Context; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.Http2Settings; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.http.StreamResetException; +import io.vertx.core.net.NetServer; import io.vertx.core.net.OpenSSLEngineOptions; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.test.core.tls.Cert; @@ -31,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * @author Julien Viet @@ -254,4 +257,28 @@ public void testClientStreamPausedWhenConnectionIsPaused() throws Exception { req2.end(buffer); await(); } + + @Test + public void testResetClientRequestNotYetSent() throws Exception { + waitFor(2); + server.close(); + server = vertx.createHttpServer(createBaseServerOptions().setInitialSettings(new Http2Settings().setMaxConcurrentStreams(1))); + AtomicInteger numReq = new AtomicInteger(); + server.requestHandler(req -> { + assertEquals(0, numReq.getAndIncrement()); + req.response().end(); + complete(); + }); + startServer(); + HttpClientRequest post = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + fail(); + }); + post.setChunked(true).write(TestUtils.randomBuffer(1024)); + assertFalse(post.reset()); + client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + assertEquals(1, numReq.get()); + complete(); + }); + await(); + } }