Skip to content

Commit

Permalink
[k8s/docker] Better support of kubernetes >= 1.2
Browse files Browse the repository at this point in the history
- Enable kubelet check by default
- Remove master check for now as they are flaky, hard to enable and not used
- Adapt to new format of labels set by kube1.2 fix #2388 #2494
- Allow to blacklist labels
- Support replica sets fix #2444 #2446
- Handle case where no pods are running. Fix #2263
- Add tests for kube 1.2
  • Loading branch information
remh committed May 17, 2016
1 parent 007d325 commit c145667
Show file tree
Hide file tree
Showing 10 changed files with 850 additions and 390 deletions.
18 changes: 13 additions & 5 deletions checks.d/docker_daemon.py
Expand Up @@ -26,6 +26,7 @@
MAX_CGROUP_LISTING_RETRIES = 3
CONTAINER_ID_RE = re.compile('[0-9a-f]{64}')
POD_NAME_LABEL = "io.kubernetes.pod.name"
NAMESPACE_LABEL = "io.kubernetes.pod.namespace"

GAUGE = AgentCheck.gauge
RATE = AgentCheck.rate
Expand Down Expand Up @@ -347,16 +348,23 @@ def _get_tags(self, entity=None, tag_type=None):
k = "pod_name"
if "-" in pod_name:
replication_controller = "-".join(pod_name.split("-")[:-1])
if "/" in replication_controller:
if "/" in replication_controller: # k8s <= 1.1
namespace, replication_controller = replication_controller.split("/", 1)
tags.append("kube_namespace:%s" % namespace)

elif NAMESPACE_LABEL in labels: # k8s >= 1.2
namespace = labels[NAMESPACE_LABEL]
pod_name = "{0}/{1}".format(namespace, pod_name)

tags.append("kube_namespace:%s" % namespace)
tags.append("kube_replication_controller:%s" % replication_controller)
tags.append("pod_name:%s" % pod_name)

if not v:
elif not v:
tags.append(k)
else:

else:
tags.append("%s:%s" % (k,v))

if k == POD_NAME_LABEL and self.is_k8s() and k not in labels:
tags.append("pod_name:no_pod")

Expand Down Expand Up @@ -426,7 +434,7 @@ def _filter_containers(self, containers):
if self._are_tags_filtered(container_tags):
container_name = DockerUtil.container_name_extractor(container)[0]
self._filtered_containers.add(container_name)
self.log.debug("Container {0} is filtered".format(container["Names"][0]))
self.log.debug("Container {0} is filtered".format(container_name))


def _are_tags_filtered(self, tags):
Expand Down
152 changes: 93 additions & 59 deletions checks.d/kubernetes.py
Expand Up @@ -46,6 +46,10 @@
RATE: {True: HISTORATE, False: RATE}
}

POD_NAME_LABEL = "io.kubernetes.pod_name"
NAMESPACE_LABEL = "io.kubernetes.pod.namespace"



class Kubernetes(AgentCheck):
""" Collect metrics and events from kubelet """
Expand All @@ -57,6 +61,8 @@ def __init__(self, name, init_config, agentConfig, instances=None):
raise Exception('Kubernetes check only supports one configured instance.')
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.kubeutil = KubeUtil()
if not self.kubeutil.host:
raise Exception('Unable to get default router and host parameter is not set')

def _perform_kubelet_checks(self, url):
service_check_base = NAMESPACE + '.kubelet.check'
Expand Down Expand Up @@ -92,29 +98,7 @@ def _perform_kubelet_checks(self, url):
else:
self.service_check(service_check_base, AgentCheck.CRITICAL)

