Skip to content

Commit

Permalink
Improve service discovery to only reload checks that need it
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaj committed Jul 28, 2016
1 parent c76ab39 commit 9d3d27b
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 42 deletions.
88 changes: 75 additions & 13 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import signal
import sys
import time
from copy import copy

# For pickle & PID files, see issue 293
os.umask(022)
Expand All @@ -31,6 +32,7 @@
get_parsed_args,
get_system_stats,
load_check_directory,
load_check
)
from daemon import AgentSupervisor, Daemon
from emitter import http_emitter
Expand Down Expand Up @@ -76,6 +78,7 @@ def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=Fal
self._checksd = []
self.collector_profile_interval = DEFAULT_COLLECTOR_PROFILE_INTERVAL
self.check_frequency = None
# this flag can be set to True, False, or a list of checks (for partial reload)
self.reload_configs_flag = False
self.sd_backend = None

Expand All @@ -98,26 +101,82 @@ def _handle_sighup(self, signum, frame):
log.info("SIGHUP caught! Scheduling configuration reload before next collection run.")
self.reload_configs_flag = True

def reload_configs(self):
"""Reloads the agent configuration and checksd configurations."""
def reload_configs(self, checks_to_reload=set()):
"""Reload the agent configuration and checksd configurations.
Can also reload only an explicit set of checks."""
log.info("Attempting a configuration reload...")
hostname = get_hostname(self._agentConfig)

# Stop checks
for check in self._checksd.get('initialized_checks', []):
check.stop()
# if no check was given, reload them all
if not checks_to_reload:
log.debug("No check list was passed, reloading every check")
# stop checks
for check in self._checksd.get('initialized_checks', []):
check.stop()

# Reload checksd configs
hostname = get_hostname(self._agentConfig)
self._checksd = load_check_directory(self._agentConfig, hostname)
self._checksd = load_check_directory(self._agentConfig, hostname)
else:
new_checksd = copy(self._checksd)

self.refresh_specific_checks(hostname, new_checksd, checks_to_reload)
# once the reload is done, replace existing checks with the new ones
self._checksd = new_checksd

# Logging
num_checks = len(self._checksd['initialized_checks'])
if num_checks > 0:
log.info("Successfully reloaded {num_checks} checks".
format(num_checks=num_checks))
opt_msg = " (refreshed %s checks)" % len(checks_to_reload) if checks_to_reload else ''

msg = "Check reload was successful. Running {num_checks} checks{opt_msg}.".format(
num_checks=num_checks, opt_msg=opt_msg)
log.info(msg)
else:
log.info("No checksd configs found")

def refresh_specific_checks(self, hostname, checksd, checks):
"""take a list of checks and for each of them:
- remove it from the init_failed_checks if it was there
- load a fresh config for it
- replace its old config with the new one in initialized_checks if there was one
- disable the check if no new config was found
- otherwise, append it to initialized_checks
"""
for check_name in checks:
idx = None
for num, check in enumerate(checksd['initialized_checks']):
if check.name == check_name:
idx = num
# stop the existing check before reloading it
check.stop()

if not idx and check_name in checksd['init_failed_checks']:
# if the check previously failed to load, pop it from init_failed_checks
checksd['init_failed_checks'].pop(check_name)

fresh_check = load_check(self._agentConfig, hostname, check_name)

# this is an error dict
# checks that failed to load are added to init_failed_checks
if isinstance(fresh_check, dict) and 'error' in fresh_check.keys():
checksd['init_failed_checks'][fresh_check.keys()[0]] = fresh_check.values()[0]

elif not fresh_check:
# no instance left of it to monitor so the check was not loaded
if idx:
checksd['initialized_checks'].pop(idx)
# the check was not previously running so we were trying to instantiate it and it failed
else:
log.error("Configuration for check %s was not found, it won't be reloaded." % check_name)

# successfully reloaded check are added to initialized_checks
# (appended or replacing a previous version)
else:
if idx is not None:
checksd['initialized_checks'][idx] = fresh_check
# it didn't exist before and doesn't need to be replaced so we append it
else:
checksd['initialized_checks'].append(fresh_check)

@classmethod
def info(cls, verbose=None):
logging.getLogger().setLevel(logging.ERROR)
Expand Down Expand Up @@ -189,13 +248,16 @@ def run(self, config=None):
log.warn("Cannot enable profiler: %s" % str(e))

