diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 87a54deea1623..91d37a8676ef4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -825,7 +825,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), - shardIt.getReshardSplitShardCountSummary() + shardIt.getSplitShardCountSummary() ); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 630637a247f01..621541d02b026 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -449,7 +449,7 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex), - shardIt.getReshardSplitShardCountSummary() + shardIt.getSplitShardCountSummary() ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index af3aa18373c1e..7dbca8ceb679b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -498,7 +498,7 @@ protected void doRun(Map shardIndexMap) { shardIndex, routing.getShardId(), shardRoutings.getSearchContextId(), - shardRoutings.getReshardSplitShardCountSummary() + shardRoutings.getSplitShardCountSummary() ) ); var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 9eefd5b87c3db..446e7c11cbd27 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -42,7 +42,7 @@ public final class SearchShardIterator implements ComparableshardId. @@ -57,7 +57,7 @@ public SearchShardIterator( ShardId shardId, List shards, OriginalIndices originalIndices, - SplitShardCountSummary reshardSplitShardCountSummary + SplitShardCountSummary splitShardCountSummary ) { this( clusterAlias, @@ -68,7 +68,7 @@ public SearchShardIterator( null, false, false, - reshardSplitShardCountSummary + splitShardCountSummary ); } @@ -76,15 +76,15 @@ public SearchShardIterator( * Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard * with provided shardId. * - * @param clusterAlias the alias of the cluster where the shard is located - * @param shardId shard id of the group - * @param targetNodeIds the list of nodes hosting shard copies - * @param originalIndices the indices that the search request originally related to (before any rewriting happened) - * @param searchContextId the point-in-time specified for this group if exists - * @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time - * @param prefiltered if true, then this group already executed the can_match phase - * @param skip if true, then this group won't have matches, and it can be safely skipped from the search - * @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary} + * @param clusterAlias the alias of the cluster where the shard is located + * @param shardId shard id of the group + * @param targetNodeIds the list of nodes hosting shard copies + * @param originalIndices the indices that the search request originally related to (before any rewriting happened) + * @param searchContextId the point-in-time specified for this group if exists + * @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time + * @param prefiltered if true, then this group already executed the can_match phase + * @param skip if true, then this group won't have matches, and it can be safely skipped from the search + * @param splitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#splitShardCountSummary} */ public SearchShardIterator( @Nullable String clusterAlias, @@ -95,7 +95,7 @@ public SearchShardIterator( TimeValue searchContextKeepAlive, boolean prefiltered, boolean skip, - SplitShardCountSummary reshardSplitShardCountSummary + SplitShardCountSummary splitShardCountSummary ) { this.shardId = shardId; this.targetNodesIterator = new PlainIterator<>(targetNodeIds); @@ -107,7 +107,7 @@ public SearchShardIterator( this.prefiltered = prefiltered; this.skip = skip; assert skip == false || prefiltered : "only prefiltered shards are skip-able"; - this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; + this.splitShardCountSummary = splitShardCountSummary; } /** @@ -195,8 +195,8 @@ ShardId shardId() { return shardId; } - public SplitShardCountSummary getReshardSplitShardCountSummary() { - return reshardSplitShardCountSummary; + public SplitShardCountSummary getSplitShardCountSummary() { + return splitShardCountSummary; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java index 58192de86280b..1a98d1d91c5ca 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java @@ -211,12 +211,7 @@ private static List toGroups(List shardI List groups = new ArrayList<>(shardIts.size()); for (SearchShardIterator shardIt : shardIts) { groups.add( - new SearchShardsGroup( - shardIt.shardId(), - shardIt.getTargetNodeIds(), - shardIt.skip(), - shardIt.getReshardSplitShardCountSummary() - ) + new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip(), shardIt.getSplitShardCountSummary()) ); } return groups; diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index eaa24ab6c0e89..8d28250ce1919 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -45,6 +45,7 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; @@ -998,8 +999,8 @@ public final SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + return acquireSearcherSupplier(wrapper, scope, SplitShardCountSummary.UNSET); + } + + public SearcherSupplier acquireSearcherSupplier( + Function wrapper, + SearcherScope scope, + SplitShardCountSummary splitShardCountSummary + ) throws EngineException { /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before * the searcher is acquired. */ @@ -1017,7 +1026,12 @@ public SearcherSupplier acquireSearcherSupplier(Function wra try { ReferenceManager referenceManager = getReferenceManager(scope); ElasticsearchDirectoryReader acquire = referenceManager.acquire(); - DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire); + final DirectoryReader maybeWrappedDirectoryReader; + if (scope == SearcherScope.EXTERNAL) { + maybeWrappedDirectoryReader = wrapExternalDirectoryReader(acquire, splitShardCountSummary); + } else { + maybeWrappedDirectoryReader = acquire; + } SearcherSupplier reader = new SearcherSupplier(wrapper) { @Override public Searcher acquireSearcherInternal(String source) { @@ -1025,7 +1039,7 @@ public Searcher acquireSearcherInternal(String source) { onSearcherCreation(source, scope); return new Searcher( source, - wrappedDirectoryReader, + maybeWrappedDirectoryReader, engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(), @@ -1070,9 +1084,18 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } public Searcher acquireSearcher(String source, SearcherScope scope, Function wrapper) throws EngineException { + return acquireSearcher(source, scope, SplitShardCountSummary.UNSET, wrapper); + } + + public Searcher acquireSearcher( + String source, + SearcherScope scope, + SplitShardCountSummary splitShardCountSummary, + Function wrapper + ) throws EngineException { SearcherSupplier releasable = null; try { - SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope); + SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, splitShardCountSummary); Searcher searcher = reader.acquireSearcher(source); releasable = null; onSearcherCreation(source, scope); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 6106d1bbef31a..4459abe0a7fc1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.Lock; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -617,8 +618,12 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException { } @Override - public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { - final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope); + public SearcherSupplier acquireSearcherSupplier( + Function wrapper, + SearcherScope scope, + SplitShardCountSummary splitShardCountSummary + ) throws EngineException { + final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope, splitShardCountSummary); return new SearcherSupplier(wrapper) { @Override protected void doClose() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8d2c8d6e750e2..ef67dfd99d313 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; @@ -1712,14 +1713,29 @@ public Engine.SearcherSupplier acquireSearcherSupplier() { return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL); } + /** + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. + * The supplier is aware of shard splits and will filter documents that have been moved to other shards + * according to the provided {@link SplitShardCountSummary}. + * @param splitShardCountSummary a summary of the shard routing state seen when the search request was created + * @return a searcher supplier + */ + public Engine.SearcherSupplier acquireExternalSearcherSupplier(SplitShardCountSummary splitShardCountSummary) { + return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL, splitShardCountSummary); + } + /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) { + return acquireSearcherSupplier(scope, SplitShardCountSummary.UNSET); + } + + public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope, SplitShardCountSummary splitShardCountSummary) { readAllowed(); markSearcherAccessed(); final Engine engine = getEngine(); - return engine.acquireSearcherSupplier(this::wrapSearcher, scope); + return engine.acquireSearcherSupplier(this::wrapSearcher, scope, splitShardCountSummary); } public Engine.Searcher acquireSearcher(String source) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index e3c78e13c9f8f..27a06c2f662de 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1272,7 +1272,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { // calculated from the ids of the underlying segments of an index commit final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); - final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(); + final Engine.SearcherSupplier searcherSupplier = shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary()); if (contextId.sameSearcherIdsAs(searcherSupplier.getSearcherId()) == false) { searcherSupplier.close(); throw e; @@ -1296,7 +1296,13 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { } final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); - return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis); + return createAndPutReaderContext( + request, + indexService, + shard, + shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary()), + keepAliveInMillis + ); } final ReaderContext createAndPutReaderContext( @@ -1448,7 +1454,7 @@ protected SearchContext createContext( public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard indexShard = indexService.getShard(request.shardId().getId()); - final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); + final Engine.SearcherSupplier reader = indexShard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary()); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet(), reader.getSearcherId()); try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) { // Use ResultsType.QUERY so that the created search context can execute queries correctly. diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 542d1cd32542b..d44f46e15f958 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -105,7 +105,7 @@ public class ShardSearchRequest extends AbstractTransportRequest implements Indi /** * Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}. */ - private final SplitShardCountSummary reshardSplitShardCountSummary; + private final SplitShardCountSummary splitShardCountSummary; public static final TransportVersion SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY = TransportVersion.fromName( "shard_search_request_reshard_shard_count_summary" @@ -151,7 +151,7 @@ public ShardSearchRequest( @Nullable String clusterAlias, ShardSearchContextId readerId, TimeValue keepAlive, - SplitShardCountSummary reshardSplitShardCountSummary + SplitShardCountSummary splitShardCountSummary ) { this( originalIndices, @@ -172,7 +172,7 @@ public ShardSearchRequest( computeWaitForCheckpoint(searchRequest.getWaitForCheckpoints(), shardId, shardRequestIndex), searchRequest.getWaitForCheckpointsTimeout(), searchRequest.isForceSyntheticSource(), - reshardSplitShardCountSummary + splitShardCountSummary ); // If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted // at this stage. Any NPEs in the above are therefore an error in request preparation logic. @@ -206,7 +206,7 @@ public ShardSearchRequest( long nowInMillis, AliasFilter aliasFilter, String clusterAlias, - SplitShardCountSummary reshardSplitShardCountSummary + SplitShardCountSummary splitShardCountSummary ) { this( OriginalIndices.NONE, @@ -227,7 +227,7 @@ public ShardSearchRequest( SequenceNumbers.UNASSIGNED_SEQ_NO, SearchService.NO_TIMEOUT, false, - reshardSplitShardCountSummary + splitShardCountSummary ); } @@ -251,7 +251,7 @@ public ShardSearchRequest( long waitForCheckpoint, TimeValue waitForCheckpointsTimeout, boolean forceSyntheticSource, - SplitShardCountSummary reshardSplitShardCountSummary + SplitShardCountSummary splitShardCountSummary ) { this.shardId = shardId; this.shardRequestIndex = shardRequestIndex; @@ -273,7 +273,7 @@ public ShardSearchRequest( this.waitForCheckpoint = waitForCheckpoint; this.waitForCheckpointsTimeout = waitForCheckpointsTimeout; this.forceSyntheticSource = forceSyntheticSource; - this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; + this.splitShardCountSummary = splitShardCountSummary; } @SuppressWarnings("this-escape") @@ -299,7 +299,7 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.waitForCheckpoint = clone.waitForCheckpoint; this.waitForCheckpointsTimeout = clone.waitForCheckpointsTimeout; this.forceSyntheticSource = clone.forceSyntheticSource; - this.reshardSplitShardCountSummary = clone.reshardSplitShardCountSummary; + this.splitShardCountSummary = clone.splitShardCountSummary; } public ShardSearchRequest(StreamInput in) throws IOException { @@ -367,9 +367,9 @@ public ShardSearchRequest(StreamInput in) throws IOException { forceSyntheticSource = false; } if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - reshardSplitShardCountSummary = new SplitShardCountSummary(in); + splitShardCountSummary = new SplitShardCountSummary(in); } else { - reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + splitShardCountSummary = SplitShardCountSummary.UNSET; } originalIndices = OriginalIndices.readOriginalIndices(in); @@ -433,7 +433,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce } } if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - reshardSplitShardCountSummary.writeTo(out); + splitShardCountSummary.writeTo(out); } } @@ -592,6 +592,10 @@ public String getClusterAlias() { return clusterAlias; } + public SplitShardCountSummary getSplitShardCountSummary() { + return splitShardCountSummary; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java index 9fe31aeb8c5d6..9374f2aadbdd4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java @@ -125,7 +125,7 @@ public void testEqualsAndHashcode() { s.getSearchContextKeepAlive(), s.prefiltered(), s.skip(), - s.getReshardSplitShardCountSummary() + s.getSplitShardCountSummary() ), s -> { if (randomBoolean()) { @@ -144,7 +144,7 @@ public void testEqualsAndHashcode() { s.getSearchContextKeepAlive(), s.prefiltered(), s.skip(), - s.getReshardSplitShardCountSummary() + s.getSplitShardCountSummary() ); } else { ShardId shardId = new ShardId( @@ -161,7 +161,7 @@ public void testEqualsAndHashcode() { s.getSearchContextKeepAlive(), s.prefiltered(), s.skip(), - s.getReshardSplitShardCountSummary() + s.getSplitShardCountSummary() ); } } @@ -233,7 +233,7 @@ public void testCompareToEqualItems() { shardIterator1.getSearchContextKeepAlive(), shardIterator1.prefiltered(), shardIterator1.skip(), - shardIterator1.getReshardSplitShardCountSummary() + shardIterator1.getSplitShardCountSummary() ); assertEquals(shardIterator1, shardIterator2); assertEquals(0, shardIterator1.compareTo(shardIterator2)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/frozen/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/frozen/FrozenEngine.java index ece9ca080cd1d..965ec067a7ee5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/frozen/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/frozen/FrozenEngine.java @@ -15,6 +15,7 @@ import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.Directory; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.IOUtils; @@ -211,7 +212,11 @@ private ElasticsearchDirectoryReader getReader() { } @Override - public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + public SearcherSupplier acquireSearcherSupplier( + Function wrapper, + SearcherScope scope, + SplitShardCountSummary splitShardCountSummary + ) throws EngineException { final Store store = this.store; store.incRef(); return new SearcherSupplier(wrapper) {