Skip to content

Commit

Permalink
Merge pull request #3627 from DataDog/JulienBalestra/revert-prometheu…
Browse files Browse the repository at this point in the history
…s-check

Revert "Fix prometheus check summary and histogram
  • Loading branch information
truthbk committed Jan 3, 2018
2 parents 05c50c6 + c886c2e commit a8d6a47
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 816 deletions.
181 changes: 78 additions & 103 deletions checks/prometheus_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@

import re
import requests
from collections import defaultdict
from google.protobuf.internal.decoder import _DecodeVarint32 # pylint: disable=E0611,E0401
from checks import AgentCheck
from utils.prometheus import metrics_pb2

from prometheus_client.parser import text_fd_to_metric_families


# Prometheus check is a mother class providing a structure and some helpers
# to collect metrics, events and service checks exposed via Prometheus.
#
Expand All @@ -33,15 +29,7 @@ class PrometheusFormat:
PROTOBUF = "PROTOBUF"
TEXT = "TEXT"


class UnknownFormatError(TypeError):
pass


class PrometheusCheck(AgentCheck):
UNWANTED_LABELS = ["le", "quantile"] # are specifics keys for prometheus itself
REQUESTS_CHUNK_SIZE = 1024 * 10 # use 10kb as chunk size when using the Stream feature in requests.get

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
# message.type is the index in this array
Expand All @@ -57,7 +45,7 @@ def __init__(self, name, init_config, agentConfig, instances=None):
# child check class.
self.NAMESPACE = ''

# `metrics_mapper` is a dictionary where the keys are the metrics to capture
# `metrics_mapper` is a dictionnary where the keys are the metrics to capture
# and the values are the corresponding metrics names to have in datadog.
# Note: it is empty in the mother class but will need to be
# overloaded/hardcoded in the final check not to be counted as custom metric.
Expand Down Expand Up @@ -95,24 +83,24 @@ def prometheus_metric_name(self, message, **kwargs):
""" Example method"""
pass

