Skip to content

Commit

Permalink
Update es read query to not use body (#34792)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Nov 9, 2023
1 parent 17d5e3c commit 0d7fe47
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 26 deletions.
16 changes: 7 additions & 9 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -345,26 +345,24 @@ def _es_read(self, log_id: str, offset: int | str) -> ElasticSearchResponse | No
:meta private:
"""
query: dict[Any, Any] = {
"query": {
"bool": {
"filter": [{"range": {self.offset_field: {"gt": int(offset)}}}],
"must": [{"match_phrase": {"log_id": log_id}}],
}
"bool": {
"filter": [{"range": {self.offset_field: {"gt": int(offset)}}}],
"must": [{"match_phrase": {"log_id": log_id}}],
}
}

try:
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore
max_log_line = self.client.count(index=self.index_patterns, query=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist", self.index_patterns)
raise e

if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
res = self.client.search( # type: ignore
res = self.client.search(
index=self.index_patterns,
body=query,
query=query,
sort=[self.offset_field],
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/provider.yaml
Expand Up @@ -57,7 +57,7 @@ versions:
dependencies:
- apache-airflow>=2.5.0
- apache-airflow-providers-common-sql>=1.3.1
- elasticsearch>8,<9
- elasticsearch>=8.10,<9

integrations:
- integration-name: Elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Expand Up @@ -371,7 +371,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.5.0",
"elasticsearch>8,<9"
"elasticsearch>=8.10,<9"
],
"cross-providers-deps": [
"common.sql"
Expand Down
24 changes: 12 additions & 12 deletions tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
Expand Up @@ -325,8 +325,8 @@ def get_source(self, index, doc_type, id, params=None):
"track_scores",
"version",
)
def count(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, body)
def count(self, index=None, doc_type=None, query=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
i = 0
for searchable_index in searchable_indexes:
Expand Down Expand Up @@ -372,10 +372,10 @@ def count(self, index=None, doc_type=None, body=None, params=None, headers=None)
"track_scores",
"version",
)
def search(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, body)
def search(self, index=None, doc_type=None, query=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, query=query)

matches = self._find_match(index, doc_type, body)
matches = self._find_match(index, doc_type, query=query)

result = {
"hits": {"total": len(matches), "max_score": 1.0},
Expand Down Expand Up @@ -442,11 +442,11 @@ def suggest(self, body, index=None):
]
return result_dict

def _find_match(self, index, doc_type, body):
searchable_indexes = self._normalize_index_to_list(index, body)
def _find_match(self, index, doc_type, query):
searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)

must = body["query"]["bool"]["must"][0] # only support one must
must = query["bool"]["must"][0] # only support one must

matches = []
for searchable_index in searchable_indexes:
Expand All @@ -472,7 +472,7 @@ def match_must_phrase(document, matches, must):
matches.append(document)

# Check index(es) exists.
def _validate_search_targets(self, targets, body):
def _validate_search_targets(self, targets, query):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
Expand All @@ -482,10 +482,10 @@ def _validate_search_targets(self, targets, body):
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", body=body)
raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", query=query)
return matches

def _normalize_index_to_list(self, index, body):
def _normalize_index_to_list(self, index, query):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
Expand All @@ -498,7 +498,7 @@ def _normalize_index_to_list(self, index, body):
raise ValueError("Invalid param 'index'")

generator = (target for index in searchable_indexes for target in index.split(","))
return list(self._validate_search_targets(generator, body))
return list(self._validate_search_targets(generator, query=query))

@staticmethod
def _normalize_doc_type_to_list(doc_type):
Expand Down
Expand Up @@ -213,12 +213,12 @@ def _escape(value):
class MissingIndexException(NotFoundError):
"""Exception representing a missing index."""

def __init__(self, msg, body):
def __init__(self, msg, query):
self.msg = msg
self.body = body
self.query = query

def __str__(self):
return f"IndexMissingException[[{self.msg}] missing] with body {self.body}"
return f"IndexMissingException[[{self.msg}] missing] with query {self.query}"


class SearchFailedException(NotFoundError):
Expand Down

0 comments on commit 0d7fe47

Please sign in to comment.