From fae95d28551e7c4062d984488b07190734e1d061 Mon Sep 17 00:00:00 2001 From: Michal Clapinski Date: Thu, 2 Aug 2018 17:20:01 +0200 Subject: [PATCH] Implement proper status reporting in nodemgr by events on Windows Reporting by events was missing so I had to add some new code to windows_process_manager.py. Also I wanted to reuse some code from docker_process_manager.py so I moved it to common_process_manager.py Change-Id: If191e8cfd6ca18fd8b4899c7bcaa259d6950d628 Partial-Bug: #1783539 --- src/nodemgr/SConscript | 1 + src/nodemgr/common/common_process_manager.py | 38 ++++++++++++++++ src/nodemgr/common/docker_process_manager.py | 45 +++---------------- src/nodemgr/common/event_manager.py | 2 +- src/nodemgr/common/windows_process_manager.py | 25 ++++++++--- 5 files changed, 65 insertions(+), 46 deletions(-) create mode 100644 src/nodemgr/common/common_process_manager.py diff --git a/src/nodemgr/SConscript b/src/nodemgr/SConscript index 53199da7bfb..90960183e2f 100644 --- a/src/nodemgr/SConscript +++ b/src/nodemgr/SConscript @@ -82,6 +82,7 @@ for file in config_database_sources: common_sources = [ '__init__.py', 'common/cassandra_manager.py', + 'common/common_process_manager.py', 'common/docker_mem_cpu.py', 'common/docker_process_manager.py', 'common/event_manager.py', diff --git a/src/nodemgr/common/common_process_manager.py b/src/nodemgr/common/common_process_manager.py new file mode 100644 index 00000000000..382a224f0ca --- /dev/null +++ b/src/nodemgr/common/common_process_manager.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2018 Juniper Networks, Inc. All rights reserved. +# + +def dummy_process_info(name): + info = dict() + info['name'] = name + info['group'] = name + info['pid'] = 0 + info['statename'] = 'PROCESS_STATE_EXITED' + info['expected'] = -1 + return info + + +def convert_to_pi_event(info): + pi_event = info.copy() + pi_event['state'] = pi_event.pop('statename') + if 'start' in pi_event: + del pi_event['start'] + return pi_event + +class ProcessInfoCache(object): + def __init__(self): + self._cached_process_infos = {} + + def update_cache(self, info): + name = info['name'] + cached_info = self._cached_process_infos.get(name) + if cached_info is None: + self._cached_process_infos[name] = info + return True + if info['name'] != cached_info['name'] or \ + info['group'] != cached_info['group'] or \ + info['pid'] != cached_info['pid'] or \ + info['statename'] != cached_info['statename']: + self._cached_process_infos[name] = info + return True + return False diff --git a/src/nodemgr/common/docker_process_manager.py b/src/nodemgr/common/docker_process_manager.py index 07cc26abf70..e79a8e40dba 100644 --- a/src/nodemgr/common/docker_process_manager.py +++ b/src/nodemgr/common/docker_process_manager.py @@ -8,6 +8,7 @@ from docker_mem_cpu import DockerMemCpuUsageData from sandesh_common.vns.ttypes import Module +import common_process_manager as cpm # code calculates name from labels and then translate it to unit_name @@ -63,24 +64,6 @@ def _convert_to_process_state(state): return state_mapping.get(state, 'PROCESS_STATE_UNKNOWN') -def _dummy_process_info(name): - info = dict() - info['name'] = name - info['group'] = name - info['pid'] = 0 - info['statename'] = 'PROCESS_STATE_EXITED' - info['expected'] = -1 - return info - - -def _convert_to_pi_event(info): - pi_event = info.copy() - pi_event['state'] = pi_event.pop('statename') - if 'start' in pi_event: - del pi_event['start'] - return pi_event - - class DockerProcessInfoManager(object): def __init__(self, module_type, unit_names, event_handlers, update_process_list): @@ -88,7 +71,7 @@ def __init__(self, module_type, unit_names, event_handlers, self._unit_names = unit_names self._event_handlers = event_handlers self._update_process_list = update_process_list - self._cached_process_infos = {} + self._process_info_cache = cpm.ProcessInfoCache() self._client = docker.from_env() def _get_full_info(self, cid): @@ -155,20 +138,6 @@ def _list_containers(self, names): containers[name] = container return containers - def _update_cache(self, info): - name = info['name'] - cached_info = self._cached_process_infos.get(name) - if cached_info is None: - self._cached_process_infos[name] = info - return True - if info['name'] != cached_info['name'] or \ - info['group'] != cached_info['group'] or \ - info['pid'] != cached_info['pid'] or \ - info['statename'] != cached_info['statename']: - self._cached_process_infos[name] = info - return True - return False - def _get_start_time(self, info): state = info.get('State') start_time = state.get('StartedAt') if state else None @@ -199,10 +168,10 @@ def _poll_containers(self): containers = self._list_containers(self._unit_names) for name in self._unit_names: container = containers.get(name) - info = (_dummy_process_info(name) if container is None else + info = (cpm.dummy_process_info(name) if container is None else self._container_to_process_info(container)) - if self._update_cache(info): - self._event_handlers['PROCESS_STATE'](_convert_to_pi_event(info)) + if self._process_info_cache.update_cache(info): + self._event_handlers['PROCESS_STATE'](cpm.convert_to_pi_event(info)) if self._update_process_list: self._event_handlers['PROCESS_LIST_UPDATE']() @@ -211,10 +180,10 @@ def get_all_processes(self): containers = self._list_containers(self._unit_names) for name in self._unit_names: container = containers.get(name) - info = (_dummy_process_info(name) if container is None else + info = (cpm.dummy_process_info(name) if container is None else self._container_to_process_info(container)) processes_info_list.append(info) - self._update_cache(info) + self._process_info_cache.update_cache(info) return processes_info_list def runforever(self): diff --git a/src/nodemgr/common/event_manager.py b/src/nodemgr/common/event_manager.py index 02f1ef810c1..05eee0c08ca 100644 --- a/src/nodemgr/common/event_manager.py +++ b/src/nodemgr/common/event_manager.py @@ -121,7 +121,7 @@ def __init__(self, config, type_info, sandesh_instance, self.logger = self.sandesh_instance.logger() if platform.system() == 'Windows': - self.process_info_manager = WindowsProcessInfoManager() + self.process_info_manager = WindowsProcessInfoManager(event_handlers) elif DockerProcessInfoManager and (utils.is_running_in_docker() or utils.is_running_in_kubepod()): self.process_info_manager = DockerProcessInfoManager( diff --git a/src/nodemgr/common/windows_process_manager.py b/src/nodemgr/common/windows_process_manager.py index 6227b9e640d..11fd5a3e673 100644 --- a/src/nodemgr/common/windows_process_manager.py +++ b/src/nodemgr/common/windows_process_manager.py @@ -5,6 +5,7 @@ import psutil import time +import common_process_manager as cpm from windows_process_mem_cpu import WindowsProcessMemCpuUsageData @@ -31,25 +32,35 @@ def _get_process_by_pid(pid): return process class WindowsProcessInfoManager(object): + def __init__(self, event_handlers): + self._event_handlers = event_handlers + self._process_info_cache = cpm.ProcessInfoCache() + def get_mem_cpu_usage_data(self, pid, last_cpu, last_time): return WindowsProcessMemCpuUsageData(pid, last_cpu, last_time) - def get_all_processes(self): + def _poll_processes(self): agent_service = _get_service_by_name('ContrailAgent') + info = cpm.dummy_process_info('contrail-vrouter-agent') if agent_service != None: - info = {} - info['name'] = 'contrail-vrouter-agent' - info['group'] = info['name'] info['statename'] = _service_status_to_state(agent_service.status()) if info['statename'] == 'PROCESS_STATE_RUNNING': info['pid'] = agent_service.pid() agent_process = _get_process_by_pid(info['pid']) if agent_process != None: info['start'] = str(int(agent_process.create_time() * 1000000)) - return [info] - else: - return [] + return [info] + + def get_all_processes(self): + processes_infos = self._poll_processes() + for info in processes_infos: + self._process_info_cache.update_cache(info) + return processes_infos def runforever(self): while True: + processes_infos = self._poll_processes() + for info in processes_infos: + if self._process_info_cache.update_cache(info): + self._event_handlers['PROCESS_STATE'](cpm.convert_to_pi_event(info)) time.sleep(5)