def _perform_master_checks(self, url):
try:
r = requests.get(url)
r.raise_for_status()
for nodeinfo in r.json()['items']:
nodename = nodeinfo['name']
service_check_name = "{0}.master.{1}.check".format(NAMESPACE, nodename)
cond = nodeinfo['status'][-1]['type']
minion_name = nodeinfo['metadata']['name']
tags = ["minion_name:{0}".format(minion_name)]
if cond != 'Ready':
self.service_check(service_check_name, AgentCheck.CRITICAL,
tags=tags, message=cond)
else:
self.service_check(service_check_name, AgentCheck.OK, tags=tags)
except Exception, e:
self.service_check(service_check_name, AgentCheck.CRITICAL, message=str(e))
self.log.warning('master checks url=%s exception=%s' % (url, str(e)))
raise

def check(self, instance):
if not self.kubeutil.host:
raise Exception('Unable to get default router and host parameter is not set')

self.max_depth = instance.get('max_depth', DEFAULT_MAX_DEPTH)
enabled_gauges = instance.get('enabled_gauges', DEFAULT_ENABLED_GAUGES)
Expand All @@ -127,13 +111,8 @@ def check(self, instance):
self.publish_rate = FUNC_MAP[RATE][self.use_histogram]
self.publish_gauge = FUNC_MAP[GAUGE][self.use_histogram]

# master health checks
if instance.get('enable_master_checks', False):
self._perform_master_checks(self.kubeutil.master_url_nodes)

# kubelet health checks
if instance.get('enable_kubelet_checks', True):
self._perform_kubelet_checks(self.kubeutil.kube_health_url)
self._perform_kubelet_checks(self.kubeutil.kube_health_url)

# kubelet metrics
self._update_metrics(instance)
Expand Down Expand Up @@ -161,8 +140,61 @@ def _shorten_name(name):
# shorten docker image id
return re.sub('([0-9a-fA-F]{64,})', lambda x: x.group(1)[0:12], name)

def _get_post_1_2_tags(self, cont_labels, subcontainer, kube_labels):

tags = []

pod_name = cont_labels[POD_NAME_LABEL]
pod_namespace = cont_labels[NAMESPACE_LABEL]
tags.append(u"pod_name:{0}/{1}".format(pod_namespace, pod_name))
tags.append(u"kube_namespace:{0}".format(pod_namespace))

kube_labels_key = "{0}/{1}".format(pod_namespace, pod_name)

pod_labels = kube_labels.get(kube_labels_key)
if pod_labels:
tags += list(pod_labels)

if "-" in pod_name:
replication_controller = "-".join(pod_name.split("-")[:-1])
tags.append("kube_replication_controller:%s" % replication_controller)

if self.publish_aliases and subcontainer.get("aliases"):
for alias in subcontainer['aliases'][1:]:
# we don't add the first alias as it will be the container_name
tags.append('container_alias:%s' % (self._shorten_name(alias)))

return tags

def _get_pre_1_2_tags(self, cont_labels, subcontainer, kube_labels):

tags = []

pod_name = cont_labels[POD_NAME_LABEL]
tags.append(u"pod_name:{0}".format(pod_name))

pod_labels = kube_labels.get(pod_name)
if pod_labels:
tags.extend(list(pod_labels))

if "-" in pod_name:
replication_controller = "-".join(pod_name.split("-")[:-1])
if "/" in replication_controller:
namespace, replication_controller = replication_controller.split("/", 1)
tags.append(u"kube_namespace:%s" % namespace)

tags.append(u"kube_replication_controller:%s" % replication_controller)

if self.publish_aliases and subcontainer.get("aliases"):
for alias in subcontainer['aliases'][1:]:
# we don't add the first alias as it will be the container_name
tags.append(u"container_alias:%s" % (self._shorten_name(alias)))

return tags


def _update_container_metrics(self, instance, subcontainer, kube_labels):
tags = instance.get('tags', []) # add support for custom tags
tags = list(instance.get('tags', [])) # add support for custom tags

if len(subcontainer.get('aliases', [])) >= 1:
# The first alias seems to always match the docker container name
Expand All @@ -173,35 +205,26 @@ def _update_container_metrics(self, instance, subcontainer, kube_labels):

tags.append('container_name:%s' % container_name)

