Skip to content

Commit

Permalink
fix(#1659): normalize id sort config (#1660)
Browse files Browse the repository at this point in the history
* fix: normalize id sort config

* fix: indexes without id using the list_documents method
  • Loading branch information
frascuchon committed Aug 5, 2022
1 parent 36dbb4f commit ca8ea92
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/rubrix/server/daos/datasets.py
Expand Up @@ -29,6 +29,7 @@
from rubrix.server.errors import WrongTaskError

NO_WORKSPACE = ""
MAX_NUMBER_OF_LISTED_DATASETS = 2500


class DatasetsDAO:
Expand Down Expand Up @@ -112,6 +113,9 @@ def list_datasets(

docs = self._es.list_documents(
index=DATASETS_INDEX_NAME,
fetch_once=True,
# TODO(@frascuchon): include id as part of the document as keyword to enable sorting by id
size=MAX_NUMBER_OF_LISTED_DATASETS,
query={
"query": query_helpers.filters.boolean_filter(
should_filters=filters, minimum_should_match=len(filters)
Expand Down Expand Up @@ -231,6 +235,7 @@ def find_by_name(
results = self._es.list_documents(
index=DATASETS_INDEX_NAME,
query={"query": {"term": {"name.keyword": name}}},
fetch_once=True,
)
results = list(results)
if len(results) == 0:
Expand Down
14 changes: 11 additions & 3 deletions src/rubrix/server/daos/records.py
Expand Up @@ -289,22 +289,30 @@ def scan_dataset(
-------
An iterable over found dataset records
"""
index = dataset_records_index(dataset.id)
search = search or RecordSearch()

sort_cfg = self.__normalize_sort_config__(
index=index, sort=[{"id": {"order": "asc"}}]
)
es_query = {
"query": search.query or {"match_all": {}},
"highlight": self.__configure_query_highlight__(task=dataset.task),
"sort": [{"id": {"order": "asc"}}] # Sort the search so the consistency is maintained in every search
"sort": sort_cfg, # Sort the search so the consistency is maintained in every search
}

if id_from:
# Scroll method does not accept read_after, thus, this case is handled as a search
es_query["search_after"] = [id_from]
results = self._es.search(index=dataset_records_index(dataset.id), query=es_query, size=limit)
results = self._es.search(index=index, query=es_query, size=limit)
hits = results["hits"]
docs = hits["hits"]

else:
docs = self._es.list_documents(
dataset_records_index(dataset.id), query=es_query,
index,
query=es_query,
sort_cfg=sort_cfg,
)
for doc in docs:
yield self.__esdoc2record__(doc)
Expand Down
19 changes: 17 additions & 2 deletions src/rubrix/server/elasticseach/client_wrapper.py
Expand Up @@ -86,18 +86,29 @@ def client(self):
return self.__client__

def list_documents(
self, index: str, query: Dict[str, Any] = None, size: Optional[int] = None
self,
index: str,
query: Dict[str, Any] = None,
sort_cfg: Optional[List[Dict[str, Any]]] = None,
size: Optional[int] = None,
fetch_once: bool = False,
) -> Iterable[Dict[str, Any]]:
"""
List ALL documents of an elasticsearch index
Parameters
----------
index:
The index name
sor_id:
The sort id configuration
query:
The es query for filter results. Default: None
sort_cfg:
Customized configuration for sort-by id
size:
Amount of samples to retrieve per iteration, 1000 by default
fetch_once:
If enabled, will return only the `size` first records found. Default to: ``False``
Returns
-------
Expand All @@ -106,11 +117,15 @@ def list_documents(
"""
size = size or 1000
query = query.copy() or {}
query["sort"] = [{"_id": {"order": "asc"}}] # Force sorting by id
if sort_cfg:
query["sort"] = sort_cfg
query["track_total_hits"] = False # Speedup pagination
response = self.__client__.search(index=index, body=query, size=size)
while response["hits"]["hits"]:
for hit in response["hits"]["hits"]:
yield hit
if fetch_once:
break

last_id = hit["_id"]
query["search_after"] = [last_id]
Expand Down

0 comments on commit ca8ea92

Please sign in to comment.