Skip to content

Commit

Permalink
Support search slicing with point-in-time (#75162)
Browse files Browse the repository at this point in the history
This PR adds support for using the `slice` option in point-in-time searches. By
default, the slice query splits documents based on their Lucene ID. This
strategy is more efficient than the one used for scrolls, which is based on the
`_id` field and must iterate through the whole terms dictionary. When slicing a
search, the same point-in-time ID must be used across slices to guarantee the
partitions don't overlap or miss documents.

Closes #65740.
  • Loading branch information
jtibshirani committed Jul 9, 2021
1 parent ac0ae3a commit 52c0bda
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 140 deletions.
60 changes: 60 additions & 0 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,63 @@ The API returns the following response:

<1> If true, all search contexts associated with the point-in-time id are successfully closed
<2> The number of search contexts have been successfully closed

[discrete]
[[search-slicing]]
=== Search slicing

When paging through a large number of documents, it can be helpful to split the search into multiple slices
to consume them independently:

[source,console]
--------------------------------------------------
GET /_search
{
"slice": {
"id": 0, <1>
"max": 2 <2>
},
"query": {
"match": {
"message": "foo"
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
}
GET /_search
{
"slice": {
"id": 1,
"max": 2
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
},
"query": {
"match": {
"message": "foo"
}
}
}
--------------------------------------------------
// TEST[skip:both calls will throw errors]

<1> The id of the slice
<2> The maximum number of slices

The result from the first request returns documents belonging to the first slice (id: 0) and the
result from the second request returns documents in the second slice. Since the maximum number of
slices is set to 2 the union of the results of the two requests is equivalent to the results of a
point-in-time search without slicing. By default the splitting is done first on the shards, then
locally on each shard. The local splitting partitions the shard into contiguous ranges based on
Lucene document IDs.

For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices
0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard.

IMPORTANT: The same point-in-time ID should be used for all slices. If different PIT IDs are used,
then slices can overlap and miss documents. This is because the splitting criterion is based on
Lucene document IDs, which are not stable across changes to the index.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" }}
Expand Down Expand Up @@ -129,8 +129,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}},
Expand Down Expand Up @@ -192,8 +192,8 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [
{"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}}
Expand Down Expand Up @@ -226,7 +226,6 @@ DELETE /_pit
----
// TEST[catch:missing]


[discrete]
[[scroll-search-results]]
=== Scroll search results
Expand Down Expand Up @@ -437,8 +436,8 @@ DELETE /_search/scroll/DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMN
[[slice-scroll]]
==== Sliced scroll

For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which
can be consumed independently:
When paging through a large number of documents, it can be helpful to split the search into multiple slices
to consume them independently:

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -472,24 +471,27 @@ GET /my-index-000001/_search?scroll=1m
<1> The id of the slice
<2> The maximum number of slices

The result from the first request returned documents that belong to the first slice (id: 0) and the result from the
second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2
the union of the results of the two requests is equivalent to the results of a scroll query without slicing.
By default the splitting is done on the shards first and then locally on each shard using the _id field
with the following formula:
`slice(doc) = floorMod(hashCode(doc._id), max)`
For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
to the first shard and the slices 1 and 3 are assigned to the second shard.
The result from the first request returned documents that belong to the first slice (id: 0) and
the result from the second request returned documents that belong to the second slice. Since the
maximum number of slices is set to 2 the union of the results of the two requests is equivalent
to the results of a scroll query without slicing. By default the splitting is done first on the
shards, then locally on each shard using the `_id` field. The local splitting follows the formula
`slice(doc) = floorMod(hashCode(doc._id), max))`.

Each scroll is independent and can be processed in parallel like any scroll request.

NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals
to N bits per slice where N is the total number of documents in the shard.
After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of
sliced query you perform in parallel to avoid the memory explosion.
NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on
the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N
is the total number of documents in the shard. After few calls the filter should be cached and
subsequent calls should be faster but you should limit the number of sliced query you perform in
parallel to avoid the memory explosion.

The <<point-in-time-api,point-in-time>> API supports a more efficient partitioning strategy and
does not suffer from this problem. When possible, it's recommended to use a point-in-time search
with slicing instead of a scroll.

To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing
but the user must ensure that the field has the following properties:
Another way to avoid this high cost is to use the `doc_values` of another field to do the slicing.
The field must have the following properties:

* The field is numeric.

Expand Down Expand Up @@ -521,6 +523,3 @@ GET /my-index-000001/_search?scroll=1m
// TEST[setup:my_index_big]

For append only time-based indices, the `timestamp` field can be used safely.

NOTE: By default the maximum number of slices allowed per scroll is limited to 1024.
You can update the `index.max_slices_per_scroll` index setting to bypass this limit.
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,28 @@ setup:
- do:
clear_scroll:
scroll_id: $scroll_id

---
"Sliced scroll with doc values":

- do:
search:
rest_total_hits_as_int: true
index: test_sliced_scroll
sort: foo
scroll: 1m
body:
slice:
field: foo
id: 0
max: 2
query:
match_all: {}

- set: {_scroll_id: scroll_id}
- match: { hits.total: 3 }
- length: { hits.hits: 3 }

- do:
clear_scroll:
scroll_id: $scroll_id
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,39 @@ setup:
body:
id: "$point_in_time_id"

---
"point-in-time with slicing":
- skip:
version: " - 7.14.99"
reason: "support for slicing was added in 7.15"
- do:
open_point_in_time:
index: test
keep_alive: 5m
- set: {id: point_in_time_id}

- do:
search:
body:
slice:
id: 0
max: 2
size: 1
query:
match:
foo: bar
sort: [{ age: desc }, { id: desc }]
pit:
id: "$point_in_time_id"

- match: {hits.total.value: 2 }
- length: {hits.hits: 1 }

- do:
close_point_in_time:
body:
id: "$point_in_time_id"

---
"wildcard":
- skip:
Expand Down

0 comments on commit 52c0bda

Please sign in to comment.