if self.reload_configs_flag:
self.reload_configs()
if isinstance(self.reload_configs_flag, set):
self.reload_configs(checks_to_reload=self.reload_configs_flag)
else:
self.reload_configs()

# Do the work. Pass `configs_reloaded` to let the collector know if it needs to
# look for the AgentMetrics check and pop it out.
self.collector.run(checksd=self._checksd,
start_event=self.start_event,
configs_reloaded=self.reload_configs_flag)
configs_reloaded=True if self.reload_configs_flag else False)

self.reload_configs_flag = False

Expand All @@ -215,7 +277,7 @@ def run(self, config=None):
# using ConfigStore.crawl_config_template
if self._agentConfig.get('service_discovery') and self.sd_backend and \
self.sd_backend.reload_check_configs:
self.reload_configs_flag = True
self.reload_configs_flag = self.sd_backend.reload_check_configs
self.sd_backend.reload_check_configs = False

if profiled:
Expand Down
19 changes: 12 additions & 7 deletions checks.d/docker_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ def __init__(self, name, init_config, agentConfig, instances=None):
agentConfig, instances=instances)

self.init_success = False
self.init()
self._service_discovery = agentConfig.get('service_discovery') and \
agentConfig.get('service_discovery_backend') == 'docker'
self.init()
self._custom_cgroups = _is_affirmative(init_config.get('custom_cgroups', False))

def is_k8s(self):
Expand All @@ -154,12 +154,17 @@ def init(self):
try:
instance = self.instances[0]

# We configure the check with the right cgroup settings for this host
# Just needs to be done once
self.docker_util = DockerUtil()
# if service discovery is enabled dockerutil will need a reference
# to the config store. We need to pass it agentConfig for that
if self._service_discovery:
self.docker_util = DockerUtil(agentConfig=self.agentConfig)
else:
self.docker_util = DockerUtil()
self.docker_client = self.docker_util.client
if self.is_k8s():
self.kubeutil = KubeUtil()
# We configure the check with the right cgroup settings for this host
# Just needs to be done once
self._mountpoints = self.docker_util.get_mountpoints(CGROUP_METRICS)
self.cgroup_listing_retries = 0
self._latest_size_query = 0
Expand Down Expand Up @@ -602,9 +607,9 @@ def _process_events(self, containers_by_id):

def _get_events(self):
"""Get the list of events."""
events, should_reload_conf = self.docker_util.get_events()
if should_reload_conf and self._service_discovery:
get_sd_backend(self.agentConfig).reload_check_configs = True
events, conf_reload_set = self.docker_util.get_events()
if conf_reload_set and self._service_discovery:
get_sd_backend(self.agentConfig).reload_check_configs = conf_reload_set
return events

def _pre_aggregate_events(self, api_events, containers_by_id):
Expand Down
30 changes: 30 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,36 @@ def load_check_directory(agentConfig, hostname):
'init_failed_checks': init_failed_checks,
}


def load_check(agentConfig, hostname, checkname):
"""Same logic as load_check_directory except it loads one specific check"""
agentConfig['checksd_hostname'] = hostname
osname = get_os()
checks_places = get_checks_places(osname, agentConfig)
for config_path in _file_configs_paths(osname, agentConfig):
check_name = _conf_path_to_check_name(config_path)
if check_name == checkname:
conf_is_valid, check_config, invalid_check = _load_file_config(config_path, check_name, agentConfig)

if invalid_check and not conf_is_valid:
return invalid_check

# try to load the check and return the result
load_success, load_failure = load_check_from_places(check_config, check_name, checks_places, agentConfig)
return load_success.values()[0] or load_failure

# the check was not found, try with service discovery
for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems():
if check_name == checkname:
sd_init_config, sd_instances = service_disco_check_config
check_config = {'init_config': sd_init_config, 'instances': sd_instances}

# try to load the check and return the result
load_success, load_failure = load_check_from_places(check_config, check_name, checks_places, agentConfig)
return load_success.values()[0] or load_failure

return None

#
# logging

Expand Down
7 changes: 5 additions & 2 deletions tests/core/test_service_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ def _get_check_tpls(image_name, **kwargs):
return None


