Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions elasticsearch/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,18 @@ def _get_sniff_data(self, initial=False):
# seed_connections for good measure
for c in chain(self.connection_pool.connections, self.seed_connections):
try:
# get ES version due to breaking changes in 5.x
_, headers, cluster_info = c.perform_request('GET', '/',
timeout=self.sniff_timeout if not initial else None)
cluster_info = self.deserializer.loads(cluster_info, headers.get('content-type'))
cluster_version = cluster_info.get('version').get('number')
# string comparison for version check works fine since "5.0.0" > "5"
# use small timeout for the sniffing request, should be a fast api call
_, headers, node_info = c.perform_request('GET', '/_nodes/_all/clear',
if cluster_version > "5":
_, headers, node_info = c.perform_request('GET', '/_nodes/_all/http',
timeout=self.sniff_timeout if not initial else None)
else:
_, headers, node_info = c.perform_request('GET', '/_nodes/_all/clear',
timeout=self.sniff_timeout if not initial else None)
node_info = self.deserializer.loads(node_info, headers.get('content-type'))
break
Expand All @@ -216,12 +226,19 @@ def _get_sniff_data(self, initial=False):
self.last_sniff = previous_sniff
raise

return list(node_info['nodes'].values())
return [list(node_info['nodes'].values()), cluster_version]

def _get_host_info(self, host_info, cluster_version):
# string comparison for version check works fine since "5.0.0" > "5"
if cluster_version > "5":
address_schema = 'http'
address_key = 'publish_address'
address = host_info.get(address_schema, '').get(address_key, '')
else:
address_key = self.connection_class.transport_schema + '_address'
address = host_info.get(address_key, '')

def _get_host_info(self, host_info):
address_key = self.connection_class.transport_schema + '_address'
host = {}
address = host_info.get(address_key, '')
if '/' in address:
host['host'], address = address.split('/', 1)

Expand All @@ -247,9 +264,9 @@ def sniff_hosts(self, initial=False):
:arg initial: flag indicating if this is during startup
(``sniff_on_start``), ignore the ``sniff_timeout`` if ``True``
"""
node_info = self._get_sniff_data(initial)
node_info, cluster_version = self._get_sniff_data(initial)

hosts = list(filter(None, (self._get_host_info(n) for n in node_info)))
hosts = list(filter(None, (self._get_host_info(n, cluster_version) for n in node_info)))

# we weren't able to get any nodes, maybe using an incompatible
# transport_schema or host_info_callback blocked all - raise error.
Expand Down