Skip to content

Commit

Permalink
Multi-get requests should wait for search active (#46283)
Browse files Browse the repository at this point in the history
When a shard has fallen search idle, and a non-realtime multi-get
request is executed, today such requests do not wait for the shard to
become search active and therefore such requests do not wait for a
refresh to see the latest changes to the index. This also prevents such
requests from triggering the shard as non-search idle, influencing the
behavior of scheduled refreshes. This commit addresses this by attaching
a listener to the shard search active state for multi-get requests. In
this way, when the next scheduled refresh is executed, the multi-get
request will then proceed.
  • Loading branch information
jasontedor committed Sep 3, 2019
1 parent 2460389 commit eea16bd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.get;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

public class TransportShardMultiGetAction extends TransportSingleShardAction<MultiGetShardRequest, MultiGetShardResponse> {

private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]";
Expand Down Expand Up @@ -73,6 +76,24 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
.getShards(state, request.request().index(), request.request().shardId(), request.request().preference());
}

@Override
protected void asyncShardOperation(
MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
super.asyncShardOperation(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -107,6 +108,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntToLongFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand Down Expand Up @@ -686,7 +688,23 @@ private static ShardRouting getInitializingShardRouting(ShardRouting existingSha
return shardRouting;
}

public void testAutomaticRefresh() throws InterruptedException {
public void testAutomaticRefreshSearch() throws InterruptedException {
runTestAutomaticRefresh(numDocs -> client().prepareSearch("test").get().getHits().getTotalHits().value);
}

public void testAutomaticRefreshMultiGet() throws InterruptedException {
runTestAutomaticRefresh(
numDocs -> {
final MultiGetRequest request = new MultiGetRequest();
request.realtime(false);
for (int i = 0; i < numDocs; i++) {
request.add("test", "" + i);
}
return Arrays.stream(client().multiGet(request).actionGet().getResponses()).filter(r -> r.getResponse().isExists()).count();
});
}

private void runTestAutomaticRefresh(final IntToLongFunction count) throws InterruptedException {
TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
Settings.Builder builder = Settings.builder();
if (randomTimeValue != null) {
Expand Down Expand Up @@ -719,31 +737,31 @@ public void testAutomaticRefresh() throws InterruptedException {
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
}
}

CountDownLatch started = new CountDownLatch(1);
Thread t = new Thread(() -> {
SearchResponse searchResponse;
started.countDown();
do {
searchResponse = client().prepareSearch().get();
} while (searchResponse.getHits().getTotalHits().value != totalNumDocs.get());

} while (count.applyAsLong(totalNumDocs.get()) != totalNumDocs.get());
});
t.start();
started.await();
assertHitCount(client().prepareSearch().get(), 1);
assertThat(count.applyAsLong(totalNumDocs.get()), equalTo(1L));
for (int i = 1; i < numDocs; i++) {
client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
indexingDone.countDown();
}

@Override
public void onFailure(Exception e) {
indexingDone.countDown();
throw new AssertionError(e);
}
});
@Override
public void onResponse(IndexResponse indexResponse) {
indexingDone.countDown();
}

@Override
public void onFailure(Exception e) {
indexingDone.countDown();
throw new AssertionError(e);
}
});
}
indexingDone.await();
t.join();
Expand All @@ -755,7 +773,6 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
assertNoSearchHits(client().prepareSearch().get());
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertFalse(shard.scheduledRefresh());
Expand Down

0 comments on commit eea16bd

Please sign in to comment.