diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 92cbbf4518e3..56c5e19be994 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -83,6 +83,13 @@ public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUn this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory); } + public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + allowCoreThreadTimeOut(true); + this.setRejectedExecutionHandler(blockingExecutionHandler); + } + public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java index 2dafb4f16cf0..8e5228c20a4d 100644 --- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java @@ -18,11 +18,14 @@ package org.apache.cassandra.concurrent; import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -38,6 +41,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i { private final String mbeanName; public final ThreadPoolMetrics metrics; + public final List threadIds; public JMXEnabledThreadPoolExecutor(String threadPoolName) { @@ -77,7 +81,13 @@ public JMXEnabledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath) { - super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory); + super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); + threadIds = Collections.synchronizedList(new ArrayList<>(corePoolSize)); + super.setThreadFactory(r-> { + Thread t = threadFactory.newThread(r); + threadIds.add(t.getId()); + return t; + }); super.prestartAllCoreThreads(); metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id); @@ -127,6 +137,12 @@ private void unregisterMBean() metrics.release(); } + @Override + public void terminated() + { + metrics.updateThreadStats(Thread.currentThread().getId()); + } + @Override public synchronized void shutdown() { diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java index add850afe22d..3368dc29912a 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java @@ -36,7 +36,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService public final int maxWorkers; public final String name; public final int maxTasksQueued; - private final SEPMetrics metrics; + public final SEPMetrics metrics; // stores both a set of work permits and task permits: // bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued] (initially 0) diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index d3c87c6aeef2..c9fdb7232be5 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.concurrent; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -25,12 +26,13 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocalThread; +import org.apache.cassandra.metrics.ThreadMetricState; import org.apache.cassandra.utils.JVMStabilityInspector; final class SEPWorker extends AtomicReference implements Runnable { private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class); - private static final boolean SET_THREAD_NAME = Boolean.parseBoolean(System.getProperty("cassandra.set_sep_thread_name", "true")); + private static final boolean TRACK_SEP = Boolean.parseBoolean(System.getProperty("cassandra.track_sep_threads", "true")); final Long workerId; final Thread thread; @@ -42,6 +44,7 @@ final class SEPWorker extends AtomicReference implements Runnabl // strategy can only work when there are multiple threads spinning (as more sleep time must elapse than real time) long prevStopCheck = 0; long soleSpinnerSpinTime = 0; + ThreadMetricState state; SEPWorker(Long workerId, Work initialState, SharedExecutorPool pool) { @@ -51,6 +54,18 @@ final class SEPWorker extends AtomicReference implements Runnabl thread.setDaemon(true); set(initialState); thread.start(); + ScheduledFuture f = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> + { + if (state != null) { + try + { + state.update(); + } catch (NullPointerException e) { + // state can become null after check, ok to just ignore that race and do nothing - metrics would + // be updated from the update at end of assignment + } + } + }, 30, 30, TimeUnit.SECONDS); } public void run() @@ -90,8 +105,11 @@ public void run() assigned = get().assigned; if (assigned == null) continue; - if (SET_THREAD_NAME) + if (TRACK_SEP) + { Thread.currentThread().setName(assigned.name + "-" + workerId); + state = new ThreadMetricState(Thread.currentThread().getId(), assigned.metrics.cpu, assigned.metrics.alloc); + } task = assigned.tasks.poll(); // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING @@ -119,6 +137,12 @@ public void run() assigned.returnWorkPermit(); if (shutdown && assigned.getActiveCount() == 0) assigned.shutdown.signalAll(); + + if (TRACK_SEP) + { + state.update(); + state = null; + } assigned = null; // try to immediately reassign ourselves some work; if we fail, start spinning diff --git a/src/java/org/apache/cassandra/metrics/SEPMetrics.java b/src/java/org/apache/cassandra/metrics/SEPMetrics.java index dd1d2d6587e6..66511e5d873c 100644 --- a/src/java/org/apache/cassandra/metrics/SEPMetrics.java +++ b/src/java/org/apache/cassandra/metrics/SEPMetrics.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; import org.apache.cassandra.concurrent.SEPExecutor; @@ -44,6 +45,11 @@ public class SEPMetrics /** Maximum number of tasks queued before a task get blocked */ public final Gauge maxTasksQueued; + /** Approximate amount of cpu time (ns/s) spent on threads in this stage */ + public final Meter cpu; + /** Approximate amount of allocations in bytes by threads in this stage */ + public final Meter alloc; + private MetricNameFactory factory; /** @@ -56,6 +62,8 @@ public class SEPMetrics public SEPMetrics(final SEPExecutor executor, String path, String poolName) { this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName); + cpu = Metrics.meter(factory.createMetricName("CPU")); + alloc = Metrics.meter(factory.createMetricName("Allocations")); activeTasks = Metrics.register(factory.createMetricName("ActiveTasks"), new Gauge() { public Integer getValue() diff --git a/src/java/org/apache/cassandra/metrics/ThreadMetricState.java b/src/java/org/apache/cassandra/metrics/ThreadMetricState.java new file mode 100644 index 000000000000..a9300b4f6c16 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadMetricState.java @@ -0,0 +1,143 @@ +package org.apache.cassandra.metrics; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.atomic.AtomicLongArray; + +import com.codahale.metrics.Meter; +import com.google.common.annotations.VisibleForTesting; + +public class ThreadMetricState +{ + private final AtomicLongArray state = new AtomicLongArray(2); + static final ThreadMXBean BEAN = ManagementFactory.getThreadMXBean(); + private static final boolean SUN; + + static { + boolean success = false; + try + { + Class.forName("com.sun.management.ThreadMXBean"); + success = true; + } + catch (ClassNotFoundException ex) + { + success = false; + } + SUN = success; + } + + Meter cpu; + Meter allocs; + long threadId; + + public ThreadMetricState(long threadId, Meter cpu, Meter allocs) + { + this.threadId = threadId; + this.cpu = cpu; + this.allocs = allocs; + baseline(); + } + + public long getCPU() + { + return state.get(0); + } + + public long getAllocations() + { + return state.get(1); + } + + boolean setCPU(long expect, long cpu) + { + return state.compareAndSet(0, expect, cpu); + } + + boolean setAllocations(long expect,long allocs) + { + return state.compareAndSet(1, expect, allocs); + } + + @VisibleForTesting + long fetchTotalCPU(long threadId) + { + return BEAN.getThreadCpuTime(threadId); + } + + @VisibleForTesting + long fetchTotalAlloc(long threadId) + { + final com.sun.management.ThreadMXBean beanX = (com.sun.management.ThreadMXBean) BEAN; + return beanX.getThreadAllocatedBytes(threadId); + } + + public void baseline() + { + if (BEAN.isCurrentThreadCpuTimeSupported()) + { + long ns = fetchTotalCPU(threadId); + if (ns > 0) + state.set(0, ns); + } + + if (SUN) + { + long bytes = fetchTotalAlloc(threadId); + if(bytes > 0) + { + state.set(1, bytes); + } + } + } + + public void update() + { + updateCPU(); + updateAllocs(); + } + + public void updateCPU() + { + boolean success = false; + long delta = 0; + while(!success && BEAN.isCurrentThreadCpuTimeSupported()) + { + long pCpu = getCPU(); + long cpu = fetchTotalCPU(threadId); + if(cpu > 0) + { + delta = cpu - pCpu; + success = setCPU(pCpu, cpu); + } + else + success = true; // skip setting + } + if (delta > 0) + cpu.mark(delta); + } + + public void updateAllocs() + { + long delta = 0; + if (SUN) + { + final com.sun.management.ThreadMXBean beanX = (com.sun.management.ThreadMXBean) BEAN; + boolean success = false; + while(!success && beanX.isThreadAllocatedMemoryEnabled()) + { + long pAllocs = getAllocations(); + long allocs = fetchTotalAlloc(threadId); + if(allocs > 0) + { + delta = allocs - pAllocs; + success = setAllocations(pAllocs, allocs); + } + else + success = true; // skip setting + } + } + if (delta > 0) + allocs.mark(delta); + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java index 268e8780dc2a..b7d6cb065c49 100644 --- a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java @@ -18,24 +18,34 @@ package org.apache.cassandra.metrics; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadPoolExecutor; import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.JmxReporter; +import java.util.concurrent.TimeUnit; import javax.management.JMX; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; + +import com.codahale.metrics.Meter; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; - /** * Metrics for {@link ThreadPoolExecutor}. */ @@ -57,7 +67,17 @@ public class ThreadPoolMetrics /** Maximum number of threads before it will start queuing tasks */ public final Gauge maxPoolSize; + private ConcurrentMap lastStatsByThreadId = new ConcurrentHashMap<>(); + + /** Approximate amount of cpu time (ns/s) spent on threads in this executor */ + public final Meter cpu; + /** Approximate amount of allocations in bytes by threads in this executor */ + public final Meter alloc; + private MetricNameFactory factory; + private final JMXEnabledThreadPoolExecutor executor; + + private static final ThreadMXBean bean = ManagementFactory.getThreadMXBean(); /** * Create metrics for given ThreadPoolExecutor. @@ -66,9 +86,13 @@ public class ThreadPoolMetrics * @param path Type of thread pool * @param poolName Name of thread pool to identify metrics */ - public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName) + public ThreadPoolMetrics(final JMXEnabledThreadPoolExecutor executor, String path, String poolName) { this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName); + this.executor = executor; + + cpu = Metrics.meter(factory.createMetricName("CPU")); + alloc = Metrics.meter(factory.createMetricName("Allocations")); activeTasks = Metrics.register(factory.createMetricName("ActiveTasks"), new Gauge() { @@ -100,6 +124,7 @@ public Integer getValue() return executor.getMaximumPoolSize(); } }); + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(this::updateCpuAlloc, 30, 30, TimeUnit.SECONDS); } public void release() @@ -126,6 +151,11 @@ public static Object getJmxMetric(MBeanServerConnection mbeanServerConn, String switch (metricName) { + case "CPU": + case "Allocations": + double rate = JMX.newMBeanProxy(mbeanServerConn, oName, JmxReporter.JmxMeterMBean.class).getOneMinuteRate(); + rate = rate / 1_000_000; // bytes to mb, and ns to ms + return Integer.toString((int) rate); case "ActiveTasks": case "PendingTasks": case "CompletedTasks": @@ -143,6 +173,29 @@ public static Object getJmxMetric(MBeanServerConnection mbeanServerConn, String } } + public synchronized void updateCpuAlloc() + { + for (long tid : ImmutableList.copyOf(executor.threadIds)) + { + updateThreadStats(tid); + } + } + + public synchronized void updateThreadStats(long threadId) + { + ThreadMetricState state = lastStatsByThreadId.computeIfAbsent(threadId, t-> new ThreadMetricState(threadId, cpu, alloc)); + + ThreadInfo info = bean.getThreadInfo(threadId, 0); + if (info == null) + { + executor.threadIds.remove(threadId); + lastStatsByThreadId.remove(threadId); + return; + } + + state.update(); + } + public static Multimap getJmxThreadPools(MBeanServerConnection mbeanServerConn) { try diff --git a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java index a56e52eb35c5..cf99f8502c7c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java @@ -20,6 +20,7 @@ import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.annotation.Nonnull; @@ -83,6 +84,12 @@ public void add(@Nonnull String... row) rows.add(row); } + public void add(@Nonnull Object... row) + { + Objects.requireNonNull(row); + add(Arrays.stream(row).map(o -> o.toString()).toArray(String[]::new)); + } + public void printTo(PrintStream out) { if (rows.isEmpty()) diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java index f3e91dcc94f4..74657d25bc26 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java @@ -20,7 +20,9 @@ import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; +import org.apache.cassandra.metrics.ThreadPoolMetrics; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.nodetool.stats.StatsHolder; @@ -37,18 +39,20 @@ public TpStatsHolder(NodeProbe probe) public Map convert2Map() { HashMap result = new HashMap<>(); - HashMap> threadPools = new HashMap<>(); + TreeMap> threadPools = new TreeMap<>(); HashMap droppedMessage = new HashMap<>(); HashMap waitLatencies = new HashMap<>(); for (Map.Entry tp : probe.getThreadPools().entries()) { - HashMap threadPool = new HashMap<>(); + Map threadPool = new HashMap<>(); threadPool.put("ActiveTasks", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "ActiveTasks")); threadPool.put("PendingTasks", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "PendingTasks")); threadPool.put("CompletedTasks", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "CompletedTasks")); threadPool.put("CurrentlyBlockedTasks", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "CurrentlyBlockedTasks")); threadPool.put("TotalBlockedTasks", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "TotalBlockedTasks")); + threadPool.put("CPU", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "CPU")); // ms/sec + threadPool.put("Allocations", probe.getThreadPoolMetric(tp.getKey(), tp.getValue(), "Allocations")); // mb/sec threadPools.put(tp.getValue(), threadPool); } result.put("ThreadPools", threadPools); diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java index 86bdf28205a1..7f0065659253 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.Map; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; + public class TpStatsPrinter { public static StatsPrinter from(String format) @@ -45,20 +47,23 @@ public void print(TpStatsHolder data, PrintStream out) { Map convertData = data.convert2Map(); - out.printf("%-30s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked"); + TableBuilder tb = new TableBuilder(); + tb.add("Pool Name", "Active", "Pending", "Completed", "Blocked", "AllTimeBlocked", "CPU[ms/s]", "Allocations[mb/s]"); - Map threadPools = convertData.get("ThreadPools") instanceof Map ? (Map)convertData.get("ThreadPools") : Collections.emptyMap(); + Map threadPools = convertData.get("ThreadPools") instanceof Map ? (Map) convertData.get("ThreadPools") : Collections.emptyMap(); for (Map.Entry entry : threadPools.entrySet()) { Map values = entry.getValue() instanceof Map ? (Map)entry.getValue() : Collections.emptyMap(); - out.printf("%-30s%10s%10s%15s%10s%18s%n", - entry.getKey(), - values.get("ActiveTasks"), - values.get("PendingTasks"), - values.get("CompletedTasks"), - values.get("CurrentlyBlockedTasks"), - values.get("TotalBlockedTasks")); + tb.add(entry.getKey(), + values.get("ActiveTasks"), + values.get("PendingTasks"), + values.get("CompletedTasks"), + values.get("CurrentlyBlockedTasks"), + values.get("TotalBlockedTasks"), + values.get("CPU"), + values.get("Allocations")); } + tb.printTo(out); out.printf("%n%-20s%10s%18s%18s%18s%18s%n", "Message type", "Dropped", "", "Latency waiting in queue (micros)", "", ""); out.printf("%-20s%10s%18s%18s%18s%18s%n", "", "", "50%", "95%", "99%", "Max"); diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index 9f9d86960aea..4fe042f3bc01 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -19,11 +19,12 @@ import java.lang.management.ManagementFactory; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import javax.management.*; import org.apache.cassandra.cache.*; - +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.metrics.ThreadPoolMetrics; import org.slf4j.Logger; @@ -67,17 +68,19 @@ private static void logStatus() MBeanServer server = ManagementFactory.getPlatformMBeanServer(); // everything from o.a.c.concurrent - logger.info(String.format("%-25s%10s%10s%15s%10s%18s", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All Time Blocked")); + logger.info(String.format("%-25s%10s%10s%15s%10s%18s%10s%12s", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All Time Blocked", "CPU", "Allocations")); for (Map.Entry tpool : ThreadPoolMetrics.getJmxThreadPools(server).entries()) { - logger.info(String.format("%-25s%10s%10s%15s%10s%18s%n", + logger.info(String.format("%-25s%10s%10s%15s%10s%18s%10s%12s", tpool.getValue(), ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "ActiveTasks"), ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "PendingTasks"), ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "CompletedTasks"), ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "CurrentlyBlockedTasks"), - ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "TotalBlockedTasks"))); + ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "TotalBlockedTasks"), + ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "CPU"), // ms/second + ThreadPoolMetrics.getJmxMetric(server, tpool.getKey(), tpool.getValue(), "Allocations"))); // mb/sec } // one offs diff --git a/test/unit/org/apache/cassandra/metrics/ThreadMetricStateTest.java b/test/unit/org/apache/cassandra/metrics/ThreadMetricStateTest.java new file mode 100644 index 000000000000..e3116ac605fc --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/ThreadMetricStateTest.java @@ -0,0 +1,113 @@ +package org.apache.cassandra.metrics; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.*; + +import com.codahale.metrics.Meter; + +public class ThreadMetricStateTest +{ + public static long allocRaw = 0; + public static long cpuRaw = 0; + Meter testCpu; + Meter testAlloc; + ThreadMetricState state; + + class UnderTest extends ThreadMetricState + { + + public UnderTest(long threadId, Meter cpu, Meter allocs) + { + super(threadId, cpu, allocs); + } + + @Override + long fetchTotalCPU(long threadId) + { + return cpuRaw; + } + + @Override + long fetchTotalAlloc(long threadId) + { + return allocRaw; + } + } + + @Before + public void setup() + { + cpuRaw = 0; + allocRaw = 0; + testCpu = new Meter(); + testAlloc = new Meter(); + state = new UnderTest(1, testCpu, testAlloc); + } + + @Test + public void testThreadDied() + { + Assert.assertEquals(0, testCpu.getCount()); + cpuRaw += 10; + state.updateCPU(); + Assert.assertEquals(10, testCpu.getCount()); + cpuRaw = -1; + state.update(); + Assert.assertEquals(10, testCpu.getCount()); + } + + @Test + public void testCPUDelta() + { + Assert.assertEquals(0, testCpu.getCount()); + cpuRaw += 10; + state.updateCPU(); + Assert.assertEquals(10, testCpu.getCount()); + cpuRaw += 10; + state.update(); + Assert.assertEquals(20, testCpu.getCount()); + } + + @Test + public void testCPUDeltaBaselined() + { + cpuRaw = 1230; + state = new UnderTest(1, testCpu, testAlloc); + Assert.assertEquals(0, testCpu.getCount()); + cpuRaw += 10; + state.updateCPU(); + Assert.assertEquals(10, testCpu.getCount()); + cpuRaw += 10; + state.update(); + Assert.assertEquals(20, testCpu.getCount()); + } + + @Test + public void testAllocDelta() + { + Assert.assertEquals(0, testAlloc.getCount()); + allocRaw += 10; + state.updateAllocs(); + Assert.assertEquals(10, testAlloc.getCount()); + allocRaw += 10; + state.update(); + state.update(); + Assert.assertEquals(20, testAlloc.getCount()); + } + + @Test + public void testAllocDeltaBaselined() + { + allocRaw = 1230; + state = new UnderTest(1, testCpu, testAlloc); + Assert.assertEquals(0, testAlloc.getCount()); + allocRaw += 10; + state.updateAllocs(); + Assert.assertEquals(10, testAlloc.getCount()); + allocRaw += 10; + state.update(); + state.update(); + Assert.assertEquals(20, testAlloc.getCount()); + } +}