Skip to content

Commit

Permalink
Merge pull request #935 from DataDog/tristan/haproxy
Browse files Browse the repository at this point in the history
Tristan/haproxy
  • Loading branch information
remh committed May 8, 2014
2 parents 5bf2f5d + c16e0cb commit 3abbe5d
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 94 deletions.
227 changes: 133 additions & 94 deletions checks.d/haproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,19 @@ def check(self, instance):
password = instance.get('password')
collect_aggregates_only = instance.get('collect_aggregates_only', True)
collect_status_metrics = instance.get('collect_status_metrics', False)
collect_status_metrics_by_host = instance.get('collect_status_metrics_by_host', False)

self.log.debug('Processing HAProxy data for %s' % url)

data = self._fetch_data(url, username, password)

process_events = instance.get('status_check', self.init_config.get('status_check', False))

self._process_data(data, collect_aggregates_only, process_events, url=url, collect_status_metrics=collect_status_metrics)
self._process_data(
data, collect_aggregates_only, process_events,
url=url, collect_status_metrics=collect_status_metrics,
collect_status_metrics_by_host=collect_status_metrics_by_host
)

def _fetch_data(self, url, username, password):
''' Hit a given URL and return the parsed json '''
Expand All @@ -85,7 +90,10 @@ def _fetch_data(self, url, username, password):
# Split the data by line
return response.split('\n')

def _process_data(self, data, collect_aggregates_only, process_events, url=None, collect_status_metrics=False):
def _process_data(
self, data, collect_aggregates_only, process_events, url=None,
collect_status_metrics=False, collect_status_metrics_by_host=False
):
''' Main data-processing loop. For each piece of useful data, we'll
either save a metric, save an event or both. '''

