Skip to content

Commit

Permalink
[jmxfetch][win32] ProcessWatchDog to monitor processes
Browse files Browse the repository at this point in the history
  • Loading branch information
yannmh committed Apr 20, 2015
1 parent d1f7573 commit 97c4363
Showing 1 changed file with 54 additions and 46 deletions.
100 changes: 54 additions & 46 deletions win32/agent.py
Expand Up @@ -34,7 +34,8 @@
log = logging.getLogger(__name__)

SERVICE_SLEEP_INTERVAL = 1
MAX_FAILED_HEARTBEATS = 8 # runs of collector
MAX_FAILED_HEARTBEATS = 8 # runs of collector


class AgentSvc(win32serviceutil.ServiceFramework):
_svc_name_ = "DatadogAgent"
Expand Down Expand Up @@ -70,11 +71,11 @@ def __init__(self, args):
# Keep a list of running processes so we can start/end as needed.
# Processes will start started in order and stopped in reverse order.
self.procs = {
'forwarder': DDForwarder(config, self.hostname),
'collector': DDAgent(agentConfig, self.hostname,
heartbeat=self._collector_send_heartbeat),
'dogstatsd': DogstatsdProcess(config, self.hostname),
'jmxfetch': JMXFetchProcess(config, self.hostname),
'forwarder': ProcessWatchDog("forwarder", DDForwarder(config, self.hostname)),
'collector': ProcessWatchDog("collector", DDAgent(agentConfig, self.hostname,
heartbeat=self._collector_send_heartbeat)),
'dogstatsd': ProcessWatchDog("dogstatsd", DogstatsdProcess(config, self.hostname)),
'jmxfetch': ProcessWatchDog("jmxfetch", JMXFetchProcess(config, self.hostname), 3),
}

def SvcStop(self):
Expand Down Expand Up @@ -103,9 +104,9 @@ def SvcDoRun(self):
while self.running:
# Restart any processes that might have died.
for name, proc in self.procs.iteritems():
if not proc.is_alive() and proc.is_enabled:
servicemanager.LogInfoMsg("%s has died. Restarting..." % proc.name)
self._restart_proc(name)
if not proc.is_alive() and proc.is_enabled():
servicemanager.LogInfoMsg("%s has died. Restarting..." % name)
proc.restart()

self._check_collector_blocked()

Expand All @@ -121,35 +122,49 @@ def _check_collector_blocked(self):
if self._collector_failed_heartbeats > self._max_failed_heartbeats:
servicemanager.LogInfoMsg(
"%s was unresponsive for too long. Restarting..." % 'collector')
self._restart_proc('collector')
self.procs['collector'].restart()
self._collector_failed_heartbeats = 0

def _restart_proc(self, proc_name):
# Make a new proc instances because multiprocessing
# won't let you call .start() twice on the same instance.
old_proc = self.procs[proc_name]

# Watch JMXFetch restarts
if proc_name == 'jmxfetch':
self._count_jmxfetch_restarts += 1
if self._count_jmxfetch_restarts >= self._MAX_JMXFETCH_RESTARTS:
servicemanager.LogInfoMsg(
"JMXFetch reached the limit of restarts. Not restarting...")
old_proc.is_enabled = False
return
class ProcessWatchDog(object):
"""
Monitor the attached process.
Restarts when it exits until the limit set is reached.
"""
def __init__(self, name, process, max_restarts="inf"):
self._name = name
self._process = process
self._count_restarts = 0
self._MAX_RESTARTS = max_restarts

if proc_name == 'collector':
new_proc = old_proc.__class__(
old_proc.config, self.hostname, heartbeat=self._collector_send_heartbeat)
else:
new_proc = old_proc.__class__(old_proc.config, self.hostname)
def start(self):
return self._process.start()

def terminate(self):
return self._process.terminate()

def is_alive(self):
return self._process.is_alive()

if old_proc.is_alive():
old_proc.terminate()
del self.procs[proc_name]
def is_enabled(self):
return self._process.is_enabled

def restart(self):
self._count_restarts += 1
if self._count_restarts >= self._MAX_RESTARTS:
servicemanager.LogInfoMsg(
"%s reached the limit of restarts. Not restarting..." % self._name)
self._process.is_enabled = False
return

# Make a new proc instances because multiprocessing
# won't let you call .start() twice on the same instance.
if self._process.is_alive():
self._process.terminate()

self._process = self._process.__class__(self._process.config, self._process.hostname)
self._process.start()

new_proc.start()
self.procs[proc_name] = new_proc

class DDAgent(multiprocessing.Process):
def __init__(self, agentConfig, hostname, heartbeat=None):
Expand Down Expand Up @@ -256,29 +271,22 @@ def __init__(self, agentConfig, hostname):
self.config = agentConfig
self.hostname = hostname

osname = get_os()
try:
osname = get_os()
confd_path = get_confd_path(osname)
except PathNotFound, e:
log.error("No conf.d folder found at '%s' or in the directory where"
"the Agent is currently deployed.\n" % e.args[0])
self.jmx_daemon = JMXFetch(confd_path, agentConfig)
self.jmx_daemon.configure()
self.is_enabled = self.jmx_daemon.should_run()

self.jmx_daemon = JMXFetch(confd_path, agentConfig)
self.jmx_daemon.configure()
self.is_enabled = self.jmx_daemon.should_run()
except PathNotFound:
self.is_enabled = False

def run(self):
from config import initialize_logging; initialize_logging('windows_jmxfetch')
if self.is_enabled:
log.debug("Windows Service - Starting JMXFetch")
self.jmx_daemon.run()
else:
log.info("No JMXFetch integration found, not starting it.")

def stop(self):
if self.is_enabled:
log.debug("Windows Service - Stopping JMXFetch")
self.jmx_daemon.terminate()
pass


if __name__ == '__main__':
Expand Down

0 comments on commit 97c4363

Please sign in to comment.