Skip to content

Commit

Permalink
Fix host tagging for external Elasticsearch clusters.
Browse files Browse the repository at this point in the history
When using an external Elasticsearch cluster, we should tag the
hostname by the node's given hostname in the stats output. Otherwise
we end up using only the last node's values because each subsequent
gauge metric will stomp on the one before it.

The test is updated to make sure the hostname given in the stats
(which should be the FQDN) matches what we output at the end.

Finally, this change also splits the cluster health metrics into a
seperate dict. This has two benefits: (a) it will clean up a lot of
debugging logging and extra work for `_process_health_data` and
(b) makes it easy to check against just the stats metrics for tests.
  • Loading branch information
conorbranagan authored and LeoCavaille committed Jan 14, 2015
1 parent 4c367b5 commit 677417f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
46 changes: 30 additions & 16 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ESCheck(AgentCheck):

DEFAULT_TIMEOUT = 5

METRICS = { # Metrics that are common to all Elasticsearch versions
STATS_METRICS = { # Metrics that are common to all Elasticsearch versions
"elasticsearch.docs.count": ("gauge", "indices.docs.count"),
"elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"),
"elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"),
Expand Down Expand Up @@ -115,6 +115,9 @@ class ESCheck(AgentCheck):
"jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"),
"jvm.threads.count": ("gauge", "jvm.threads.count"),
"jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"),
}

CLUSTER_HEALTH_METRICS = {
"elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"),
"elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"),
"elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"),
Expand Down Expand Up @@ -176,7 +179,7 @@ def check(self, instance):
# Check ES version for this instance and define parameters
# (URLs and metrics) accordingly
version = self._get_es_version()
self._define_params(version)
self._define_params(version, self.curr_config.is_external)

# Load stats data.
stats_url = urlparse.urljoin(self.curr_config.url, self.STATS_URL)
Expand Down Expand Up @@ -212,16 +215,21 @@ def _get_es_version(self):
self.log.debug("Elasticsearch version is %s" % version)
return version

def _define_params(self, version):
def _define_params(self, version, is_external):
""" Define the set of URLs and METRICS to use depending on the
running ES version.
"""
if version >= [0,90,10]:
# ES versions 0.90.10 and above
self.HEALTH_URL = "/_cluster/health?pretty=true"
self.STATS_URL = "/_nodes/_local/stats?all=true"
self.NODES_URL = "/_nodes?network=true"

# For "external" clusters, we want to collect from all nodes.
if is_external:
self.STATS_URL = "/_nodes/stats?all=true"
else:
self.STATS_URL = "/_nodes/_local/stats?all=true"

additional_metrics = {
"jvm.gc.collectors.young.count": ("gauge", "jvm.gc.collectors.young.collection_count"),
"jvm.gc.collectors.young.collection_time": ("gauge", "jvm.gc.collectors.young.collection_time_in_millis", lambda v: float(v)/1000),
Expand All @@ -242,7 +250,7 @@ def _define_params(self, version):
"jvm.gc.collection_time": ("gauge", "jvm.gc.collection_time_in_millis", lambda v: float(v)/1000),
}

self.METRICS.update(additional_metrics)
self.STATS_METRICS.update(additional_metrics)

if version >= [0,90,5]:
# ES versions 0.90.5 and above
Expand All @@ -264,7 +272,7 @@ def _define_params(self, version):
"elasticsearch.cache.filter.size": ("gauge", "indices.cache.filter_size_in_bytes"),
}

self.METRICS.update(additional_metrics)
self.STATS_METRICS.update(additional_metrics)

def _get_data(self, url):
""" Hit a given URL and return the parsed json
Expand Down Expand Up @@ -295,17 +303,22 @@ def _get_data(self, url):
return resp.json()

def _process_stats_data(self, data):
is_external = self.curr_config.is_external
for node_name in data['nodes']:
node_data = data['nodes'][node_name]
# On newer version of ES it's "host" not "hostname"
node_hostname = node_data.get('hostname', node_data.get('host', None))
should_process = self.curr_config.is_external\
or self.should_process_node(node_name, node_hostname)
should_process = is_external \
or self.should_process_node(node_name, node_hostname)

# Override the metric hostname if we're hitting an external cluster.
metric_hostname = node_hostname if is_external else None

if should_process:
for metric in self.METRICS:
desc = self.METRICS[metric]
for metric in self.STATS_METRICS:
desc = self.STATS_METRICS[metric]
self._process_metric(node_data, metric, *desc,
tags=self.curr_config.tags)
tags=self.curr_config.tags, hostname=metric_hostname)

def should_process_node(self, node_name, node_hostname):
""" The node stats API will return stats for every node so we
Expand Down Expand Up @@ -373,7 +386,8 @@ def _host_matches_node(self, primary_addrs):
# Check the interface addresses against the primary address
return primary_addrs in ips

def _process_metric(self, data, metric, xtype, path, xform=None, tags=None):
def _process_metric(self, data, metric, xtype, path, xform=None,
tags=None, hostname=None):
"""data: dictionary containing all the stats
metric: datadog metric
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue
Expand All @@ -391,9 +405,9 @@ def _process_metric(self, data, metric, xtype, path, xform=None, tags=None):
if value is not None:
if xform: value = xform(value)
if xtype == "gauge":
self.gauge(metric, value, tags=tags)
self.gauge(metric, value, tags=tags, hostname=hostname)
else:
self.rate(metric, value, tags=tags)
self.rate(metric, value, tags=tags, hostname=hostname)
else:
self._metric_not_found(metric, path)

Expand All @@ -409,9 +423,9 @@ def _process_health_data(self, data):
event = self._create_event(data['status'])
self.event(event)

for metric in self.METRICS:
for metric in self.CLUSTER_HEALTH_METRICS:
# metric description
desc = self.METRICS[metric]
desc = self.CLUSTER_HEALTH_METRICS[metric]
self._process_metric(data, metric, *desc, tags=self.curr_config.tags)

# Process the service check
Expand Down
19 changes: 17 additions & 2 deletions tests/test_elastic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# stdlib
import socket
import unittest

# 3p
Expand Down Expand Up @@ -36,18 +37,23 @@ def test_bad_config(self):
def test_check(self):
agentConfig = {
'version': '0.1',
'api_key': 'toto'
'api_key': 'toto',
'hostname': 'agent-es-test'
}

conf = {
'instances': [{'url': 'http://localhost:%s' % PORT}]
'instances': [
{'url': 'http://localhost:%s' % PORT},
{'url': 'http://localhost:%s' % PORT, 'is_external': True}
]
}

# Initialize the check from checks.d
self.check = load_check('elastic', conf, agentConfig)

self.check.check(conf['instances'][0])
r = self.check.get_metrics()
self.check.get_events()

self.assertTrue(type(r) == type([]))
self.assertTrue(len(r) > 0)
Expand Down Expand Up @@ -86,4 +92,13 @@ def test_check(self):
self.check.cluster_status[conf['instances'][0].get('url')] = "red"
self.check.check(conf['instances'][0])
events = self.check.get_events()
self.check.get_metrics()
self.assertEquals(len(events),1,events)

# Check an "external" cluster
self.check.check(conf['instances'][1])
r = self.check.get_metrics()
expected_hostname = socket.gethostname()
for m in r:
if m[0] not in self.check.CLUSTER_HEALTH_METRICS:
self.assertEquals(m[3]['hostname'], expected_hostname)

0 comments on commit 677417f

Please sign in to comment.