Skip to content

Commit

Permalink
More tests / improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 0d94a81 commit 58c2336
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/HttpConnection.java
Expand Up @@ -64,7 +64,7 @@ public interface HttpConnection {
*/
@Fluent
default HttpConnection goAway(long errorCode) {
return goAway(errorCode, 2^31 - 1);
return goAway(errorCode, Integer.MAX_VALUE);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -254,8 +254,8 @@ public void closeHandler(Handler<Void> handler) {
this.closeHandler = handler;
}

public boolean isClosed() {
return !channel.isOpen();
public boolean isValid() {
return channel.isOpen();
}

int getOutstandingRequestCount() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -362,7 +362,7 @@ static abstract class Pool {
* @param conn the connection
*/
void deliverStream(HttpClientConnection conn, Waiter waiter) {
if (conn.isClosed()) {
if (!conn.isValid()) {
// The connection has been closed - closed connections can be in the pool
// Get another connection - Note that we DO NOT call connectionClosed() on the pool at this point
// that is done asynchronously in the connection closeHandler()
Expand All @@ -386,7 +386,7 @@ public Http1xPool(ConnQueue queue) {

public boolean getConnection(Waiter waiter) {
ClientConnection conn = availableConnections.poll();
if (conn != null && !conn.isClosed()) {
if (conn != null && conn.isValid()) {
ContextImpl context = waiter.context;
if (context == null) {
context = conn.getContext();
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -91,6 +91,15 @@ void checkPending(VertxHttp2ClientHandler handler) {
}
}

void discard(VertxHttp2ClientHandler conn) {
synchronized (queue) {
if (clientHandler == conn) {
clientHandler = null;
queue.connectionClosed();
}
}
}

@Override
void recycle(HttpClientConnection conn) {
synchronized (queue) {
Expand Down
Expand Up @@ -32,5 +32,9 @@ interface HttpClientConnection {

NetSocket createNetSocket();

boolean isClosed();
/**
* Check if the connection is valid for creating streams. The connection might be closed or a {@literal GOAWAY}
* frame could have been sent or received.
*/
boolean isValid();
}
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -756,7 +756,7 @@ public synchronized WebSocketStream handler(Handler<WebSocket> handler) {
wsConnect = handler;
}
getConnection(port, host, conn -> {
if (!conn.isClosed()) {
if (conn.isValid()) {
conn.toWebSocket(requestURI, headers, version, subProtocols, options.getMaxWebsocketFrameSize(), wsConnect);
} else {
websocket(port, host, requestURI, headers, version, subProtocols, wsConnect);
Expand Down
Expand Up @@ -66,6 +66,10 @@ public VertxHttp2ClientHandler(Http2Pool http2Pool,
this.http2Pool = http2Pool;

connection().addListener(new Http2EventAdapter() {
@Override
public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
http2Pool.discard(VertxHttp2ClientHandler.this);
}
@Override
public void onStreamClosed(Http2Stream stream) {
streams.remove(stream.id());
Expand Down Expand Up @@ -125,9 +129,9 @@ public NetSocket createNetSocket() {
}

@Override
public boolean isClosed() {
// Todo
return false;
public boolean isValid() {
Http2Connection conn = connection();
return !conn.goAwaySent() && !conn.goAwayReceived();
}

@Override
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -619,4 +619,49 @@ public void testConnectionHandler() throws Exception {
req2.end();
await();
}

@Test
public void testConnectionShutdownInConnectionHandler() throws Exception {
AtomicInteger serverStatus = new AtomicInteger();
server.connectionHandler(conn -> {
if (serverStatus.getAndIncrement() == 0) {
conn.goAwayHandler(ga -> {
assertEquals(0, ga.getErrorCode());
assertEquals(1, serverStatus.getAndIncrement());
});
conn.shutdownHandler(v -> {
assertEquals(2, serverStatus.getAndIncrement());
});
conn.closeHandler(v -> {
assertEquals(3, serverStatus.getAndIncrement());
});
}
});
server.requestHandler(req -> {
assertEquals(5, serverStatus.getAndIncrement());
req.response().end();
});
startServer();
AtomicInteger clientStatus = new AtomicInteger();
HttpClientRequest req1 = client.get(4043, "localhost", "/somepath");
req1.connectionHandler(conn -> {
conn.shutdownHandler(v -> {
clientStatus.compareAndSet(1, 2);
});
if (clientStatus.getAndIncrement() == 0) {
conn.shutdown();
}
});
req1.exceptionHandler(err -> {
fail();
});
req1.handler(resp -> {
assertEquals(2, clientStatus.getAndIncrement());
resp.endHandler(v -> {
testComplete();
});
});
req1.end();
await();
}
}

0 comments on commit 58c2336

Please sign in to comment.