Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yield list of hits instead of individual hits #379

Merged
merged 1 commit into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 24 additions & 15 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,13 +1234,14 @@ def _es_results(
if sort_params:
body["sort"] = [sort_params]

es_results = list(
search_yield_hits(
es_results: List[Dict[str, Any]] = sum(
_search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
)
),
[],
)

_, df = query_compiler._es_results_to_pandas(
df = query_compiler._es_results_to_pandas(
results=es_results, show_progress=show_progress
)
df = self._apply_df_post_processing(df, post_processing)
Expand Down Expand Up @@ -1447,14 +1448,16 @@ def quantile_to_percentile(quantile: Union[int, float]) -> float:
return float(min(100, max(0, quantile * 100)))


def search_yield_hits(
def _search_yield_hits(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
"""
This is a generator used to initialize point in time API and query the
search API and return generator which yields an individual documents
search API and return generator which yields batches of hits as they
come in. No empty batches will be yielded, if there are no hits then
no batches will be yielded instead.

Parameters
----------
Expand All @@ -1469,8 +1472,8 @@ def search_yield_hits(
Examples
--------
>>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP
[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]
[[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]},
{'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]]
"""
# Make a copy of 'body' to avoid mutating it outside this function.
body = body.copy()
Expand Down Expand Up @@ -1500,7 +1503,7 @@ def _search_with_scroll(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:
# No documents, no reason to send a search.
if max_number_of_hits == 0:
return
Expand Down Expand Up @@ -1533,8 +1536,11 @@ def _search_with_scroll(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)

# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield

# Retrieve the next set of results
resp = client.scroll(
Expand All @@ -1555,7 +1561,7 @@ def _search_with_pit_and_search_after(
query_compiler: "QueryCompiler",
body: Dict[str, Any],
max_number_of_hits: Optional[int],
) -> Generator[Dict[str, Any], None, None]:
) -> Generator[List[Dict[str, Any]], None, None]:

# No documents, no reason to send a search.
if max_number_of_hits == 0:
Expand Down Expand Up @@ -1602,8 +1608,11 @@ def _search_with_pit_and_search_after(
hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded)

# Yield the hits we need to and then track the total number.
yield from hits[:hits_to_yield]
hits_yielded += hits_to_yield
# Never yield an empty list as that makes things simpler for
# downstream consumers.
if hits and hits_to_yield > 0:
yield hits[:hits_to_yield]
hits_yielded += hits_to_yield

# Set the 'search_after' for the next request
# to be the last sort value for this set of hits.
Expand Down
17 changes: 4 additions & 13 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ def es_dtypes(self) -> pd.Series:
def _es_results_to_pandas(
self,
results: List[Dict[str, Any]],
batch_size: Optional[int] = None,
show_progress: bool = False,
) -> "pd.Dataframe":
"""
Expand Down Expand Up @@ -239,10 +238,8 @@ def _es_results_to_pandas(
(which isn't great)
NOTE - using this lists is generally not a good way to use this API
"""
partial_result = False

if results is None:
return partial_result, self._empty_pd_ef()
if not results:
return self._empty_pd_ef()

# This is one of the most performance critical areas of eland, and it repeatedly calls
# self._mappings.field_name_pd_dtype and self._mappings.date_field_format
Expand All @@ -253,8 +250,7 @@ def _es_results_to_pandas(
index = []

i = 0
for hit in results:
i = i + 1
for i, hit in enumerate(results, 1):

if "_source" in hit:
row = hit["_source"]
Expand All @@ -277,11 +273,6 @@ def _es_results_to_pandas(
# flatten row to map correctly to 2D DataFrame
rows.append(self._flatten_dict(row, field_mapping_cache))

if batch_size is not None:
if i >= batch_size:
partial_result = True
break

if show_progress:
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
print(f"{datetime.now()}: read {i} rows")
Expand Down Expand Up @@ -310,7 +301,7 @@ def _es_results_to_pandas(
if show_progress:
print(f"{datetime.now()}: read {i} rows")

return partial_result, df
return df

def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
out = {}
Expand Down