Skip to content

Commit

Permalink
Check ThreadInfo[] for null element if thread are not alive.
Browse files Browse the repository at this point in the history
If a thread is not alive getting ThreadMXBean#getThreadInfo(long[], int)
places null elemnents in the returned array which are not repected
in the HotTheards API.

Closes #4775
  • Loading branch information
s1monw committed Jan 17, 2014
1 parent 73ebd4b commit 81e2950
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 13 deletions.
47 changes: 34 additions & 13 deletions src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java
Expand Up @@ -151,6 +151,9 @@ public int compare(MyThreadInfo o1, MyThreadInfo o2) {
}
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
for (int j = 0; j < threadElementsSnapshotCount; j++) {
// NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist,
// null will be set in the corresponding element in the returned array. A thread is alive if it has
// been started and has not yet died.
allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
Thread.sleep(threadElementsSnapshotDelay.millis());
}
Expand All @@ -163,8 +166,22 @@ public int compare(MyThreadInfo o1, MyThreadInfo o2) {
} else if ("block".equals(type)) {
time = hotties.get(t).blockedTime;
}
String threadName = null;
if (allInfos[0][t] == null) {
for (ThreadInfo[] info : allInfos) {
if (info != null && info[t] != null) {
threadName = info[t].getThreadName();
break;
}
}
if (threadName == null) {
continue; // thread is not alive yet or died before the first snapshot - ignore it!
}
} else {
threadName = allInfos[0][t].getThreadName();
}
double percent = (((double) time) / interval.nanos()) * 100;
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, allInfos[0][t].getThreadName()));
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, threadName));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
Expand All @@ -189,16 +206,18 @@ public int compare(MyThreadInfo o1, MyThreadInfo o2) {
count++;
}
}
StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(Locale.ROOT, " unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
}
} else {
sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
if (allInfos[i][t] != null) {
final StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(Locale.ROOT, " unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
}
} else {
sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
}
}
}
}
Expand All @@ -211,9 +230,11 @@ public int compare(MyThreadInfo o1, MyThreadInfo o2) {
}
}

private static final StackTraceElement[] EMPTY = new StackTraceElement[0];

private int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) {
StackTraceElement[] s1 = threadInfo.getStackTrace();
StackTraceElement[] s2 = threadInfo0.getStackTrace();
StackTraceElement[] s1 = threadInfo == null ? EMPTY : threadInfo.getStackTrace();
StackTraceElement[] s2 = threadInfo0 == null ? EMPTY : threadInfo0.getStackTrace();
int i = s1.length - 1;
int j = s2.length - 1;
int rslt = 0;
Expand Down
128 changes: 128 additions & 0 deletions src/test/java/org/elasticsearch/action/admin/HotThreadsTest.java
@@ -0,0 +1,128 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.index.query.FilterBuilders.andFilter;
import static org.elasticsearch.index.query.FilterBuilders.notFilter;
import static org.elasticsearch.index.query.FilterBuilders.queryFilter;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;

/**
*/
public class HotThreadsTest extends ElasticsearchIntegrationTest {

@Test
public void testHotThreadsDontFail() throws ExecutionException, InterruptedException {
/**
* This test just checks if nothing crashes or gets stuck etc.
*/
createIndex("test");
final int iters = atLeast(2);

for (int i = 0; i < iters; i++) {
final String type;
NodesHotThreadsRequestBuilder nodesHotThreadsRequestBuilder = client().admin().cluster().prepareNodesHotThreads();
if (randomBoolean()) {
TimeValue timeValue = new TimeValue(rarely() ? randomIntBetween(500, 5000) : randomIntBetween(20, 500));
nodesHotThreadsRequestBuilder.setInterval(timeValue);
}
if (randomBoolean()) {
nodesHotThreadsRequestBuilder.setThreads(randomIntBetween(1, 100));
}
if (randomBoolean()) {
switch (randomIntBetween(0, 2)) {
case 2:
type = "cpu";
break;
case 1:
type = "wait";
break;
default:
type = "block";
break;
}
assertThat(type, notNullValue());
nodesHotThreadsRequestBuilder.setType(type);
} else {
type = null;
}
final CountDownLatch latch = new CountDownLatch(1);
nodesHotThreadsRequestBuilder.execute(new ActionListener<NodesHotThreadsResponse>() {
@Override
public void onResponse(NodesHotThreadsResponse nodeHotThreads) {
try {
assertThat(nodeHotThreads, notNullValue());
Map<String,NodeHotThreads> nodesMap = nodeHotThreads.getNodesMap();
assertThat(nodesMap.size(), equalTo(cluster().size()));
for (NodeHotThreads ht : nodeHotThreads) {
assertNotNull(ht.getHotThreads());
//logger.info(ht.getHotThreads());
}
} finally {
latch.countDown();
}
}

@Override
public void onFailure(Throwable e) {
logger.error("FAILED", e);
latch.countDown();
fail();
}
});

indexRandom(true,
client().prepareIndex("test", "type1", "1").setSource("field1", "value1"),
client().prepareIndex("test", "type1", "2").setSource("field1", "value2"),
client().prepareIndex("test", "type1", "3").setSource("field1", "value3"));
ensureYellow();
if (randomBoolean()) {
optimize();
}
while(latch.getCount() > 0) {
assertHitCount(
client().prepareSearch()
.setQuery(matchAllQuery())
.setPostFilter(
andFilter(
queryFilter(matchAllQuery()),
notFilter(andFilter(queryFilter(termQuery("field1", "value1")),
queryFilter(termQuery("field1", "value2")))))).get(),
3l);
}
latch.await();
}
}
}

0 comments on commit 81e2950

Please sign in to comment.