Skip to content

Commit

Permalink
Add test and randomization for elastic#5165
Browse files Browse the repository at this point in the history
  • Loading branch information
s1monw committed Feb 19, 2014
1 parent a804f93 commit 7ce3579
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 11 deletions.
23 changes: 12 additions & 11 deletions src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -86,6 +86,11 @@
public class SearchService extends AbstractLifecycleComponent<SearchService> {

public static final String NORMS_LOADING_KEY = "index.norms.loading";
private static final String DEFAUTL_KEEPALIVE_COMPONENENT_KEY ="default_keep_alive";
public static final String DEFAUTL_KEEPALIVE_KEY ="search."+DEFAUTL_KEEPALIVE_COMPONENENT_KEY;
private static final String KEEPALIVE_INTERVAL_COMPONENENT_KEY ="keep_alive_interval";
public static final String KEEPALIVE_INTERVAL_KEY ="search."+KEEPALIVE_INTERVAL_COMPONENENT_KEY;


private final ThreadPool threadPool;

Expand Down Expand Up @@ -129,9 +134,9 @@ public SearchService(Settings settings, ClusterService clusterService, IndicesSe
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;

TimeValue keepAliveInterval = componentSettings.getAsTime("keep_alive_interval", timeValueMinutes(1));
TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1));
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(5)).millis();
this.defaultKeepAlive = componentSettings.getAsTime(DEFAUTL_KEEPALIVE_COMPONENENT_KEY, timeValueMinutes(5)).millis();

Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>();
elementParsers.putAll(dfsPhase.parseElements());
Expand Down Expand Up @@ -458,8 +463,8 @@ private SearchContext findContext(long id) throws SearchContextMissingException
return context;
}

SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
final SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request, null);
boolean success = false;
try {
activeContexts.put(context.id(), context);
Expand All @@ -473,11 +478,7 @@ SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticSear
}
}

SearchContext createContext(ShardSearchRequest request) throws ElasticSearchException {
return createContext(request, null);
}

SearchContext createContext(ShardSearchRequest request, @Nullable Engine.Searcher searcher) throws ElasticSearchException {
final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.Searcher searcher) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());

Expand Down Expand Up @@ -839,11 +840,11 @@ public void awaitTermination() throws InterruptedException {
class Reaper implements Runnable {
@Override
public void run() {
long time = threadPool.estimatedTimeInMillis();
final long time = threadPool.estimatedTimeInMillis();
for (SearchContext context : activeContexts.values()) {
// Use the same value for both checks since lastAccessTime can
// be modified by another thread between checks!
long lastAccessTime = context.lastAccessTime();
final long lastAccessTime = context.lastAccessTime();
if (lastAccessTime == -1l) { // its being processed or timeout is disabled
continue;
}
Expand Down
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;

import org.apache.lucene.util.English;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import java.util.concurrent.ExecutionException;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;

/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
public class StressSearchServiceReaperTest extends ElasticsearchIntegrationTest {


@Override
protected Settings nodeSettings(int nodeOrdinal) {
// very frequent checks
return ImmutableSettings.builder().put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueMillis(1)).build();
}

@Slow
@Test // see issue #5165 - this test fails each time without the fix in pull #5170
public void testStressReaper() throws ExecutionException, InterruptedException {
int num = atLeast(100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[num];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type", "" + i).setSource("f", English.intToEnglish(i));
}
prepareCreate("test").setSettings("number_of_shards", randomIntBetween(1,5), "number_of_replicas", randomIntBetween(0,1)).setSettings();
indexRandom(true, builders);
ensureYellow();
final int iterations = atLeast(500);
for (int i = 0; i < iterations; i++) {
SearchResponse searchResponse = client().prepareSearch("test").setQuery(matchAllQuery()).setSize(num).get();
assertNoFailures(searchResponse);
assertHitCount(searchResponse, num);
}
}
}
10 changes: 10 additions & 0 deletions src/test/java/org/elasticsearch/test/TestCluster.java
Expand Up @@ -45,10 +45,12 @@
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
Expand Down Expand Up @@ -245,6 +247,14 @@ private static Settings getRandomNodeSettings(long seed) {
builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, rarely(random));
}
builder.put("type", RandomPicks.randomFrom(random, CacheRecycler.Type.values()));
if (random.nextInt(10) == 0) { // 10% of the nodes have a very frequent check interval
builder.put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueMillis(10 + random.nextInt(2000)));
} else if (random.nextInt(10) != 0) { // 90% of the time - 10% of the time we don't set anything
builder.put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueSeconds(10 + random.nextInt(5 * 60)));
}
if (random.nextBoolean()) { // sometimes set a
builder.put(SearchService.DEFAUTL_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5*60)));
}
return builder.build();
}

Expand Down

0 comments on commit 7ce3579

Please sign in to comment.