diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 diff --git a/muted-tests.yml b/muted-tests.yml index 496b0727c3e0a..2ff0bac53fc11 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -361,9 +361,6 @@ tests: - class: org.elasticsearch.repositories.blobstore.testkit.analyze.GCSRepositoryAnalysisRestIT method: testRepositoryAnalysis issue: https://github.com/elastic/elasticsearch/issues/125668 - - class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT - method: testSearchWithRandomDisconnects - issue: https://github.com/elastic/elasticsearch/issues/122707 - class: org.elasticsearch.docker.test.DockerYmlTestSuiteIT method: test {p0=/11_nodes/Test cat nodes output} issue: https://github.com/elastic/elasticsearch/issues/125906 @@ -409,4 +406,3 @@ tests: - class: org.elasticsearch.xpack.inference.InferenceRestIT method: test {p0=inference/70_text_similarity_rank_retriever/Text similarity reranker with min_score zero includes all docs} issue: https://github.com/elastic/elasticsearch/issues/137732 - diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0437db6b9eb7c..59839150585a5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -80,6 +80,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } NodeQueryResponse( - QueryPhaseResultConsumer.MergeResult mergeResult, Object[] results, + Exception reductionFailure, + QueryPhaseResultConsumer.MergeResult mergeResult, SearchPhaseController.TopDocsStats topDocsStats ) { this.results = results; @@ -238,6 +250,7 @@ public static final class NodeQueryResponse extends TransportResponse { r.incRef(); } } + this.reductionFailure = reductionFailure; this.mergeResult = mergeResult; this.topDocsStats = topDocsStats; assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results); @@ -248,6 +261,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeArray((o, v) -> { @@ -260,8 +277,19 @@ public void writeTo(StreamOutput out) throws IOException { ((QuerySearchResult) v).writeTo(o); } }, results); - mergeResult.writeTo(out); - topDocsStats.writeTo(out); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + mergeResult.writeTo(out); + topDocsStats.writeTo(out); + } + } else { + mergeResult.writeTo(out); + topDocsStats.writeTo(out); + } } @Override @@ -495,7 +523,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -515,6 +548,21 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + onNodeQueryFailure(e, request, routing); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { @@ -786,11 +834,93 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } + var channelListener = new ChannelActionListener<>(channel); + NodeQueryResponse nodeQueryResponse; + try (queryPhaseResultConsumer) { + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + nodeQueryResponse = getSuccessfulResponse(); + } else { + nodeQueryResponse = getReductionFailureResponse(reductionFailure); + } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + ActionListener.respondAndRelease(channelListener, nodeQueryResponse); + } + + private NodeQueryResponse getSuccessfulResponse() throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + return getReductionFailureResponse(e); + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; + for (int i = 0; i < results.length; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + results[i] = failures.get(i); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + results[i] = result; + } + assert results[i] != null; + } + return new NodeQueryResponse(results, null, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + private NodeQueryResponse getReductionFailureResponse(Exception reductionFailure) throws IOException { + try { + final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; + for (int i = 0; i < results.length; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + results[i] = failures.get(i); + } else { + results[i] = result; + } + assert results[i] != null; + } + return new NodeQueryResponse(results, reductionFailure, null, null); + } finally { + releaseAllResultsContexts(); + } + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { var channelListener = new ChannelActionListener<>(channel); try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -800,7 +930,8 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -839,16 +970,30 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi ActionListener.respondAndRelease( channelListener, - new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats) + new NodeQueryResponse(results, null, mergeResult, queryPhaseResultConsumer.topDocsStats) ); } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, + private void maybeFreeContext( + SearchPhaseResult result, + BitSet relevantShardIndices, NamedWriteableRegistry namedWriteableRegistry ) { + if (result instanceof QuerySearchResult q + && q.getContextId() != null + && relevantShardIndices.get(q.getShardIndex()) == false + && q.hasSuggestHits() == false + && q.getRankShardResult() == null + && searchRequest.searchRequest.scroll() == null + && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) { + if (dependencies.searchService.freeReaderContext(q.getContextId())) { + q.clearContextId(); + } + } + } + + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -858,7 +1003,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f69ba14b2d685..5247e3bff2491 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -286,7 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); - private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); + public static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 698006f324dc1..e820ad9030f88 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -491,7 +491,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..1e5fcced5a988 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9213000,9185007,9112012,8841074 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index a2bfbb0094989..febec42efcb5b 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_8.19.8,8841073 +batched_response_might_include_reduction_failure,8841074