Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUn
this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
}

public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
allowCoreThreadTimeOut(true);
this.setRejectedExecutionHandler(blockingExecutionHandler);
}

public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.cassandra.concurrent;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServer;
import javax.management.ObjectName;

Expand All @@ -38,6 +41,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
{
private final String mbeanName;
public final ThreadPoolMetrics metrics;
public final List<Long> threadIds;

public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
Expand Down Expand Up @@ -77,7 +81,13 @@ public JMXEnabledThreadPoolExecutor(int corePoolSize,
NamedThreadFactory threadFactory,
String jmxPath)
{
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
threadIds = Collections.synchronizedList(new ArrayList<>(corePoolSize));
super.setThreadFactory(r-> {
Thread t = threadFactory.newThread(r);
threadIds.add(t.getId());
return t;
});
super.prestartAllCoreThreads();
metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);

Expand Down Expand Up @@ -127,6 +137,12 @@ private void unregisterMBean()
metrics.release();
}

@Override
public void terminated()
{
metrics.updateThreadStats(Thread.currentThread().getId());
}

@Override
public synchronized void shutdown()
{
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/concurrent/SEPExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
public final int maxWorkers;
public final String name;
public final int maxTasksQueued;
private final SEPMetrics metrics;
public final SEPMetrics metrics;

// stores both a set of work permits and task permits:
// bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued] (initially 0)
Expand Down
28 changes: 26 additions & 2 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.concurrent;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
Expand All @@ -25,12 +26,13 @@
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.metrics.ThreadMetricState;
import org.apache.cassandra.utils.JVMStabilityInspector;

final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class);
private static final boolean SET_THREAD_NAME = Boolean.parseBoolean(System.getProperty("cassandra.set_sep_thread_name", "true"));
private static final boolean TRACK_SEP = Boolean.parseBoolean(System.getProperty("cassandra.track_sep_threads", "true"));

final Long workerId;
final Thread thread;
Expand All @@ -42,6 +44,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
// strategy can only work when there are multiple threads spinning (as more sleep time must elapse than real time)
long prevStopCheck = 0;
long soleSpinnerSpinTime = 0;
ThreadMetricState state;

SEPWorker(Long workerId, Work initialState, SharedExecutorPool pool)
{
Expand All @@ -51,6 +54,18 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
thread.setDaemon(true);
set(initialState);
thread.start();
ScheduledFuture f = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() ->
{
if (state != null) {
try
{
state.update();
} catch (NullPointerException e) {
// state can become null after check, ok to just ignore that race and do nothing - metrics would
// be updated from the update at end of assignment
}
}
}, 30, 30, TimeUnit.SECONDS);
}

public void run()
Expand Down Expand Up @@ -90,8 +105,11 @@ public void run()
assigned = get().assigned;
if (assigned == null)
continue;
if (SET_THREAD_NAME)
if (TRACK_SEP)
{
Thread.currentThread().setName(assigned.name + "-" + workerId);
state = new ThreadMetricState(Thread.currentThread().getId(), assigned.metrics.cpu, assigned.metrics.alloc);
}
task = assigned.tasks.poll();

// if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
Expand Down Expand Up @@ -119,6 +137,12 @@ public void run()
assigned.returnWorkPermit();
if (shutdown && assigned.getActiveCount() == 0)
assigned.shutdown.signalAll();

if (TRACK_SEP)
{
state.update();
state = null;
}
assigned = null;

// try to immediately reassign ourselves some work; if we fail, start spinning
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/metrics/SEPMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;

import org.apache.cassandra.concurrent.SEPExecutor;

Expand All @@ -44,6 +45,11 @@ public class SEPMetrics
/** Maximum number of tasks queued before a task get blocked */
public final Gauge<Integer> maxTasksQueued;

/** Approximate amount of cpu time (ns/s) spent on threads in this stage */
public final Meter cpu;
/** Approximate amount of allocations in bytes by threads in this stage */
public final Meter alloc;

private MetricNameFactory factory;

/**
Expand All @@ -56,6 +62,8 @@ public class SEPMetrics
public SEPMetrics(final SEPExecutor executor, String path, String poolName)
{
this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
cpu = Metrics.meter(factory.createMetricName("CPU"));
alloc = Metrics.meter(factory.createMetricName("Allocations"));
activeTasks = Metrics.register(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
{
public Integer getValue()
Expand Down
143 changes: 143 additions & 0 deletions src/java/org/apache/cassandra/metrics/ThreadMetricState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.apache.cassandra.metrics;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.atomic.AtomicLongArray;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;

public class ThreadMetricState
{
private final AtomicLongArray state = new AtomicLongArray(2);
static final ThreadMXBean BEAN = ManagementFactory.getThreadMXBean();
private static final boolean SUN;

static {
boolean success = false;
try
{
Class.forName("com.sun.management.ThreadMXBean");
success = true;
}
catch (ClassNotFoundException ex)
{
success = false;
}
SUN = success;
}

Meter cpu;
Meter allocs;
long threadId;

public ThreadMetricState(long threadId, Meter cpu, Meter allocs)
{
this.threadId = threadId;
this.cpu = cpu;
this.allocs = allocs;
baseline();
}

public long getCPU()
{
return state.get(0);
}

public long getAllocations()
{
return state.get(1);
}

boolean setCPU(long expect, long cpu)
{
return state.compareAndSet(0, expect, cpu);
}

boolean setAllocations(long expect,long allocs)
{
return state.compareAndSet(1, expect, allocs);
}

@VisibleForTesting
long fetchTotalCPU(long threadId)
{
return BEAN.getThreadCpuTime(threadId);
}

@VisibleForTesting
long fetchTotalAlloc(long threadId)
{
final com.sun.management.ThreadMXBean beanX = (com.sun.management.ThreadMXBean) BEAN;
return beanX.getThreadAllocatedBytes(threadId);
}

public void baseline()
{
if (BEAN.isCurrentThreadCpuTimeSupported())
{
long ns = fetchTotalCPU(threadId);
if (ns > 0)
state.set(0, ns);
}

if (SUN)
{
long bytes = fetchTotalAlloc(threadId);
if(bytes > 0)
{
state.set(1, bytes);
}
}
}

public void update()
{
updateCPU();
updateAllocs();
}

public void updateCPU()
{
boolean success = false;
long delta = 0;
while(!success && BEAN.isCurrentThreadCpuTimeSupported())
{
long pCpu = getCPU();
long cpu = fetchTotalCPU(threadId);
if(cpu > 0)
{
delta = cpu - pCpu;
success = setCPU(pCpu, cpu);
}
else
success = true; // skip setting
}
if (delta > 0)
cpu.mark(delta);
}

public void updateAllocs()
{
long delta = 0;
if (SUN)
{
final com.sun.management.ThreadMXBean beanX = (com.sun.management.ThreadMXBean) BEAN;
boolean success = false;
while(!success && beanX.isThreadAllocatedMemoryEnabled())
{
long pAllocs = getAllocations();
long allocs = fetchTotalAlloc(threadId);
if(allocs > 0)
{
delta = allocs - pAllocs;
success = setAllocations(pAllocs, allocs);
}
else
success = true; // skip setting
}
}
if (delta > 0)
allocs.mark(delta);
}
}
Loading