Skip to content

Commit

Permalink
Yield list of hits instead of individual hits
Browse files Browse the repository at this point in the history
  • Loading branch information
sethmlarson committed Aug 17, 2021
1 parent 011bf29 commit dae5432
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 28 deletions.
39 changes: 24 additions & 15 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,13 +1234,14 @@ def _es_results(
if sort_params:
body["sort"] = [sort_params]

es_results = list(
search_yield_hits(
es_results: List[Dict[str, Any]] = sum(
_search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
)
),
[],
)

_, df = query_compiler._es_results_to_pandas(
df = query_compiler._es_results_to_pandas(
results=es_results, show_progress=show_progress
)
df = self._apply_df_post_processing(df, post_processing)
Expand Down Expand Up @@ -1447,14 +1448,16 @@ def quantile_to_percentile(quantile: Union[int, float]) -> float:
return float(min(100, max(0, quantile * 100)))


def search_yield_hits(
def _search_yield_hits(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
"""
This is a generator used to initialize point in time API and query the
search API and return generator which yields an individual documents
search API and return generator which yields batches of hits as they
come in. No empty batches will be yielded, if there are no hits then
no batches will be yielded instead.
Parameters
----------
Expand All @@ -1469,8 +1472,8 @@ def search_yield_hits(
Examples
--------
>>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP
[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]
[[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]]
"""
# Make a copy of 'body' to avoid mutating it outside this function.
body = body.copy()
Expand Down Expand Up @@ -1500,7 +1503,7 @@ def _search_with_scroll(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
# No documents, no reason to send a search.
if max_number_of_hits == 0:
return
Expand Down Expand Up @@ -1533,8 +1536,11 @@ def _search_with_scroll(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)

# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield

# Retrieve the next set of results
resp = client.scroll(
Expand All @@ -1555,7 +1561,7 @@ def _search_with_pit_and_search_after(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:

# No documents, no reason to send a search.
if max_number_of_hits == 0:
Expand Down Expand Up @@ -1602,8 +1608,11 @@ def _search_with_pit_and_search_after(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)

# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield

# Set the 'search_after' for the next request
# to be the last sort value for this set of hits.
Expand Down
17 changes: 4 additions & 13 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ def es_dtypes(self) -> pd.Series:
def _es_results_to_pandas(
self,
results: List[Dict[str, Any]],
batch_size: Optional[int] = None,
show_progress: bool = False,
) -> "pd.Dataframe":
"""
Expand Down Expand Up @@ -239,10 +238,8 @@ def _es_results_to_pandas(
(which isn't great)
NOTE - using this lists is generally not a good way to use this API
"""
partial_result = False

if results is None:
return partial_result, self._empty_pd_ef()
if not results:
return self._empty_pd_ef()

# This is one of the most performance critical areas of eland, and it repeatedly calls
# self._mappings.field_name_pd_dtype and self._mappings.date_field_format
Expand All @@ -253,8 +250,7 @@ def _es_results_to_pandas(
index = []

i = 0
for hit in results:
i = i + 1
for i, hit in enumerate(results, 1):

if "_source" in hit:
row = hit["_source"]
Expand All @@ -277,11 +273,6 @@ def _es_results_to_pandas(
# flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row, field_mapping_cache))

if batch_size is not None:
if i >= batch_size:
partial_result = True
break

if show_progress:
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {i} rows")
Expand Down Expand Up @@ -310,7 +301,7 @@ def _es_results_to_pandas(
if show_progress:
print(f"{datetime.now()}: read {i} rows")

return partial_result, df
return df

def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
out = {}
Expand Down

0 comments on commit dae5432

Please sign in to comment.