Skip to content

Commit

Permalink
Fix prometheus check summary and histogram, replace the text parsing (#…
Browse files Browse the repository at this point in the history
…3617)

* prometheus_check: Improve the unittesting strategy of the text parsing
* prometheus_check: Use the prometheus client parser
* prometheus_check: Fix the sum and count of histogram and summary
* prometheus_check: Poll returns Response
  • Loading branch information
JulienBalestra committed Jan 2, 2018
1 parent e49cbce commit 05c50c6
Show file tree
Hide file tree
Showing 3 changed files with 816 additions and 131 deletions.
181 changes: 103 additions & 78 deletions checks/prometheus_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

import re
import requests
from collections import defaultdict
from google.protobuf.internal.decoder import _DecodeVarint32 # pylint: disable=E0611,E0401
from checks import AgentCheck
from utils.prometheus import metrics_pb2

from prometheus_client.parser import text_fd_to_metric_families


# Prometheus check is a mother class providing a structure and some helpers
# to collect metrics, events and service checks exposed via Prometheus.
#
Expand All @@ -29,7 +33,15 @@ class PrometheusFormat:
PROTOBUF = "PROTOBUF"
TEXT = "TEXT"


class UnknownFormatError(TypeError):
pass


class PrometheusCheck(AgentCheck):
UNWANTED_LABELS = ["le", "quantile"] # are specifics keys for prometheus itself
REQUESTS_CHUNK_SIZE = 1024 * 10 # use 10kb as chunk size when using the Stream feature in requests.get

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
# message.type is the index in this array
Expand All @@ -45,7 +57,7 @@ def __init__(self, name, init_config, agentConfig, instances=None):
# child check class.
self.NAMESPACE = ''

# `metrics_mapper` is a dictionnary where the keys are the metrics to capture
# `metrics_mapper` is a dictionary where the keys are the metrics to capture
# and the values are the corresponding metrics names to have in datadog.
# Note: it is empty in the mother class but will need to be
# overloaded/hardcoded in the final check not to be counted as custom metric.
Expand Down Expand Up @@ -83,24 +95,24 @@ def prometheus_metric_name(self, message, **kwargs):
""" Example method"""
pass

class UnknownFormatError(Exception):
def __init__(self, arg):
self.args = arg

def parse_metric_family(self, buf, content_type):
def parse_metric_family(self, response):
"""
Gets the output data from a prometheus endpoint response along with its
Content-type header and parses it into Prometheus classes (see [0])
Parse the MetricFamily from a valid requests.Response object to provide a MetricFamily object (see [0])
Parse the binary buffer in input, searching for Prometheus messages
of type MetricFamily [0] delimited by a varint32 [1] when the
content-type is a `application/vnd.google.protobuf`.
The text format uses iter_lines() generator.
The protobuf format directly parse the response.content property searching for Prometheus messages of type
MetricFamily [0] delimited by a varint32 [1] when the content-type is a `application/vnd.google.protobuf`.
[0] https://github.com/prometheus/client_model/blob/086fe7ca28bde6cec2acd5223423c1475a362858/metrics.proto#L76-%20%20L81
[1] https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/AbstractMessageLite#writeDelimitedTo(java.io.OutputStream)
:param response: requests.Response
:return: metrics_pb2.MetricFamily()
"""
if 'application/vnd.google.protobuf' in content_type:
if 'application/vnd.google.protobuf' in response.headers['Content-Type']:
n = 0
buf = response.content
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
Expand All @@ -118,26 +130,56 @@ def parse_metric_family(self, buf, content_type):
else:
self.log.debug("type override %s for %s is not a valid type name" % (new_type, message.name))
yield message
elif 'text/plain' in content_type:
messages = {} # map with the name of the element (before the labels) and the list of occurrences with labels and values
obj_map = {} # map of the types of each metrics
obj_help = {} # help for the metrics
for line in buf.splitlines():
self._extract_metrics_from_string(line, messages, obj_map, obj_help)

# Add type overrides:
for m_name, m_type in self.type_overrides.iteritems():
if m_type in self.METRIC_TYPES:
obj_map[m_name] = m_type
else:
self.log.debug("type override %s for %s is not a valid type name" % (m_type,m_name))
elif 'text/plain' in response.headers['Content-Type']:
messages = defaultdict(list) # map with the name of the element (before the labels)
# and the list of occurrences with labels and values

