Skip to content

Commit

Permalink
Http2 metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 20, 2016
1 parent f96a255 commit a53c62c
Show file tree
Hide file tree
Showing 28 changed files with 678 additions and 215 deletions.
4 changes: 1 addition & 3 deletions src/main/java/io/vertx/core/http/HttpConnection.java
Expand Up @@ -34,7 +34,6 @@
* - test netsocket client reset
* - add rawMethod
* - byte distribution algorithm configurability (options ? connection ?)
* - metrics
* - netSocket() interaction are not exactly the same than with http/1.x : see if we can make is the same
* - netSocket() sendFile
* - OpenSSL integration
Expand All @@ -44,11 +43,10 @@
* - examples
*
* HttpServer:
* - server synchronization + executeFromIO
* - server synchronization + executeFromIO + test worker server
*
* HttpClient:
* - test executeFromIO
* - metrics
* - HttpConnection test
*
* not yet in scope:
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -327,6 +327,15 @@ void handleResponseChunk(Buffer buff) {
}

void handleResponseEnd(LastHttpContent trailer) {
if (metrics.isEnabled()) {
HttpClientRequestBase req = currentResponse.request();
Object reqMetric = req.metric();
if (req.exceptionOccurred) {
metrics.requestReset(reqMetric);
} else {
metrics.responseEnd(reqMetric, currentResponse);
}
}
currentResponse.handleEnd(pausedChunk, new HeadersAdaptor(trailer.trailingHeaders()));

// We don't signal response end for a 100-continue response as a real response will follow
Expand Down Expand Up @@ -363,8 +372,19 @@ protected synchronized void handleClosed() {
if (ws != null) {
ws.handleClosed();
}
// Connection was closed - call exception handlers for any requests in the pipeline or one being currently written
Exception e = new VertxException("Connection was closed");

// Signal requests failed
if (metrics.isEnabled()) {
for (HttpClientRequestImpl req: requests) {
metrics.requestReset(req.metric());
}
if (requestForResponse != null) {
metrics.requestReset(requestForResponse.metric());
}
}

// Connection was closed - call exception handlers for any requests in the pipeline or one being currently written
for (HttpClientRequestImpl req: requests) {
req.handleException(e);
}
Expand Down Expand Up @@ -462,6 +482,10 @@ public synchronized void beginRequest(HttpClientRequestImpl req) {
if (currentRequest != null) {
throw new IllegalStateException("Connection is already writing a request");
}
if (metrics.isEnabled()) {
Object reqMetric = client.httpClientMetrics().requestBegin(metric, localAddress(), remoteAddress(), req);
req.metric(reqMetric);
}
this.currentRequest = req;
this.requests.add(req);
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/vertx/core/http/impl/FileStreamChannel.java
Expand Up @@ -58,14 +58,15 @@ class FileStreamChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
private boolean active;
private boolean closed;
private long bytesWritten;
private final VertxHttp2Stream stream;
private final Handler<Void> endHandler;
private final Handler<Long> endHandler;

FileStreamChannel(
Context resultCtx,
Handler<AsyncResult<Void>> resultHandler,
VertxHttp2Stream stream,
Handler<Void> endHandler,
Handler<Long> endHandler,
long length) {
super(null, Id.INSTANCE);

Expand Down Expand Up @@ -156,12 +157,13 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
ByteBuf chunk;
while (!stream.isNotWritable() && (chunk = (ByteBuf) in.current()) != null && length > 0) {
length -= chunk.readableBytes();
bytesWritten += chunk.readableBytes();
boolean end = length == 0;
stream.writeData(chunk.retain(), end);
stream.writeData(chunk.retain(), false);
stream.handlerContext.flush();
in.remove();
if (end) {
endHandler.handle(null);
endHandler.handle(bytesWritten);
}
}
}
Expand Down
87 changes: 54 additions & 33 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -37,6 +37,7 @@
import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.net.NetSocket;
import io.vertx.core.spi.metrics.HttpClientMetrics;

import java.util.Map;

Expand All @@ -48,14 +49,19 @@
class Http2ClientConnection extends Http2ConnectionBase implements HttpClientConnection {

final Http2Pool http2Pool;
final HttpClientMetrics metrics;
final Object metric;
long streamCount;

public Http2ClientConnection(Http2Pool http2Pool,
ContextImpl context,
Channel channel,
VertxHttp2ConnectionHandler connHandler) {
super(channel, context, connHandler);
VertxHttp2ConnectionHandler connHandler,
HttpClientMetrics metrics) {
super(channel, context, connHandler, metrics);
this.http2Pool = http2Pool;
this.metrics = metrics;
this.metric = metrics.connected(remoteAddress(), remoteName());
}

@Override
Expand All @@ -77,7 +83,7 @@ public void onStreamClosed(Http2Stream nettyStream) {

HttpClientStream createStream() {
try {
Http2Connection conn = connHandler.connection();
Http2Connection conn = handler.connection();
Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
Http2ClientStream clientStream = new Http2ClientStream(this, stream);
streams.put(clientStream.stream.id(), clientStream);
Expand All @@ -88,16 +94,13 @@ HttpClientStream createStream() {
}

@Override
public void reportBytesWritten(long numberOfBytes) {
}

@Override
public void reportBytesRead(long s) {
protected Object metric() {
return metric;
}

@Override
public boolean isValid() {
Http2Connection conn = connHandler.connection();
Http2Connection conn = handler.connection();
return !conn.goAwaySent() && !conn.goAwayReceived();
}

Expand All @@ -123,37 +126,32 @@ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promi
String uri = headers.path().toString();
String host = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers);
Http2Stream promisedStream = connHandler.connection().stream(promisedStreamId);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
HttpClientRequestPushPromise promisedReq = new HttpClientRequestPushPromise(this, promisedStream, http2Pool.client, method, uri, host, headersMap);
if (metrics.isEnabled()) {
promisedReq.metric(metrics.responsePushed(metric, localAddress(), remoteAddress(), promisedReq));
}
streams.put(promisedStreamId, promisedReq.getStream());
stream.handlePushPromise(promisedReq);
}

static class Http2ClientStream extends VertxHttp2Stream implements HttpClientStream {

private final Http2ClientConnection handler;
private final ContextImpl context;
private final Channel channel;
private final ChannelHandlerContext handlerContext;
private final Http2Connection conn;
private final Http2Stream stream;
private final Http2ClientConnection conn;
private final Http2Connection connection;
private HttpClientRequestBase req;
private HttpClientResponseImpl resp;
private boolean ended;

public Http2ClientStream(Http2ClientConnection handler, Http2Stream stream) throws Http2Exception {
this(handler, null, stream);
public Http2ClientStream(Http2ClientConnection conn, Http2Stream stream) throws Http2Exception {
this(conn, null, stream);
}

public Http2ClientStream(Http2ClientConnection handler, HttpClientRequestBase req, Http2Stream stream) throws Http2Exception {
super(handler.http2Pool.client.getVertx(), handler.context, handler.handlerContext, handler.connHandler.encoder(), handler.connHandler.decoder(), stream);
this.handler = handler;
this.context = handler.context;
this.handlerContext = handler.handlerContext;
this.conn = handler.connHandler.connection();
this.stream = stream;
public Http2ClientStream(Http2ClientConnection conn, HttpClientRequestBase req, Http2Stream stream) throws Http2Exception {
super(conn, stream);
this.conn = conn;
this.connection = conn.handler.connection();
this.req = req;
this.channel = handler.channel;
}

@Override
Expand All @@ -163,8 +161,16 @@ public HttpVersion version() {

@Override
void callEnd() {
// Should use an shared immutable object ?
if (conn.metrics.isEnabled()) {
HttpClientRequestBase req = this.req;
if (req.exceptionOccurred) {
conn.metrics.requestReset(req.metric());
} else {
conn.metrics.responseEnd(req.metric(), resp);
}
}
ended = true;
// Should use a shared immutable object for CaseInsensitiveHeaders ?
resp.handleEnd(null, new CaseInsensitiveHeaders());
}

Expand All @@ -175,13 +181,24 @@ void callHandler(Buffer buf) {

@Override
void callReset(long errorCode) {
ended = true;
handleException(new StreamResetException(errorCode));
if (!ended) {
ended = true;
if (conn.metrics.isEnabled()) {
HttpClientRequestBase req = this.req;
conn.metrics.requestReset(req.metric());
}
handleException(new StreamResetException(errorCode));
}
}

@Override
void handleClose() {
if (!ended) {
ended = true;
if (conn.metrics.isEnabled()) {
HttpClientRequestBase req = this.req;
conn.metrics.requestReset(req.metric());
}
handleException(new VertxException("Connection was closed")); // Put that in utility class
}
}
Expand Down Expand Up @@ -274,16 +291,20 @@ public void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers
h.add(Http2HeadersAdaptor.toLowerCase(header.getKey()), header.getValue());
}
}
if (handler.http2Pool.client.getOptions().isTryUseCompression() && h.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
if (conn.http2Pool.client.getOptions().isTryUseCompression() && h.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
h.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP);
}
if (conn.metrics.isEnabled()) {
((HttpClientRequestImpl)req).metric(conn.metrics.requestBegin(conn.metric, conn.localAddress(), conn.remoteAddress(), req));
}
encoder.writeHeaders(handlerContext, stream.id(), h, 0, end && content == null, handlerContext.newPromise());
if (content != null) {
writeBuffer(content, end);
} else {
channel.flush();
}
}

@Override
public void writeBuffer(ByteBuf buf, boolean end) {
writeData(buf, end);
Expand All @@ -308,7 +329,7 @@ public void doSetWriteQueueMaxSize(int size) {
}
@Override
public boolean isNotWritable() {
return !conn.remote().flowController().isWritable(stream);
return !connection.remote().flowController().isWritable(stream);
}
@Override
public void beginRequest(HttpClientRequestImpl request) {
Expand All @@ -325,12 +346,12 @@ public void reset(long code) {

@Override
public HttpClientConnection connection() {
return handler;
return conn;
}

@Override
public NetSocket createNetSocket() {
return handler.toNetSocket(this);
return conn.toNetSocket(this);
}
}
}

0 comments on commit a53c62c

Please sign in to comment.