Permalink
Browse files

Refactor spectator metrics

__Problem__

Currently the listener does not expose metrics to specific events but instead does internal calculations and expose custom gauges. This is limiting as it gives less information.

__Modification__

Modified the metrics to represent the actual events.

__Result__

More flexible insights.
  • Loading branch information...
NiteshKant committed Feb 6, 2017
1 parent be12144 commit c0b3387cc5772d314544b9f97670cfc378672cfc

This file was deleted.

Oops, something went wrong.
@@ -23,5 +23,4 @@ dependencies {
compile project(':rxnetty-http')
compile project(':rxnetty-spectator-tcp')
compile project(':rxnetty-common')
compile 'com.netflix.spectator:spectator-api:0.40.0'
}
@@ -17,44 +17,32 @@
package io.reactivex.netty.spectator.http;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import io.reactivex.netty.protocol.http.client.events.HttpClientEventsListener;
import io.reactivex.netty.spectator.http.internal.ResponseCodesHolder;
import io.reactivex.netty.spectator.internal.LatencyMetrics;
import io.reactivex.netty.spectator.internal.EventMetric;
import io.reactivex.netty.spectator.tcp.TcpClientListener;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static io.reactivex.netty.spectator.internal.SpectatorUtils.*;
/**
* HttpClientListener.
*/
public class HttpClientListener extends HttpClientEventsListener {
private final AtomicInteger requestBacklog;
private final AtomicInteger inflightRequests;
private final Counter processedRequests;
private final Counter requestWriteFailed;
private final Counter failedResponses;
private final EventMetric requestWrite;
private final EventMetric requestProcessing;
private final EventMetric response;
private final ResponseCodesHolder responseCodesHolder;
private final LatencyMetrics requestWriteTimes;
private final LatencyMetrics responseReadTimes;
private final LatencyMetrics requestProcessingTimes;
private final TcpClientListener tcpDelegate;
public HttpClientListener(Registry registry, String monitorId) {
requestBacklog = newGauge(registry, "requestBacklog", monitorId, new AtomicInteger());
inflightRequests = newGauge(registry, "inflightRequests", monitorId, new AtomicInteger());
requestWriteTimes = new LatencyMetrics("requestWriteTimes", monitorId, registry);
responseReadTimes = new LatencyMetrics("responseReadTimes", monitorId, registry);
processedRequests = newCounter(registry, "processedRequests", monitorId);
requestWriteFailed = newCounter(registry, "requestWriteFailed", monitorId);
failedResponses = newCounter(registry, "failedResponses", monitorId);
requestProcessingTimes = new LatencyMetrics("requestProcessingTimes", monitorId, registry);
requestWrite = new EventMetric(registry, "request", monitorId, "action", "write");
requestProcessing = new EventMetric(registry, "request", monitorId, "action", "processing");
response = new EventMetric(registry, "response", monitorId, "action", "read");
responseCodesHolder = new ResponseCodesHolder(registry, monitorId);
tcpDelegate = new TcpClientListener(registry, monitorId);
}
@@ -63,49 +51,9 @@ public HttpClientListener(String monitorId) {
this(Spectator.globalRegistry(), monitorId);
}
public long getRequestBacklog() {
return requestBacklog.get();
}
public long getInflightRequests() {
return inflightRequests.get();
}
public long getProcessedRequests() {
return processedRequests.count();
}
public long getRequestWriteFailed() {
return requestWriteFailed.count();
}
public long getFailedResponses() {
return failedResponses.count();
}
public long getResponse1xx() {
return responseCodesHolder.getResponse1xx();
}
public long getResponse2xx() {
return responseCodesHolder.getResponse2xx();
}
public long getResponse3xx() {
return responseCodesHolder.getResponse3xx();
}
public long getResponse4xx() {
return responseCodesHolder.getResponse4xx();
}
public long getResponse5xx() {
return responseCodesHolder.getResponse5xx();
}
@Override
public void onRequestProcessingComplete(long duration, TimeUnit timeUnit) {
requestProcessingTimes.record(duration, timeUnit);
requestProcessing.success(duration, timeUnit);
}
@Override
@@ -115,38 +63,32 @@ public void onResponseHeadersReceived(int responseCode, long duration, TimeUnit
@Override
public void onResponseReceiveComplete(long duration, TimeUnit timeUnit) {
inflightRequests.decrementAndGet();
processedRequests.increment();
responseReadTimes.record(duration, timeUnit);
response.success(duration, timeUnit);
}
@Override
public void onRequestWriteStart() {
requestBacklog.decrementAndGet();
requestWrite.start();
}
@Override
public void onResponseFailed(Throwable throwable) {
inflightRequests.decrementAndGet();
processedRequests.increment();
failedResponses.increment();
response.failure();
}
@Override
public void onRequestWriteComplete(long duration, TimeUnit timeUnit) {
requestWriteTimes.record(duration, timeUnit);
requestWrite.success(duration, timeUnit);
}
@Override
public void onRequestWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) {
inflightRequests.decrementAndGet();
requestWriteFailed.increment();
requestWrite.failure(duration, timeUnit);
}
@Override
public void onRequestSubmitted() {
requestBacklog.incrementAndGet();
inflightRequests.incrementAndGet();
requestProcessing.start();
}
@Override
@@ -256,84 +198,4 @@ public void onConnectSuccess(long duration, TimeUnit timeUnit) {
public void onConnectStart() {
tcpDelegate.onConnectStart();
}
public long getLiveConnections() {
return tcpDelegate.getLiveConnections();
}
public long getConnectionCount() {
return tcpDelegate.getConnectionCount();
}
public long getPendingConnects() {
return tcpDelegate.getPendingConnects();
}
public long getFailedConnects() {
return tcpDelegate.getFailedConnects();
}
public long getPendingConnectionClose() {
return tcpDelegate.getPendingConnectionClose();
}
public long getFailedConnectionClose() {
return tcpDelegate.getFailedConnectionClose();
}
public long getPendingPoolAcquires() {
return tcpDelegate.getPendingPoolAcquires();
}
public long getFailedPoolAcquires() {
return tcpDelegate.getFailedPoolAcquires();
}
public long getPendingPoolReleases() {
return tcpDelegate.getPendingPoolReleases();
}
public long getFailedPoolReleases() {
return tcpDelegate.getFailedPoolReleases();
}
public long getPoolEvictions() {
return tcpDelegate.getPoolEvictions();
}
public long getPoolReuse() {
return tcpDelegate.getPoolReuse();
}
public long getPendingWrites() {
return tcpDelegate.getPendingWrites();
}
public long getPendingFlushes() {
return tcpDelegate.getPendingFlushes();
}
public long getBytesWritten() {
return tcpDelegate.getBytesWritten();
}
public long getBytesRead() {
return tcpDelegate.getBytesRead();
}
public long getFailedWrites() {
return tcpDelegate.getFailedWrites();
}
public long getFailedFlushes() {
return tcpDelegate.getFailedFlushes();
}
public long getPoolAcquires() {
return tcpDelegate.getPoolAcquires();
}
public long getPoolReleases() {
return tcpDelegate.getPoolReleases();
}
}
Oops, something went wrong.

0 comments on commit c0b3387

Please sign in to comment.