From e983bd0db5022b1a1651e96d84a20a02ea6f2efc Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Wed, 15 Oct 2025 12:56:28 +0100 Subject: [PATCH 1/2] Add `flush_after_seconds` option to `streaming_bulk()` (#3064) * Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb7219c708870af1985991264d753f66496) --- elasticsearch/_async/helpers.py | 67 +++++++-- elasticsearch/compat.py | 44 +++++- elasticsearch/helpers/__init__.py | 11 +- elasticsearch/helpers/actions.py | 139 +++++++++++++----- .../test_async/test_server/test_helpers.py | 40 +++++ test_elasticsearch/test_helpers.py | 50 ++++++- .../test_server/test_helpers.py | 42 ++++++ 7 files changed, 343 insertions(+), 50 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index e4d5e6bc5..c9243af63 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -33,12 +33,16 @@ Union, ) +from ..compat import safe_task from ..exceptions import ApiError, NotFoundError, TransportError from ..helpers.actions import ( _TYPE_BULK_ACTION, _TYPE_BULK_ACTION_BODY, _TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_HEADER_AND_BODY, + _TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY, + _TYPE_BULK_ACTION_WITH_META, + BulkMeta, _ActionChunker, _process_bulk_chunk_error, _process_bulk_chunk_success, @@ -54,9 +58,10 @@ async def _chunk_actions( - actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY], + actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY], chunk_size: int, max_chunk_bytes: int, + flush_after_seconds: Optional[float], serializer: Serializer, ) -> AsyncIterable[ Tuple[ @@ -76,10 +81,42 @@ async def _chunk_actions( chunker = _ActionChunker( chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes, serializer=serializer ) - async for action, data in actions: - ret = chunker.feed(action, data) - if ret: - yield ret + + if not flush_after_seconds: + async for action, data in actions: + ret = chunker.feed(action, data) + if ret: + yield ret + else: + item_queue: asyncio.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = ( + asyncio.Queue() + ) + + async def get_items() -> None: + try: + async for item in actions: + await item_queue.put(item) + finally: + await item_queue.put((BulkMeta.done, None)) + + async with safe_task(get_items()): + timeout: Optional[float] = flush_after_seconds + while True: + try: + action, data = await asyncio.wait_for( + item_queue.get(), timeout=timeout + ) + timeout = flush_after_seconds + except asyncio.TimeoutError: + action, data = BulkMeta.flush, None + timeout = None + + if action is BulkMeta.done: + break + ret = chunker.feed(action, data) + if ret: + yield ret + ret = chunker.flush() if ret: yield ret @@ -159,9 +196,13 @@ async def azip( async def async_streaming_bulk( client: AsyncElasticsearch, - actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]], + actions: Union[ + Iterable[_TYPE_BULK_ACTION_WITH_META], + AsyncIterable[_TYPE_BULK_ACTION_WITH_META], + ], chunk_size: int = 500, max_chunk_bytes: int = 100 * 1024 * 1024, + flush_after_seconds: Optional[float] = None, raise_on_error: bool = True, expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY @@ -194,6 +235,9 @@ async def async_streaming_bulk( :arg actions: iterable or async iterable containing the actions to be executed :arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) + :arg flush_after_seconds: time in seconds after which a chunk is written even + if hasn't reached `chunk_size` or `max_chunk_bytes`. Set to 0 to not use a + timeout-based flush. (default: 0) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) from the execution of the last chunk when some occur. By default we raise. :arg raise_on_exception: if ``False`` then don't propagate exceptions from @@ -220,9 +264,14 @@ async def async_streaming_bulk( if isinstance(retry_on_status, int): retry_on_status = (retry_on_status,) - async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: + async def map_actions() -> ( + AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] + ): async for item in aiter(actions): - yield expand_action_callback(item) + if isinstance(item, BulkMeta): + yield item, None + else: + yield expand_action_callback(item) serializer = client.transport.serializers.get_serializer("application/json") @@ -234,7 +283,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: ] bulk_actions: List[bytes] async for bulk_data, bulk_actions in _chunk_actions( - map_actions(), chunk_size, max_chunk_bytes, serializer + map_actions(), chunk_size, max_chunk_bytes, flush_after_seconds, serializer ): for attempt in range(max_retries + 1): to_retry: List[bytes] = [] diff --git a/elasticsearch/compat.py b/elasticsearch/compat.py index 007971306..b44b9daea 100644 --- a/elasticsearch/compat.py +++ b/elasticsearch/compat.py @@ -15,11 +15,14 @@ # specific language governing permissions and limitations # under the License. +import asyncio import inspect import os import sys +from contextlib import asynccontextmanager, contextmanager from pathlib import Path -from typing import Tuple, Type, Union +from threading import Thread +from typing import Any, AsyncIterator, Callable, Coroutine, Iterator, Tuple, Type, Union string_types: Tuple[Type[str], Type[bytes]] = (str, bytes) @@ -76,9 +79,48 @@ def warn_stacklevel() -> int: return 0 +@contextmanager +def safe_thread( + target: Callable[..., Any], *args: Any, **kwargs: Any +) -> Iterator[Thread]: + """Run a thread within a context manager block. + + The thread is automatically joined when the block ends. If the thread raised + an exception, it is raised in the caller's context. + """ + captured_exception = None + + def run() -> None: + try: + target(*args, **kwargs) + except BaseException as exc: + nonlocal captured_exception + captured_exception = exc + + thread = Thread(target=run) + thread.start() + yield thread + thread.join() + if captured_exception: + raise captured_exception + + +@asynccontextmanager +async def safe_task(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[asyncio.Task[Any]]: + """Run a background task within a context manager block. + + The task is awaited when the block ends. + """ + task = asyncio.create_task(coro) + yield task + await task + + __all__ = [ "string_types", "to_str", "to_bytes", "warn_stacklevel", + "safe_thread", + "safe_task", ] diff --git a/elasticsearch/helpers/__init__.py b/elasticsearch/helpers/__init__.py index 67676932b..6f8f24c21 100644 --- a/elasticsearch/helpers/__init__.py +++ b/elasticsearch/helpers/__init__.py @@ -19,12 +19,21 @@ from .._utils import fixup_module_metadata from .actions import _chunk_actions # noqa: F401 from .actions import _process_bulk_chunk # noqa: F401 -from .actions import bulk, expand_action, parallel_bulk, reindex, scan, streaming_bulk +from .actions import ( + BULK_FLUSH, + bulk, + expand_action, + parallel_bulk, + reindex, + scan, + streaming_bulk, +) from .errors import BulkIndexError, ScanError __all__ = [ "BulkIndexError", "ScanError", + "BULK_FLUSH", "expand_action", "streaming_bulk", "bulk", diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index d1a43a8dc..79197a1e4 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -16,9 +16,10 @@ # under the License. import logging +import queue import time +from enum import Enum from operator import methodcaller -from queue import Queue from typing import ( Any, Callable, @@ -37,13 +38,21 @@ from elastic_transport import OpenTelemetrySpan from .. import Elasticsearch -from ..compat import to_bytes +from ..compat import safe_thread, to_bytes from ..exceptions import ApiError, NotFoundError, TransportError from ..serializer import Serializer from .errors import BulkIndexError, ScanError logger = logging.getLogger("elasticsearch.helpers") + +class BulkMeta(Enum): + flush = 1 + done = 2 + + +BULK_FLUSH = BulkMeta.flush + _TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any]] _TYPE_BULK_ACTION_HEADER = Dict[str, Any] _TYPE_BULK_ACTION_BODY = Union[None, bytes, Dict[str, Any]] @@ -51,6 +60,13 @@ _TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY ] +_TYPE_BULK_ACTION_WITH_META = Union[bytes, str, Dict[str, Any], BulkMeta] +_TYPE_BULK_ACTION_HEADER_WITH_META = Union[Dict[str, Any], BulkMeta] +_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY = Union[ + Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY], + Tuple[BulkMeta, Any], +] + def expand_action(data: _TYPE_BULK_ACTION) -> _TYPE_BULK_ACTION_HEADER_AND_BODY: """ @@ -139,7 +155,9 @@ def __init__( ] = [] def feed( - self, action: _TYPE_BULK_ACTION_HEADER, data: _TYPE_BULK_ACTION_BODY + self, + action: _TYPE_BULK_ACTION_HEADER_WITH_META, + data: _TYPE_BULK_ACTION_BODY, ) -> Optional[ Tuple[ List[ @@ -152,23 +170,25 @@ def feed( ] ]: ret = None - raw_action = action - raw_data = data - action_bytes = to_bytes(self.serializer.dumps(action), "utf-8") - # +1 to account for the trailing new line character - cur_size = len(action_bytes) + 1 - - data_bytes: Optional[bytes] - if data is not None: - data_bytes = to_bytes(self.serializer.dumps(data), "utf-8") - cur_size += len(data_bytes) + 1 - else: - data_bytes = None + action_bytes = b"" + data_bytes: Optional[bytes] = None + cur_size = 0 + if not isinstance(action, BulkMeta): + action_bytes = to_bytes(self.serializer.dumps(action), "utf-8") + # +1 to account for the trailing new line character + cur_size = len(action_bytes) + 1 + + if data is not None: + data_bytes = to_bytes(self.serializer.dumps(data), "utf-8") + cur_size += len(data_bytes) + 1 + else: + data_bytes = None # full chunk, send it and start a new one if self.bulk_actions and ( self.size + cur_size > self.max_chunk_bytes or self.action_count == self.chunk_size + or (action == BulkMeta.flush and self.bulk_actions) ): ret = (self.bulk_data, self.bulk_actions) self.bulk_actions = [] @@ -176,15 +196,16 @@ def feed( self.size = 0 self.action_count = 0 - self.bulk_actions.append(action_bytes) - if data_bytes is not None: - self.bulk_actions.append(data_bytes) - self.bulk_data.append((raw_action, raw_data)) - else: - self.bulk_data.append((raw_action,)) + if not isinstance(action, BulkMeta): + self.bulk_actions.append(action_bytes) + if data_bytes is not None: + self.bulk_actions.append(data_bytes) + self.bulk_data.append((action, data)) + else: + self.bulk_data.append((action,)) - self.size += cur_size - self.action_count += 1 + self.size += cur_size + self.action_count += 1 return ret def flush( @@ -209,9 +230,10 @@ def flush( def _chunk_actions( - actions: Iterable[_TYPE_BULK_ACTION_HEADER_AND_BODY], + actions: Iterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY], chunk_size: int, max_chunk_bytes: int, + flush_after_seconds: Optional[float], serializer: Serializer, ) -> Iterable[ Tuple[ @@ -231,10 +253,41 @@ def _chunk_actions( chunker = _ActionChunker( chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes, serializer=serializer ) - for action, data in actions: - ret = chunker.feed(action, data) - if ret: - yield ret + + if not flush_after_seconds: + for action, data in actions: + ret = chunker.feed(action, data) + if ret: + yield ret + else: + item_queue: queue.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = ( + queue.Queue() + ) + + def get_items() -> None: + try: + for item in actions: + item_queue.put(item) + finally: + # make sure we signal the end even if there is an exception + item_queue.put((BulkMeta.done, None)) + + with safe_thread(get_items): + timeout: Optional[float] = flush_after_seconds + while True: + try: + action, data = item_queue.get(timeout=timeout) + timeout = flush_after_seconds + except queue.Empty: + action, data = BulkMeta.flush, None + timeout = None + + if action is BulkMeta.done: + break + ret = chunker.feed(action, data) + if ret: + yield ret + ret = chunker.flush() if ret: yield ret @@ -361,9 +414,10 @@ def _process_bulk_chunk( def streaming_bulk( client: Elasticsearch, - actions: Iterable[_TYPE_BULK_ACTION], + actions: Iterable[_TYPE_BULK_ACTION_WITH_META], chunk_size: int = 500, max_chunk_bytes: int = 100 * 1024 * 1024, + flush_after_seconds: Optional[float] = None, raise_on_error: bool = True, expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY @@ -397,6 +451,9 @@ def streaming_bulk( :arg actions: iterable containing the actions to be executed :arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) + :arg flush_after_seconds: time in seconds after which a chunk is written even + if hasn't reached `chunk_size` or `max_chunk_bytes`. Set to 0 to not use a + timeout-based flush. (default: 0) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) from the execution of the last chunk when some occur. By default we raise. :arg raise_on_exception: if ``False`` then don't propagate exceptions from @@ -425,6 +482,13 @@ def streaming_bulk( serializer = client.transport.serializers.get_serializer("application/json") + def expand_action_with_meta( + data: _TYPE_BULK_ACTION_WITH_META, + ) -> _TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY: + if isinstance(data, BulkMeta): + return data, None + return expand_action_callback(data) + bulk_data: List[ Union[ Tuple[_TYPE_BULK_ACTION_HEADER], @@ -433,9 +497,10 @@ def streaming_bulk( ] bulk_actions: List[bytes] for bulk_data, bulk_actions in _chunk_actions( - map(expand_action_callback, actions), + map(expand_action_with_meta, actions), chunk_size, max_chunk_bytes, + flush_after_seconds, serializer, ): for attempt in range(max_retries + 1): @@ -557,6 +622,7 @@ def parallel_bulk( thread_count: int = 4, chunk_size: int = 500, max_chunk_bytes: int = 100 * 1024 * 1024, + flush_after_seconds: Optional[float] = None, queue_size: int = 4, expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY @@ -573,6 +639,9 @@ def parallel_bulk( :arg thread_count: size of the threadpool to use for the bulk requests :arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) + :arg flush_after_seconds: time in seconds after which a chunk is written even + if hasn't reached `chunk_size` or `max_chunk_bytes`. Set to 0 to not use a + timeout-based flush. (default: 0) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) from the execution of the last chunk when some occur. By default we raise. :arg raise_on_exception: if ``False`` then don't propagate exceptions from @@ -596,7 +665,7 @@ def _setup_queues(self) -> None: super()._setup_queues() # type: ignore[misc] # The queue must be at least the size of the number of threads to # prevent hanging when inserting sentinel values during teardown. - self._inqueue: Queue[ + self._inqueue: queue.Queue[ Tuple[ List[ Union[ @@ -605,7 +674,7 @@ def _setup_queues(self) -> None: ], List[bytes], ] - ] = Queue(max(queue_size, thread_count)) + ] = queue.Queue(max(queue_size, thread_count)) self._quick_put = self._inqueue.put with client._otel.helpers_span("helpers.parallel_bulk") as otel_span: @@ -625,7 +694,11 @@ def _setup_queues(self) -> None: ) ), _chunk_actions( - expanded_actions, chunk_size, max_chunk_bytes, serializer + expanded_actions, + chunk_size, + max_chunk_bytes, + flush_after_seconds, + serializer, ), ): yield from result diff --git a/test_elasticsearch/test_async/test_server/test_helpers.py b/test_elasticsearch/test_async/test_server/test_helpers.py index 0bb781304..b94e74489 100644 --- a/test_elasticsearch/test_async/test_server/test_helpers.py +++ b/test_elasticsearch/test_async/test_server/test_helpers.py @@ -17,6 +17,7 @@ import asyncio import logging +import time from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, call, patch @@ -123,6 +124,45 @@ def sync_gen(): "_source" ] + async def test_explicit_flushes(self, async_client): + async def async_gen(): + yield {"answer": 2, "_id": 0} + yield {"answer": 1, "_id": 1} + yield helpers.BULK_FLUSH + await asyncio.sleep(0.5) + yield {"answer": 2, "_id": 2} + + timestamps = [] + async for ok, item in helpers.async_streaming_bulk( + async_client, async_gen(), index="test-index", refresh=True + ): + timestamps.append(time.time()) + assert ok + + # make sure there is a pause between the writing of the 2nd and 3rd items + assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 + + async def test_timeout_flushes(self, async_client): + async def async_gen(): + yield {"answer": 2, "_id": 0} + yield {"answer": 1, "_id": 1} + await asyncio.sleep(0.5) + yield {"answer": 2, "_id": 2} + + timestamps = [] + async for ok, item in helpers.async_streaming_bulk( + async_client, + async_gen(), + index="test-index", + refresh=True, + flush_after_seconds=0.05, + ): + assert ok + timestamps.append(time.time()) + + # make sure there is a pause between the writing of the 2nd and 3rd items + assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 + async def test_all_errors_from_chunk_are_raised_on_failure(self, async_client): await async_client.indices.create( index="i", diff --git a/test_elasticsearch/test_helpers.py b/test_elasticsearch/test_helpers.py index e30635f44..398cb6cc3 100644 --- a/test_elasticsearch/test_helpers.py +++ b/test_elasticsearch/test_helpers.py @@ -18,6 +18,7 @@ import pickle import threading import time +from typing import Optional from unittest import mock import pytest @@ -156,21 +157,34 @@ def test__source_metadata_or_source(self): {"_source": {"key2": "val2"}, "key": "val", "_op_type": "update"} ) == ({"update": {}}, {"key2": "val2"}) - def test_chunks_are_chopped_by_byte_size(self): + @pytest.mark.parametrize("flush_seconds", [None, 10]) + def test_chunks_are_chopped_by_byte_size(self, flush_seconds: Optional[float]): assert 100 == len( - list(helpers._chunk_actions(self.actions, 100000, 1, JSONSerializer())) + list( + helpers._chunk_actions( + self.actions, 100000, 1, flush_seconds, JSONSerializer() + ) + ) ) - def test_chunks_are_chopped_by_chunk_size(self): + @pytest.mark.parametrize("flush_seconds", [None, 10]) + def test_chunks_are_chopped_by_chunk_size(self, flush_seconds: Optional[float]): assert 10 == len( - list(helpers._chunk_actions(self.actions, 10, 99999999, JSONSerializer())) + list( + helpers._chunk_actions( + self.actions, 10, 99999999, flush_seconds, JSONSerializer() + ) + ) ) - def test_chunks_are_chopped_by_byte_size_properly(self): + @pytest.mark.parametrize("flush_seconds", [None, 10]) + def test_chunks_are_chopped_by_byte_size_properly( + self, flush_seconds: Optional[float] + ): max_byte_size = 170 chunks = list( helpers._chunk_actions( - self.actions, 100000, max_byte_size, JSONSerializer() + self.actions, 100000, max_byte_size, flush_seconds, JSONSerializer() ) ) assert 25 == len(chunks) @@ -178,6 +192,30 @@ def test_chunks_are_chopped_by_byte_size_properly(self): chunk = b"".join(chunk_actions) assert len(chunk) <= max_byte_size + @pytest.mark.parametrize("flush_seconds", [None, 10]) + def test_chunks_are_chopped_by_flush(self, flush_seconds: Optional[float]): + flush = (helpers.BULK_FLUSH, None) + actions = ( + self.actions[:3] + + [flush] * 2 # two consecutive flushes after 3 items + + self.actions[3:4] + + [flush] # flush after one more item + + self.actions[4:] + + [flush] # flush at the end + ) + chunks = list( + helpers._chunk_actions( + actions, 100, 99999999, flush_seconds, JSONSerializer() + ) + ) + assert 3 == len(chunks) + assert len(chunks[0][0]) == 3 + assert len(chunks[0][1]) == 6 + assert len(chunks[1][0]) == 1 + assert len(chunks[1][1]) == 2 + assert len(chunks[2][0]) == 96 + assert len(chunks[2][1]) == 192 + class TestExpandActions: @pytest.mark.parametrize("action", ["whatever", b"whatever"]) diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 6ed43e2af..9d181f3e7 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -16,6 +16,7 @@ # under the License. import json +import time from datetime import datetime, timedelta from unittest.mock import call, patch @@ -75,6 +76,47 @@ def test_bulk_all_documents_get_inserted(sync_client): assert {"answer": 42} == sync_client.get(index="test-index", id=42)["_source"] +def test_explicit_flushes(sync_client): + def sync_gen(): + yield {"answer": 0, "_id": 0} + yield {"answer": 1, "_id": 1} + yield helpers.BULK_FLUSH + time.sleep(0.5) + yield {"answer": 2, "_id": 2} + + timestamps = [] + for ok, item in helpers.streaming_bulk( + sync_client, sync_gen(), index="test-index", refresh=True + ): + assert ok + timestamps.append(time.time()) + + # make sure there is a pause between the writing of the 2nd and 3rd items + assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 + + +def test_timeout_flushes(sync_client): + def sync_gen(): + yield {"answer": 0, "_id": 0} + yield {"answer": 1, "_id": 1} + time.sleep(0.5) + yield {"answer": 2, "_id": 2} + + timestamps = [] + for ok, item in helpers.streaming_bulk( + sync_client, + sync_gen(), + index="test-index", + refresh=True, + flush_after_seconds=0.05, + ): + assert ok + timestamps.append(time.time()) + + # make sure there is a pause between the writing of the 2nd and 3rd items + assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 + + def test_bulk_all_errors_from_chunk_are_raised_on_failure(sync_client): sync_client.indices.create( index="i", From d7765c65331e7a158cdb4378efcfa6bb93e05950 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Wed, 15 Oct 2025 17:44:19 +0100 Subject: [PATCH 2/2] fix safe_task type hint for Python 3.8 --- elasticsearch/compat.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/elasticsearch/compat.py b/elasticsearch/compat.py index b44b9daea..d06e27650 100644 --- a/elasticsearch/compat.py +++ b/elasticsearch/compat.py @@ -106,7 +106,9 @@ def run() -> None: @asynccontextmanager -async def safe_task(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[asyncio.Task[Any]]: +async def safe_task( + coro: Coroutine[Any, Any, Any], +) -> "AsyncIterator[asyncio.Task[Any]]": """Run a background task within a context manager block. The task is awaited when the block ends.