Permalink
Browse files

This will expose the incoming and outgoing queue depths as a metric i…

…n the StatsCollector.
  • Loading branch information...
1 parent 86b90ca commit a85415119a971d79904ec334c8bf2961c30cef7c Jim Carroll committed Nov 9, 2012
@@ -38,6 +38,16 @@
public <V> Future<V> schedule(Callable<V> r, long delay, TimeUnit timeUnit);
/**
+ * How many pending tasks are there.
+ */
+ public int getNumberPending();
+
+ /**
+ * How many pending limited tasks are there
+ */
+ public int getNumberLimitedPending();
+
+ /**
* Start up the executor
*/
public void start();
@@ -17,6 +17,11 @@
package com.nokia.dempsy.monitoring;
public interface StatsCollector {
+
+ public interface Gauge
+ {
+ long value();
+ }
/**
* A timer context is returned from start calls on the Stats collector
@@ -75,6 +80,20 @@
* ALSO be reflected in the messageDiscarded results.
*/
void messageCollision(Object message);
+
+ /**
+ * If the transport supports the queuing of incoming messages, then it
+ * can optionally supply a Gauge instance that provides this metric
+ * on demand.
+ */
+ void setMessagesPendingGauge(Gauge currentMessagesPendingGauge);
+
+ /**
+ * If the transport supports the queuing of outgoing messages, then it
+ * can optionally supply a Gauge instance that provides this metric
+ * on demand.
+ */
+ void setMessagesOutPendingGauge(Gauge currentMessagesOutPendingGauge);
/**
* The MP manager calls this method when it creates a message processor
@@ -2,19 +2,19 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultDempsyExecutor implements DempsyExecutor
{
private ScheduledExecutorService schedule = null;
- private ExecutorService executor = null;
+ private ThreadPoolExecutor executor = null;
private AtomicLong numLimited = null;
private long maxNumWaitingLimitedTasks = -1;
private int threadPoolSize = -1;
@@ -66,7 +66,7 @@ public void start()
// then use the other constructor
threadPoolSize = Math.max(cpuBasedThreadCount, minNumThreads);
}
- executor = Executors.newFixedThreadPool(threadPoolSize);
+ executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(threadPoolSize);
schedule = Executors.newSingleThreadScheduledExecutor();
numLimited = new AtomicLong(0);
@@ -78,8 +78,6 @@ public void start()
public void setMaxNumberOfQueuedLimitedTasks(int maxNumWaitingLimitedTasks) { this.maxNumWaitingLimitedTasks = maxNumWaitingLimitedTasks; }
- public int getCurrentQueuedLimitedTasks() { return (int)numLimited.get(); }
-
@Override
public int getNumThreads() { return threadPoolSize; }
@@ -92,6 +90,19 @@ public void shutdown()
if (schedule != null)
schedule.shutdown();
}
+
+ @Override
+ public int getNumberPending()
+ {
+ return executor.getQueue().size();
+ }
+
+ @Override
+ public int getNumberLimitedPending()
+ {
+ return numLimited.intValue();
+ }
+
public boolean isRunning() { return (schedule != null && executor != null) &&
!(schedule.isShutdown() || schedule.isTerminated()) &&
@@ -153,6 +153,18 @@ public void run()
iStartedIt = true;
executor.start();
}
+
+ if (statsCollector != null)
+ {
+ statsCollector.setMessagesPendingGauge(new StatsCollector.Gauge()
+ {
+ @Override
+ public long value()
+ {
+ return executor.getNumberPending();
+ }
+ });
+ }
serverThread.start();
}
@@ -65,6 +65,17 @@ protected TcpSender(TcpDestination destination, StatsCollector statsCollector,
this.timeoutMillis = socketWriteTimeoutMillis;
this.batchOutgoingMessages = batchOutgoingMessages;
this.maxNumberOfQueuedMessages = maxNumberOfQueuedOutgoing;
+ if (this.statsCollector != null)
+ {
+ this.statsCollector.setMessagesOutPendingGauge(new StatsCollector.Gauge()
+ {
+ @Override
+ public long value()
+ {
+ return sendingQueue.size();
+ }
+ });
+ }
this.start();
}
@@ -21,4 +21,7 @@
long getMessageBytesSent();
long getMessageBytesReceived();
+
+ long getMessagesPending();
+ long getMessagesOutPending();
}
@@ -24,7 +24,6 @@
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.monitoring.StatsCollector;
import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Timer;
@@ -56,6 +55,9 @@
public static final String MN_MP_DELETE = "message-processors-deleted";
public static final String MN_MSG_COLLISION = "messages-collisions";
public static final String GAGE_MPS_IN_PROCESS = "messages-in-process";
+ public static final String GAGE_MPS = "message-processors";
+ public static final String GAGE_MSG_PENDING = "messages-pending";
+ public static final String GAGE_MSG_OUT_PENDING = "messages-out-pending";
public static final String TM_MP_PREIN = "pre-instantiation-duration";
public static final String TM_MP_HANDLE = "mp-handle-message-duration";
public static final String TM_MP_OUTPUT = "outputInvoke-duration";
@@ -68,9 +70,11 @@
MN_BYTES_SENT, MN_MSG_UNSENT,
MN_MP_CREATE, MN_MP_DELETE,
MN_MSG_COLLISION,
- GAGE_MPS_IN_PROCESS,
+ GAGE_MPS_IN_PROCESS, GAGE_MPS,
+ GAGE_MSG_PENDING, GAGE_MSG_OUT_PENDING,
TM_MP_PREIN, TM_MP_HANDLE,
TM_MP_OUTPUT, TM_MP_EVIC
+
};
private Meter messagesReceived;
@@ -85,19 +89,32 @@
private Meter bytesSent;
private Meter messagesUnsent;
private AtomicInteger inProcessMessages;
- @SuppressWarnings("unused")
- private Gauge<Integer> messagesInProcess;
private AtomicLong numberOfMPs;
private Meter mpsCreated;
private Meter mpsDeleted;
- @SuppressWarnings("unused")
- private Gauge<Long> messageProcessors;
private String scope;
-
+
private Timer preInstantiationDuration;
private Timer mpHandleMessageDuration;
private Timer outputInvokeDuration;
private Timer evictionInvokeDuration;
+
+ private StatsCollector.Gauge currentMessagesPendingGauge;
+ private StatsCollector.Gauge currentMessagesOutPendingGauge;
+
+ {
+ StatsCollector.Gauge tmp = new Gauge()
+ {
+ @Override
+ public long value()
+ {
+ return 0;
+ }
+ };
+
+ currentMessagesPendingGauge = tmp;
+ currentMessagesOutPendingGauge = tmp;
+ }
public StatsCollectorCoda(ClusterId clusterId)
@@ -115,8 +132,7 @@ public StatsCollectorCoda(ClusterId clusterId)
bytesSent = Metrics.newMeter(Dempsy.class, MN_BYTES_SENT, scope, "bytes", TimeUnit.SECONDS);
messagesUnsent = Metrics.newMeter(Dempsy.class, MN_MSG_UNSENT, scope, "messsages", TimeUnit.SECONDS);
inProcessMessages = new AtomicInteger();
- messagesInProcess = Metrics.newGauge(Dempsy.class, GAGE_MPS_IN_PROCESS,
- scope, new Gauge<Integer>() {
+ Metrics.newGauge(Dempsy.class, GAGE_MPS_IN_PROCESS, scope, new com.yammer.metrics.core.Gauge<Integer>() {
@Override
public Integer value()
{
@@ -127,13 +143,28 @@ public Integer value()
numberOfMPs = new AtomicLong();
mpsCreated = Metrics.newMeter(Dempsy.class, MN_MP_CREATE, scope, "instances", TimeUnit.SECONDS);
mpsDeleted = Metrics.newMeter(Dempsy.class, MN_MP_DELETE, scope, "instances", TimeUnit.SECONDS);
- messageProcessors = Metrics.newGauge(Dempsy.class, "message-processors",
- scope, new Gauge<Long>() {
+ Metrics.newGauge(Dempsy.class, GAGE_MPS, scope, new com.yammer.metrics.core.Gauge<Long>() {
@Override
public Long value() {
return numberOfMPs.get();
}
});
+
+ Metrics.newGauge(Dempsy.class, GAGE_MSG_PENDING, scope, new com.yammer.metrics.core.Gauge<Long>() {
+ @Override
+ public Long value()
+ {
+ return StatsCollectorCoda.this.currentMessagesPendingGauge.value();
+ }
+ });
+
+ Metrics.newGauge(Dempsy.class, GAGE_MSG_OUT_PENDING, scope, new com.yammer.metrics.core.Gauge<Long>() {
+ @Override
+ public Long value()
+ {
+ return StatsCollectorCoda.this.currentMessagesOutPendingGauge.value();
+ }
+ });
preInstantiationDuration = Metrics.newTimer(Dempsy.class, TM_MP_PREIN,
scope, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
@@ -356,4 +387,29 @@ public long getMessageBytesReceived()
return bytesReceived.count();
}
+ @Override
+ public synchronized void setMessagesPendingGauge(StatsCollector.Gauge currentMessagesPendingGauge)
+ {
+ if (currentMessagesPendingGauge != null)
+ this.currentMessagesPendingGauge = currentMessagesPendingGauge;
+ }
+
+ @Override
+ public synchronized void setMessagesOutPendingGauge(StatsCollector.Gauge currentMessagesOutPendingGauge)
+ {
+ if (currentMessagesOutPendingGauge != null)
+ this.currentMessagesOutPendingGauge = currentMessagesOutPendingGauge;
+ }
+
+ @Override
+ public long getMessagesPending()
+ {
+ return currentMessagesPendingGauge.value();
+ }
+
+ @Override
+ public long getMessagesOutPending()
+ {
+ return currentMessagesOutPendingGauge.value();
+ }
}
Oops, something went wrong.

0 comments on commit a854151

Please sign in to comment.