Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
shardIt.getClusterAlias(),
shardIt.getSearchContextId(),
shardIt.getSearchContextKeepAlive(),
shardIt.getReshardSplitShardCountSummary()
shardIt.getSplitShardCountSummary()
);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
shardIt.getSearchContextId(),
shardIt.getSearchContextKeepAlive(),
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
shardIt.getReshardSplitShardCountSummary()
shardIt.getSplitShardCountSummary()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
shardIndex,
routing.getShardId(),
shardRoutings.getSearchContextId(),
shardRoutings.getReshardSplitShardCountSummary()
shardRoutings.getSplitShardCountSummary()
)
);
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
/**
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
*/
private final SplitShardCountSummary reshardSplitShardCountSummary;
private final SplitShardCountSummary splitShardCountSummary;

/**
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
Expand All @@ -57,7 +57,7 @@ public SearchShardIterator(
ShardId shardId,
List<ShardRouting> shards,
OriginalIndices originalIndices,
SplitShardCountSummary reshardSplitShardCountSummary
SplitShardCountSummary splitShardCountSummary
) {
this(
clusterAlias,
Expand All @@ -68,23 +68,23 @@ public SearchShardIterator(
null,
false,
false,
reshardSplitShardCountSummary
splitShardCountSummary
);
}

/**
* Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard
* with provided <code>shardId</code>.
*
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param targetNodeIds the list of nodes hosting shard copies
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
* @param searchContextId the point-in-time specified for this group if exists
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
* @param prefiltered if true, then this group already executed the can_match phase
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
* @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param targetNodeIds the list of nodes hosting shard copies
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
* @param searchContextId the point-in-time specified for this group if exists
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
* @param prefiltered if true, then this group already executed the can_match phase
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
* @param splitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#splitShardCountSummary}
*/
public SearchShardIterator(
@Nullable String clusterAlias,
Expand All @@ -95,7 +95,7 @@ public SearchShardIterator(
TimeValue searchContextKeepAlive,
boolean prefiltered,
boolean skip,
SplitShardCountSummary reshardSplitShardCountSummary
SplitShardCountSummary splitShardCountSummary
) {
this.shardId = shardId;
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
Expand All @@ -107,7 +107,7 @@ public SearchShardIterator(
this.prefiltered = prefiltered;
this.skip = skip;
assert skip == false || prefiltered : "only prefiltered shards are skip-able";
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
this.splitShardCountSummary = splitShardCountSummary;
}

/**
Expand Down Expand Up @@ -195,8 +195,8 @@ ShardId shardId() {
return shardId;
}

public SplitShardCountSummary getReshardSplitShardCountSummary() {
return reshardSplitShardCountSummary;
public SplitShardCountSummary getSplitShardCountSummary() {
return splitShardCountSummary;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,7 @@ private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardI
List<SearchShardsGroup> groups = new ArrayList<>(shardIts.size());
for (SearchShardIterator shardIt : shardIts) {
groups.add(
new SearchShardsGroup(
shardIt.shardId(),
shardIt.getTargetNodeIds(),
shardIt.skip(),
shardIt.getReshardSplitShardCountSummary()
)
new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip(), shardIt.getSplitShardCountSummary())
);
}
return groups;
Expand Down
33 changes: 28 additions & 5 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -998,15 +999,23 @@ public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searche
// Called before a {@link Searcher} is created, to allow subclasses to perform any stats or logging operations.
protected void onSearcherCreation(String source, SearcherScope scope) {}

// Allows subclasses to wrap the DirectoryReader before it is used to create Searchers
protected DirectoryReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
// Allows subclasses to wrap the DirectoryReader before it is used to create external Searchers
protected DirectoryReader wrapExternalDirectoryReader(DirectoryReader reader, SplitShardCountSummary ignored) throws IOException {
return reader;
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
return acquireSearcherSupplier(wrapper, scope, SplitShardCountSummary.UNSET);
}

public SearcherSupplier acquireSearcherSupplier(
Function<Searcher, Searcher> wrapper,
SearcherScope scope,
SplitShardCountSummary splitShardCountSummary
) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -1017,15 +1026,20 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire);
final DirectoryReader maybeWrappedDirectoryReader;
if (scope == SearcherScope.EXTERNAL) {
maybeWrappedDirectoryReader = wrapExternalDirectoryReader(acquire, splitShardCountSummary);
} else {
maybeWrappedDirectoryReader = acquire;
}
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
onSearcherCreation(source, scope);
return new Searcher(
source,
wrappedDirectoryReader,
maybeWrappedDirectoryReader,
engineConfig.getSimilarity(),
engineConfig.getQueryCache(),
engineConfig.getQueryCachingPolicy(),
Expand Down Expand Up @@ -1070,9 +1084,18 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}

public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcher(source, scope, SplitShardCountSummary.UNSET, wrapper);
}

public Searcher acquireSearcher(
String source,
SearcherScope scope,
SplitShardCountSummary splitShardCountSummary,
Function<Searcher, Searcher> wrapper
) throws EngineException {
SearcherSupplier releasable = null;
try {
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, splitShardCountSummary);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
onSearcherCreation(source, scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.Lock;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
Expand Down Expand Up @@ -617,8 +618,12 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException {
}

@Override
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope);
public SearcherSupplier acquireSearcherSupplier(
Function<Searcher, Searcher> wrapper,
SearcherScope scope,
SplitShardCountSummary splitShardCountSummary
) throws EngineException {
final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope, splitShardCountSummary);
return new SearcherSupplier(wrapper) {
@Override
protected void doClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
Expand Down Expand Up @@ -1712,14 +1713,29 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
* The supplier is aware of shard splits and will filter documents that have been moved to other shards
* according to the provided {@link SplitShardCountSummary}.
* @param splitShardCountSummary a summary of the shard routing state seen when the search request was created
* @return a searcher supplier
*/
public Engine.SearcherSupplier acquireExternalSearcherSupplier(SplitShardCountSummary splitShardCountSummary) {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL, splitShardCountSummary);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
return acquireSearcherSupplier(scope, SplitShardCountSummary.UNSET);
}

public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope, SplitShardCountSummary splitShardCountSummary) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
return engine.acquireSearcherSupplier(this::wrapSearcher, scope, splitShardCountSummary);
}

public Engine.Searcher acquireSearcher(String source) {
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
// calculated from the ids of the underlying segments of an index commit
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
final Engine.SearcherSupplier searcherSupplier = shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary());
if (contextId.sameSearcherIdsAs(searcherSupplier.getSearcherId()) == false) {
searcherSupplier.close();
throw e;
Expand All @@ -1296,7 +1296,13 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
}
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());
return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis);
return createAndPutReaderContext(
request,
indexService,
shard,
shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary()),
keepAliveInMillis
);
}

final ReaderContext createAndPutReaderContext(
Expand Down Expand Up @@ -1448,7 +1454,7 @@ protected SearchContext createContext(
public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard indexShard = indexService.getShard(request.shardId().getId());
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
final Engine.SearcherSupplier reader = indexShard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary());
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet(), reader.getSearcherId());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
// Use ResultsType.QUERY so that the created search context can execute queries correctly.
Expand Down
Loading