Skip to content

Commit

Permalink
prometheus: Fix metric resets
Browse files Browse the repository at this point in the history
This patch changes the way we reset metrics when collecting data in the
prometheus exporter module. With this patch, the exported metrics at any
point in time should align with a freshly started ceph-mgr module.

The patch also introduces locking mechanism to serialize the requests
and to avoid overwriting the metrics during multiple scrapes happenning
at the same time.

Signed-off-by: Boris Ranto <branto@redhat.com>
(cherry picked from commit f6c4db8)

Conflicts:
	src/pybind/mgr/prometheus/module.py: Do not import PG_STATES
  • Loading branch information
b-ranto committed Jul 24, 2018
1 parent 056a368 commit 76204ba
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions src/pybind/mgr/prometheus/module.py
Expand Up @@ -5,7 +5,7 @@
import os
import socket
import threading
from collections import OrderedDict
import time
from mgr_module import MgrModule, MgrStandbyModule, CommandResult

# Defaults for the Prometheus HTTP server. Can also set in config-key
Expand Down Expand Up @@ -110,40 +110,30 @@ def health_status_to_number(status):
class Metrics(object):
def __init__(self):
self.metrics = self._setup_static_metrics()
self.pending = {}

def set(self, key, value, labels=('',)):
def clear(self):
'''
Set the value of a single Metrics. This should be used for static metrics,
e.g. cluster health.
Clear all the metrics data. This does not remove the initiated Metric classes.
'''
self.metrics[key].set(value, labels)
for k in self.metrics.keys():
self.metrics[k].clear()

def append(self, key, value, labels = ('',)):
def set(self, key, value, labels=('',)):
'''
Append a metrics to the staging area. Use this to aggregate daemon specific
metrics that can appear and go away as daemons are added or removed.
Set the value of a single Metric (with labels). Use this to set the value of any metric.
'''
if key not in self.pending:
self.pending[key] = []
self.pending[key].append((labels, value))
self.metrics[key].set(value, labels)

def reset(self):
def all(self):
'''
When metrics aggregation is done, call Metrics.reset() to apply the
aggregated metric. This will remove all label -> value mappings for a
metric and set the new mapping (from pending). This means daemon specific
metrics os daemons that do no longer exist, are removed.
Return the dict of all the metrics.
'''
for k, v in self.pending.items():
self.metrics[k].reset(v)
self.pending = {}
return self.metrics

def add_metric(self, path, metric):
if path not in self.metrics:
self.metrics[path] = metric


