Skip to content

Commit

Permalink
query the master API for events
Browse files Browse the repository at this point in the history
fixed line endings

query the master API

pass the auth token

more logs
  • Loading branch information
Massimiliano Pippi committed May 25, 2016
1 parent 9d5b18a commit b468257
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 10 deletions.
35 changes: 30 additions & 5 deletions checks.d/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from fnmatch import fnmatch
import numbers
import re
import simplejson as json

# 3rd party
import requests
import simplejson as json

# project
from checks import AgentCheck
Expand Down Expand Up @@ -46,6 +46,7 @@
RATE: {True: HISTORATE, False: RATE}
}


class Kubernetes(AgentCheck):
""" Collect metrics and events from kubelet """

Expand Down Expand Up @@ -106,11 +107,16 @@ def check(self, instance):
self.publish_rate = FUNC_MAP[RATE][self.use_histogram]
self.publish_gauge = FUNC_MAP[GAUGE][self.use_histogram]

pods_list = self.kubeutil.retrieve_pods_list()

# kubelet health checks
self._perform_kubelet_checks(self.kubeutil.kube_health_url)

# kubelet metrics
self._update_metrics(instance)
self._update_metrics(instance, pods_list)

# kubelet events
self._process_events(instance, pods_list)

def _publish_raw_metrics(self, metric, dat, tags, depth=0):
if depth >= self.max_depth:
Expand Down Expand Up @@ -186,7 +192,6 @@ def _get_pre_1_2_tags(self, cont_labels, subcontainer, kube_labels):

return tags


def _update_container_metrics(self, instance, subcontainer, kube_labels):
tags = list(instance.get('tags', [])) # add support for custom tags

Expand Down Expand Up @@ -237,8 +242,7 @@ def _update_container_metrics(self, instance, subcontainer, kube_labels):
def _retrieve_metrics(self, url):
return retrieve_json(url)

def _update_metrics(self, instance):
pods_list = self.kubeutil.retrieve_pods_list()
def _update_metrics(self, instance, pods_list):
metrics = self._retrieve_metrics(self.kubeutil.metrics_url)

excluded_labels = instance.get('excluded_labels')
Expand Down Expand Up @@ -280,3 +284,24 @@ def _update_pods_metrics(self, instance, pods):
_tags = tags[:] # copy base tags
_tags.append('kube_replication_controller:{0}'.format(ctrl))
self.publish_gauge(self, NAMESPACE + '.pods.running', pod_count, _tags)

def _process_events(self, instance, pods_list):
"""
Retrieve a list of events from kubelet that is relevant for the host the
agent is running on.
"""
pods = self.kubeutil.filter_pods_list(pods_list, self.kubeutil.host)
pod_uids = self.kubeutil.extract_uids(pods)

k8s_namespace = instance.get('namespace', 'default')
events_endpoint = '{}/namespaces/{}/events'.format(self.kubeutil.kubernetes_api_url, k8s_namespace)
self.log.debug('Kubernetes API endpoint to query events: %s' % events_endpoint)

events = self.kubeutil.retrieve_json_auth(events_endpoint, self.kubeutil.get_auth_token())
event_items = events.get('items') or []
self.log.debug('Found {} events, filtering out unwanted ones...'.format(len(event_items)))

for event in event_items:
involved_obj = event.get('involvedObject')
if involved_obj and involved_obj.get('uid') in pod_uids:
self.log.error('Should post this event:', event.get('reason'), event.get('message'))
70 changes: 65 additions & 5 deletions utils/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from utils.http import retrieve_json
from utils.singleton import Singleton

import requests

log = logging.getLogger('collector')

KUBERNETES_CHECK_NAME = 'kubernetes'
Expand All @@ -34,6 +36,9 @@ class KubeUtil():
DEFAULT_CADVISOR_PORT = 4194
DEFAULT_KUBELET_PORT = 10255
DEFAULT_MASTER_PORT = 8080
DEFAULT_MASTER_NAME = 'kubernetes' # DNS name to reach the master from a pod.
CA_CRT_PATH = '/run/secrets/kubernetes.io/serviceaccount/ca.crt'
AUTH_TOKEN_PATH = '/run/secrets/kubernetes.io/serviceaccount/token'

