Skip to content

Commit

Permalink
Simplify http/1.1 client request reset and prevent a reset connection…
Browse files Browse the repository at this point in the history
… to be reused from the pool
  • Loading branch information
vietj committed Oct 7, 2017
1 parent 1ed7843 commit 59ec940
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 36 deletions.
16 changes: 6 additions & 10 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -254,7 +254,7 @@ private void handshakeComplete(ChannelHandlerContext ctx, FullHttpResponse respo
} }


public boolean isValid() { public boolean isValid() {
return chctx.channel().isOpen(); return !reset && chctx.channel().isOpen();
} }


int getOutstandingRequestCount() { int getOutstandingRequestCount() {
Expand Down Expand Up @@ -431,23 +431,19 @@ public ContextImpl getContext() {
return super.getContext(); return super.getContext();
} }


public void resetRequest(long code) { @Override
public void reset(long code) {
if (!reset) { if (!reset) {
reset = true; reset = true;
currentRequest = null; if (currentRequest != null) {
requests.removeLast(); requests.removeLast();
}
if (requests.size() == 0) { if (requests.size() == 0) {
pool.responseEnded(this, true); pool.responseEnded(this, true);
} }
} }
} }


@Override
public void resetResponse(long code) {
reset = true;
pool.responseEnded(this, true);
}

private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) { private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) {
DefaultHttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(version), HttpUtils.toNettyHttpMethod(method, rawMethod), uri, false); DefaultHttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(version), HttpUtils.toNettyHttpMethod(method, rawMethod), uri, false);
if (headers != null) { if (headers != null) {
Expand Down
Expand Up @@ -373,7 +373,7 @@ public void endRequest() {
} }


@Override @Override
public void resetRequest(long code) { public void reset(long code) {
if (!(requestEnded && responseEnded)) { if (!(requestEnded && responseEnded)) {
requestEnded = true; requestEnded = true;
responseEnded = true; responseEnded = true;
Expand All @@ -384,11 +384,6 @@ public void resetRequest(long code) {
} }
} }


@Override
public void resetResponse(long code) {
resetRequest(code);
}

@Override @Override
public HttpClientConnection connection() { public HttpClientConnection connection() {
return conn; return conn;
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -392,16 +392,12 @@ public boolean reset(long code) {
reset = code; reset = code;
if (!completed) { if (!completed) {
completed = true; completed = true;
if (stream != null) {
stream.resetRequest(code);
}
if (completionHandler != null) { if (completionHandler != null) {
completionHandler.handle(null); completionHandler.handle(null);
} }
} else { }
if (response != null) { if (stream != null) {
stream.resetResponse(code); stream.reset(code);
}
} }
return true; return true;
} }
Expand Down Expand Up @@ -510,9 +506,7 @@ else if (completed) {
} }


protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) { protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) {
if (reset != null) { if (reset == null) {
stream.resetResponse(reset);
} else {
response = resp; response = resp;
int statusCode = resp.statusCode(); int statusCode = resp.statusCode();
if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) { if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) {
Expand Down
Expand Up @@ -100,7 +100,7 @@ public HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> han
@Override @Override
public boolean reset(long code) { public boolean reset(long code) {
synchronized (conn) { synchronized (conn) {
stream.resetRequest(code); stream.reset(code);
return true; return true;
} }
} }
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/core/http/impl/HttpClientStream.java
Expand Up @@ -32,7 +32,7 @@ interface HttpClientStream {
* @return the stream id or -1 for HTTP/1.x * @return the stream id or -1 for HTTP/1.x
*/ */
int id(); int id();

/** /**
* @return the stream version or null if it's not yet determined * @return the stream version or null if it's not yet determined
*/ */
Expand All @@ -52,8 +52,7 @@ interface HttpClientStream {
void doPause(); void doPause();
void doResume(); void doResume();


void resetRequest(long code); void reset(long code);
void resetResponse(long code);


void beginRequest(HttpClientRequestImpl request); void beginRequest(HttpClientRequestImpl request);
void endRequest(); void endRequest();
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/io/vertx/test/core/Http1xTest.java
Expand Up @@ -3299,13 +3299,13 @@ private void testCloseTheConnectionAfterResetBeforeResponseReceived(boolean pipe
case 0: case 0:
so.handler(buff -> { so.handler(buff -> {
total.appendBuffer(buff); total.appendBuffer(buff);
if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + if (total.toString().equals("GET /1 HTTP/1.1\r\n" +
"Host: localhost:8080\r\n" + "Host: localhost:8080\r\n" +
"\r\n")) { "\r\n")) {
requestReceived.complete(null); requestReceived.complete(null);
so.write(Buffer.buffer( so.write(Buffer.buffer(
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-Length: 200\r\n" + "Content-Length: 11\r\n" +
"\r\n" + "\r\n" +
"Some-Buffer" "Some-Buffer"
)); ));
Expand All @@ -3318,7 +3318,7 @@ private void testCloseTheConnectionAfterResetBeforeResponseReceived(boolean pipe
case 1: case 1:
so.handler(buff -> { so.handler(buff -> {
total.appendBuffer(buff); total.appendBuffer(buff);
if (total.toString().equals("GET /somepath HTTP/1.1\r\n" + if (total.toString().equals("GET /2 HTTP/1.1\r\n" +
"Host: localhost:8080\r\n" + "Host: localhost:8080\r\n" +
"\r\n")) { "\r\n")) {
so.write( so.write(
Expand All @@ -3344,11 +3344,11 @@ private void testCloseTheConnectionAfterResetBeforeResponseReceived(boolean pipe
awaitLatch(listenLatch); awaitLatch(listenLatch);
client.close(); client.close();
client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(pipelined).setKeepAlive(true)); client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(pipelined).setKeepAlive(true));
HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath"); HttpClientRequest req1 = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/1");
if (pipelined) { if (pipelined) {
requestReceived.thenAccept(v -> { requestReceived.thenAccept(v -> {
req1.reset(); req1.reset();
client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/2", resp -> {
assertEquals(200, resp.statusCode()); assertEquals(200, resp.statusCode());
resp.bodyHandler(body -> { resp.bodyHandler(body -> {
assertEquals("Hello world", body.toString()); assertEquals("Hello world", body.toString());
Expand All @@ -3364,7 +3364,7 @@ private void testCloseTheConnectionAfterResetBeforeResponseReceived(boolean pipe
}); });
req1.handler(resp -> fail()); req1.handler(resp -> fail());
req1.end(); req1.end();
client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> { client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/2", resp -> {
assertEquals(200, resp.statusCode()); assertEquals(200, resp.statusCode());
resp.bodyHandler(body -> { resp.bodyHandler(body -> {
assertEquals("Hello world", body.toString()); assertEquals("Hello world", body.toString());
Expand Down

0 comments on commit 59ec940

Please sign in to comment.