Skip to content

Commit

Permalink
Fix synchronization issues in connection manager ConnQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 10, 2017
1 parent 5541bf3 commit fad9c7f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -400,7 +400,7 @@ void handleResponseEnd(LastHttpContent trailer) {
connection().close();
}
} else {
responseEnded(this);
responseEnded();
}
}
}
Expand Down Expand Up @@ -469,12 +469,12 @@ private void requestEnded() {
});
}

private void responseEnded(ClientConnection conn) {
private void responseEnded() {
if (!keepAlive) {
conn.close();
close();
} else {
context.runOnContext(v -> {
if (conn.currentRequest == null) {
if (currentRequest == null) {
lifecycleHandler.handle(true);
}
});
Expand Down
23 changes: 14 additions & 9 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -257,15 +257,7 @@ private void createNewConnection(Waiter waiter) {
}
createNewConnection(context, ar -> {
if (ar.succeeded()) {
HttpClientConnection conn = ar.result();
conn.lifecycleHandler(reuse -> {
if (reuse) {
recycle(conn);
} else {
pool.discardConnection(conn);
}
});
deliver(conn, waiter);
newConnection(waiter, ar.result());
} else {

// If no specific exception handler is provided, fall back to the HttpClient's exception handler.
Expand All @@ -281,6 +273,19 @@ private void createNewConnection(Waiter waiter) {
});
}

private synchronized void newConnection(Waiter waiter, HttpClientConnection conn) {
conn.lifecycleHandler(reuse -> recycleConnection(conn, reuse));
deliver(conn, waiter);
}

private synchronized void recycleConnection(HttpClientConnection conn, boolean reuse) {
if (reuse) {
recycle(conn);
} else {
pool.discardConnection(conn);
}
}

private void createNewConnection(ContextImpl context, Handler<AsyncResult<HttpClientConnection>> handler) {
connCount++;
sslHelper.validate(vertx);
Expand Down
16 changes: 8 additions & 8 deletions src/test/java/io/vertx/test/core/Http1xTest.java
Expand Up @@ -2530,11 +2530,11 @@ public void testConnectionCloseHttp_1_1_NoClose() throws Exception {
AtomicBoolean firstRequest = new AtomicBoolean(true);
socket.handler(RecordParser.newDelimited("\r\n\r\n", buffer -> {
if (firstRequest.getAndSet(false)) {
socket.write("HTTP/1.1 200 OK\n" + "Content-Type: text/plain\n" + "Content-Length: 4\n"
+ "\n" + "xxx\n");
socket.write("HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 4\r\n"
+ "\r\n" + "xxx\n");
} else {
socket.write("HTTP/1.1 200 OK\n" + "Content-Type: text/plain\n" + "Content-Length: 1\n"
+ "Connection: close\n" + "\n" + "\n");
socket.write("HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 1\r\n"
+ "Connection: close\r\n" + "\r\n" + "\r\n");
}
}));
});
Expand All @@ -2546,11 +2546,11 @@ public void testConnectionCloseHttp_1_1_Close() throws Exception {
AtomicBoolean firstRequest = new AtomicBoolean(true);
socket.handler(RecordParser.newDelimited("\r\n\r\n", buffer -> {
if (firstRequest.getAndSet(false)) {
socket.write("HTTP/1.1 200 OK\n" + "Content-Type: text/plain\n" + "Content-Length: 4\n"
+ "\n" + "xxx\n");
socket.write("HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 3\r\n"
+ "\r\n" + "xxx");
} else {
socket.write("HTTP/1.1 200 OK\n" + "Content-Type: text/plain\n" + "Content-Length: 1\n"
+ "Connection: close\n" + "\n" + "\n");
socket.write("HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 0\r\n"
+ "Connection: close\r\n" + "\r\n");
socket.close();
}
}));
Expand Down

0 comments on commit fad9c7f

Please sign in to comment.