Skip to content

Commit

Permalink
[7.x] Backport HotThreads memory report type (#72850) (#78483)
Browse files Browse the repository at this point in the history
Backport of #72850

Add new HotThreads report type to capture allocated memory
per Elasticsearch thread.

Co-authored-by: zhangchao <80152403@qq.com>
  • Loading branch information
grcevski and easyice committed Sep 29, 2021
1 parent 9bf8c73 commit bddc632
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public void testHotThreadsDontFail() throws ExecutionException, InterruptedExcep
}
nodesHotThreadsRequestBuilder.setIgnoreIdleThreads(randomBoolean());
if (randomBoolean()) {
switch (randomIntBetween(0, 2)) {
switch (randomIntBetween(0, 3)) {
case 3:
type = "mem";
break;
case 2:
type = "cpu";
break;
Expand Down
48 changes: 35 additions & 13 deletions server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;

import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -63,7 +64,8 @@ public enum ReportType {

CPU("cpu"),
WAIT("wait"),
BLOCK("block");
BLOCK("block"),
MEM("mem");

private final String type;

Expand Down Expand Up @@ -117,7 +119,7 @@ public HotThreads type(ReportType type) {

public String detect() throws Exception {
synchronized (mutex) {
return innerDetect(ManagementFactory.getThreadMXBean(), Thread.currentThread().getId());
return innerDetect(ManagementFactory.getThreadMXBean(), SunThreadInfo.INSTANCE, Thread.currentThread().getId());
}
}

Expand Down Expand Up @@ -145,7 +147,7 @@ static boolean isIdleThread(ThreadInfo threadInfo) {
return false;
}

Map<Long, ThreadTimeAccumulator> getAllValidThreadInfos(ThreadMXBean threadBean, long currentThreadId) {
Map<Long, ThreadTimeAccumulator> getAllValidThreadInfos(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long currentThreadId) {
long[] threadIds = threadBean.getAllThreadIds();
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds);
Map<Long, ThreadTimeAccumulator> result = new HashMap<>(threadIds.length);
Expand All @@ -158,7 +160,8 @@ Map<Long, ThreadTimeAccumulator> getAllValidThreadInfos(ThreadMXBean threadBean,
if (cpuTime == INVALID_TIMING) {
continue;
}
result.put(threadIds[i], new ThreadTimeAccumulator(threadInfos[i], cpuTime));
long allocatedBytes = type == ReportType.MEM ? sunThreadInfo.getThreadAllocatedBytes(threadIds[i]) : 0;
result.put(threadIds[i], new ThreadTimeAccumulator(threadInfos[i], cpuTime, allocatedBytes));
}

return result;
Expand Down Expand Up @@ -186,11 +189,15 @@ private void setThreadWaitBlockTimeMonitoringEnabled(ThreadMXBean threadBean, bo
}
}

String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Exception {
String innerDetect(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long currentThreadId) throws Exception {
if (threadBean.isThreadCpuTimeSupported() == false) {
throw new ElasticsearchException("thread CPU time is not supported on this JDK");
}

if (type == ReportType.MEM && sunThreadInfo.isThreadAllocatedMemorySupported() == false) {
throw new ElasticsearchException("thread allocated memory is not supported on this JDK");
}

StringBuilder sb = new StringBuilder()
.append("Hot threads at ")
.append(DATE_TIME_FORMATTER.format(LocalDateTime.now(Clock.systemUTC())))
Expand All @@ -207,9 +214,9 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti

try {
// Capture before and after thread state with timings
Map<Long, ThreadTimeAccumulator> previousThreadInfos = getAllValidThreadInfos(threadBean, currentThreadId);
Map<Long, ThreadTimeAccumulator> previousThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId);
Thread.sleep(interval.millis());
Map<Long, ThreadTimeAccumulator> latestThreadInfos = getAllValidThreadInfos(threadBean, currentThreadId);
Map<Long, ThreadTimeAccumulator> latestThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId);

latestThreadInfos.forEach((threadId, accumulator) -> accumulator.subtractPrevious(previousThreadInfos.get(threadId)));

Expand All @@ -225,7 +232,7 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
ThreadInfo[][] allInfos = captureThreadStacks(threadBean, topThreadIds);

for (int t = 0; t < topThreads.size(); t++) {
long time = getter.applyAsLong(topThreads.get(t));
long timeOrBytes = getter.applyAsLong(topThreads.get(t));
String threadName = null;
for (ThreadInfo[] info : allInfos) {
if (info != null && info[t] != null) {
Expand All @@ -240,9 +247,15 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
if (threadName == null) {
continue; // thread is not alive yet or died before the first snapshot - ignore it!
}
double percent = (((double) time) / interval.nanos()) * 100;
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n",
percent, TimeValue.timeValueNanos(time), interval, type.getTypeValue(), threadName));

if (type == ReportType.MEM) {
sb.append(String.format(Locale.ROOT, "%n%s memory allocated by thread '%s'%n",
new ByteSizeValue(timeOrBytes), threadName));
} else {
double percent = (((double) timeOrBytes) / interval.nanos()) * 100;
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n",
percent, TimeValue.timeValueNanos(timeOrBytes), interval, type.getTypeValue(), threadName));
}
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
Expand Down Expand Up @@ -311,11 +324,13 @@ static class ThreadTimeAccumulator {
private long cpuTime;
private long blockedTime;
private long waitedTime;
private long allocatedBytes;

ThreadTimeAccumulator(ThreadInfo info, long cpuTime) {
ThreadTimeAccumulator(ThreadInfo info, long cpuTime, long allocatedBytes) {
this.blockedTime = info.getBlockedTime();
this.waitedTime = info.getWaitedTime();
this.cpuTime = cpuTime;
this.allocatedBytes = allocatedBytes;
this.threadId = info.getThreadId();
}

Expand All @@ -329,6 +344,7 @@ void subtractPrevious(ThreadTimeAccumulator previous) {
this.blockedTime -= previous.blockedTime;
this.waitedTime -= previous.waitedTime;
this.cpuTime -= previous.cpuTime;
this.allocatedBytes -= previous.allocatedBytes;
}
}

Expand All @@ -346,6 +362,10 @@ public long getWaitedTime() {
return Math.max(waitedTime, 0);
}

public long getAllocatedBytes() {
return Math.max(allocatedBytes, 0);
}

public long getThreadId() {
return threadId;
}
Expand All @@ -358,8 +378,10 @@ static ToLongFunction<ThreadTimeAccumulator> valueGetterForReportType(ReportType
return ThreadTimeAccumulator::getWaitedTime;
case BLOCK:
return ThreadTimeAccumulator::getBlockedTime;
case MEM:
return ThreadTimeAccumulator::getAllocatedBytes;
}
throw new IllegalArgumentException("expected thread type to be either 'cpu', 'wait', or 'block', but was " + type);
throw new IllegalArgumentException("expected thread type to be either 'cpu', 'wait', 'mem', or 'block', but was " + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.monitor.jvm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.management.ThreadMXBean;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;

public class SunThreadInfo {

private static final ThreadMXBean threadMXBean;
private static final Method getThreadAllocatedBytes;
private static final Method isThreadAllocatedMemorySupported;
private static final Method isThreadAllocatedMemoryEnabled;

private static final Logger logger = LogManager.getLogger(SunThreadInfo.class);
public static final SunThreadInfo INSTANCE = new SunThreadInfo();

static {
threadMXBean = ManagementFactory.getThreadMXBean();
getThreadAllocatedBytes = getMethod("getThreadAllocatedBytes", long.class);
isThreadAllocatedMemorySupported = getMethod("isThreadAllocatedMemorySupported");
isThreadAllocatedMemoryEnabled = getMethod("isThreadAllocatedMemoryEnabled");
}

public boolean isThreadAllocatedMemorySupported() {
if (isThreadAllocatedMemorySupported == null) {
return false;
}

try {
return (boolean) isThreadAllocatedMemorySupported.invoke(threadMXBean);
} catch (Exception e) {
logger.warn("exception while invoke isThreadAllocatedMemorySupported", e);
return false;
}
}

public boolean isThreadAllocatedMemoryEnabled() {
if (isThreadAllocatedMemoryEnabled == null) {
return false;
}

try {
return (boolean) isThreadAllocatedMemoryEnabled.invoke(threadMXBean);
} catch (Exception e) {
logger.warn("exception while invoke isThreadAllocatedMemoryEnabled", e);
return false;
}
}

public long getThreadAllocatedBytes(long id) {
if (getThreadAllocatedBytes == null) {
return 0;
}

if (isThreadAllocatedMemorySupported() == false || isThreadAllocatedMemoryEnabled() == false) {
return 0;
}

if (id <= 0) {
return 0;
}

try {
long bytes = (long) getThreadAllocatedBytes.invoke(threadMXBean, id);
assert bytes >= 0 : "OS reported a negative thread allocated size [" + bytes + "], thread id [" + id + "].";
return Math.max(0, bytes);
} catch (Exception e) {
logger.warn("exception retrieving thread allocated memory", e);
return 0;
}
}

private static Method getMethod(String methodName, Class<?>... parameterTypes) {
try {
Method method = Class.forName("com.sun.management.ThreadMXBean").getMethod(methodName, parameterTypes);
return method;
} catch (Exception e) {
// not available
return null;
}
}
}

0 comments on commit bddc632

Please sign in to comment.