Skip to content

Commit

Permalink
Fallback on scroll for Elasticsearch <7.12
Browse files Browse the repository at this point in the history
  • Loading branch information
sethmlarson committed Aug 8, 2021
1 parent 30876c8 commit a55ab2a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 20 deletions.
1 change: 1 addition & 0 deletions .ci/test-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ELASTICSEARCH_VERSION:
- 8.0.0-SNAPSHOT
- 7.x-SNAPSHOT
- 7.14-SNAPSHOT
- 7.11-SNAPSHOT

PANDAS_VERSION:
- 1.2.0
Expand Down
124 changes: 104 additions & 20 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from numpy.typing import DTypeLike

from eland.arithmetics import ArithmeticSeries
from eland.common import es_version
from eland.field_mappings import Field
from eland.filter import BooleanFilter
from eland.query_compiler import QueryCompiler
Expand Down Expand Up @@ -1244,7 +1245,7 @@ def _es_results(
body["sort"] = [sort_params]

es_results = list(
search_after_with_pit(
search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
)
)
Expand Down Expand Up @@ -1509,14 +1510,14 @@ def show_progress(self) -> bool:
return self._show_progress


def search_after_with_pit(
def search_yield_hits(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
"""
This is a generator used to initialize point in time API and query the
search API and return generator which yields an individual document
search API and return generator which yields an individual documents
Parameters
----------
Expand All @@ -1530,39 +1531,122 @@ def search_after_with_pit(
Examples
--------
>>> results = list(search_after_with_pit(query_compiler, body, 2)) # doctest: +SKIP
>>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP
[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]
"""
# Make a copy of 'body' to avoid mutating it outside this function.
body = body.copy()

# Use the default search size
body.setdefault("size", DEFAULT_SEARCH_SIZE)

# Improves performance by not tracking # of hits. We only
# care about the hit itself for these queries.
body.setdefault("track_total_hits", False)

# Elasticsearch 7.12 added '_shard_doc' sort tiebreaker for PITs which
# means we're guaranteed to be safe on documents with a duplicate sort rank.
if es_version(query_compiler._client) >= (7, 12, 0):
yield from _search_with_pit_and_search_after(
query_compiler=query_compiler,
body=body,
max_number_of_hits=max_number_of_hits,
)

# Otherwise we use 'scroll' like we used to.
else:
yield from _search_with_scroll(
query_compiler=query_compiler,
body=body,
max_number_of_hits=max_number_of_hits,
)


def _search_with_scroll(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
# No documents, no reason to send a search.
if max_number_of_hits == 0:
return

client = query_compiler._client
hits_yielded = 0

# Make the initial search with 'scroll' set
resp = client.search(
index=query_compiler._index_pattern,
body=body,
scroll=DEFAULT_PIT_KEEP_ALIVE,
)
scroll_id: Optional[str] = resp.get("_scroll_id", None)

try:
while scroll_id and (
max_number_of_hits is None or hits_yielded < max_number_of_hits
):
hits: List[Dict[str, Any]] = resp["hits"]["hits"]

# If we didn't receive any hits it means we've reached the end.
if not hits:
break

# Calculate which hits should be yielded from this batch
if max_number_of_hits is None:
hits_to_yield = len(hits)
else:
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)

# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield

# Retrieve the next set of results
resp = client.scroll(
body={"scroll_id": scroll_id, "scroll": DEFAULT_PIT_KEEP_ALIVE},
)
scroll_id = resp.get("_scroll_id", None) # Update the scroll ID.

finally:
# Close the scroll if we have one open
if scroll_id is not None:
try:
client.clear_scroll(body={"scroll_id": [scroll_id]})
except NotFoundError:
pass


def _search_with_pit_and_search_after(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:

# No documents, no reason to send a search.
if max_number_of_hits == 0:
return

client = query_compiler._client
hits_yielded = 0 # Track the total number of hits yielded.
pit_id: Optional[str] = None

# Pagination with 'search_after' must have a 'sort' setting.
# Using '_doc:asc' is the most efficient as reads documents
# in the order that they're written on disk in Lucene.
body.setdefault("sort", [{"_doc": "asc"}])

try:
pit_id = query_compiler._client.open_point_in_time(
pit_id = client.open_point_in_time(
index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE
)["id"]

# Modify the search with the new point in time ID and keep-alive time.
body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE}

# Use the default search size
body.setdefault("size", DEFAULT_SEARCH_SIZE)

# Improves performance by not tracking # of hits. We only
# care about the hit itself for these queries.
body.setdefault("track_total_hits", False)

# Pagination with 'search_after' must have a 'sort' setting.
# Using '_doc:asc' is the most efficient as reads documents
# in the order that they're written on disk in Lucene.
body.setdefault("sort", [{"_doc": "asc"}])

while max_number_of_hits is None or hits_yielded < max_number_of_hits:
resp = query_compiler._client.search(body=body)
resp = client.search(body=body)
hits: List[Dict[str, Any]] = resp["hits"]["hits"]

# The point in time ID can change between searches so we
Expand Down Expand Up @@ -1593,7 +1677,7 @@ def search_after_with_pit(
# to keep our memory footprint low.
if pit_id is not None:
try:
query_compiler._client.close_point_in_time(body={"id": pit_id})
client.close_point_in_time(body={"id": pit_id})
except NotFoundError:
# If a point in time is already closed Elasticsearch throws NotFoundError
pass

0 comments on commit a55ab2a

Please sign in to comment.