Skip to content

Commit

Permalink
Rework to handle http metrics outside of the pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 17, 2017
1 parent 537d2b7 commit 005c4ca
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 114 deletions.
70 changes: 58 additions & 12 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -17,12 +17,18 @@
package io.vertx.core.http.impl;

import io.netty.channel.Channel;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.impl.pool.Pool;
import io.vertx.core.http.impl.pool.Waiter;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.HttpClientMetrics;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

/**
* The connection manager associates remote hosts with pools, it also tracks all connections so they can be closed
Expand All @@ -36,7 +42,7 @@ class ConnectionManager {
private final HttpClientMetrics metrics; // Shall be removed later combining the PoolMetrics with HttpClientMetrics
private final HttpClientImpl client;
private final Map<Channel, HttpClientConnection> connectionMap = new ConcurrentHashMap<>();
private final Map<EndpointKey, Pool<HttpClientConnection>> endpointMap = new ConcurrentHashMap<>();
private final Map<EndpointKey, Endpoint> endpointMap = new ConcurrentHashMap<>();
private final long maxSize;

ConnectionManager(HttpClientImpl client,
Expand Down Expand Up @@ -85,18 +91,28 @@ public int hashCode() {
}
}

void getConnection(String peerHost, boolean ssl, int port, String host, Waiter<HttpClientConnection> waiter) {
class Endpoint {

private final Pool<HttpClientConnection> pool;
private final Object metric;

public Endpoint(Pool<HttpClientConnection> pool, Object metric) {
this.pool = pool;
this.metric = metric;
}
}

void getConnection(String peerHost, boolean ssl, int port, String host,
Handler<HttpConnection> connectionHandler,
BiFunction<ContextInternal, HttpClientConnection, Boolean> onSuccess,
BiConsumer<ContextInternal, Throwable> onFailure) {
EndpointKey key = new EndpointKey(ssl, port, peerHost);
while (true) {
EndpointKey key = new EndpointKey(ssl, port, peerHost);
Pool<HttpClientConnection> pool = endpointMap.computeIfAbsent(key, targetAddress -> {
Object metric = metrics != null ? metrics.createEndpoint(host, port, 10 /* todo: fix when reworking pool metrics */) : null;
Endpoint endpoint = endpointMap.computeIfAbsent(key, targetAddress -> {
int maxPoolSize = Math.max(client.getOptions().getMaxPoolSize(), client.getOptions().getHttp2MaxPoolSize());
Object metric = metrics != null ? metrics.createEndpoint(host, port, maxPoolSize) : null;
HttpChannelConnector connector = new HttpChannelConnector(client, metric, ssl, peerHost, host, port);
return new Pool<>(
connector,
metrics,
metric,
maxWaitQueueSize,
maxSize,
Pool<HttpClientConnection> pool = new Pool<>(connector, maxWaitQueueSize, maxSize,
v -> {
if (metrics != null) {
metrics.closeEndpoint(host, port, metric);
Expand All @@ -105,8 +121,38 @@ void getConnection(String peerHost, boolean ssl, int port, String host, Waiter<H
},
connectionMap::put,
connectionMap::remove);
return new Endpoint(pool, metric);
});
if (pool.getConnection(waiter)) {
Object metric;
if (metrics != null) {
metric = metrics.enqueueRequest(endpoint.metric);
} else {
metric = null;
}
if (endpoint.pool.getConnection(new Waiter<HttpClientConnection>(client.getVertx().getOrCreateContext()) {
@Override
public void initConnection(ContextInternal ctx, HttpClientConnection conn) {
if (connectionHandler != null) {
ctx.executeFromIO(() -> {
connectionHandler.handle(conn);
});
}
}
@Override
public void handleFailure(ContextInternal ctx, Throwable failure) {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
onFailure.accept(ctx, failure);
}
@Override
public boolean handleConnection(ContextInternal ctx, HttpClientConnection conn) throws Exception {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
return onSuccess.apply(ctx, conn);
}
})) {
break;
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
Expand Up @@ -61,16 +61,16 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
private final String peerHost;
private final String host;
private final int port;
private final Object endpointMetric;
private final Object metric;

HttpChannelConnector(HttpClientImpl client,
Object endpointMetric,
Object metric,
boolean ssl,
String peerHost,
String host,
int port) {
this.client = client;
this.endpointMetric = endpointMetric;
this.metric = metric;
this.options = client.getOptions();
this.metrics = client.metrics();
this.sslHelper = client.getSslHelper();
Expand Down Expand Up @@ -276,7 +276,7 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
port,
ssl,
client,
endpointMetric,
metric,
client.metrics());
clientHandler.addHandler(conn -> {
listener.onConnectSuccess(conn, http1MaxConcurrency, ch, context, weight, http1Weight);
Expand All @@ -298,7 +298,7 @@ private void http2Connected(ConnectionListener<HttpClientConnection> listener,
.clientUpgrade(upgrade)
.useCompression(client.getOptions().isTryUseCompression())
.initialSettings(client.getOptions().getInitialSettings())
.connectionFactory(connHandler -> new Http2ClientConnection(listener, endpointMetric, client, context, connHandler, metrics))
.connectionFactory(connHandler -> new Http2ClientConnection(listener, metric, client, context, connHandler, metrics))
.logEnabled(options.getLogActivity())
.build();
handler.addHandler(conn -> {
Expand All @@ -317,7 +317,7 @@ private void http2Connected(ConnectionListener<HttpClientConnection> listener,
});
handler.removeHandler(conn -> {
if (metrics != null) {
metrics.endpointDisconnected(endpointMetric, conn.metric());
metrics.endpointDisconnected(metric, conn.metric());
}
listener.onClose();
});
Expand Down
46 changes: 17 additions & 29 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -17,16 +17,7 @@
package io.vertx.core.http.impl;

import io.vertx.core.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.pool.ConnectionProvider;
import io.vertx.core.http.impl.pool.Waiter;
import io.vertx.core.impl.ContextImpl;
Expand All @@ -51,6 +42,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -944,28 +937,23 @@ private void getConnectionForWebsocket(boolean ssl,
Handler<Http1xClientConnection> handler,
Handler<Throwable> connectionExceptionHandler,
ContextImpl context) {
websocketCM.getConnection(host, ssl, port, host, new Waiter<HttpClientConnection>(context) {
@Override
public void initConnection(ContextInternal ctx, HttpClientConnection conn) {
}
@Override
public boolean handleConnection(ContextInternal ctx, HttpClientConnection conn) {
ctx.executeFromIO(() -> {
handler.handle((Http1xClientConnection) conn);
});
return true;
}
@Override
public void handleFailure(ContextInternal ctx, Throwable failure) {
ctx.executeFromIO(() -> {
connectionExceptionHandler.handle(failure);
});
}
websocketCM.getConnection(host, ssl, port, host, null, (ctx, conn) -> {
ctx.executeFromIO(() -> {
handler.handle((Http1xClientConnection) conn);
});
return true;
}, (ctx, failure) -> {
ctx.executeFromIO(() -> {
connectionExceptionHandler.handle(failure);
});
});
}

void getConnectionForRequest(String peerHost, boolean ssl, int port, String host, Waiter<HttpClientConnection> waiter) {
httpCM.getConnection(peerHost, ssl, port, host, waiter);
void getConnectionForRequest(String peerHost, boolean ssl, int port, String host,
Handler<HttpConnection> connectionHandler,
BiFunction<ContextInternal, HttpClientConnection, Boolean> onSuccess,
BiConsumer<ContextInternal, Throwable> onFailure) {
httpCM.getConnection(peerHost, ssl, port, host, connectionHandler, onSuccess, onFailure);
}

/**
Expand Down
75 changes: 24 additions & 51 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -29,9 +29,7 @@
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.Waiter;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
Expand Down Expand Up @@ -647,54 +645,6 @@ private synchronized void connect() {
throw new IllegalStateException("You must provide a rawMethod when using an HttpMethod.OTHER method");
}

ContextImpl ctx = vertx.getOrCreateContext();
Waiter<HttpClientConnection> waiter = new Waiter<HttpClientConnection>(ctx) {

private void checkContext(ContextInternal current) {
}

@Override
public void handleFailure(ContextInternal ctx, Throwable failure) {
checkContext(ctx);
ctx.executeFromIO(() -> {
handleException(failure);
});
}

@Override
public void initConnection(ContextInternal ctx, HttpClientConnection conn) {
checkContext(ctx);
synchronized (HttpClientRequestImpl.this) {
if (connectionHandler != null) {
ctx.executeFromIO(() -> {
connectionHandler.handle(conn);
});
}
}
}

@Override
public boolean handleConnection(ContextInternal ctx, HttpClientConnection conn) throws Exception {
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
if (exceptionOccurred != null || reset != null) {
return false;
}
checkContext(ctx);
conn.createStream(HttpClientRequestImpl.this, ar -> {
if (ar.succeeded()) {
HttpClientStream stream = ar.result();
ctx.executeFromIO(() -> {
connected(stream, HttpClientRequestImpl.this.headersCompletionHandler);
});
} else {
throw new RuntimeException(ar.cause());
}
});
return true;
}
};

String peerHost;
if (hostHeader != null) {
int idx = hostHeader.lastIndexOf(':');
Expand All @@ -710,7 +660,30 @@ public boolean handleConnection(ContextInternal ctx, HttpClientConnection conn)
// We defer actual connection until the first part of body is written or end is called
// This gives the user an opportunity to set an exception handler before connecting so
// they can capture any exceptions on connection
client.getConnectionForRequest(peerHost, ssl, port, host, waiter);
client.getConnectionForRequest(peerHost, ssl, port, host, connectionHandler, (ctx, conn) -> {
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
if (exceptionOccurred != null || reset != null) {
return false;
}
// checkContext(ctx);
conn.createStream(HttpClientRequestImpl.this, ar -> {
if (ar.succeeded()) {
HttpClientStream stream = ar.result();
ctx.executeFromIO(() -> {
connected(stream, HttpClientRequestImpl.this.headersCompletionHandler);
});
} else {
throw new RuntimeException(ar.cause());
}
});
return true;
}, (ctx, failure) -> {
ctx.executeFromIO(() -> {
handleException(failure);
});

});
connecting = true;
}
}
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -74,8 +74,6 @@ public static class Holder<C> {
}
private static final Logger log = LoggerFactory.getLogger(Pool.class);

private final HttpClientMetrics metrics; // todo: later switch to PoolMetrics
private final Object metric;
private final ConnectionProvider<C> connector;
private final long maxWeight;
private final int queueMaxSize;
Expand All @@ -91,18 +89,14 @@ public static class Holder<C> {
private final BiConsumer<Channel, C> connectionRemoved;

public Pool(ConnectionProvider<C> connector,
HttpClientMetrics metrics,
Object metric,
int queueMaxSize,
long maxWeight,
Handler<Void> poolClosed,
BiConsumer<Channel, C> connectionAdded,
BiConsumer<Channel, C> connectionRemoved) {
this.maxWeight = maxWeight;
this.connector = connector;
this.metric = metric;
this.queueMaxSize = queueMaxSize;
this.metrics = metrics;
this.poolClosed = poolClosed;
this.all = new HashSet<>();
this.available = new ArrayDeque<>();
Expand All @@ -116,9 +110,6 @@ public synchronized boolean getConnection(Waiter<C> waiter) {
}
// Enqueue
if (capacity > 0 || weight < maxWeight || (queueMaxSize < 0 || waiters.size() < queueMaxSize)) {
if (metrics != null) {
waiter.metric = metrics.enqueueRequest(metric);
}
waiters.add(waiter);
checkPending();
} else {
Expand All @@ -133,9 +124,6 @@ private void checkPending() {
if (waiter == null) {
break;
}
if (metric != null) {
metrics.dequeueRequest(metric, waiter.metric);
}
if (capacity > 0) {
capacity--;
Holder<C> conn = available.peek();
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/vertx/core/http/impl/pool/Waiter.java
Expand Up @@ -24,8 +24,7 @@
*/
public abstract class Waiter<C> {

protected final ContextImpl context;
Object metric;
public final ContextImpl context;

protected Waiter(ContextImpl context) {
this.context = context;
Expand Down
Expand Up @@ -81,8 +81,6 @@ void getConnection(Waiter<FakeConnection> waiter) {
closed = false;
pool = new Pool<>(
connector,
null,
null,
queueMaxSize,
maxPoolSize,
v -> {
Expand Down

0 comments on commit 005c4ca

Please sign in to comment.