obj_map = {} # map of the types of each metrics
obj_help = {} # help for the metrics
for metric in text_fd_to_metric_families(response.iter_lines(chunk_size=self.REQUESTS_CHUNK_SIZE)):
metric_name = "%s_bucket" % metric.name if metric.type == "histogram" else metric.name
metric_type = self.type_overrides.get(metric_name, metric.type)
if metric_type == "untyped" or metric_type not in self.METRIC_TYPES:
continue

for sample in metric.samples:
if (sample[0].endswith("_sum") or sample[0].endswith("_count")) and \
metric_type in ["histogram", "summary"]:
messages[sample[0]].append({"labels": sample[1], 'value': sample[2]})
else:
messages[metric_name].append({"labels": sample[1], 'value': sample[2]})

obj_map[metric.name] = metric_type
obj_help[metric.name] = metric.documentation

for _m in obj_map:
if _m in messages or (obj_map[_m] == 'histogram' and '{}_bucket'.format(_m) in messages):
if _m in messages or (obj_map[_m] == 'histogram' and ('{}_bucket'.format(_m) in messages)):
yield self._extract_metric_from_map(_m, messages, obj_map, obj_help)
else:
raise self.UnknownFormatError('Unsupported content-type provided: {}'.format(content_type))
raise UnknownFormatError('Unsupported content-type provided: {}'.format(
response.headers['Content-Type']))

@staticmethod
def get_metric_value_by_labels(messages, _metric, _m, metric_suffix):
"""
:param messages: dictionary as metric_name: {labels: {}, value: 10}
:param _metric: dictionary as {labels: {le: '0.001', 'custom': 'value'}}
:param _m: str as metric name
:param metric_suffix: str must be in (count or sum)
:return: value of the metric_name matched by the labels
"""
metric_name = '{}_{}'.format(_m, metric_suffix)
expected_labels = set([(k, v) for k, v in _metric["labels"].iteritems()
if k not in PrometheusCheck.UNWANTED_LABELS])
for elt in messages[metric_name]:
current_labels = set([(k, v) for k, v in elt["labels"].iteritems()
if k not in PrometheusCheck.UNWANTED_LABELS])
# As we have two hashable objects we can compare them without any side effects
if current_labels == expected_labels:
return float(elt["value"])

raise AttributeError("cannot find expected labels for metric %s with suffix %s" % (metric_name, metric_suffix))

