Skip to content

Commit

Permalink
Return true for can_match on idle search shards (#55428) (#57158)
Browse files Browse the repository at this point in the history
With this change, we will always return true for can_match requests on
idle search shards; otherwise, some shards will never get refreshed if
all search requests perform the can_match phase (i.e., total shards >
pre_filter_shard_size).

Relates #27500
Relates #50043

Co-authored-by: Nhat Nguyen <nhat.nguyen@elastic.co>
  • Loading branch information
jimczi and dnhatn committed May 27, 2020
1 parent 1755b29 commit 9798f74
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collections;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class TransportSearchIT extends ESIntegTestCase {

Expand Down Expand Up @@ -69,4 +75,33 @@ public void testShardCountLimit() throws Exception {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null)));
}
}

public void testSearchIdle() throws Exception {
int numOfReplicas = randomIntBetween(0, 1);
internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1);
final Settings.Builder settings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500)));
assertAcked(prepareCreate("test").setSettings(settings)
.addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd"));
ensureGreen("test");
assertBusy(() -> {
for (String node : internalCluster().nodesInclude("test")) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
assertTrue(indexShard.isSearchIdle());
}
}
});
client().prepareIndex("test", "_doc").setId("1").setSource("created_date", "2020-01-01").get();
client().prepareIndex("test", "_doc").setId("2").setSource("created_date", "2020-01-02").get();
client().prepareIndex("test", "_doc").setId("3").setSource("created_date", "2020-01-03").get();
assertBusy(() -> {
SearchResponse resp = client().prepareSearch("test")
.setQuery(new RangeQueryBuilder("created_date").gte("2020-01-02").lte("2020-01-03"))
.setPreFilterShardSize(randomIntBetween(1, 3)).get();
assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
assertHitCount(client().prepareSearch().get(), 1);
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertFalse(shard.scheduledRefresh());
assertTrue(shard.hasRefreshPending());

// now disable background refresh and make sure the refresh happens
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
Expand All @@ -168,11 +169,13 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
// wait for both to ensure we don't have in-flight operations
updateSettingsLatch.await();
refreshLatch.await();
assertFalse(shard.hasRefreshPending());
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertTrue(shard.scheduledRefresh());
assertFalse(shard.hasRefreshPending());
assertTrue(shard.isSearchIdle());
assertHitCount(client().prepareSearch().get(), 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static final class CanMatchSearchPhaseResults extends SearchPhaseResults

@Override
void consumeResult(CanMatchResponse result) {
consumeResult(result.getShardIndex(), result.canMatch(), result.minAndMax());
consumeResult(result.getShardIndex(), result.canMatch(), result.estimatedMinAndMax());
}

@Override
Expand Down
55 changes: 46 additions & 9 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -271,6 +272,7 @@ Runnable getGlobalCheckpointSyncer() {

private final AtomicLong lastSearcherAccess = new AtomicLong();
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;

public IndexShard(
Expand Down Expand Up @@ -369,6 +371,7 @@ public boolean shouldCache(Query query) {
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -2742,7 +2745,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
Expand Down Expand Up @@ -3242,7 +3245,7 @@ && isSearchIdle()
/**
* Returns true if this shards is search idle
*/
final boolean isSearchIdle() {
public final boolean isSearchIdle() {
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
}

Expand All @@ -3253,15 +3256,49 @@ final long getLastSearcherAccess() {
return lastSearcherAccess.get();
}

/**
* Returns true if this shard has some scheduled refresh that is pending because of search-idle.
*/
public final boolean hasRefreshPending() {
return pendingRefreshLocation.get() != null;
}

private void setRefreshPending(Engine engine) {
Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
Translog.Location location;
do {
location = this.pendingRefreshLocation.get();
if (location != null && lastWriteLocation.compareTo(location) <= 0) {
break;
final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
pendingRefreshLocation.updateAndGet(curr -> {
if (curr == null || curr.compareTo(lastWriteLocation) <= 0) {
return lastWriteLocation;
} else {
return curr;
}
} while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
});
}

private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener {
Translog.Location lastWriteLocation;

@Override
public void beforeRefresh() {
try {
lastWriteLocation = getEngine().getTranslogLastWriteLocation();
} catch (AlreadyClosedException exc) {
// shard is closed - no location is fine
lastWriteLocation = null;
}
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh && lastWriteLocation != null) {
pendingRefreshLocation.updateAndGet(pendingLocation -> {
if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) {
return null;
} else {
return pendingLocation;
}
});
}
}
}

/**
Expand Down
26 changes: 15 additions & 11 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1115,18 +1115,22 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException
IndexShard indexShard = indexService.getShard(request.shardId().getId());
// we don't want to use the reader wrapper since it could run costly operations
// and we can afford false positives.
final boolean hasRefreshPending = indexShard.hasRefreshPending();
try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) {
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, false);
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
final boolean canMatch;
if (canRewriteToMatchNone(request.source())) {
QueryBuilder queryBuilder = request.source().query();
return new CanMatchResponse(queryBuilder instanceof MatchNoneQueryBuilder == false, minMax);
canMatch = queryBuilder instanceof MatchNoneQueryBuilder == false;
} else {
// null query means match_all
canMatch = true;
}
// null query means match_all
return new CanMatchResponse(true, minMax);
return new CanMatchResponse(canMatch || hasRefreshPending, minMax);
}
}

Expand Down Expand Up @@ -1202,37 +1206,37 @@ private static PipelineTree requestToPipelineTree(SearchRequest request) {

public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final MinAndMax<?> minAndMax;
private final MinAndMax<?> estimatedMinAndMax;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
minAndMax = in.readOptionalWriteable(MinAndMax::new);
estimatedMinAndMax = in.readOptionalWriteable(MinAndMax::new);
} else {
minAndMax = null;
estimatedMinAndMax = null;
}
}

public CanMatchResponse(boolean canMatch, MinAndMax<?> minAndMax) {
public CanMatchResponse(boolean canMatch, MinAndMax<?> estimatedMinAndMax) {
this.canMatch = canMatch;
this.minAndMax = minAndMax;
this.estimatedMinAndMax = estimatedMinAndMax;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalWriteable(minAndMax);
out.writeOptionalWriteable(estimatedMinAndMax);
}
}

public boolean canMatch() {
return canMatch;
}

public MinAndMax<?> minAndMax() {
return minAndMax;
public MinAndMax<?> estimatedMinAndMax() {
return estimatedMinAndMax;
}
}

Expand Down

0 comments on commit 9798f74

Please sign in to comment.