Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how can I usge search_after in elasticsearch-dsl 7.1.0 #1329

Closed
linhuisheng opened this issue Mar 23, 2020 · 10 comments
Closed

how can I usge search_after in elasticsearch-dsl 7.1.0 #1329

linhuisheng opened this issue Mar 23, 2020 · 10 comments

Comments

@linhuisheng
Copy link

how can I usge search_after in elasticsearch-dsl 7.1.0,Can you give me an example!

@cj564932957
Copy link

s = Index('index_name').search().query().sort('_id').extra(
**{"search_after": ["QeliI24By1Zd8KjZthR9"],"size":10 })

or you can use from_dict

s = Search.from_dict({"query": {"match": {"title": "python"}}})

@hakancelikdev
Copy link

hakancelikdev commented Oct 22, 2021

The function I wrote below will work.

from __future__ import annotations

from typing import Iterator

from elasticsearch_dsl import Search


def search_after(s: Search, *, sort_values: list[str] | None = None, unique_field: str = "_id", size: int = 1000) -> Iterator:    
    def execute(s: Search) -> Iterator:
        es_response = s.execute()
        if hits := es_response["hits"]["hits"]:
            last_document = hits[-1]

            yield from es_response
            yield from execute(s.update_from_dict({"search_after": last_document["sort"]}))

    if sort_values is None:
        assert s.to_dict().get("sort") is not None, "sort_values not found in search, please sort_values parameter or sort your search."  # noqa: E501

    sort_values = [unique_field] if sort_values is None else sort_values + [unique_field]
    sort_values.extend(s.to_dict().get("sort", [])
    s = s.sort(*sort_values).update_from_dict({"size": size})

    yield from execute(s)

@karlkovaciny
Copy link

I didn't do the fancy iterator thing, but I did use Point in Time so the query would be unaffected by refreshes.

# Set up paginated query with search_after and a fixed point_in_time
elasticsearch_py = get_default_elastic_conn()
pit = elasticsearch_py.open_point_in_time(index=NEWS_INDEX, keep_alive="5m")
pit_id = pit["id"]

query_size = 500
search_after = [0]
hits: List[AttrDict[str, Any]] = []
while query_size:
    if hits:
        search_after = hits[-1].meta.sort

    search = (
        Search()
        .extra(size=query_size)
        .extra(pit={"id": pit_id, "keep_alive": "5m"})
        .extra(search_after=search_after)
        .filter(filter_)
        .sort("url.keyword")  # Note you need a unique field to sort on or it may never advance
    )
    response = search.execute()
    hits = [hit for hit in response]

    pit_id = response.pit_id
    query_size = len(hits)
    for hit in hits:
        # Do work with hits

@reese-allison
Copy link

reese-allison commented Sep 23, 2022

I would like to open a PR to implement both of these commented suggestions.
I plan to add this to the Search class.

def page(self):
    """
    Turn the search into a paged search utilizing Point in Time (PIT) and search_after.
    Returns a generator that will iterate over all the documents matching the query.
    """
    search = self._clone()

    # A sort is required to page search results. We use the optimized default if sort is None.
    # https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
    if not search._sort:
        search._sort = ['_shard_doc']

    keep_alive = search._params.pop("keep_alive", "30s")
    pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive)
    pit_id = pit['id']

    # The index is passed with Point in Time (PIT).
    search._index = None
    search._extra.update(pit={"id": pit['id'], "keep_alive": keep_alive})

    es = get_connection(search._using)

    response = es.search(body=search.to_dict(), **search._params)
    while hits := response["hits"]["hits"]:
        for hit in hits:
            yield self._get_result(hit)

        # If we have fewer hits than our batch size, we know there are no more results.
        if len(hits) < search._params.get('size', 0):
            break

        last_document = hits[-1]
        pit_id = response['pit_id']
        search._extra.update(
            pit={"id": pit_id, "keep_alive": keep_alive}, 
            search_after=last_document["sort"]
        )
        response = es.search(body=search.to_dict(), **search._params)

    # Try to close the PIT unless it is already closed.
    try:
        search._using.close_point_in_time(body={"id": pit_id})
    except NotFoundError:
        pass

@reese-allison
Copy link

I opened a PR
#1623

@lucasvc
Copy link

lucasvc commented Jan 20, 2023

Any of these methods would help to paginate over the 10000 limit?

@reese-allison
Copy link

Any of these methods would help to paginate over the 10000 limit?

I regularly use the code on PR #1623 to page millions of objects.

@lucasvc
Copy link

lucasvc commented Jan 20, 2023

I regularly use the code on PR #1623 to page millions of objects.

Then I am doing something wrong with your code, because an execute that returns 19k objects (and can't be read) with your code response is,

{
    "pit_id": <omitted>,
    "took": 168,
    "timed_out": false,
    "_shards": {
        "total": 3292,
        "successful": 2618,
        "skipped": 2618,
        "failed": 674,
        "failures": [
            {
                "shard": 0,
                "index": <omitted>,
                "node": <omitted>,
                "reason": {
                    "type": "illegal_argument_exception",
                    "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
                }
            },
....
},
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": 0.0,
        "hits": []
    }
}

all failures look the same and 0 hits.
Any query with less than 10k results works as expected.
Is this caused by my Elastic server?

@reese-allison
Copy link

I regularly use the code on PR #1623 to page millions of objects.

Then I am doing something wrong with your code, because an execute that returns 19k objects (and can't be read) with your code response is,

{
    "pit_id": <omitted>,
    "took": 168,
    "timed_out": false,
    "_shards": {
        "total": 3292,
        "successful": 2618,
        "skipped": 2618,
        "failed": 674,
        "failures": [
            {
                "shard": 0,
                "index": <omitted>,
                "node": <omitted>,
                "reason": {
                    "type": "illegal_argument_exception",
                    "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
                }
            },
....
},
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": 0.0,
        "hits": []
    }
}

all failures look the same and 0 hits. Any query with less than 10k results works as expected. Is this caused by my Elastic server?

Can you post your code? It looks like your size might be set too high for the shards to return results.

@lucasvc
Copy link

lucasvc commented Jan 24, 2023

Can you post your code? It looks like your size might be set too high for the shards to return results.

You are completelly right, before doing the paging I had the lines

    total = search.count()
    search = search[0:total]

where I was trying to fetch all without paging :)
Thanks for you help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants