Skip to content

Commit

Permalink
Add HttpMethod and uri in pushed response metrics callback
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 6, 2016
1 parent a2902d2 commit 80c399d
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 24 deletions.
31 changes: 25 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
Expand All @@ -30,6 +31,7 @@
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -144,16 +146,25 @@ public synchronized void onSettingsRead(ChannelHandlerContext ctx, Http2Settings
super.onSettingsRead(ctx, settings);
}

synchronized void sendPush(int streamId, Http2Headers headers, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
handler.writePushPromise(streamId, headers, new Handler<AsyncResult<Integer>>() {
synchronized void sendPush(int streamId, String host, HttpMethod method, MultiMap headers, String path, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
Http2Headers headers_ = new DefaultHttp2Headers();
headers_.method(method.name());
headers_.path(path);
if (host != null) {
headers_.authority(host);
}
if (headers != null) {
headers.forEach(header -> headers_.add(header.getKey(), header.getValue()));
}
handler.writePushPromise(streamId, headers_, new Handler<AsyncResult<Integer>>() {
@Override
public void handle(AsyncResult<Integer> ar) {
if (ar.succeeded()) {
synchronized (Http2ServerConnection.this) {
int promisedStreamId = ar.result();
String contentEncoding = HttpUtils.determineContentEncoding(headers);
String contentEncoding = HttpUtils.determineContentEncoding(headers_);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
Push push = new Push(promisedStream, contentEncoding, completionHandler);
Push push = new Push(promisedStream, contentEncoding, method, path, completionHandler);
streams.put(promisedStreamId, push);
if (maxConcurrentStreams == null || concurrentStreams < maxConcurrentStreams) {
concurrentStreams++;
Expand Down Expand Up @@ -183,12 +194,20 @@ protected Object metric() {

private class Push extends VertxHttp2Stream<Http2ServerConnection> {

private final HttpMethod method;
private final String uri;
private final String contentEncoding;
private Http2ServerResponseImpl response;
private final Future<HttpServerResponse> completionHandler;

public Push(Http2Stream stream, String contentEncoding, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
public Push(Http2Stream stream,
String contentEncoding,
HttpMethod method,
String uri,
Handler<AsyncResult<HttpServerResponse>> completionHandler) {
super(Http2ServerConnection.this, stream);
this.method = method;
this.uri = uri;
this.contentEncoding = contentEncoding;
this.completionHandler = Future.<HttpServerResponse>future().setHandler(completionHandler);
}
Expand Down Expand Up @@ -243,7 +262,7 @@ void handleClose() {

void complete() {
synchronized (Http2ServerConnection.this) {
response = new Http2ServerResponseImpl(Http2ServerConnection.this, this, true, contentEncoding);
response = new Http2ServerResponseImpl(Http2ServerConnection.this, this, method, uri, true, contentEncoding);
completionHandler.complete(response);
}
}
Expand Down
21 changes: 9 additions & 12 deletions src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java
Expand Up @@ -85,7 +85,13 @@ public Http2ServerResponseImpl(Http2ServerConnection conn, VertxHttp2Stream stre
}
}

public Http2ServerResponseImpl(Http2ServerConnection conn, VertxHttp2Stream stream, boolean push, String contentEncoding) {
public Http2ServerResponseImpl(
Http2ServerConnection conn,
VertxHttp2Stream stream,
HttpMethod method,
String path,
boolean push,
String contentEncoding) {
this.stream = stream;
this.ctx = conn.handlerContext;
this.conn = conn;
Expand All @@ -96,7 +102,7 @@ public Http2ServerResponseImpl(Http2ServerConnection conn, VertxHttp2Stream stre
putHeader(HttpHeaderNames.CONTENT_ENCODING, contentEncoding);
}

this.metric = conn.metrics().responsePushed(conn.metric(), this);
this.metric = conn.metrics().responsePushed(conn.metric(), method, path, this);
}

void callReset(long code) {
Expand Down Expand Up @@ -606,16 +612,7 @@ public HttpServerResponse push(HttpMethod method, String host, String path, Mult
throw new IllegalStateException("A push response cannot promise another push");
}
checkEnded();
Http2Headers headers_ = new DefaultHttp2Headers();
headers_.method(method.name());
headers_.path(path);
if (host != null) {
headers_.authority(host);
}
if (headers != null) {
headers.forEach(header -> headers_.add(header.getKey(), header.getValue()));
}
conn.sendPush(stream.id(), headers_, handler);
conn.sendPush(stream.id(), host, method, headers, path, handler);
return this;
}
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -169,7 +170,7 @@ public void requestReset(Void requestMetric) {
}

@Override
public Void responsePushed(Void socketMetric, HttpServerResponse response) {
public Void responsePushed(Void socketMetric, HttpMethod method, String uri, HttpServerResponse response) {
return null;
}

Expand Down
Expand Up @@ -16,6 +16,7 @@

package io.vertx.core.spi.metrics;

import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;
Expand Down Expand Up @@ -63,10 +64,11 @@ public interface HttpServerMetrics<R, W, S> extends TCPMetrics<S> {
* Called when an http server response is pushed.
*
* @param socketMetric the socket metric
* @param response the http server response
* @return the request metric
* @param method the pushed response method
* @param uri the pushed response uri
* @param response the http server response @return the request metric
*/
R responsePushed(S socketMetric, HttpServerResponse response);
R responsePushed(S socketMetric, HttpMethod method, String uri, HttpServerResponse response);

/**
* Called when an http server response has ended.
Expand Down
Expand Up @@ -16,6 +16,7 @@

package io.vertx.test.fakemetrics;

import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -62,7 +63,7 @@ public HttpServerMetric requestBegin(SocketMetric socketMetric, HttpServerReques
}

@Override
public HttpServerMetric responsePushed(SocketMetric socketMetric, HttpServerResponse response) {
public HttpServerMetric responsePushed(SocketMetric socketMetric, HttpMethod method, String uri, HttpServerResponse response) {
HttpServerMetric requestMetric = new HttpServerMetric(null, socketMetric);
requestMetric.response.set(response);
requests.add(requestMetric);
Expand Down
Expand Up @@ -16,7 +16,6 @@

package io.vertx.test.fakemetrics;

import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;

Expand Down

0 comments on commit 80c399d

Please sign in to comment.