Skip to content

Commit

Permalink
Backport of async search changes (#53976)
Browse files Browse the repository at this point in the history
* Get Async Search: omit _clusters section when empty (#53907)

The _clusters section is omitted by the search API whenever no remote clusters are searched. Async search should do the same, but Get Async Search returns a deserialized response, hence a weird `_clusters` section with all values set to `0` gets returned instead. In fact the recreated Clusters object is not the same object as the EMPTY constant, yet it has the same content.

This commit addresses this by changing the comparison in the `toXContent` method to not print out the section if the number of total clusters is `0`.

* Async search: remove version from response (#53960)

The goal of the version field was to quickly show when you can expect to find something new in the search response, compared to when nothing has changed. This can also be done by looking at the `_shards` section and `num_reduce_phases` returned with the search response. In fact when there has been one or more additional reduction of the results, you can expect new results in the search response. Otherwise, the `_shards` section could notify of additional failures of shards that have completed the query, but that is not a guarantee that their results will be exposed (only when the following partial reduction is performed their results will be available).

That said this commit clarifies this in the docs and removes the version field from the async search response

* Async Search: replicas to auto expand from 0 to 1 (#53964)

This way single node clusters that are green don't go yellow once async search is used, while
all the others still have one replica.

* [DOCS] address timing issue in async search docs tests (#53910)

The docs snippets for submit async search have proven difficult to test as it is not possible to guarantee that you get a response that is not final, even when providing `wait_for_completion=0`. In the docs we want to show though a proper long-running query, and its first response should be partial rather than final.

With this commit we adapt the docs snippets to show a partial response, and replace under the hood all that's needed to make the snippets tests succeed when we get a final response. Also, increased the timeout so we always get a final response.

Closes #53887
Closes #53891
  • Loading branch information
javanna committed Mar 23, 2020
1 parent 965af3a commit 932a7e3
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
public class AsyncSearchResponse implements ToXContentObject {
@Nullable
private final String id;
private final int version;
@Nullable
private final SearchResponse searchResponse;
@Nullable
Expand All @@ -55,15 +54,13 @@ public class AsyncSearchResponse implements ToXContentObject {
/**
* Creates an {@link AsyncSearchResponse} with the arguments that are always present in the server response
*/
AsyncSearchResponse(int version,
boolean isPartial,
boolean isRunning,
long startTimeMillis,
long expirationTimeMillis,
@Nullable String id,
@Nullable SearchResponse searchResponse,
@Nullable ElasticsearchException error) {
this.version = version;
AsyncSearchResponse(boolean isPartial,
boolean isRunning,
long startTimeMillis,
long expirationTimeMillis,
@Nullable String id,
@Nullable SearchResponse searchResponse,
@Nullable ElasticsearchException error) {
this.isPartial = isPartial;
this.isRunning = isRunning;
this.startTimeMillis = startTimeMillis;
Expand All @@ -81,13 +78,6 @@ public String getId() {
return id;
}

/**
* Returns the version of this response.
*/
public int getVersion() {
return version;
}

/**
* Returns the current {@link SearchResponse} or <code>null</code> if not available.
*
Expand Down Expand Up @@ -145,7 +135,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (id != null) {
builder.field("id", id);
}
builder.field("version", version);
builder.field("is_partial", isPartial);
builder.field("is_running", isRunning);
builder.field("start_time_in_millis", startTimeMillis);
Expand All @@ -165,7 +154,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

public static final ParseField ID_FIELD = new ParseField("id");
public static final ParseField VERSION_FIELD = new ParseField("version");
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");
public static final ParseField IS_RUNNING_FIELD = new ParseField("is_running");
public static final ParseField START_TIME_FIELD = new ParseField("start_time_in_millis");
Expand All @@ -176,16 +164,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static final ConstructingObjectParser<AsyncSearchResponse, Void> PARSER = new ConstructingObjectParser<>(
"submit_async_search_response", true,
args -> new AsyncSearchResponse(
(int) args[0],
(boolean) args[0],
(boolean) args[1],
(boolean) args[2],
(long) args[2],
(long) args[3],
(long) args[4],
(String) args[5],
(SearchResponse) args[6],
(ElasticsearchException) args[7]));
(String) args[4],
(SearchResponse) args[5],
(ElasticsearchException) args[6]));
static {
PARSER.declareInt(constructorArg(), VERSION_FIELD);
PARSER.declareBoolean(constructorArg(), IS_PARTIAL_FIELD);
PARSER.declareBoolean(constructorArg(), IS_RUNNING_FIELD);
PARSER.declareLong(constructorArg(), START_TIME_FIELD);
Expand All @@ -203,7 +189,7 @@ private static SearchResponse parseSearchResponse(XContentParser p) throws IOExc
return SearchResponse.innerFromXContent(p);
}

public static AsyncSearchResponse fromXContent(XContentParser parser) throws IOException {
public static AsyncSearchResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public void testSubmitAsyncSearchRequest() throws IOException {
// 15 sec should be enough to make sure we always complete right away
request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS));
AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT);
assertTrue(response.getVersion() >= 0);
assertFalse(response.isPartial());
assertTrue(response.getStartTime() > 0);
assertTrue(response.getExpirationTime() > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class AsyncSearchResponseTests

@Override
protected org.elasticsearch.xpack.core.search.action.AsyncSearchResponse createServerTestInstance(XContentType xContentType) {
int version = randomIntBetween(0, Integer.MAX_VALUE);
boolean isPartial = randomBoolean();
boolean isRunning = randomBoolean();
long startTimeMillis = randomLongBetween(0, Long.MAX_VALUE);
Expand All @@ -48,7 +47,7 @@ protected org.elasticsearch.xpack.core.search.action.AsyncSearchResponse createS
: new SearchResponse(InternalSearchResponse.empty(), randomAlphaOfLength(10), 1, 1, 0, randomIntBetween(0, 10000),
ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY);
org.elasticsearch.xpack.core.search.action.AsyncSearchResponse testResponse =
new org.elasticsearch.xpack.core.search.action.AsyncSearchResponse(id, version, searchResponse, error, isPartial, isRunning,
new org.elasticsearch.xpack.core.search.action.AsyncSearchResponse(id, searchResponse, error, isPartial, isRunning,
startTimeMillis, expirationTimeMillis);
return testResponse;
}
Expand All @@ -62,7 +61,6 @@ protected AsyncSearchResponse doParseToClientInstance(XContentParser parser) thr
protected void assertInstances(org.elasticsearch.xpack.core.search.action.AsyncSearchResponse expected, AsyncSearchResponse parsed) {
assertNotSame(parsed, expected);
assertEquals(expected.getId(), parsed.getId());
assertEquals(expected.getVersion(), parsed.getVersion());
assertEquals(expected.isRunning(), parsed.isRunning());
assertEquals(expected.isPartial(), parsed.isPartial());
assertEquals(expected.getStartTime(), parsed.getStartTime());
Expand Down
45 changes: 24 additions & 21 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ POST /sales*/_async_search?size=0
}
}
--------------------------------------------------
// TEST[skip:"AwaitsFix https://github.com/elastic/elasticsearch/issues/53891"]
// TEST[setup:sales]
// TEST[s/size=0/size=0&wait_for_completion=0/]
// TEST[s/size=0/size=0&wait_for_completion=10s&clean_on_completion=false/]

The response contains an identifier of the search being executed.
You can use this ID to later retrieve the search's final results.
Expand All @@ -43,7 +42,6 @@ results are returned as part of the <<search-api-response-body,`response`>> obje
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=", <1>
"version" : 0,
"is_partial" : true, <2>
"is_running" : true, <3>
"start_time_in_millis" : 1583945890986,
Expand All @@ -70,12 +68,17 @@ results are returned as part of the <<search-api-response-body,`response`>> obje
}
--------------------------------------------------
// TESTRESPONSE[s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/$body.id/]
// TESTRESPONSE[s/"is_partial" : true/"is_partial": $body.is_partial/]
// TESTRESPONSE[s/"is_running" : true/"is_running": $body.is_running/]
// TESTRESPONSE[s/1583945890986/$body.start_time_in_millis/]
// TESTRESPONSE[s/1584377890986/$body.expiration_time_in_millis/]
// TESTRESPONSE[s/"took" : 1122/"took": $body.response.took/]
// TESTRESPONSE[s/"num_reduce_phases" : 0,//]
// TESTRESPONSE[s/"total" : 562/"total": $body.response._shards.total/]
// TESTRESPONSE[s/"successful" : 3/"successful": $body.response._shards.successful/]
// TESTRESPONSE[s/"value" : 157483/"value": $body.response.hits.total.value/]
// TESTRESPONSE[s/"relation" : "gte"/"relation": $body.response.hits.total.relation/]
// TESTRESPONSE[s/"hits" : \[ \]\n\s\s\s\s\}/"hits" : \[\]},"aggregations": $body.response.aggregations/]

<1> Identifier of the async search that can be used to monitor its progress, retrieve its results, and/or delete it.
<2> Whether the returned search results are partial or final
Expand Down Expand Up @@ -135,17 +138,16 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"version" : 2, <1>
"is_partial" : true, <2>
"is_running" : true, <3>
"is_partial" : true, <1>
"is_running" : true, <2>
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986, <4>
"expiration_time_in_millis" : 1584377890986, <3>
"response" : {
"took" : 12144,
"timed_out" : false,
"num_reduce_phases" : 38,
"num_reduce_phases" : 46, <4>
"_shards" : {
"total" : 562,
"total" : 562, <5>
"successful" : 188,
"skipped" : 0,
"failed" : 0
Expand All @@ -158,7 +160,7 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
"max_score" : null,
"hits" : [ ]
},
"aggregations" : { <5>
"aggregations" : { <6>
"sale_date" : {
"buckets" : []
}
Expand All @@ -176,17 +178,18 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
// TESTRESPONSE[s/"successful" : 188/"successful": $body.response._shards.successful/]
// TESTRESPONSE[s/"value" : 456433/"value": $body.response.hits.total.value/]
// TESTRESPONSE[s/"buckets" : \[\]/"buckets": $body.response.aggregations.sale_date.buckets/]
// TESTRESPONSE[s/"num_reduce_phases" : 38,//]

<1> The returned `version` is useful to identify whether the response contains
additional results compared to previously obtained responses. If the version
stays the same, no new results have become available, otherwise a higher version
number indicates that more shards have completed their execution of the query
and their partial results are also included in the response.
<2> Whether the returned search results are partial or final
<3> Whether the search is still being executed or it has completed
<4> When the async search will expire
<5> Partial aggregations results, coming from the shards that have already
// TESTRESPONSE[s/"num_reduce_phases" : 46,//]

<1> Whether the returned search results are partial or final
<2> Whether the search is still being executed or it has completed
<3> When the async search will expire
<4> Indicates how many reduction of the results have been performed. If this
number increases compared to the last retrieved results, you can expect
additional results included in the search response
<5> Indicates how many shards have executed the query. Note that in order for
shard results to be included in the search response, they need to be reduced
first.
<6> Partial aggregations results, coming from the shards that have already
completed the execution of the query.

The `wait_for_completion` parameter, which defaults to `1`, can also be provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,7 @@ public Clusters(int total, int successful, int skipped) {
}

private Clusters(StreamInput in) throws IOException {
this.total = in.readVInt();
this.successful = in.readVInt();
this.skipped = in.readVInt();
this(in.readVInt(), in.readVInt(), in.readVInt());
}

@Override
Expand All @@ -434,7 +432,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (this != EMPTY) {
if (total > 0) {
builder.startObject(_CLUSTERS_FIELD.getPreferredName());
builder.field(TOTAL_FIELD.getPreferredName(), total);
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -292,4 +293,13 @@ public void testSerialization() throws IOException {
assertEquals(searchResponse.getSkippedShards(), deserialized.getSkippedShards());
assertEquals(searchResponse.getClusters(), deserialized.getClusters());
}

public void testToXContentEmptyClusters() throws IOException {
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, 1,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
SearchResponse deserialized = copyWriteable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
deserialized.getClusters().toXContent(builder, ToXContent.EMPTY_PARAMS);
assertEquals(0, Strings.toString(builder).length());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ class AsyncSearchIndexService {
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
public static final String RESULT_FIELD = "result";

public static Settings settings() {
private static Settings settings() {
return Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build();
}

public static XContentBuilder mappings() throws IOException {
private static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class MutableSearchResponse {
private final AtomicArray<ShardSearchFailure> shardFailures;
private final Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier;

private int version;
private boolean isPartial;
private boolean isFinalReduce;
private int successfulShards;
Expand All @@ -63,7 +62,6 @@ class MutableSearchResponse {
this.skippedShards = skippedShards;
this.clusters = clusters;
this.aggReduceContextSupplier = aggReduceContextSupplier;
this.version = 0;
this.shardFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
this.isPartial = true;
this.sections = totalShards == -1 ? null : new InternalSearchResponse(
Expand All @@ -83,7 +81,6 @@ synchronized void updatePartialResponse(int successfulShards, SearchResponseSect
throw new IllegalStateException("received partial response out of order: "
+ newSections.getNumReducePhases() + " < " + sections.getNumReducePhases());
}
++ version;
this.successfulShards = successfulShards;
this.sections = newSections;
this.isPartial = true;
Expand All @@ -96,7 +93,6 @@ synchronized void updatePartialResponse(int successfulShards, SearchResponseSect
*/
synchronized void updateFinalResponse(int successfulShards, SearchResponseSections newSections) {
failIfFrozen();
++ version;
this.successfulShards = successfulShards;
this.sections = newSections;
this.isPartial = false;
Expand All @@ -110,7 +106,6 @@ synchronized void updateFinalResponse(int successfulShards, SearchResponseSectio
*/
synchronized void updateWithFailure(Exception exc) {
failIfFrozen();
++ version;
this.isPartial = true;
this.failure = ElasticsearchException.guessRootCauses(exc)[0];
this.frozen = true;
Expand Down Expand Up @@ -147,7 +142,7 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, lon
} else {
resp = null;
}
return new AsyncSearchResponse(task.getSearchId().getEncoded(), version, resp, failure, isPartial,
return new AsyncSearchResponse(task.getSearchId().getEncoded(), resp, failure, isPartial,
frozen == false, task.getStartTime(), expirationTime);
}

Expand All @@ -159,7 +154,7 @@ private void failIfFrozen() {

private ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) {
return new ShardSearchFailure[0];
return ShardSearchFailure.EMPTY_ARRAY;
}
List<ShardSearchFailure> failures = new ArrayList<>();
for (int i = 0; i < shardFailures.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -17,6 +21,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.assertEqualResponses;
import static org.elasticsearch.xpack.search.AsyncSearchResponseTests.randomAsyncSearchResponse;
Expand Down Expand Up @@ -100,4 +105,15 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
assertFalse(indexService.ensureAuthenticatedUserIsSame(original, runAsDiffType));
assertFalse(indexService.ensureAuthenticatedUserIsSame(threadContext.getHeaders(), runAsDiffType));
}

public void testSettings() throws ExecutionException, InterruptedException {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(AsyncSearchIndexService.INDEX)).actionGet();
Settings settings = getIndexResponse.getSettings().get(AsyncSearchIndexService.INDEX);
assertEquals("1", settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS));
assertEquals("0-1", settings.get(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS));
}
}
Loading

0 comments on commit 932a7e3

Please sign in to comment.