Skip to content

Commit

Permalink
Decouple channel connector from pool and connection queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 10, 2017
1 parent f31991d commit b0e4798
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 484 deletions.
39 changes: 16 additions & 23 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -66,6 +66,7 @@ class ClientConnection extends Http1xConnectionBase implements HttpClientConnect

private static final Logger log = LoggerFactory.getLogger(ClientConnection.class);

private final ClientConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
private final boolean ssl;
private final String host;
Expand All @@ -79,9 +80,6 @@ class ClientConnection extends Http1xConnectionBase implements HttpClientConnect
private final HttpClientMetrics metrics;
private final HttpVersion version;

private Handler<Void> concurrencyUpdateHandler;
private Handler<Void> evictionHandler;

private WebSocketClientHandshaker handshaker;
private HttpClientRequestImpl currentRequest;
private HttpClientResponseImpl currentResponse;
Expand All @@ -92,9 +90,18 @@ class ClientConnection extends Http1xConnectionBase implements HttpClientConnect
private boolean paused;
private Buffer pausedChunk;

ClientConnection(HttpVersion version, HttpClientImpl client, Object endpointMetric, ChannelHandlerContext channel, boolean ssl, String host,
int port, ContextImpl context, HttpClientMetrics metrics) {
ClientConnection(ClientConnectionListener<HttpClientConnection> listener,
HttpVersion version,
HttpClientImpl client,
Object endpointMetric,
ChannelHandlerContext channel,
boolean ssl,
String host,
int port,
ContextImpl context,
HttpClientMetrics metrics) {
super(client.getVertx(), channel, context);
this.listener = listener;
this.client = client;
this.ssl = ssl;
this.host = host;
Expand All @@ -112,18 +119,6 @@ public Channel channel() {
return chctx.channel();
}

@Override
public HttpClientConnection concurrencyUpdateHandler(Handler<Void> handler) {
concurrencyUpdateHandler = handler;
return this;
}

@Override
public HttpClientConnection evictionHandler(Handler<Void> handler) {
evictionHandler = handler;
return this;
}

public HttpClientMetrics metrics() {
return metrics;
}
Expand Down Expand Up @@ -464,7 +459,7 @@ public void reset(long code) {
private void requestEnded() {
context.runOnContext(v -> {
if (pipelining && requests.size() < pipeliningLimit) {
concurrencyUpdateHandler.handle(null);
listener.onRecycle(this);
}
});
}
Expand All @@ -475,7 +470,7 @@ private void responseEnded() {
} else {
context.runOnContext(v -> {
if (currentRequest == null) {
concurrencyUpdateHandler.handle(null);
listener.onRecycle(this);
}
});
}
Expand Down Expand Up @@ -634,7 +629,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf);
}
}.removeHandler(sock -> evictionHandler.handle(null)));
}.removeHandler(sock -> listener.onClose(this, chctx.channel())));
return socket;
}

Expand All @@ -645,9 +640,7 @@ public HttpClientConnection connection() {

@Override
public HttpVersion version() {
// Used to determine the http version in the HttpClientRequest#sendHead handler , for HTTP/1.1 it will
// not yet know but it will for HTTP/2
return null;
return version;
}

@Override
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.core.http.impl;

import io.netty.channel.Channel;

public interface ClientConnectionListener<C> {

void onRecycle(C conn);

void onClose(C conn, Channel channel);

}
47 changes: 32 additions & 15 deletions src/main/java/io/vertx/core/http/impl/ClientHandler.java
Expand Up @@ -25,7 +25,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.spi.metrics.HttpClientMetrics;
Expand All @@ -37,34 +37,43 @@ class ClientHandler extends VertxHttpHandler<ClientConnection> {
private boolean closeFrameSent;
private ContextImpl context;
private ChannelHandlerContext chctx;
private Http1xPool pool;
private HttpClientImpl client;
private Object endpointMetric;
private HttpClientMetrics metrics;
private final HttpVersion version;
private final String host;
private final int port;
private final boolean ssl;
private final HttpClientImpl client;
private final HttpClientMetrics metrics;
private final ClientConnectionListener<HttpClientConnection> listener;
private final Object endpointMetric;

public ClientHandler(ContextImpl context,
Http1xPool pool,
public ClientHandler(ClientConnectionListener<HttpClientConnection> listener,
ContextImpl context,
HttpVersion version,
String host,
int port,
boolean ssl,
HttpClientImpl client,
Object endpointMetric,
HttpClientMetrics metrics) {
this.context = context;
this.pool = pool;
this.version = version;
this.client = client;
this.host = host;
this.port = port;
this.ssl = ssl;
this.endpointMetric = endpointMetric;
this.metrics = metrics;
this.listener = listener;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
chctx = ctx;
ClientConnection conn = new ClientConnection(pool.version(), client, endpointMetric, ctx,
pool.ssl(), pool.host(), pool.port(), context, metrics);
ClientConnection conn = new ClientConnection(listener, version, client, endpointMetric, ctx, ssl, host, port, context, metrics);
if (metrics != null) {
context.executeFromIO(() -> {
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(metric);
metrics.endpointConnected(endpointMetric, metric);
});
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(metric);
metrics.endpointConnected(endpointMetric, metric);
}
setConnection(conn);
}
Expand All @@ -73,6 +82,14 @@ public ChannelHandlerContext context() {
return chctx;
}

@Override
public void channelInactive(ChannelHandlerContext chctx) throws Exception {
if (metrics != null) {
metrics.endpointDisconnected(endpointMetric, getConnection().metric());
}
super.channelInactive(chctx);
}

@Override
protected void handleMessage(ClientConnection conn, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
if (msg instanceof HttpObject) {
Expand Down

0 comments on commit b0e4798

Please sign in to comment.