def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
"""
Expand Down Expand Up @@ -178,15 +220,15 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
_g.gauge.value = float(_metric['value'])
elif obj_map[_m] == 'summary':
if '{}_count'.format(_m) in messages:
_g.summary.sample_count = long(float(messages['{}_count'.format(_m)][0]['value']))
_g.summary.sample_count = long(self.get_metric_value_by_labels(messages, _metric, _m, 'count'))
if '{}_sum'.format(_m) in messages:
_g.summary.sample_sum = float(messages['{}_sum'.format(_m)][0]['value'])
_g.summary.sample_sum = self.get_metric_value_by_labels(messages, _metric, _m, 'sum')
# TODO: see what can be done with the untyped metrics
elif obj_map[_m] == 'histogram':
if '{}_count'.format(_m) in messages:
_g.histogram.sample_count = long(float(messages['{}_count'.format(_m)][0]['value']))
_g.histogram.sample_count = long(self.get_metric_value_by_labels(messages, _metric, _m, 'count'))
if '{}_sum'.format(_m) in messages:
_g.histogram.sample_sum = float(messages['{}_sum'.format(_m)][0]['value'])
_g.histogram.sample_sum = self.get_metric_value_by_labels(messages, _metric, _m, 'sum')
# last_metric = len(_obj.metric) - 1
# if last_metric >= 0:
for lbl in _metric['labels']:
Expand Down Expand Up @@ -214,42 +256,6 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
_l.value = _metric['labels'][lbl]
return _obj

def _extract_metrics_from_string(self, line, messages, obj_map, obj_help):
"""
Extracts the metrics from a line of metric and update the given
dictionnaries (we take advantage of the reference of the dictionary here)
"""
if line.startswith('# TYPE'):
metric = line.split(' ')
if len(metric) == 4:
obj_map[metric[2]] = metric[3] # line = # TYPE metric_name metric_type
elif line.startswith('# HELP'):
_h = line.split(' ', 3)
if len(_h) == 4:
obj_help[_h[2]] = _h[3] # line = # HELP metric_name Help message...
elif not line.startswith('#'):
_match = self.metrics_pattern.match(line)
if _match is not None:
_g = _match.groups()
_msg = []
_lbls = self._extract_labels_from_string(_g[1])
if _g[0] in messages:
_msg = messages[_g[0]]
_msg.append({'labels': _lbls, 'value': _g[2]})
messages[_g[0]] = _msg

def _extract_labels_from_string(self,labels):
"""
Extracts the labels from a string that looks like:
{label_name_1="value 1", label_name_2="value 2"}
"""
lbls = {}
labels = labels.lstrip('{').rstrip('}')
_lbls = self.lbl_pattern.findall(labels)
for _lbl in _lbls:
lbls[_lbl[0]] = _lbl[1]
return lbls

def process(self, endpoint, send_histograms_buckets=True, instance=None):
"""
Polls the data from prometheus and pushes them as gauges
Expand All @@ -258,12 +264,15 @@ def process(self, endpoint, send_histograms_buckets=True, instance=None):
Note that if the instance has a 'tags' attribute, it will be pushed
automatically as additionnal custom tags and added to the metrics
"""
content_type, data = self.poll(endpoint)
tags = []
if instance is not None:
tags = instance.get('tags', [])
for metric in self.parse_metric_family(data, content_type):
self.process_metric(metric, send_histograms_buckets=send_histograms_buckets, custom_tags=tags, instance=instance)
response = self.poll(endpoint)
try:
tags = []
if instance is not None:
tags = instance.get('tags', [])
for metric in self.parse_metric_family(response):
self.process_metric(metric, send_histograms_buckets=send_histograms_buckets, custom_tags=tags, instance=instance)
finally:
response.close()

def process_metric(self, message, send_histograms_buckets=True, custom_tags=None, **kwargs):
"""
Expand All @@ -284,23 +293,39 @@ def process_metric(self, message, send_histograms_buckets=True, custom_tags=None
except AttributeError as err:
self.log.debug("Unable to handle metric: {} - error: {}".format(message.name, err))

def poll(self, endpoint, pFormat=PrometheusFormat.PROTOBUF, headers={}):
def poll(self, endpoint, pFormat=PrometheusFormat.PROTOBUF, headers=None):
"""
Polls the metrics from the prometheus metrics endpoint provided.
Defaults to the protobuf format, but can use the formats specified by
the PrometheusFormat class.
Custom headers can be added to the default headers.
Returns the content-type of the response and the content of the reponse itself.
Returns a valid requests.Response, raise requests.HTTPError if the status code of the requests.Response
isn't valid - see response.raise_for_status()
The caller needs to close the requests.Response
:param endpoint: string url endpoint
:param pFormat: the preferred format defined in PrometheusFormat
:param headers: extra headers
:return: requests.Response
"""
if headers is None:
headers = {}
if 'accept-encoding' not in headers:
headers['accept-encoding'] = 'gzip'
if pFormat == PrometheusFormat.PROTOBUF:
headers['accept'] = 'application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited'
headers['accept'] = 'application/vnd.google.protobuf; ' \
'proto=io.prometheus.client.MetricFamily; ' \
'encoding=delimited'

req = requests.get(endpoint, headers=headers)
req.raise_for_status()
return req.headers['Content-Type'], req.content
response = requests.get(endpoint, headers=headers, stream=True)
try:
response.raise_for_status()
return response
except requests.HTTPError:
response.close()
raise

def _submit(self, metric_name, message, send_histograms_buckets=True, custom_tags=None):
"""
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ simplejson==3.6.5
supervisor==3.3.3
tornado==3.2.2
uptime==3.0.1
prometheus-client==0.1.0
Loading

0 comments on commit 05c50c6

Please sign in to comment.