POD_NAME_LABEL = "io.kubernetes.pod.name"
NAMESPACE_LABEL = "io.kubernetes.pod.namespace"
Expand All @@ -58,12 +63,13 @@ def __init__(self):
self.cadvisor_port = instance.get('port', KubeUtil.DEFAULT_CADVISOR_PORT)
self.kubelet_port = instance.get('kubelet_port', KubeUtil.DEFAULT_KUBELET_PORT)

self.metrics_url = urljoin(
'%s://%s:%d' % (self.method, self.host, self.cadvisor_port), KubeUtil.METRICS_PATH)
self.pods_list_url = urljoin(
'%s://%s:%d' % (self.method, self.host, self.kubelet_port), KubeUtil.PODS_LIST_PATH)
self.kubelet_api_url = '%s://%s:%d' % (self.method, self.host, self.kubelet_port)
self.cadvisor_url = '%s://%s:%d' % (self.method, self.host, self.cadvisor_port)
self.kubernetes_api_url = 'https://%s/api/v1' % self.DEFAULT_MASTER_NAME

self.kube_health_url = '%s://%s:%d/healthz' % (self.method, self.host, self.kubelet_port)
self.metrics_url = urljoin(self.cadvisor_url, KubeUtil.METRICS_PATH)
self.pods_list_url = urljoin(self.kubelet_api_url, KubeUtil.PODS_LIST_PATH)
self.kube_health_url = urljoin(self.kubelet_api_url, 'healthz')

def get_kube_labels(self, excluded_keys=None):
pods = retrieve_json(self.pods_list_url)
Expand Down Expand Up @@ -93,9 +99,50 @@ def extract_kube_labels(self, pods_list, excluded_keys=None):

return kube_labels

def extract_uids(self, pods_list):
"""
Exctract uids from a list of pods coming from the kubelet API.
"""
uids = []
pods = pods_list.get("items") or []
for p in pods:
uid = p.get('metadata', {}).get('uid')
if uid is not None:
uids.append(uid)
return uids

def retrieve_pods_list(self):
return retrieve_json(self.pods_list_url)

def filter_pods_list(self, pods_list, host_ip):
"""
Filter out (in place) pods that are not running on the given host.
"""
filtered_pods = []
pod_items = pods_list.get('items') or []
log.debug('Found {} pods to filter'.format(pod_items))
for pod in pod_items:
status = pod.get('status', {})
if status.get('hostIP') == host_ip:
filtered_pods.append(pod)
pods_list['items'] = filtered_pods
return pods_list

def retrieve_json_auth(self, url, auth_token, timeout=10):
"""
Kubernetes API requires authentication using a token available in
every pod.
We try to verify ssl certificate if available.
"""
verify = self.CA_CRT_PATH if os.path.exists(self.CA_CRT_PATH) else False
log.debug('ssl validation: {}'.format(verify))
headers = {'Authorization': 'Bearer {}'.format(auth_token)}
log.debug('HTTP headers: {}'.format(headers))
r = requests.get(url, timeout=timeout, headers=headers, verify=verify)
r.raise_for_status()
return r.json()

@classmethod
def _get_default_router(cls):
try:
Expand All @@ -108,3 +155,16 @@ def _get_default_router(cls):
log.error('Unable to open /proc/net/route: %s', e)

return None

@classmethod
def get_auth_token(cls):
"""
Return a string containing the token read from file
"""
try:
with open(cls.AUTH_TOKEN_PATH) as f:
return f.read()
except IOError as e:
log.error('Unable to read token from {}: {}'.format(cls.AUTH_TOKEN_PATH, e))

return None

0 comments on commit b468257

Please sign in to comment.