Skip to content

Commit

Permalink
Merge pull request #84 from benzenittini/DropwizardStatsAndReporters
Browse files Browse the repository at this point in the history
Dropwizard stats collectors and reporters
  • Loading branch information
jimfcarroll committed May 3, 2017
2 parents 4a547e5 + 300687a commit 5ebfb6f
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

public interface NodeStatsCollector extends StatsCollector {

/**
* Sets a unique identifier for this node, used by certain stats collectors when publishing.
*/
default void setNodeId(final String nodeId) {
// Does nothing by default. Must override.
}

/**
* The dispatcher calls this method in its <code>onMessage</code> handler.
*/
Expand Down
13 changes: 13 additions & 0 deletions dempsy-framework.impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public NodeManager collaborator(final ClusterInfoSession session) {
}

private static Container makeContainer(final String containerTypeId) {
return new Manager<Container>(Container.class).getAssociatedInstance(containerTypeId);
return new Manager<>(Container.class).getAssociatedInstance(containerTypeId);
}

public NodeManager start() throws DempsyException {
Expand All @@ -105,12 +105,12 @@ public NodeManager start() throws DempsyException {
nodeStatsCollector = tr.track((NodeStatsCollector) node.getNodeStatsCollector());

// TODO: cleaner?
statsCollectorFactory = tr.track(new Manager<ClusterStatsCollectorFactory>(ClusterStatsCollectorFactory.class)
statsCollectorFactory = tr.track(new Manager<>(ClusterStatsCollectorFactory.class)
.getAssociatedInstance(node.getClusterStatsCollectorFactoryId()));

// =====================================
// set the dispatcher on adaptors and create containers for mp clusters
final AtomicReference<String> firstAdaptorClusterName = new AtomicReference<String>(null);
final AtomicReference<String> firstAdaptorClusterName = new AtomicReference<>(null);
node.getClusters().forEach(c -> {
if (c.isAdaptor()) {
if (firstAdaptorClusterName.get() == null)
Expand All @@ -122,7 +122,7 @@ public NodeManager start() throws DempsyException {
.setClusterId(c.getClusterId());

// TODO: This is a hack for now.
final Manager<RoutingStrategy.Inbound> inboundManager = new Manager<RoutingStrategy.Inbound>(RoutingStrategy.Inbound.class);
final Manager<RoutingStrategy.Inbound> inboundManager = new Manager<>(RoutingStrategy.Inbound.class);
final RoutingStrategy.Inbound is = inboundManager.getAssociatedInstance(c.getRoutingStrategyId());
containers.add(new PerContainer(con, is, c));
}
Expand All @@ -148,6 +148,7 @@ else if (firstAdaptorClusterName.get() == null)
}

nodeId = Optional.ofNullable(nodeAddress).map(n -> n.getGuid()).orElse(firstAdaptorClusterName.get());
nodeStatsCollector.setNodeId(nodeId);

if (nodeAddress == null && node.getReceiver() != null)
LOGGER.warn("The node at " + nodeId + " contains no message processors but has a Reciever set. The receiver will never be started.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package net.dempsy.monitoring.dropwizard;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

import net.dempsy.config.ClusterId;
import net.dempsy.monitoring.ClusterStatsCollector;
import net.dempsy.monitoring.StatsCollector;
import net.dempsy.utils.MetricUtils;

public class DropwizardClusterStatsCollector implements ClusterStatsCollector {

public static final String MESSAGES_DISPATCHED = "messages-dispatched";
public static final String MESSAGES_PROCESSED = "messages-processed";
public static final String MESSAGES_FAILED = "messages-failed";
public static final String MESSAGES_COLLISION = "messages-collision";
public static final String MESSAGES_PROCESSOR_CREATED = "messages-processor-created";
public static final String MESSAGES_PROCESSOR_DELETED = "messages-processor-deleted";
public static final String OUTPUT_INVOKE_STARTED_TIMER = "output-invoke-started-timer";
public static final String EVICTION_PASS_STARTED_TIMER = "eviction-pass-started-timer";
public static final String PRE_INSTANTIATION_STARTED_TIMER = "pre-instantiation-started-timer";

private static class DropwizardTimerContext implements StatsCollector.TimerContext {
private final Timer timer;
private final Timer.Context context;

private DropwizardTimerContext(final MetricRegistry registry, final String timerName) {
timer = registry.timer(timerName);
context = timer.time();
}

@Override
public void stop() {
context.stop();
}
}

private final ClusterId clusterId;

private final MetricRegistry registry;
private final Meter messageDispatched;
private final Meter messageProcessed;
private final Meter messageFailed;
private final Meter messageCollision;
private final Meter messageProcessorCreated;
private final Meter messageProcessorDeleted;

public DropwizardClusterStatsCollector(final ClusterId clusterId) {
super();

this.clusterId = clusterId;

registry = MetricUtils.getMetricsRegistry();
messageDispatched = registry.meter(getName(MESSAGES_DISPATCHED));
messageProcessed = registry.meter(getName(MESSAGES_PROCESSED));
messageFailed = registry.meter(getName(MESSAGES_FAILED));
messageCollision = registry.meter(getName(MESSAGES_COLLISION));
messageProcessorCreated = registry.meter(getName(MESSAGES_PROCESSOR_CREATED));
messageProcessorDeleted = registry.meter(getName(MESSAGES_PROCESSOR_DELETED));
}

@Override
public void messageDispatched(final Object message) {
messageDispatched.mark();
}

@Override
public void messageProcessed(final Object message) {
messageProcessed.mark();
}

@Override
public void messageFailed(final boolean mpFailure) {
messageFailed.mark();
}

@Override
public void messageCollision(final Object message) {
messageCollision.mark();
}

@Override
public void messageProcessorCreated(final Object key) {
messageProcessorCreated.mark();
}

@Override
public void messageProcessorDeleted(final Object key) {
messageProcessorDeleted.mark();
}

@Override
public void stop() {
// Don't need to do anything to stop this collector.
}

@Override
public TimerContext preInstantiationStarted() {
return new DropwizardTimerContext(registry, getName(PRE_INSTANTIATION_STARTED_TIMER));
}

@Override
public TimerContext outputInvokeStarted() {
return new DropwizardTimerContext(registry, getName(OUTPUT_INVOKE_STARTED_TIMER));
}

@Override
public TimerContext evictionPassStarted() {
return new DropwizardTimerContext(registry, getName(EVICTION_PASS_STARTED_TIMER));
}

// protected access for testing purposes
protected String getName(final String key) {
return MetricRegistry.name(DropwizardClusterStatsCollector.class, "cluster", clusterId.applicationName, clusterId.clusterName, key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package net.dempsy.monitoring.dropwizard;

import java.util.ArrayList;
import java.util.List;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;

import net.dempsy.monitoring.NodeStatsCollector;
import net.dempsy.utils.MetricUtils;

public class DropwizardNodeStatsCollector implements NodeStatsCollector {

public static final String MESSAGE_RECEIVED = "message-received";
public static final String MESSAGE_DISCARDED = "message-discarded";
public static final String MESSAGE_SENT = "message-sent";
public static final String MESSAGE_NOT_SENT = "message-not-sent";
public static final String MESSAGES_PENDING_GAUGE = "messages-pending-gauge";
public static final String MESSAGES_OUT_PENDING_GAUGE = "messages-out-pending-gauge";

private List<DropwizardReporterSpec> reporters = new ArrayList<>();

private DropwizardStatsReporter reporter;

private String nodeId;
private final MetricRegistry registry;
private final Meter messageReceived;
private final Meter messageDiscarded;
private final Meter messageSent;
private final Meter messageNotSent;

public DropwizardNodeStatsCollector() {
registry = MetricUtils.getMetricsRegistry();
messageReceived = registry.meter(getName(MESSAGE_RECEIVED));
messageDiscarded = registry.meter(getName(MESSAGE_DISCARDED));
messageSent = registry.meter(getName(MESSAGE_SENT));
messageNotSent = registry.meter(getName(MESSAGE_NOT_SENT));
}

@Override
public void setNodeId(final String nodeId) {
this.nodeId = nodeId;
reporter = new DropwizardStatsReporter(this.nodeId, reporters);
}

public void setReporters(final List<DropwizardReporterSpec> reporters) {
this.reporters = reporters;
}

@Override
public void stop() {
// Stop the reporters
reporter.stopReporters();
}

@Override
public void messageReceived(final Object message) {
messageReceived.mark();
}

@Override
public void messageDiscarded(final Object message) {
messageDiscarded.mark();
}

@Override
public void messageSent(final Object message) {
messageSent.mark();
}

@Override
public void messageNotSent() {
messageNotSent.mark();
}

@Override
public void setMessagesPendingGauge(final Gauge currentMessagesPendingGauge) {
final String gaugeName = getName(MESSAGES_PENDING_GAUGE);
// If the registry doesn't already have this gauge, then add it.
if (!registry.getGauges().containsKey(gaugeName)) {
registry.register(gaugeName, new com.codahale.metrics.Gauge<Long>() {
@Override
public Long getValue() {
return currentMessagesPendingGauge.value();
}
});
}
}

@Override
public void setMessagesOutPendingGauge(final Gauge currentMessagesOutPendingGauge) {
final String gaugeName = getName(MESSAGES_OUT_PENDING_GAUGE);
// If the registry doesn't already have this gauge, then add it.
if (!registry.getGauges().containsKey(gaugeName)) {
registry.register(gaugeName, new com.codahale.metrics.Gauge<Long>() {
@Override
public Long getValue() {
return currentMessagesOutPendingGauge.value();
}
});
}
}

// protected access for testing purposes
protected String getName(final String key) {
return MetricRegistry.name(DropwizardNodeStatsCollector.class, "node", key);
}

}
Loading

0 comments on commit 5ebfb6f

Please sign in to comment.