Skip to content

Commit

Permalink
[TEST] Add a more restrictive thread leaks filter
Browse files Browse the repository at this point in the history
Today all threads are allowed to leak a suite. This is tricky since
it essentially allows resource leaks by default where for instance
test private TransportClients will never get closed and consume
resources influencing other tests. It also hides threads that
are not fully under elasticsearchs control like the Lucene
TimeLimitingCollector thread. This commit restricts the threads
that can leak a suite to the threads spawned from testclusters
and fixes sevearl places that leaked threads.

Closes #7833
  • Loading branch information
s1monw committed Sep 23, 2014
1 parent 046a3a0 commit 30acba6
Show file tree
Hide file tree
Showing 32 changed files with 167 additions and 43 deletions.
4 changes: 4 additions & 0 deletions core-signatures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ com.ning.compress.lzf.LZFOutputStream#<init>(java.io.OutputStream)
com.ning.compress.lzf.LZFOutputStream#<init>(java.io.OutputStream, com.ning.compress.BufferRecycler)
com.ning.compress.lzf.LZFUncompressor#<init>(com.ning.compress.DataHandler)
com.ning.compress.lzf.LZFUncompressor#<init>(com.ning.compress.DataHandler, com.ning.compress.BufferRecycler)

@defaultMessage Spawns a new thread which is solely under lucenes control use ThreadPool#estimatedTimeInMillisCounter instead
org.apache.lucene.search.TimeLimitingCollector#getGlobalTimerThread()
org.apache.lucene.search.TimeLimitingCollector#getGlobalCounter()
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
new ShardSearchRequest(request).types(request.types()).nowInMillis(request.nowInMillis())
.filteringAliases(request.filteringAliases()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()
);
SearchContext.setCurrent(searchContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected ShardCountResponse shardOperation(ShardCountRequest request) throws El
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest.request).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API), indexService, indexShard, scriptService,
pageCacheRecycler, bigArrays));
pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
Expand All @@ -129,7 +129,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API, IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
pageCacheRecycler, bigArrays));
pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected ShardExistsResponse shardOperation(ShardExistsRequest request) throws
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
.nowInMillis(request.nowInMillis),
null, result.searcher(), indexService, indexShard,
scriptService, pageCacheRecycler,
bigArrays
bigArrays, threadPool.estimatedTimeInMillisCounter()
);
SearchContext.setCurrent(context);

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
Expand Down Expand Up @@ -149,8 +150,8 @@ public final static EarlyTerminatingCollector wrapCountBasedEarlyTerminatingColl
/**
* Wraps <code>delegate</code> with a time limited collector with a timeout of <code>timeoutInMillis</code>
*/
public final static TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, TimeLimitingCollector.getGlobalCounter(), timeoutInMillis);
public final static TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, final Counter counter, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, counter, timeoutInMillis);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
Expand Down Expand Up @@ -687,4 +688,9 @@ public boolean useSlowScroll() {
public SearchContext useSlowScroll(boolean useSlowScroll) {
throw new UnsupportedOperationException();
}

@Override
public Counter timeEstimateCounter() {
throw new UnsupportedOperationException();
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());

Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays);
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -609,4 +610,9 @@ public boolean useSlowScroll() {
public SearchContext useSlowScroll(boolean useSlowScroll) {
throw new UnsupportedOperationException("Not supported");
}

@Override
public Counter timeEstimateCounter() {
throw new UnsupportedOperationException("Not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void search(List<AtomicReaderContext> leaves, Weight weight, Collector co
if (timeoutSet) {
// TODO: change to use our own counter that uses the scheduler in ThreadPool
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeoutInMillis());
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis());
}
if (terminateAfterSet) {
// throws Lucene.EarlyTerminationException when given count is reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;

private final SearchShardTarget shardTarget;
private final Counter timeEstimateCounter;

private SearchType searchType;

Expand Down Expand Up @@ -179,7 +181,7 @@ public class DefaultSearchContext extends SearchContext {
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays) {
BigArrays bigArrays, Counter timeEstimateCounter) {
this.id = id;
this.request = request;
this.searchType = request.searchType();
Expand All @@ -199,6 +201,7 @@ public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarg

// initialize the filtering alias based on the provided filters
aliasFilter = indexService.aliasesService().aliasFilter(request.filteringAliases());
this.timeEstimateCounter = timeEstimateCounter;
}

@Override
Expand Down Expand Up @@ -705,4 +708,9 @@ public DefaultSearchContext useSlowScroll(boolean useSlowScroll) {
this.useSlowScroll = useSlowScroll;
return this;
}

@Override
public Counter timeEstimateCounter() {
return timeEstimateCounter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -354,6 +355,8 @@ public void clearReleasables(Lifetime lifetime) {

public abstract SearchContext useSlowScroll(boolean useSlowScroll);

public abstract Counter timeEstimateCounter();

/**
* The life time of an object that is used during search execution.
*/
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
Expand Down Expand Up @@ -150,6 +151,10 @@ public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
}

public Counter estimatedTimeInMillisCounter() {
return estimatedTimeThread.counter;
}

public ThreadPoolInfo info() {
List<Info> infos = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
Expand Down Expand Up @@ -264,6 +269,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
while (!retiredExecutors.isEmpty()) {
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit);
}
estimatedTimeThread.join(unit.toMillis(timeout));
return result;
}

Expand Down Expand Up @@ -506,15 +512,15 @@ public String toString() {
static class EstimatedTimeThread extends Thread {

final long interval;

final TimeCounter counter;
volatile boolean running = true;

volatile long estimatedTimeInMillis;

EstimatedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.estimatedTimeInMillis = System.currentTimeMillis();
this.counter = new TimeCounter();
setDaemon(true);
}

Expand All @@ -534,6 +540,19 @@ public void run() {
}
}
}

private class TimeCounter extends Counter {

@Override
public long addAndGet(long delta) {
throw new UnsupportedOperationException();
}

@Override
public long get() {
return estimatedTimeInMillis;
}
}
}

