Skip to content

Commit

Permalink
Websocket metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 2, 2015
1 parent 72bbc74 commit e27d659
Show file tree
Hide file tree
Showing 17 changed files with 471 additions and 52 deletions.
Expand Up @@ -80,8 +80,8 @@ class ClientConnection extends ConnectionBase {
// Requests can be pipelined so we need a queue to keep track of requests
private final Queue<HttpClientRequestImpl> requests = new ArrayDeque<>();
private final Handler<Throwable> exceptionHandler;
private final HttpClientMetrics metrics;
private final Object metric;
final HttpClientMetrics metrics;

private WebSocketClientHandshaker handshaker;
private HttpClientRequestImpl currentRequest;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -637,6 +637,7 @@ private void handshake(FullHttpRequest request, Channel ch, ChannelHandlerContex
ServerWebSocketImpl ws = new ServerWebSocketImpl(vertx, theURI.toString(), theURI.getPath(),
theURI.getQuery(), new HeadersAdaptor(request.headers()), wsConn, shake.version() != WebSocketVersion.V00,
connectRunnable, options.getMaxWebsocketFrameSize());
ws.metric = metrics.connected(wsConn.metric(), ws);
wsConn.handleWebsocketConnect(ws);
if (!ws.isRejected()) {
ChannelHandler handler = ctx.pipeline().get(HttpChunkContentCompressor.class);
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ServerConnection.java
Expand Up @@ -321,6 +321,10 @@ synchronized private void handleWsFrame(WebSocketFrameInternal frame) {
}

synchronized protected void handleClosed() {
if (ws != null) {
metrics.disconnected(ws.metric);
ws.metric = null;
}
super.handleClosed();
if (ws != null) {
ws.handleClosed();
Expand Down
Expand Up @@ -40,6 +40,7 @@ public class ServerWebSocketImpl extends WebSocketImplBase implements ServerWebS
private final String query;
private final Runnable connectRunnable;
private final MultiMap headers;
Object metric;

private boolean connected;
private boolean rejected;
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/io/vertx/core/http/impl/WebSocketImpl.java
Expand Up @@ -21,7 +21,7 @@
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.spi.metrics.HttpClientMetrics;

/**
* This class is optimised for performance when used on the same event loop. However it can be used safely from other threads.
Expand All @@ -34,9 +34,13 @@
*/
public class WebSocketImpl extends WebSocketImplBase implements WebSocket {

public WebSocketImpl(VertxInternal vertx, ConnectionBase conn, boolean supportsContinuation,
final Object metric;

public WebSocketImpl(VertxInternal vertx,
ClientConnection conn, boolean supportsContinuation,
int maxWebSocketFrameSize) {
super(vertx, conn, supportsContinuation, maxWebSocketFrameSize);
metric = conn.metrics.connected(conn.metric(), this);
}

@Override
Expand Down Expand Up @@ -125,4 +129,10 @@ public synchronized WebSocket drainHandler(Handler<Void> handler) {
this.drainHandler = handler;
return this;
}

@Override
synchronized void handleClosed() {
((ClientConnection) conn).metrics.disconnected(metric);
super.handleClosed();
}
}
24 changes: 22 additions & 2 deletions src/main/java/io/vertx/core/metrics/impl/DummyVertxMetrics.java
Expand Up @@ -29,6 +29,8 @@
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocket;
import io.vertx.core.spi.metrics.DatagramSocketMetrics;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
Expand Down Expand Up @@ -155,7 +157,7 @@ public void close() {
}
}

class DummyHttpServerMetrics implements HttpServerMetrics<Void, Void> {
class DummyHttpServerMetrics implements HttpServerMetrics<Void, Void, Void> {

@Override
public Void requestBegin(Void socketMetric, HttpServerRequest request) {
Expand Down Expand Up @@ -195,9 +197,18 @@ public void close() {
public boolean isEnabled() {
return false;
}

@Override
public Void connected(Void socketMetric, ServerWebSocket serverWebSocket) {
return null;
}

@Override
public void disconnected(Void serverWebSocketMetric) {
}
}

class DummyHttpClientMetrics implements HttpClientMetrics<Void, Void> {
class DummyHttpClientMetrics implements HttpClientMetrics<Void, Void, Void> {

@Override
public Void requestBegin(Void socketMetric, SocketAddress localAddress, SocketAddress remoteAddress, HttpClientRequest request) {
Expand Down Expand Up @@ -237,6 +248,15 @@ public void close() {
public boolean isEnabled() {
return false;
}

@Override
public Void connected(Void socketMetric, WebSocket webSocket) {
return null;
}

@Override
public void disconnected(Void webSocketMetric) {
}
}

class DummyTCPMetrics implements TCPMetrics<Void> {
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java
Expand Up @@ -18,14 +18,15 @@

import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.WebSocket;
import io.vertx.core.net.SocketAddress;

/**
* The http client metrics SPI that Vert.x will use to call when http client events occur.
*
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
*/
public interface HttpClientMetrics<R, S> extends TCPMetrics<S> {
public interface HttpClientMetrics<R, W, S> extends TCPMetrics<S> {

/**
* Called when an http client request begins
Expand All @@ -45,4 +46,20 @@ public interface HttpClientMetrics<R, S> extends TCPMetrics<S> {
* @param response the {@link io.vertx.core.http.HttpClientResponse}
*/
void responseEnd(R requestMetric, HttpClientResponse response);

/**
* Called when a web socket connects.
*
* @param socketMetric the socket metric
* @param webSocket the server web socket
* @return the web socket metric
*/
W connected(S socketMetric, WebSocket webSocket);

/**
* Called when the web socket has disconnected.
*
* @param webSocketMetric the web socket metric
*/
void disconnected(W webSocketMetric);
}
19 changes: 18 additions & 1 deletion src/main/java/io/vertx/core/spi/metrics/HttpServerMetrics.java
Expand Up @@ -18,13 +18,14 @@

import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;

/**
* The http server metrics SPI that Vert.x will use to call when each http server event occurs.
*
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
*/
public interface HttpServerMetrics<R, S> extends TCPMetrics<S> {
public interface HttpServerMetrics<R, W, S> extends TCPMetrics<S> {

/**
* Called when an http server request begins
Expand All @@ -42,4 +43,20 @@ public interface HttpServerMetrics<R, S> extends TCPMetrics<S> {
* @param response the {@link io.vertx.core.http.HttpServerResponse}
*/
void responseEnd(R requestMetric, HttpServerResponse response);

/**
* Called when a server web socket connects.
*
* @param socketMetric the socket metric
* @param serverWebSocket the server web socket
* @return the server web socket metric
*/
W connected(S socketMetric, ServerWebSocket serverWebSocket);

/**
* Called when the server web socket has disconnected.
*
* @param serverWebSocketMetric the server web socket metric
*/
void disconnected(W serverWebSocketMetric);
}
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/spi/metrics/VertxMetrics.java
Expand Up @@ -85,7 +85,7 @@ public interface VertxMetrics extends Metrics, Measured {
* @param options the options used to create the {@link io.vertx.core.http.HttpServer}
* @return the http server metrics SPI
*/
HttpServerMetrics<?, ?> createMetrics(HttpServer server, SocketAddress localAddress, HttpServerOptions options);
HttpServerMetrics<?, ?, ?> createMetrics(HttpServer server, SocketAddress localAddress, HttpServerOptions options);

/**
* Provides the http client metrics SPI when an http client has been created
Expand All @@ -94,7 +94,7 @@ public interface VertxMetrics extends Metrics, Measured {
* @param options the options used to create the {@link io.vertx.core.http.HttpClient}
* @return the http client metrics SPI
*/
HttpClientMetrics<?, ?> createMetrics(HttpClient client, HttpClientOptions options);
HttpClientMetrics<?, ?, ?> createMetrics(HttpClient client, HttpClientOptions options);

/**
* Provides the net server metrics SPI when a net server is created
Expand Down
86 changes: 71 additions & 15 deletions src/test/java/io/vertx/test/core/MetricsTest.java
Expand Up @@ -20,12 +20,18 @@
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.test.fakemetrics.FakeEventBusMetrics;
import io.vertx.test.fakemetrics.FakeHttpClientMetrics;
import io.vertx.test.fakemetrics.FakeHttpServerMetrics;
import io.vertx.test.fakemetrics.FakeMetricsBase;
import io.vertx.test.fakemetrics.FakeVertxMetrics;
import io.vertx.test.fakemetrics.HandlerMetric;
import io.vertx.test.fakemetrics.ReceivedMessage;
import io.vertx.test.fakemetrics.SentMessage;
import io.vertx.test.fakemetrics.WebSocketMetric;
import org.junit.Test;

import java.util.Arrays;
Expand Down Expand Up @@ -75,8 +81,7 @@ public void testPublishMessageToCluster() {
}

private void testBroadcastMessage(Vertx from, Vertx[] to, boolean publish, boolean expectedLocal, boolean expectedRemote) {
FakeVertxMetrics metrics = FakeVertxMetrics.getMetrics(from);
FakeEventBusMetrics eventBusMetrics = metrics.getEventBusMetrics();
FakeEventBusMetrics eventBusMetrics = FakeMetricsBase.getMetrics(from.eventBus());
AtomicInteger broadcastCount = new AtomicInteger();
AtomicInteger receiveCount = new AtomicInteger();
for (Vertx vertx : to) {
Expand Down Expand Up @@ -114,8 +119,7 @@ public void testReceiveMessageSentFromRemote() {
}

private void testReceiveMessageSent(Vertx from, Vertx to, boolean expectedLocal, int expectedHandlers) {
FakeVertxMetrics metrics = FakeVertxMetrics.getMetrics(to);
FakeEventBusMetrics eventBusMetrics = metrics.getEventBusMetrics();
FakeEventBusMetrics eventBusMetrics = FakeMetricsBase.getMetrics(to.eventBus());
MessageConsumer<Object> consumer = to.eventBus().consumer(ADDRESS1);
consumer.completionHandler(done -> {
assertTrue(done.succeeded());
Expand All @@ -141,8 +145,7 @@ public void testReceiveMessagePublishedFromRemote() {
}

private void testReceiveMessagePublished(Vertx from, Vertx to, boolean expectedLocal, int expectedHandlers) {
FakeVertxMetrics metrics = FakeVertxMetrics.getMetrics(to);
FakeEventBusMetrics eventBusMetrics = metrics.getEventBusMetrics();
FakeEventBusMetrics eventBusMetrics = FakeMetricsBase.getMetrics(to.eventBus());
AtomicInteger count = new AtomicInteger();
for (int i = 0;i < expectedHandlers;i++) {
MessageConsumer<Object> consumer = to.eventBus().consumer(ADDRESS1);
Expand Down Expand Up @@ -176,8 +179,8 @@ public void testReplyMessageFromRemote() {
}

private void testReply(Vertx from, Vertx to, boolean expectedLocal, boolean expectedRemote) {
FakeEventBusMetrics fromMetrics = FakeVertxMetrics.getMetrics(from).getEventBusMetrics();
FakeEventBusMetrics toMetrics = FakeVertxMetrics.getMetrics(to).getEventBusMetrics();
FakeEventBusMetrics fromMetrics = FakeMetricsBase.getMetrics(from.eventBus());
FakeEventBusMetrics toMetrics = FakeMetricsBase.getMetrics(to.eventBus());
MessageConsumer<Object> consumer = to.eventBus().consumer(ADDRESS1);
consumer.completionHandler(done -> {
assertTrue(done.succeeded());
Expand Down Expand Up @@ -207,8 +210,9 @@ private void testReply(Vertx from, Vertx to, boolean expectedLocal, boolean expe

@Test
public void testHandlerRegistration() {
FakeEventBusMetrics metrics = FakeVertxMetrics.getMetrics(vertx).getEventBusMetrics();
MessageConsumer<Object> consumer = vertx.eventBus().consumer(ADDRESS1, msg -> {});
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(vertx.eventBus());
MessageConsumer<Object> consumer = vertx.eventBus().consumer(ADDRESS1, msg -> {
});
assertEquals(1, metrics.getRegistrations().size());
HandlerMetric registration = metrics.getRegistrations().get(0);
assertEquals(ADDRESS1, registration.address);
Expand All @@ -235,7 +239,7 @@ private HandlerMetric assertRegistration(FakeEventBusMetrics metrics) {
}

private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCoult) {
FakeEventBusMetrics metrics = FakeVertxMetrics.getMetrics(to).getEventBusMetrics();
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(to.eventBus());
to.eventBus().consumer(ADDRESS1, msg -> {
HandlerMetric registration = assertRegistration(metrics);
assertEquals(ADDRESS1, registration.address);
Expand All @@ -261,7 +265,7 @@ private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCo

@Test
public void testHandlerProcessMessageFailure() throws Exception {
FakeEventBusMetrics metrics = FakeVertxMetrics.getMetrics(vertx).getEventBusMetrics();
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(vertx.eventBus());
vertx.eventBus().consumer(ADDRESS1, msg -> {
assertEquals(1, metrics.getReceivedMessages().size());
HandlerMetric registration = metrics.getRegistrations().get(0);
Expand All @@ -284,7 +288,7 @@ public void testHandlerProcessMessageFailure() throws Exception {

@Test
public void testHandlerMetricReply() throws Exception {
FakeEventBusMetrics metrics = FakeVertxMetrics.getMetrics(vertx).getEventBusMetrics();
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(vertx.eventBus());
vertx.eventBus().consumer(ADDRESS1, msg -> {
assertEquals(2, metrics.getRegistrations().size());
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
Expand Down Expand Up @@ -319,8 +323,8 @@ public void testHandlerMetricReply() throws Exception {
@Test
public void testBytesCodec() throws Exception {
startNodes(2);
FakeEventBusMetrics fromMetrics = FakeVertxMetrics.getMetrics(vertices[0]).getEventBusMetrics();
FakeEventBusMetrics toMetrics = FakeVertxMetrics.getMetrics(vertices[1]).getEventBusMetrics();
FakeEventBusMetrics fromMetrics = FakeMetricsBase.getMetrics(vertices[0].eventBus());
FakeEventBusMetrics toMetrics = FakeMetricsBase.getMetrics(vertices[1].eventBus());
vertices[1].eventBus().consumer(ADDRESS1, msg -> {
int encoded = fromMetrics.getEncodedBytes(ADDRESS1);
int decoded = toMetrics.getDecodedBytes(ADDRESS1);
Expand All @@ -335,4 +339,56 @@ public void testBytesCodec() throws Exception {
});
await();
}

@Test
public void testServerWebSocket() throws Exception {
HttpServer server = vertx.createHttpServer();
server.websocketHandler(ws -> {
FakeHttpServerMetrics metrics = FakeMetricsBase.getMetrics(server);
WebSocketMetric metric = metrics.getMetric(ws);
assertNotNull(metric);
assertNotNull(metric.soMetric);
ws.handler(buffer -> {
ws.close();
});
ws.closeHandler(closed -> {
assertNull(metrics.getMetric(ws));
testComplete();
});
});
server.listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, ar -> {
assertTrue(ar.succeeded());
HttpClient client = vertx.createHttpClient();
client.websocket(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
ws.write(Buffer.buffer("wibble"));
});
});
await();
}

@Test
public void testWebSocket() throws Exception {
HttpServer server = vertx.createHttpServer();
server.websocketHandler(ws -> {
ws.write(Buffer.buffer("wibble"));
});
server.listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, ar -> {
assertTrue(ar.succeeded());
HttpClient client = vertx.createHttpClient();
client.websocket(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
FakeHttpClientMetrics metrics = FakeMetricsBase.getMetrics(client);
WebSocketMetric metric = metrics.getMetric(ws);
assertNotNull(metric);
assertNotNull(metric.soMetric);
ws.closeHandler(closed -> {
assertNull(metrics.getMetric(ws));
testComplete();
});
ws.handler(buffer -> {
ws.close();
});
});
});
await();
}
}

0 comments on commit e27d659

Please sign in to comment.