Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support search slicing with point-in-time #74457

Merged
merged 12 commits into from
Jul 8, 2021
Merged
63 changes: 63 additions & 0 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,66 @@ The API returns the following response:

<1> If true, all search contexts associated with the point-in-time id are successfully closed
<2> The number of search contexts have been successfully closed

[discrete]
[[search-slicing]]
=== Search slicing

When paging through a large number of documents, it can be helpful to split the search into multiple slices
to consume them independently:

[source,console]
--------------------------------------------------
GET /_search
{
"slice": {
"id": 0, <1>
"max": 2 <2>
},
"query": {
"match": {
"message": "foo"
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
}

GET /_search
{
"slice": {
"id": 1,
"max": 2
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
},
"query": {
"match": {
"message": "foo"
}
}
}
--------------------------------------------------
// TEST[skip:both calls will throw errors]

<1> The id of the slice
<2> The maximum number of slices

The result from the first request returns documents belonging to the first slice (id: 0) and the
result from the second request returns documents in the second slice. Since the maximum number of
slices is set to 2 the union of the results of the two requests is equivalent to the results of a
point-in-time search without slicing. By default the splitting is done first on the shards, then
locally on each shard using the document's Lucene ID. The local splitting follows the formula
`slice(doc) = doc.lucene_id % max`.

For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices
0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard.

IMPORTANT: The same point-in-time ID should be used for all slices. If different PIT IDs are used,
then slices can overlap and miss documents. This is because the splitting criterion is based on
Lucene document IDs, which are not stable across changes to the index.

NOTE: By default the maximum number of slices allowed per search is limited to 1024.
You can update the `index.max_slices_per_scroll` index setting to bypass this limit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this setup, the limit is not very useful. We should remove it when scrolls are gone, yeah!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is also a little confusing now (it mentions scrolls). I wonder if we should update the check so it doesn't apply to point-in-time searches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing the limit for slicing with point-in-time.

Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" }}
Expand Down Expand Up @@ -129,8 +129,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}},
Expand Down Expand Up @@ -192,8 +192,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}}
Expand Down Expand Up @@ -226,7 +226,6 @@ DELETE /_pit
----
// TEST[catch:missing]


[discrete]
[[scroll-search-results]]
=== Scroll search results
Expand Down Expand Up @@ -437,8 +436,8 @@ DELETE /_search/scroll/DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMN
[[slice-scroll]]
==== Sliced scroll

For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which
can be consumed independently:
When paging through a large number of documents, it can be helpful to split the search into multiple slices
to consume them independently:

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -472,24 +471,27 @@ GET /my-index-000001/_search?scroll=1m
<1> The id of the slice
<2> The maximum number of slices

The result from the first request returned documents that belong to the first slice (id: 0) and the result from the
second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2
the union of the results of the two requests is equivalent to the results of a scroll query without slicing.
By default the splitting is done on the shards first and then locally on each shard using the _id field
with the following formula:
`slice(doc) = floorMod(hashCode(doc._id), max)`
For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
to the first shard and the slices 1 and 3 are assigned to the second shard.
The result from the first request returned documents that belong to the first slice (id: 0) and
the result from the second request returned documents that belong to the second slice. Since the
maximum number of slices is set to 2 the union of the results of the two requests is equivalent
to the results of a scroll query without slicing. By default the splitting is done first on the
shards, then locally on each shard using the `_id` field. The local splitting follows the formula
`slice(doc) = doc.lucene_id % max`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice(doc) = doc.lucene_id % max

I think we should use the old formula for sliced scroll (slice(doc) = floorMod(hashCode(doc._id), max))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was a bad copy-paste!


Each scroll is independent and can be processed in parallel like any scroll request.

NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals
to N bits per slice where N is the total number of documents in the shard.
After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of
sliced query you perform in parallel to avoid the memory explosion.
NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on
the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N
is the total number of documents in the shard. After few calls the filter should be cached and
subsequent calls should be faster but you should limit the number of sliced query you perform in
parallel to avoid the memory explosion.