static class ExecutorHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public void init() {
}

@After
public void cleanup() {
threadPool.shutdownNow();
public void cleanup() throws InterruptedException {
terminate(threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ public void testThatTransportClientSettingIsSet() {

@Test
public void testThatTransportClientSettingCannotBeChanged() {
TransportClient client = new TransportClient(settingsBuilder().put(Client.CLIENT_TYPE_SETTING, "anything"));
Settings settings = client.injector.getInstance(Settings.class);
assertThat(settings.get(Client.CLIENT_TYPE_SETTING), is("transport"));
try (TransportClient client = new TransportClient(settingsBuilder().put(Client.CLIENT_TYPE_SETTING, "anything"))) {
Settings settings = client.injector.getInstance(Settings.class);
assertThat(settings.get(Client.CLIENT_TYPE_SETTING), is("transport"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testPriorityQueue() throws Exception {

@Test
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
Expand All @@ -85,11 +85,12 @@ public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}

@Test
public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
Expand All @@ -114,11 +115,12 @@ public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}

@Test
public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
Expand All @@ -143,11 +145,12 @@ public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}

@Test
public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
Expand All @@ -172,12 +175,13 @@ public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}

@Test
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName()));
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
Expand Down Expand Up @@ -233,9 +237,7 @@ public void run() {
block.countDown();
Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));

timer.shutdownNow();
executor.shutdownNow();
assertTrue(terminate(timer, executor));
}

static class AwaitingJob extends PrioritizedRunnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,7 @@ public void tearDown() throws Exception {

engine.close();
store.close();

if (threadPool != null) {
threadPool.shutdownNow();
}
terminate(threadPool);
}

private Document testDocumentWithTextField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -97,6 +99,12 @@ protected void configure() {
context = new QueryParseContext(index, queryParserService);
}

@After
public void tearDown() throws Exception {
super.tearDown();
terminate(injector.getInstance(ThreadPool.class));
}

@Test
public void testParser() throws IOException {
String templateString = "{\"template\": {"
Expand Down
Loading

1 comment on commit 30acba6

@DaddyMoe
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @s1monw,

  1. Does this mean that I will always have to annotate my Failing Test classes as a result of memory leaks detected by Lucenes' <RandomizedRunner> with :

<@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // Which is "Highly discouraged" >?
2. Or is there another work around?

PS: currently using <@ThreadLeakScope(ThreadLeakScope.Scope.NONE)> on staging and I will like to find an alternative before going live

Please sign in to comment.