def parse_metric_family(self, response):
"""
Parse the MetricFamily from a valid requests.Response object to provide a MetricFamily object (see [0])
class UnknownFormatError(Exception):
def __init__(self, arg):
self.args = arg

The text format uses iter_lines() generator.
def parse_metric_family(self, buf, content_type):
"""
Gets the output data from a prometheus endpoint response along with its
Content-type header and parses it into Prometheus classes (see [0])
The protobuf format directly parse the response.content property searching for Prometheus messages of type
MetricFamily [0] delimited by a varint32 [1] when the content-type is a `application/vnd.google.protobuf`.
Parse the binary buffer in input, searching for Prometheus messages
of type MetricFamily [0] delimited by a varint32 [1] when the
content-type is a `application/vnd.google.protobuf`.
[0] https://github.com/prometheus/client_model/blob/086fe7ca28bde6cec2acd5223423c1475a362858/metrics.proto#L76-%20%20L81
[1] https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/AbstractMessageLite#writeDelimitedTo(java.io.OutputStream)
:param response: requests.Response
:return: metrics_pb2.MetricFamily()
"""
if 'application/vnd.google.protobuf' in response.headers['Content-Type']:
if 'application/vnd.google.protobuf' in content_type:
n = 0
buf = response.content
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
Expand All @@ -130,56 +118,26 @@ def parse_metric_family(self, response):
else:
self.log.debug("type override %s for %s is not a valid type name" % (new_type, message.name))
yield message

elif 'text/plain' in response.headers['Content-Type']:
messages = defaultdict(list) # map with the name of the element (before the labels)
# and the list of occurrences with labels and values

obj_map = {} # map of the types of each metrics
elif 'text/plain' in content_type:
messages = {} # map with the name of the element (before the labels) and the list of occurrences with labels and values
obj_map = {} # map of the types of each metrics
obj_help = {} # help for the metrics
for metric in text_fd_to_metric_families(response.iter_lines(chunk_size=self.REQUESTS_CHUNK_SIZE)):
metric_name = "%s_bucket" % metric.name if metric.type == "histogram" else metric.name
metric_type = self.type_overrides.get(metric_name, metric.type)
if metric_type == "untyped" or metric_type not in self.METRIC_TYPES:
continue

for sample in metric.samples:
if (sample[0].endswith("_sum") or sample[0].endswith("_count")) and \
metric_type in ["histogram", "summary"]:
messages[sample[0]].append({"labels": sample[1], 'value': sample[2]})
else:
messages[metric_name].append({"labels": sample[1], 'value': sample[2]})
for line in buf.splitlines():
self._extract_metrics_from_string(line, messages, obj_map, obj_help)

# Add type overrides:
for m_name, m_type in self.type_overrides.iteritems():
if m_type in self.METRIC_TYPES:
obj_map[m_name] = m_type
else:
self.log.debug("type override %s for %s is not a valid type name" % (m_type,m_name))

obj_map[metric.name] = metric_type
obj_help[metric.name] = metric.documentation

for _m in obj_map:
if _m in messages or (obj_map[_m] == 'histogram' and ('{}_bucket'.format(_m) in messages)):
if _m in messages or (obj_map[_m] == 'histogram' and '{}_bucket'.format(_m) in messages):
yield self._extract_metric_from_map(_m, messages, obj_map, obj_help)
else:
raise UnknownFormatError('Unsupported content-type provided: {}'.format(
response.headers['Content-Type']))

@staticmethod
def get_metric_value_by_labels(messages, _metric, _m, metric_suffix):
"""
:param messages: dictionary as metric_name: {labels: {}, value: 10}
:param _metric: dictionary as {labels: {le: '0.001', 'custom': 'value'}}
:param _m: str as metric name
:param metric_suffix: str must be in (count or sum)
:return: value of the metric_name matched by the labels
"""
metric_name = '{}_{}'.format(_m, metric_suffix)
expected_labels = set([(k, v) for k, v in _metric["labels"].iteritems()
if k not in PrometheusCheck.UNWANTED_LABELS])
for elt in messages[metric_name]:
current_labels = set([(k, v) for k, v in elt["labels"].iteritems()
if k not in PrometheusCheck.UNWANTED_LABELS])
# As we have two hashable objects we can compare them without any side effects
if current_labels == expected_labels:
return float(elt["value"])

raise AttributeError("cannot find expected labels for metric %s with suffix %s" % (metric_name, metric_suffix))
raise self.UnknownFormatError('Unsupported content-type provided: {}'.format(content_type))

def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
"""
Expand Down Expand Up @@ -220,15 +178,15 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
_g.gauge.value = float(_metric['value'])
elif obj_map[_m] == 'summary':
if '{}_count'.format(_m) in messages:
_g.summary.sample_count = long(self.get_metric_value_by_labels(messages, _metric, _m, 'count'))
_g.summary.sample_count = long(float(messages['{}_count'.format(_m)][0]['value']))
if '{}_sum'.format(_m) in messages:
_g.summary.sample_sum = self.get_metric_value_by_labels(messages, _metric, _m, 'sum')
_g.summary.sample_sum = float(messages['{}_sum'.format(_m)][0]['value'])
# TODO: see what can be done with the untyped metrics
elif obj_map[_m] == 'histogram':
if '{}_count'.format(_m) in messages:
_g.histogram.sample_count = long(self.get_metric_value_by_labels(messages, _metric, _m, 'count'))
_g.histogram.sample_count = long(float(messages['{}_count'.format(_m)][0]['value']))
if '{}_sum'.format(_m) in messages:
_g.histogram.sample_sum = self.get_metric_value_by_labels(messages, _metric, _m, 'sum')
_g.histogram.sample_sum = float(messages['{}_sum'.format(_m)][0]['value'])
# last_metric = len(_obj.metric) - 1
# if last_metric >= 0:
for lbl in _metric['labels']:
Expand Down Expand Up @@ -256,6 +214,42 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
_l.value = _metric['labels'][lbl]
return _obj