Expand All @@ -96,62 +104,100 @@ def _process_data(self, data, collect_aggregates_only, process_events, url=None,

hosts_statuses = defaultdict(int)

# Holds a list of dictionaries describing each system
data_list = []
back_or_front = None

for line in data[1:]: # Skip the first line
# Skip the first line, go backwards to set back_or_front
for line in data[:0:-1]:
if not line.strip():
continue
data_dict = {}
values = line.split(',')

# Store each line's values in a dictionary
for i, val in enumerate(values):
if val:
try:
# Try converting to a long, if failure, just leave it
val = float(val)
except Exception:
pass
data_dict[fields[i]] = val

# The percentage of used sessions based on 'scur' and 'slim'
if 'slim' in data_dict and 'scur' in data_dict:
try:
data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
except (TypeError, ZeroDivisionError):
pass

service = data_dict['svname']
data_dict = self._line_to_dict(fields, line)

if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
hosts_statuses[(data_dict['pxname'], data_dict['status'])] += 1
if self._is_aggregate(data_dict):
back_or_front = data_dict['svname']

self._update_data_dict(data_dict, back_or_front)

if data_dict['svname'] in Services.ALL:
data_list.append(data_dict)
self._update_hosts_statuses_if_needed(
collect_status_metrics, collect_status_metrics_by_host,
data_dict, hosts_statuses
)

if self._should_process(data_dict, collect_aggregates_only):
# Send the list of data to the metric and event callbacks
self._process_metrics(data_list, service, url)
if process_events:
self._process_events(data_list, url)

# Clear out the event list for the next service
data_list = []
elif not collect_aggregates_only:
data_list.append(data_dict)
self._process_metrics(data_dict, url)
if process_events:
self._process_event(data_dict, url)

if collect_status_metrics:
self._process_status_metric(hosts_statuses)
self._process_status_metric(hosts_statuses, collect_status_metrics_by_host)

return data

def _process_status_metric(self, hosts_statuses):
def _line_to_dict(self, fields, line):
data_dict = {}
for i, val in enumerate(line.split(',')[:]):
if val:
try:
# Try converting to a long, if failure, just leave it
val = float(val)
except Exception:
pass
data_dict[fields[i]] = val
return data_dict

def _update_data_dict(self, data_dict, back_or_front):
"""
Adds spct if relevant, adds service
"""
data_dict['back_or_front'] = back_or_front
# The percentage of used sessions based on 'scur' and 'slim'
if 'slim' in data_dict and 'scur' in data_dict:
try:
data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
except (TypeError, ZeroDivisionError):
pass

def _is_aggregate(self, data_dict):
return data_dict['svname'] in Services.ALL

def _update_hosts_statuses_if_needed(self,
collect_status_metrics, collect_status_metrics_by_host,
data_dict, hosts_statuses
):
if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
if collect_status_metrics_by_host and 'svname' in data_dict:
key = (data_dict['pxname'], data_dict['svname'], data_dict['status'])
else:
key = (data_dict['pxname'], data_dict['status'])
hosts_statuses[key] += 1

def _should_process(self, data_dict, collect_aggregates_only):
"""
if collect_aggregates_only, we process only the aggregates
else we process all except Services.BACKEND
"""
if collect_aggregates_only:
if self._is_aggregate(data_dict):
return True
return False
elif data_dict['svname'] is Services.BACKEND:
return False
return True

def _process_status_metric(self, hosts_statuses, collect_status_metrics_by_host):
agg_statuses = defaultdict(lambda:{'available':0, 'unavailable':0})
for (service, status), count in hosts_statuses.iteritems():
for host_status, count in hosts_statuses.iteritems():
try:
service, hostname, status = host_status
except:
service, status = host_status
status = status.lower()

tags = ['status:%s' % status, 'service:%s' % service]
if collect_status_metrics_by_host:
tags.append('backend:%s' % hostname)
self.gauge("haproxy.count_per_status", count, tags=tags)

if 'up' in status:
Expand All @@ -164,62 +210,55 @@ def _process_status_metric(self, hosts_statuses):
tags = ['status:%s' % status, 'service:%s' % service]
self.gauge("haproxy.count_per_status", count, tags=tags)

def _process_metrics(self, data_list, service, url):
for data in data_list:
"""
Each element of data_list is a dictionary related to one host
(one line) extracted from the csv. All of these elements should
have the same value for 'pxname' key
It should look like:
data_list = [
{'svname':'i-4562165', 'pxname':'dogweb', 'scur':'42', ...},
{'svname':'i-2854985', 'pxname':'dogweb', 'scur':'1337', ...},
...
]
"""
tags = ["type:%s" % service, "instance_url:%s" % url]
hostname = data['svname']
service_name = data['pxname']

if service == Services.BACKEND:
tags.append('backend:%s' % hostname)
tags.append("service:%s" % service_name)

for key, value in data.items():
if HAProxy.METRICS.get(key):
suffix = HAProxy.METRICS[key][1]
name = "haproxy.%s.%s" % (service.lower(), suffix)
if HAProxy.METRICS[key][0] == 'rate':
self.rate(name, value, tags=tags)
else:
self.gauge(name, value, tags=tags)

def _process_events(self, data_list, url):
''' Main event processing loop. Events will be created for a service
def _process_metrics(self, data, url):
"""
Data is a dictionary related to one host
(one line) extracted from the csv.
It should look like:
{'pxname':'dogweb', 'svname':'i-4562165', 'scur':'42', ...}
"""
hostname = data['svname']
service_name = data['pxname']
back_or_front = data['back_or_front']
tags = ["type:%s" % back_or_front, "instance_url:%s" % url]
tags.append("service:%s" % service_name)
if back_or_front == Services.BACKEND:
tags.append('backend:%s' % hostname)

for key, value in data.items():
if HAProxy.METRICS.get(key):
suffix = HAProxy.METRICS[key][1]
name = "haproxy.%s.%s" % (back_or_front.lower(), suffix)
if HAProxy.METRICS[key][0] == 'rate':
self.rate(name, value, tags=tags)
else:
self.gauge(name, value, tags=tags)

def _process_event(self, data, url):
''' Main event processing loop. An event will be created for a service
status change '''
for data in data_list:
hostname = data['svname']
service_name = data['pxname']
key = "%s:%s" % (hostname,service_name)
status = self.host_status[url][key]

if status is None:
self.host_status[url][key] = data['status']
continue

if status != data['status'] and data['status'] in ('UP', 'DOWN'):
# If the status of a host has changed, we trigger an event
try:
lastchg = int(data['lastchg'])
except Exception:
lastchg = 0

# Create the event object
ev = self._create_event(data['status'], hostname, lastchg, service_name)
self.event(ev)

# Store this host status so we can check against it later
self.host_status[url][key] = data['status']
hostname = data['svname']
service_name = data['pxname']
key = "%s:%s" % (hostname,service_name)
status = self.host_status[url][key]

if status is None:
self.host_status[url][key] = data['status']
return

if status != data['status'] and data['status'] in ('UP', 'DOWN'):
# If the status of a host has changed, we trigger an event
try:
lastchg = int(data['lastchg'])
except Exception:
lastchg = 0

# Create the event object
ev = self._create_event(data['status'], hostname, lastchg, service_name)
self.event(ev)

# Store this host status so we can check against it later
self.host_status[url][key] = data['status']

def _create_event(self, status, hostname, lastchg, service_name):
if status == "DOWN":
Expand Down
1 change: 1 addition & 0 deletions conf.d/haproxy.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ instances:
# status_check: False
# collect_aggregates_only: True
# collect_status_metrics: False
# collect_status_metrics_by_host: False

0 comments on commit 3abbe5d

Please sign in to comment.