pod_name_set = False
try:
for label_name, label in subcontainer['spec']['labels'].iteritems():
label_name = label_name.replace('io.kubernetes.pod.name', 'pod_name')
if label_name == "pod_name":
pod_name_set = True
pod_labels = kube_labels.get(label)
if pod_labels:
tags.extend(list(pod_labels))

if "-" in label:
replication_controller = "-".join(
label.split("-")[:-1])
if "/" in replication_controller:
namespace, replication_controller = replication_controller.split("/", 1)
tags.append("kube_namespace:%s" % namespace)

tags.append("kube_replication_controller:%s" % replication_controller)
tags.append('%s:%s' % (label_name, label))
cont_labels = subcontainer['spec']['labels']
except KeyError:
pass
self.log.debug("Subcontainer, doesn't have any labels")
cont_labels = {}

# Collect pod names, namespaces, rc...
if NAMESPACE_LABEL in cont_labels and POD_NAME_LABEL in cont_labels:
# Kubernetes >= 1.2
tags += self._get_post_1_2_tags(cont_labels, subcontainer, kube_labels)

elif POD_NAME_LABEL cont_labels:
# Kubernetes <= 1.1
tags += self._get_pre_1_2_tags(cont_labels, subcontainer, kube_labels)

if not pod_name_set:
else:
# Those are containers that are not part of a pod.
# They are top aggregate views and don't have the previous metadata.
tags.append("pod_name:no_pod")

if self.publish_aliases and subcontainer.get("aliases"):
for alias in subcontainer['aliases'][1:]:
# we don't add the first alias as it will be the container_name
tags.append('container_alias:%s' % (self._shorten_name(alias)))

stats = subcontainer['stats'][-1] # take the latest
self._publish_raw_metrics(NAMESPACE, stats, tags)
Expand All @@ -223,7 +246,9 @@ def _retrieve_metrics(self, url):
def _update_metrics(self, instance):
pods_list = self.kubeutil.retrieve_pods_list()
metrics = self._retrieve_metrics(self.kubeutil.metrics_url)
kube_labels = self.kubeutil.extract_kube_labels(pods_list)

excluded_labels = instance.get('excluded_labels')
kube_labels = self.kubeutil.extract_kube_labels(pods_list, excluded_keys=excluded_labels)

if not metrics:
raise Exception('No metrics retrieved cmd=%s' % self.metrics_cmd)
Expand All @@ -238,18 +263,27 @@ def _update_metrics(self, instance):
self._update_pods_metrics(instance, pods_list)

def _update_pods_metrics(self, instance, pods):
controllers_map = defaultdict(list)
supported_kinds = [
"DaemonSet",
"Deployment",
"Job",
"ReplicationController",
"ReplicaSet",
]

controllers_map = defaultdict(int)
for pod in pods['items']:
node_name = pod['spec']['nodeName']
try:
created_by = json.loads(pod['metadata']['annotations']['kubernetes.io/created-by'])
if created_by['reference']['kind'] == 'ReplicationController':
controllers_map[created_by['reference']['name']].append(node_name)
kind = created_by['reference']['kind']
if kind in supported_kinds:
controllers_map[created_by['reference']['name']] += 1
except KeyError:
continue

tags = instance.get('tags', [])
for ctrl, pods in controllers_map.iteritems():
for ctrl, pod_count in controllers_map.iteritems():
_tags = tags[:] # copy base tags
_tags.append('kube_replication_controller:{0}'.format(ctrl))
self.publish_gauge(self, NAMESPACE + '.pods.running', len(pods), _tags)
self.publish_gauge(self, NAMESPACE + '.pods.running', pod_count, _tags)
2 changes: 0 additions & 2 deletions conf.d/kubernetes.yaml.example
Expand Up @@ -22,8 +22,6 @@ instances:
#
use_histogram: True
#
# Kubelet checks
# enable_kubelet_checks: true
# kubelet_port: 10255
#
# We can define a whitelist of patterns that permit publishing raw metrics.
Expand Down

0 comments on commit c145667

Please sign in to comment.