Skip to content

Commit

Permalink
Make REST API check stricter (#882)
Browse files Browse the repository at this point in the history
So far we have used the info API to determine whether the REST API of
Elasticsearch is available. However, we might get lucky that a quorum
(but not all) of the target hosts are available yet. While certain nodes
would then respond to HTTP requests, others might not which can lead to
situations where the REST API check succeeds but we run into connection
errors later on (because we hit a different host from the connection
pool).

With this commit we make this check stricter by using the cluster health
API and blocking until  at least the number of target hosts in the
cluster is available.
  • Loading branch information
danielmitterdorfer committed Jan 29, 2020
1 parent 80ed8a7 commit f145325
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
27 changes: 19 additions & 8 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,29 +128,40 @@ def create(self):
return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)


def wait_for_rest_layer(es, max_attempts=20):
def wait_for_rest_layer(es, max_attempts=40):
"""
Waits for ``max_attempts`` until Elasticsearch's REST API is available.
:param es: Elasticsearch client to use for connecting.
:param max_attempts: The maximum number of attempts to check whether the REST API is available.
:return: True iff Elasticsearch is available.
:return: True iff Elasticsearch's REST API is available.
"""
# assume that at least the hosts that we expect to contact should be available. Note that this is not 100%
# bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts
# but this is still better than just checking for any random node's REST API being reachable.
expected_node_count = len(es.transport.hosts)
logger = logging.getLogger(__name__)
for attempt in range(max_attempts):
logger.debug("REST API is available after %s attempts", attempt)
import elasticsearch
try:
es.info()
# see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also
# available when the cluster status is RED (as long as all required nodes are present)
es.cluster.health(wait_for_nodes=">={}".format(expected_node_count))
logger.info("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt)
return True
except elasticsearch.ConnectionError as e:
if "SSL: UNKNOWN_PROTOCOL" in str(e):
raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e)
else:
time.sleep(1)
logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
except elasticsearch.TransportError as e:
if e.status_code == 503:
time.sleep(1)
elif e.status_code == 401:
time.sleep(1)
# cluster block, x-pack not initialized yet, our wait condition is not reached
if e.status_code in (503, 401, 408):
logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.status_code, attempt)
time.sleep(3)
else:
logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt)
raise e
return False
29 changes: 20 additions & 9 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,43 +277,54 @@ def test_create_https_connection_unverified_certificate_present_client_certifica


class RestLayerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch", autospec=True)
@mock.patch("elasticsearch.Elasticsearch")
def test_successfully_waits_for_rest_layer(self, es):
es.transport.hosts = [
{"host": "node-a.example.org", "port": 9200},
{"host": "node-b.example.org", "port": 9200}
]

self.assertTrue(client.wait_for_rest_layer(es, max_attempts=3))

es.cluster.health.assert_has_calls([
mock.call(wait_for_nodes=">=2"),
])

# don't sleep in realtime
@mock.patch("time.sleep")
@mock.patch("elasticsearch.Elasticsearch", autospec=True)
@mock.patch("elasticsearch.Elasticsearch")
def test_retries_on_transport_errors(self, es, sleep):
import elasticsearch

es.info.side_effect = [
es.cluster.health.side_effect = [
elasticsearch.TransportError(503, "Service Unavailable"),
elasticsearch.TransportError(401, "Unauthorized"),
elasticsearch.TransportError(408, "Timed Out"),
elasticsearch.TransportError(408, "Timed Out"),
{
"version": {
"number": "5.0.0",
"build_hash": "abc123"
}
}
]
self.assertTrue(client.wait_for_rest_layer(es, max_attempts=3))
self.assertTrue(client.wait_for_rest_layer(es, max_attempts=5))

# don't sleep in realtime
@mock.patch("time.sleep")
@mock.patch("elasticsearch.Elasticsearch", autospec=True)
def test_dont_retries_eternally_on_transport_errors(self, es, sleep):
@mock.patch("elasticsearch.Elasticsearch")
def test_dont_retry_eternally_on_transport_errors(self, es, sleep):
import elasticsearch

es.info.side_effect = elasticsearch.TransportError(401, "Unauthorized")
es.cluster.health.side_effect = elasticsearch.TransportError(401, "Unauthorized")
self.assertFalse(client.wait_for_rest_layer(es, max_attempts=3))

@mock.patch("elasticsearch.Elasticsearch", autospec=True)
@mock.patch("elasticsearch.Elasticsearch")
def test_ssl_error(self, es):
import elasticsearch
import urllib3.exceptions

es.info.side_effect = elasticsearch.ConnectionError("N/A",
es.cluster.health.side_effect = elasticsearch.ConnectionError("N/A",
"[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)",
urllib3.exceptions.SSLError(
"[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)"))
Expand Down

0 comments on commit f145325

Please sign in to comment.