-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Elasticsearch version: 8.6.1 (docker)
elasticsearch-py
version: 8.7.0
Description of the problem including expected versus actual behavior:
I am using the async_bulk
from helpers
to fill my index with documents. If one of the cluster's nodes goes down (causing unavailable_shards_exception
), the retry mechanism doesn't seems to work and the bulk fails after the first failure.
Steps to reproduce:
Bring up a two nodes es01
, es02
cluster with docker compose and mount the port 9200 of the first one (es01
) to localhost.
Create an index with more than one shard (to ensure that both nodes contains shard of this index). In the middle of indexing, run:
docker kill es02
The bulk operation will stop after it first attempt, although we define 10 retries:
2023-04-09 09:54:49,667 11388 WARNING Node <AiohttpHttpNode(http://localhost:9200)> has failed for 1 times in a row, putting on 1 second timeout [_node_pool.py:246]
2023-04-09 09:54:49,667 11388 WARNING Retrying request after failure (attempt 0 of 10) [_async_transport.py:332]
Traceback (most recent call last):
File "/Users/itay/workspace/aud-elasticsearch-indexer/venv/lib/python3.11/site-packages/elastic_transport/_async_transport.py", line 259, in perform_request
resp = await node.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/itay/workspace/aud-elasticsearch-indexer/venv/lib/python3.11/site-packages/elastic_transport/_node/_http_aiohttp.py", line 218, in perform_request
raise err from None
elastic_transport.ConnectionTimeout: Connection timeout caused by: TimeoutError()
2023-04-09 09:54:52,659 11390 ERROR 205 document(s) failed to index: [{'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'DrvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][8] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][8]] containing [42] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'D7vKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][0] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][0]] containing [34] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'ELvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][4] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][4]] containing [29] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'ErvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][10] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][10]] containing [25] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'FbvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][2] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][2]] containing [40] requests]'}}}] [index.py:115]
2023-04-09 09:54:52,660 11390 ERROR fail indexing extendedEs/2023-04-04/8776374/block30/part-r-04061.avro [index.py:194]
Traceback (most recent call last):
File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 183, in async_wrapper_download_and_index
await download_and_index(
File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 157, in download_and_index
await write_to_elasticsearch(
File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 118, in write_to_elasticsearch
raise Exception("failed to index to Elasticsearch")
Exception: failed to index to Elasticsearch
Code snippet:
es_async = AsyncElasticsearch(
"http://localhost:9200",
http_compress=True,
request_timeout=60,
max_retries=10,
retry_on_timeout=True,
)
docs = [
# generate docs here
]
await async_bulk(
es_async,
docs,
request_timeout=ELASTICSEARCH_TIMEOUT_IN_SECONDS,
raise_on_error=False,
)
Note that in other operations, not from helpers
, such as create index, force_merge, .. - the retry mechanism works in case node goes down.