def client_read(path):
def client_read(path, **kwargs):
"""Return a mocked string that would normally be read from a config store (etcd, consul...)."""
parts = path.split('/')
config_parts = ['check_names', 'init_configs', 'instances']
image, config_part = parts[-2], parts[-1]
return TestServiceDiscovery.mock_tpls.get(image)[0][config_parts.index(config_part)]
if 'all' in kwargs:
return {}
else:
return TestServiceDiscovery.mock_tpls.get(image)[0][config_parts.index(config_part)]


def issue_read(identifier):
Expand Down
4 changes: 2 additions & 2 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
# if a user actually uses them in a custom check
# If you're this user, please use utils.pidfile or utils.platform instead
# FIXME: remove them at a point (6.x)
from utils.dockerutil import DockerUtil
from utils.pidfile import PidFile # noqa, see ^^^
from utils.platform import Platform
from utils.proxy import get_proxy
Expand Down Expand Up @@ -182,6 +181,7 @@ def get_hostname(config=None):
* 'hostname -f' (on unix)
* socket.gethostname()
"""
from utils.dockerutil import DockerUtil
hostname = None

# first, try the config
Expand All @@ -200,7 +200,7 @@ def get_hostname(config=None):
return gce_hostname

# Try to get the docker hostname
docker_util = DockerUtil()
docker_util = DockerUtil(agentConfig=config)
if hostname is None and docker_util.is_dockerized():
docker_hostname = docker_util.get_hostname()
if docker_hostname is not None and is_valid_hostname(docker_hostname):
Expand Down
24 changes: 19 additions & 5 deletions utils/dockerutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

# project
from utils.singleton import Singleton
from utils.service_discovery.config_stores import get_config_store


class MountException(Exception):
pass


class CGroupException(Exception):
pass

Expand Down Expand Up @@ -49,6 +51,12 @@ def __init__(self, **kwargs):
# At first run we'll just collect the events from the latest 60 secs
self._latest_event_collection_ts = int(time.time()) - 60

# if agentConfig is passed it means service discovery is enabled and we need to get_config_store
if 'agentConfig' in kwargs:
self.config_store = get_config_store(kwargs['agentConfig'])
else:
self.config_store = None

# Try to detect if we are on ECS
self._is_ecs = False
try:
Expand Down Expand Up @@ -93,16 +101,19 @@ def is_ecs(self):

def get_events(self):
self.events = []
should_reload_conf = False
conf_reload_set = set()
now = int(time.time())

event_generator = self.client.events(since=self._latest_event_collection_ts,
until=now, decode=True)
self._latest_event_collection_ts = now
for event in event_generator:
try:
if event.get('status') in CONFIG_RELOAD_STATUS:
should_reload_conf = True
if self.config_store and event.get('status') in CONFIG_RELOAD_STATUS:
image = event.get('from', '')
checks = self._get_checks_from_image(image)
if checks:
conf_reload_set.update(set(checks))
self.events.append(event)
except AttributeError:
# due to [0] it might happen that the returned `event` is not a dict as expected but a string,
Expand All @@ -111,7 +122,11 @@ def get_events(self):
# [0]: https://github.com/docker/docker-py/pull/1082
log.debug('Unable to parse Docker event: %s', event)

return self.events, should_reload_conf
return self.events, conf_reload_set

def _get_checks_from_image(self, image):
"""Get the list of checks applied to an image from the image_to_checks cache in the config store"""
return self.config_store.image_to_checks[image]

def get_hostname(self):
"""Return the `Name` param from `docker info` to use as the hostname"""
Expand Down Expand Up @@ -222,7 +237,6 @@ def find_cgroup_from_proc(cls, mountpoints, pid, subsys, docker_root='/'):
if os.path.exists(stat_file_path):
return os.path.join(stat_file_path, '%(file)s')


raise MountException("Cannot find Docker cgroup directory. Be sure your system is supported.")

@classmethod
Expand Down
5 changes: 1 addition & 4 deletions utils/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
# stdlib
import sys

# project
from utils.dockerutil import DockerUtil

_is_ecs = None

class Platform(object):
"""
Expand Down Expand Up @@ -72,4 +68,5 @@ def python_architecture():

@staticmethod
def is_ecs_instance():
from utils.dockerutil import DockerUtil
return DockerUtil().is_ecs()
Loading

0 comments on commit 9d3d27b

Please sign in to comment.