Skip to content

Commit

Permalink
Move the stream creation from pool to HttpClientConnection as it's sp…
Browse files Browse the repository at this point in the history
…ecific to HTTP connections
  • Loading branch information
vietj committed Nov 10, 2017
1 parent 49bd5b7 commit 1a7c182
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 34 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -633,6 +633,11 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
return socket;
}

@Override
public HttpClientStream createStream() throws Exception {
return this;
}

@Override
public HttpClientConnection connection() {
return this;
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -257,7 +257,7 @@ private synchronized void initConnection(Waiter waiter, C conn) {
mgr.connectionMap.put(connector.channel(conn), conn);
pool.initConnection(conn);
pool.getContext(conn).executeFromIO(() -> {
waiter.handleConnection((HttpClientConnection) conn);
waiter.initConnection((HttpClientConnection) conn);
});
if (waiter.isCancelled()) {
pool.recycleConnection(conn);
Expand All @@ -271,14 +271,11 @@ private void deliverInternal(C conn, Waiter waiter) {
ContextImpl ctx = pool.getContext(conn);
if (ctx.nettyEventLoop().inEventLoop()) {
ctx.executeFromIO(() -> {
HttpClientStream stream;
try {
stream = pool.createStream(conn);
waiter.handleConnection((HttpClientConnection) conn);
} catch (Exception e) {
getConnection(waiter);
return;
}
waiter.handleStream(stream);
});
} else {
ctx.runOnContext(v -> {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ConnectionPool.java
Expand Up @@ -42,8 +42,6 @@ interface ConnectionPool<C> {

void recycleConnection(C conn);

HttpClientStream createStream(C conn) throws Exception;

void evictConnection(C conn);

boolean isValid(C conn);
Expand Down
Expand Up @@ -85,7 +85,7 @@ void onStreamClosed(Http2Stream nettyStream) {
listener.onRecycle(this);
}

synchronized HttpClientStream createStream() throws Http2Exception {
public synchronized HttpClientStream createStream() throws Http2Exception {
Http2Connection conn = handler.connection();
Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
boolean writable = handler.encoder().flowController().isWritable(stream);
Expand Down
Expand Up @@ -40,6 +40,8 @@ interface HttpClientConnection extends HttpConnection {
*/
boolean isValid();

HttpClientStream createStream() throws Exception;

ContextImpl getContext();

}
7 changes: 3 additions & 4 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -949,12 +949,11 @@ private void getConnectionForWebsocket(boolean ssl,
ContextImpl context) {
wsCM.getConnection(host, ssl, port, host, new Waiter(context) {
@Override
void handleConnection(HttpClientConnection conn) {
void initConnection(HttpClientConnection conn) {
}
@Override
void handleStream(HttpClientStream stream) {
// Use some variance for this
handler.handle((ClientConnection) stream);
void handleConnection(HttpClientConnection conn) {
handler.handle((ClientConnection) conn);
}
@Override
void handleFailure(Throwable failure) {
Expand Down
15 changes: 0 additions & 15 deletions src/main/java/io/vertx/core/http/impl/HttpClientPool.java
Expand Up @@ -112,11 +112,6 @@ public void recycleConnection(HttpClientConnection conn) {
((ConnectionPool<HttpClientConnection>) current).recycleConnection(conn);
}

@Override
public HttpClientStream createStream(HttpClientConnection conn) throws Exception {
return ((ConnectionPool<HttpClientConnection>) current).createStream(conn);
}

@Override
public void evictConnection(HttpClientConnection conn) {
((ConnectionPool<HttpClientConnection>) current).evictConnection(conn);
Expand Down Expand Up @@ -188,11 +183,6 @@ public void recycleConnection(Http2ClientConnection conn) {
conn.streamCount--;
}

@Override
public HttpClientStream createStream(Http2ClientConnection conn) throws Exception {
return conn.createStream();
}

@Override
public void closeAllConnections() {
List<Http2ClientConnection> toClose = toClose = new ArrayList<>(allConnections);
Expand Down Expand Up @@ -244,11 +234,6 @@ public boolean canCreateConnection(int connCount) {
return connCount < maxSockets;
}

@Override
public HttpClientStream createStream(ClientConnection conn) {
return conn;
}

@Override
public void initConnection(ClientConnection conn) {
}
Expand Down
Expand Up @@ -648,7 +648,7 @@ void handleFailure(Throwable failure) {
}

@Override
void handleConnection(HttpClientConnection conn) {
void initConnection(HttpClientConnection conn) {
synchronized (HttpClientRequestImpl.this) {
if (connectionHandler != null) {
connectionHandler.handle(conn);
Expand All @@ -657,7 +657,9 @@ void handleConnection(HttpClientConnection conn) {
}

@Override
void handleStream(HttpClientStream stream) {
void handleConnection(HttpClientConnection conn) throws Exception {
HttpClientStream stream;
stream = conn.createStream();
connected(stream, headersCompletionHandler);
}

Expand Down
8 changes: 3 additions & 5 deletions src/main/java/io/vertx/core/http/impl/Waiter.java
Expand Up @@ -38,18 +38,16 @@ public Waiter(ContextImpl context) {
abstract void handleFailure(Throwable failure);

/**
* Handle connection success.
* Init connection.
*
* @param conn the connection
*/
abstract void handleConnection(HttpClientConnection conn);
abstract void initConnection(HttpClientConnection conn);

/**
* Handle connection success.
*
* @param stream the stream
*/
abstract void handleStream(HttpClientStream stream);
abstract void handleConnection(HttpClientConnection conn) throws Exception;

/**
* @return true if the waiter has been cancelled
Expand Down

0 comments on commit 1a7c182

Please sign in to comment.