def _extract_metrics_from_string(self, line, messages, obj_map, obj_help):
"""
Extracts the metrics from a line of metric and update the given
dictionnaries (we take advantage of the reference of the dictionary here)
"""
if line.startswith('# TYPE'):
metric = line.split(' ')
if len(metric) == 4:
obj_map[metric[2]] = metric[3] # line = # TYPE metric_name metric_type
elif line.startswith('# HELP'):
_h = line.split(' ', 3)
if len(_h) == 4:
obj_help[_h[2]] = _h[3] # line = # HELP metric_name Help message...
elif not line.startswith('#'):
_match = self.metrics_pattern.match(line)
if _match is not None:
_g = _match.groups()
_msg = []
_lbls = self._extract_labels_from_string(_g[1])
if _g[0] in messages:
_msg = messages[_g[0]]
_msg.append({'labels': _lbls, 'value': _g[2]})
messages[_g[0]] = _msg

def _extract_labels_from_string(self,labels):
"""
Extracts the labels from a string that looks like:
{label_name_1="value 1", label_name_2="value 2"}
"""
lbls = {}
labels = labels.lstrip('{').rstrip('}')
_lbls = self.lbl_pattern.findall(labels)
for _lbl in _lbls:
lbls[_lbl[0]] = _lbl[1]
return lbls

def process(self, endpoint, send_histograms_buckets=True, instance=None):
"""
Polls the data from prometheus and pushes them as gauges
Expand All @@ -264,15 +258,12 @@ def process(self, endpoint, send_histograms_buckets=True, instance=None):
Note that if the instance has a 'tags' attribute, it will be pushed
automatically as additionnal custom tags and added to the metrics
"""
response = self.poll(endpoint)
try:
tags = []
if instance is not None:
tags = instance.get('tags', [])
for metric in self.parse_metric_family(response):
self.process_metric(metric, send_histograms_buckets=send_histograms_buckets, custom_tags=tags, instance=instance)
finally:
response.close()
content_type, data = self.poll(endpoint)
tags = []
if instance is not None:
tags = instance.get('tags', [])
for metric in self.parse_metric_family(data, content_type):
self.process_metric(metric, send_histograms_buckets=send_histograms_buckets, custom_tags=tags, instance=instance)

def process_metric(self, message, send_histograms_buckets=True, custom_tags=None, **kwargs):
"""
Expand All @@ -293,39 +284,23 @@ def process_metric(self, message, send_histograms_buckets=True, custom_tags=None
except AttributeError as err:
self.log.debug("Unable to handle metric: {} - error: {}".format(message.name, err))

def poll(self, endpoint, pFormat=PrometheusFormat.PROTOBUF, headers=None):
def poll(self, endpoint, pFormat=PrometheusFormat.PROTOBUF, headers={}):
"""
Polls the metrics from the prometheus metrics endpoint provided.
Defaults to the protobuf format, but can use the formats specified by
the PrometheusFormat class.
Custom headers can be added to the default headers.
Returns a valid requests.Response, raise requests.HTTPError if the status code of the requests.Response
isn't valid - see response.raise_for_status()
The caller needs to close the requests.Response
:param endpoint: string url endpoint
:param pFormat: the preferred format defined in PrometheusFormat
:param headers: extra headers
:return: requests.Response
Returns the content-type of the response and the content of the reponse itself.
"""
if headers is None:
headers = {}
if 'accept-encoding' not in headers:
headers['accept-encoding'] = 'gzip'
if pFormat == PrometheusFormat.PROTOBUF:
headers['accept'] = 'application/vnd.google.protobuf; ' \
'proto=io.prometheus.client.MetricFamily; ' \
'encoding=delimited'
headers['accept'] = 'application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited'

response = requests.get(endpoint, headers=headers, stream=True)
try:
response.raise_for_status()
return response
except requests.HTTPError:
response.close()
raise
req = requests.get(endpoint, headers=headers)
req.raise_for_status()
return req.headers['Content-Type'], req.content

def _submit(self, metric_name, message, send_histograms_buckets=True, custom_tags=None):
"""
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ simplejson==3.6.5
supervisor==3.3.3
tornado==3.2.2
uptime==3.0.1
prometheus-client==0.1.0

0 comments on commit a8d6a47

Please sign in to comment.