def _setup_static_metrics(self):
metrics = {}
metrics['health_status'] = Metric(
Expand Down Expand Up @@ -268,7 +258,6 @@ def _setup_static_metrics(self):
return metrics



class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
Expand All @@ -277,16 +266,14 @@ def __init__(self, mtype, name, desc, labels=None):
self.labelnames = labels # tuple if present
self.value = {} # indexed by label values

def clear(self):
self.value = {}

def set(self, value, labelvalues=None):
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value

def reset(self, values):
self.value = {}
for labelvalues, value in values:
self.value[labelvalues] = value

def str_expfmt(self):

def promethize(path):
Expand Down Expand Up @@ -361,8 +348,11 @@ class Module(MgrModule):
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = Metrics()
self.schema = OrderedDict()
self.shutdown_event = threading.Event()
self.collect_lock = threading.RLock()
self.collect_time = 0
self.collect_timeout = 5.0
self.collect_cache = None
_global_instance['plugin'] = self

def get_health(self):
Expand All @@ -379,7 +369,7 @@ def get_df(self):

for pool in df['pools']:
for stat in DF_POOL:
self.metrics.append('pool_{}'.format(stat),
self.metrics.set('pool_{}'.format(stat),
pool['stats'][stat],
(pool['id'],))

Expand All @@ -390,7 +380,7 @@ def get_fs(self):
for fs in fs_map['filesystems']:
# collect fs metadata
data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
self.metrics.append('fs_metadata', 1,
self.metrics.set('fs_metadata', 1,
(data_pools,
fs['id'],
fs['mdsmap']['metadata_pool'],
Expand All @@ -399,7 +389,7 @@ def get_fs(self):
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
host_version = servers.get((id_, 'mds'), ('',''))
self.metrics.append('mds_metadata', 1,
self.metrics.set('mds_metadata', 1,
('mds.{}'.format(id_), fs['id'],
host_version[0], daemon['addr'],
daemon['rank'], host_version[1]))
Expand All @@ -411,12 +401,12 @@ def get_quorum_status(self):
rank = mon['rank']
id_ = mon['name']
host_version = servers.get((id_, 'mon'), ('',''))
self.metrics.append('mon_metadata', 1,
self.metrics.set('mon_metadata', 1,
('mon.{}'.format(id_), host_version[0],
mon['public_addr'].split(':')[0], rank,
host_version[1]))
in_quorum = int(rank in mon_status['quorum'])
self.metrics.append('mon_quorum_status', in_quorum,
self.metrics.set('mon_quorum_status', in_quorum,
('mon.{}'.format(id_),))

def get_pg_status(self):
Expand Down Expand Up @@ -451,7 +441,7 @@ def get_osd_stats(self):
id_ = osd['osd']
for stat in OSD_STATS:
val = osd['perf_stat'][stat]
self.metrics.append('osd_{}'.format(stat), val,
self.metrics.set('osd_{}'.format(stat), val,
('osd.{}'.format(id_),))

def get_service_list(self):
Expand Down Expand Up @@ -499,7 +489,7 @@ def get_metadata_and_osd_status(self):

host_version = servers.get((str(id_), 'osd'), ('',''))

self.metrics.append('osd_metadata', 1, (
self.metrics.set('osd_metadata', 1, (
'osd.{}'.format(id_),
c_addr,
dev_class,
Expand All @@ -510,7 +500,7 @@ def get_metadata_and_osd_status(self):
# collect osd status
for state in OSD_STATUS:
status = osd[state]
self.metrics.append('osd_{}'.format(state), status,
self.metrics.set('osd_{}'.format(state), status,
('osd.{}'.format(id_),))

# collect disk occupation metadata
Expand Down Expand Up @@ -539,15 +529,15 @@ def get_metadata_and_osd_status(self):

pool_meta = []
for pool in osd_map['pools']:
self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
self.metrics.set('pool_metadata', 1, (pool['pool'], pool['pool_name']))

# Populate rgw_metadata
for key, value in servers.items():
service_id, service_type = key
if service_type != 'rgw':
continue
hostname, version = value
self.metrics.append(
self.metrics.set(
'rgw_metadata',
1,
('{}.{}'.format(service_type, service_id), hostname, version)
Expand All @@ -560,6 +550,9 @@ def get_num_objects(self):
self.metrics.set(stat, pg_sum[stat])

def collect(self):
# Clear the metrics before scraping
self.metrics.clear()

self.get_health()
self.get_df()
self.get_fs()
Expand Down Expand Up @@ -589,7 +582,7 @@ def collect(self):
counter_info['description'] + ' Total',
("ceph_daemon",),
))
self.metrics.append(_path, value, (daemon,))
self.metrics.set(_path, value, (daemon,))

_path = path + '_count'
self.metrics.add_metric(_path, Metric(
Expand All @@ -598,20 +591,18 @@ def collect(self):
counter_info['description'] + ' Count',
("ceph_daemon",),
))
self.metrics.append(_path, counter_info['count'], (daemon,))
self.metrics.set(_path, counter_info['count'], (daemon,))
else:
self.metrics.add_metric(path, Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
))
self.metrics.append(path, value, (daemon,))
self.metrics.set(path, value, (daemon,))

# It is sufficient to reset the pending metrics once per scrape
self.metrics.reset()

return self.metrics.metrics
return self.metrics.all()

def get_file_sd_config(self):
servers = self.list_servers()
Expand Down Expand Up @@ -688,11 +679,25 @@ def index(self):

@cherrypy.expose
def metrics(self):
if global_instance().have_mon_connection():
metrics = global_instance().collect()
inst = global_instance()
# Lock the function execution
try:
inst.collect_lock.acquire()
return self._metrics(inst)
finally:
inst.collect_lock.release()

def _metrics(self, inst):
# Return cached data if available and collected before the cache times out
if inst.collect_cache and time.time() - inst.collect_time < inst.collect_timeout:
return inst.collect_cache

if inst.have_mon_connection():
metrics = inst.collect()
cherrypy.response.headers['Content-Type'] = 'text/plain'
if metrics:
return self.format_metrics(metrics)
inst.collect_cache = self.format_metrics(metrics)
return inst.collect_cache
else:
raise cherrypy.HTTPError(503, 'No MON connection')

Expand Down

0 comments on commit 76204ba

Please sign in to comment.