Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kubernetes] Ingest k8s events + limits and requests metrics #2551

Merged
merged 18 commits into from Aug 23, 2016
Merged
154 changes: 137 additions & 17 deletions checks.d/kubernetes.py
Expand Up @@ -10,15 +10,15 @@
from fnmatch import fnmatch
import numbers
import re
import simplejson as json
import time

# 3rd party
import requests
import simplejson as json

# project
from checks import AgentCheck
from config import _is_affirmative
from utils.http import retrieve_json
from utils.kubeutil import KubeUtil

NAMESPACE = "kubernetes"
Expand All @@ -30,6 +30,7 @@
'diskio.io_service_bytes.stats.total',
'network.??_bytes',
'cpu.*.total']
DEFAULT_COLLECT_EVENTS = False

NET_ERRORS = ['rx_errors', 'tx_errors', 'rx_dropped', 'tx_dropped']

Expand All @@ -46,6 +47,8 @@
RATE: {True: HISTORATE, False: RATE}
}

EVENT_TYPE = 'kubernetes'


class Kubernetes(AgentCheck):
""" Collect metrics and events from kubelet """
Expand All @@ -55,10 +58,13 @@ class Kubernetes(AgentCheck):
def __init__(self, name, init_config, agentConfig, instances=None):
if instances is not None and len(instances) > 1:
raise Exception('Kubernetes check only supports one configured instance.')

AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.kubeutil = KubeUtil()

inst = instances[0] if instances is not None else None
self.kubeutil = KubeUtil(instance=inst)
if not self.kubeutil.host:
raise Exception('Unable to get default router and host parameter is not set')
raise Exception('Unable to retrieve Docker hostname and host parameter is not set')

def _perform_kubelet_checks(self, url):
service_check_base = NAMESPACE + '.kubelet.check'
Expand Down Expand Up @@ -107,11 +113,17 @@ def check(self, instance):
self.publish_rate = FUNC_MAP[RATE][self.use_histogram]
self.publish_gauge = FUNC_MAP[GAUGE][self.use_histogram]

pods_list = self.kubeutil.retrieve_pods_list()

# kubelet health checks
self._perform_kubelet_checks(self.kubeutil.kube_health_url)

# kubelet metrics
self._update_metrics(instance)
self._update_metrics(instance, pods_list)

# kubelet events
if _is_affirmative(instance.get('collect_events', DEFAULT_COLLECT_EVENTS)):
self._process_events(instance, pods_list)

def _publish_raw_metrics(self, metric, dat, tags, depth=0):
if depth >= self.max_depth:
Expand Down Expand Up @@ -187,7 +199,6 @@ def _get_pre_1_2_tags(self, cont_labels, subcontainer, kube_labels):

return tags


def _update_container_metrics(self, instance, subcontainer, kube_labels):
tags = list(instance.get('tags', [])) # add support for custom tags

Expand Down Expand Up @@ -220,7 +231,6 @@ def _update_container_metrics(self, instance, subcontainer, kube_labels):
# They are top aggregate views and don't have the previous metadata.
tags.append("pod_name:no_pod")


stats = subcontainer['stats'][-1] # take the latest
self._publish_raw_metrics(NAMESPACE, stats, tags)

Expand All @@ -235,25 +245,74 @@ def _update_container_metrics(self, instance, subcontainer, kube_labels):
sum(float(net[x]) for x in NET_ERRORS),
tags)

def _retrieve_metrics(self, url):
return retrieve_json(url)
return tags

def _update_metrics(self, instance):
pods_list = self.kubeutil.retrieve_pods_list()
metrics = self._retrieve_metrics(self.kubeutil.metrics_url)
def _update_metrics(self, instance, pods_list):
metrics = self.kubeutil.retrieve_metrics()

excluded_labels = instance.get('excluded_labels')
kube_labels = self.kubeutil.extract_kube_labels(pods_list, excluded_keys=excluded_labels)

if not metrics:
raise Exception('No metrics retrieved cmd=%s' % self.metrics_cmd)

# container metrics from Cadvisor
container_tags = {}
for subcontainer in metrics:
c_id = subcontainer.get('id')
try:
tags = self._update_container_metrics(instance, subcontainer, kube_labels)
if c_id:
container_tags[c_id] = tags
# also store tags for aliases
for alias in subcontainer.get('aliases', []):
container_tags[alias] = tags
except Exception, e:
self.log.error("Unable to collect metrics for container: {0} ({1}".format(c_id, e))

# container metrics from kubernetes API: limits and requests
for pod in pods_list['items']:
try:
self._update_container_metrics(instance, subcontainer, kube_labels)
except Exception as e:
self.log.error("Unable to collect metrics for container: {0} ({1}".format(
subcontainer.get('name'), e))
containers = pod['spec']['containers']
name2id = {}
for cs in pod['status'].get('containerStatuses', []):
c_id = cs.get('containerID', '').split('//')[-1]
name = cs.get('name')
if name:
name2id[name] = c_id
except KeyError:
self.log.debug("Pod %s does not have containers specs, skipping...", pod['metadata'].get('name'))
continue

for container in containers:
c_name = container.get('name')
_tags = container_tags.get(name2id.get(c_name), [])

prog = re.compile(r'[-+]?\d+[\.]?\d*')

