Skip to content

Latest commit

 

History

History
237 lines (167 loc) · 7.03 KB

async.rst

File metadata and controls

237 lines (167 loc) · 7.03 KB

Using Asyncio with Elasticsearch

The elasticsearch package supports async/await with Asyncio and Aiohttp. You can either install aiohttp directly or use the [async] extra:

$ python -m pip install elasticsearch aiohttp

# - OR -

$ python -m pip install elasticsearch[async]

Getting Started with Async

After installation all async API endpoints are available via ~elasticsearch.AsyncElasticsearch and are used in the same way as other APIs, just with an extra await:

import asyncio
from elasticsearch import AsyncElasticsearch

client = AsyncElasticsearch()

async def main():
    resp = await client.search(
        index="documents",
        body={"query": {"match_all": {}}},
        size=20,
    )
    print(resp)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

All APIs that are available under the sync client are also available under the async client.

ASGI Applications and Elastic APM

ASGI (Asynchronous Server Gateway Interface) is a new way to serve Python web applications making use of async I/O to achieve better performance. Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette. If you're using one of these frameworks along with Elasticsearch then you should be using :py~elasticsearch.AsyncElasticsearch to avoid blocking the event loop with synchronous network calls for optimal performance.

Elastic APM also supports tracing of async Elasticsearch queries just the same as synchronous queries. For an example on how to configure AsyncElasticsearch with a popular ASGI framework FastAPI and APM tracing there is a pre-built example in the examples/fastapi-apm directory.

Frequently Asked Questions

ValueError when initializing AsyncElasticsearch?

If when trying to use AsyncElasticsearch you receive ValueError: You must have 'aiohttp' installed to use AiohttpHttpNode you should ensure that you have aiohttp installed in your environment (check with $ python -m pip freeze | grep aiohttp). Otherwise, async support won't be available.

What about the elasticsearch-async package?

Previously asyncio was supported separately via the elasticsearch-async package. The elasticsearch-async package has been deprecated in favor of AsyncElasticsearch provided by the elasticsearch package in v7.8 and onwards.

Receiving 'Unclosed client session / connector' warning?

This warning is created by aiohttp when an open HTTP connection is garbage collected. You'll typically run into this when closing your application. To resolve the issue ensure that ~elasticsearch.AsyncElasticsearch.close is called before the :py~elasticsearch.AsyncElasticsearch instance is garbage collected.

For example if using FastAPI that might look like this:

import os
from contextlib import asynccontextmanager

from fastapi import FastAPI
from elasticsearch import AsyncElasticsearch

ELASTICSEARCH_URL = os.environ["ELASTICSEARCH_URL"]
client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global client
    client = AsyncElasticsearch(ELASTICSEARCH_URL)
    yield
    await client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def main():
    return await client.info()

You can run this example by saving it to main.py and executing ELASTICSEARCH_URL=http://localhost:9200 uvicorn main:app.

Async Helpers

Async variants of all helpers are available in elasticsearch.helpers and are all prefixed with async_*. You'll notice that these APIs are identical to the ones in the sync helpers documentation.

All async helpers that accept an iterator or generator also accept async iterators and async generators.

Bulk and Streaming Bulk

async_bulk

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

client = AsyncElasticsearch()

async def gendata():
    mywords = ['foo', 'bar', 'baz']
    for word in mywords:
        yield {
            "_index": "mywords",
            "doc": {"word": word},
        }

async def main():
    await async_bulk(client, gendata())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

async_streaming_bulk

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk

client = AsyncElasticsearch()

async def gendata():
    mywords = ['foo', 'bar', 'baz']
    for word in mywords:
        yield {
            "_index": "mywords",
            "word": word,
        }

async def main():
    async for ok, result in async_streaming_bulk(client, gendata()):
        action, result = result.popitem()
        if not ok:
            print("failed to %s document %s" % ())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Scan

async_scan

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan

client = AsyncElasticsearch()

async def main():
    async for doc in async_scan(
        client=client,
        query={"query": {"match": {"title": "python"}}},
        index="orders-*"
    ):
        print(doc)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Reindex

async_reindex

API Reference

The API of ~elasticsearch.AsyncElasticsearch is nearly identical to the API of ~elasticsearch.Elasticsearch with the exception that every API call like :py~elasticsearch.AsyncElasticsearch.search is an async function and requires an await to properly return the response body.

AsyncElasticsearch

Note

To reference Elasticsearch APIs that are namespaced like .indices.create() refer to the sync API reference. These APIs are identical between sync and async.

AsyncElasticsearch