The <<point-in-time-api,point-in-time>> API supports a more efficient partitioning strategy and
does not suffer from this problem. When possible, it's recommended to use a point-in-time search
with slicing instead of a scroll.

To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing
but the user must ensure that the field has the following properties:
Another way to avoid this high cost is to use the `doc_values` of another field to do the slicing.
The field must have the following properties:

* The field is numeric.

Expand Down Expand Up @@ -521,6 +523,3 @@ GET /my-index-000001/_search?scroll=1m
// TEST[setup:my_index_big]

For append only time-based indices, the `timestamp` field can be used safely.

NOTE: By default the maximum number of slices allowed per scroll is limited to 1024.
You can update the `index.max_slices_per_scroll` index setting to bypass this limit.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@

import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeAction;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.test.ESIntegTestCase;

Expand Down Expand Up @@ -159,6 +165,99 @@ public void testWithPreferenceAndRoutings() throws Exception {
}
}

private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
int numSliceResults = searchResponse.getHits().getHits().length;
String scrollId = searchResponse.getScrollId();
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = client().prepareSearchScroll("test")
.setScrollId(scrollId)
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.get();
scrollId = searchResponse.getScrollId();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
clearScroll(scrollId);
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet<>(keys).size(), equalTo(numDocs));
}

public void testPointInTime() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int max = randomIntBetween(2, numShards * 3);

// Test the default slicing strategy (null), as well as numeric doc values
for (String field : new String[]{null, "random_int", "static_int"}) {
// Open point-in-time reader
OpenPointInTimeRequest request = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueSeconds(10));
OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
String pointInTimeId = response.getPointInTimeId();

// Test _doc sort
assertSearchSlicesWithPointInTime(field, "_doc", pointInTimeId, max, numDocs);
// Test numeric sort
assertSearchSlicesWithPointInTime(field, "random_int", pointInTimeId, max, numDocs);

// Close point-in-time reader
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
}
}

private void assertSearchSlicesWithPointInTime(String sliceField, String sortField, String pointInTimeId, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
int numSliceResults = 0;

SearchRequestBuilder request = client().prepareSearch("test")
.slice(new SliceBuilder(sliceField, id, numSlice))
.setPointInTime(new PointInTimeBuilder(pointInTimeId))
.addSort(SortBuilders.fieldSort(sortField))
.setSize(randomIntBetween(10, 100));

SearchResponse searchResponse = request.get();
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;

while (true) {
int numHits = searchResponse.getHits().getHits().length;
if (numHits == 0) {
break;
}

totalResults += numHits;
numSliceResults += numHits;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}

Object[] sortValues = searchResponse.getHits().getHits()[numHits - 1].getSortValues();
searchResponse = request.searchAfter(sortValues).get();
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet<>(keys).size(), equalTo(numDocs));
}

public void testInvalidFields() throws Exception {
setupIndex(0, 1);
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
Expand Down Expand Up @@ -193,40 +292,7 @@ public void testInvalidQuery() throws Exception {
Throwable rootCause = findRootCause(exc);
assertThat(rootCause.getClass(), equalTo(SearchException.class));
assertThat(rootCause.getMessage(),
equalTo("`slice` cannot be used outside of a scroll context"));
}

private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
int numSliceResults = searchResponse.getHits().getHits().length;
String scrollId = searchResponse.getScrollId();
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = client().prepareSearchScroll("test")
.setScrollId(scrollId)
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.get();
scrollId = searchResponse.getScrollId();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
clearScroll(scrollId);
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet(keys).size(), equalTo(numDocs));
equalTo("[slice] can only be used with [scroll] or [point-in-time] requests"));
}

private Throwable findRootCause(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}

if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
if (source.pointInTimeBuilder() == null && context.scrollContext() == null) {
throw new SearchException(shardTarget, "[slice] can only be used with [scroll] or [point-in-time] requests");
}
context.sliceBuilder(source.slice());
}
Expand Down
Loading