# limits
try:
for limit, value_str in container['resources']['limits'].iteritems():
values = [float(s) for s in prog.findall(value_str)]
if len(values) != 1:
self.log.warning("Error parsing limits value string: %s", value_str)
continue
self.publish_gauge(self, '{}.{}.limits'.format(NAMESPACE, limit), values[0], _tags)
except (KeyError, AttributeError) as e:
self.log.debug("Unable to retrieve container limits for %s: %s", c_name, e)
self.log.debug("Container object for {}: {}".format(c_name, container))

# requests
try:
for request, value_str in container['resources']['requests'].iteritems():
values = [float(s) for s in prog.findall(value_str)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to parse eg. 500m as 500 instead of 0.5?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(values) != 1:
self.log.warning("Error parsing requests value string: %s", value_str)
continue
self.publish_gauge(self, '{}.{}.requests'.format(NAMESPACE, request), values[0], _tags)
except (KeyError, AttributeError) as e:
self.log.error("Unable to retrieve container requests for %s: %s", c_name, e)
self.log.debug("Container object for {}: {}".format(c_name, container))

self._update_pods_metrics(instance, pods_list)

Expand All @@ -273,11 +332,72 @@ def _update_pods_metrics(self, instance, pods):
kind = created_by['reference']['kind']
if kind in supported_kinds:
controllers_map[created_by['reference']['name']] += 1
except KeyError:
except (KeyError, ValueError) as e:
self.log.debug("Unable to retrieve pod kind for pod %s: %s", pod, e)
continue

tags = instance.get('tags', [])
for ctrl, pod_count in controllers_map.iteritems():
_tags = tags[:] # copy base tags
_tags.append('kube_replication_controller:{0}'.format(ctrl))
self.publish_gauge(self, NAMESPACE + '.pods.running', pod_count, _tags)

def _process_events(self, instance, pods_list):
"""
Retrieve a list of events from the kubernetes API.

At the moment (k8s v1.3) there is no support to select events based on a timestamp query, so we
go through the whole list every time. This should be fine for now as events
have a TTL of one hour[1] but logic needs to improve as soon as they provide
query capabilities or at least pagination, see [2][3].

[1] https://github.com/kubernetes/kubernetes/blob/release-1.3.0/cmd/kube-apiserver/app/options/options.go#L51
[2] https://github.com/kubernetes/kubernetes/issues/4432
[3] https://github.com/kubernetes/kubernetes/issues/1362
"""
node_ip, node_name = self.kubeutil.get_node_info()
self.log.debug('Processing events on {} [{}]'.format(node_name, node_ip))

k8s_namespace = instance.get('namespace', 'default')
events_endpoint = '{}/namespaces/{}/events'.format(self.kubeutil.kubernetes_api_url, k8s_namespace)
self.log.debug('Kubernetes API endpoint to query events: %s' % events_endpoint)

events = self.kubeutil.retrieve_json_auth(events_endpoint, self.kubeutil.get_auth_token())
event_items = events.get('items') or []
last_read = self.kubeutil.last_event_collection_ts[k8s_namespace]
most_recent_read = 0

self.log.debug('Found {} events, filtering out using timestamp: {}'.format(len(event_items), last_read))

for event in event_items:
# skip if the event is too old
event_ts = int(time.mktime(time.strptime(event.get('lastTimestamp'), '%Y-%m-%dT%H:%M:%SZ')))
if event_ts <= last_read:
continue

involved_obj = event.get('involvedObject', {})

# compute the most recently seen event, without relying on items order
if event_ts > most_recent_read:
most_recent_read = event_ts

title = '{} {} on {}'.format(involved_obj.get('name'), event.get('reason'), node_name)
message = event.get('message')
source = event.get('source')
if source:
message += '\nSource: {} {}\n'.format(source.get('component', ''), source.get('host', ''))
msg_body = "%%%\n{}\n```\n{}\n```\n%%%".format(title, message)
dd_event = {
'timestamp': event_ts,
'host': node_ip,
'event_type': EVENT_TYPE,
'msg_title': title,
'msg_text': msg_body,
'source_type_name': EVENT_TYPE,
'event_object': 'kubernetes:{}'.format(involved_obj.get('name')),
}
self.event(dd_event)

if most_recent_read > 0:
self.kubeutil.last_event_collection_ts[k8s_namespace] = most_recent_read
self.log.debug('_last_event_collection_ts is now {}'.format(most_recent_read))
11 changes: 8 additions & 3 deletions conf.d/kubernetes.yaml.example
Expand Up @@ -15,13 +15,18 @@ instances:
# method: http
- port: 4194

# use_histogram controls whether we send detailed metrics, i.e. one per container.
# collect_events controls whether the agent should fetch events from the kubernetes API and
# ingest them in Datadog. To avoid duplicates, only one agent at a time across the entire
# cluster should have this feature enabled. To enable the feature, set the parameter to `true`.
# collect_events: false

# use_histogram controls whether we send detailed metrics, i.e. one per container.
# When false, we send detailed metrics corresponding to individual containers, tagging by container id
# to keep them unique.
# When true, we aggregate data based on container image.
#
# use_histogram: True
#
# use_histogram: false

# kubelet_port: 10255
#
# We can define a whitelist of patterns that permit publishing raw metrics.
Expand Down