Skip to content

Commit

Permalink
Implement proper status reporting in nodemgr by events on Windows
Browse files Browse the repository at this point in the history
Reporting by events was missing so I had to add some new code to
windows_process_manager.py. Also I wanted to reuse some code from
docker_process_manager.py so I moved it to common_process_manager.py

Change-Id: If191e8cfd6ca18fd8b4899c7bcaa259d6950d628
Partial-Bug: #1783539
  • Loading branch information
Michal Clapinski committed Aug 3, 2018
1 parent 9d26794 commit fae95d2
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 46 deletions.
1 change: 1 addition & 0 deletions src/nodemgr/SConscript
Expand Up @@ -82,6 +82,7 @@ for file in config_database_sources:
common_sources = [
'__init__.py',
'common/cassandra_manager.py',
'common/common_process_manager.py',
'common/docker_mem_cpu.py',
'common/docker_process_manager.py',
'common/event_manager.py',
Expand Down
38 changes: 38 additions & 0 deletions src/nodemgr/common/common_process_manager.py
@@ -0,0 +1,38 @@
#
# Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
#

def dummy_process_info(name):
info = dict()
info['name'] = name
info['group'] = name
info['pid'] = 0
info['statename'] = 'PROCESS_STATE_EXITED'
info['expected'] = -1
return info


def convert_to_pi_event(info):
pi_event = info.copy()
pi_event['state'] = pi_event.pop('statename')
if 'start' in pi_event:
del pi_event['start']
return pi_event

class ProcessInfoCache(object):
def __init__(self):
self._cached_process_infos = {}

def update_cache(self, info):
name = info['name']
cached_info = self._cached_process_infos.get(name)
if cached_info is None:
self._cached_process_infos[name] = info
return True
if info['name'] != cached_info['name'] or \
info['group'] != cached_info['group'] or \
info['pid'] != cached_info['pid'] or \
info['statename'] != cached_info['statename']:
self._cached_process_infos[name] = info
return True
return False
45 changes: 7 additions & 38 deletions src/nodemgr/common/docker_process_manager.py
Expand Up @@ -8,6 +8,7 @@

from docker_mem_cpu import DockerMemCpuUsageData
from sandesh_common.vns.ttypes import Module
import common_process_manager as cpm


# code calculates name from labels and then translate it to unit_name
Expand Down Expand Up @@ -63,32 +64,14 @@ def _convert_to_process_state(state):
return state_mapping.get(state, 'PROCESS_STATE_UNKNOWN')


def _dummy_process_info(name):
info = dict()
info['name'] = name
info['group'] = name
info['pid'] = 0
info['statename'] = 'PROCESS_STATE_EXITED'
info['expected'] = -1
return info


def _convert_to_pi_event(info):
pi_event = info.copy()
pi_event['state'] = pi_event.pop('statename')
if 'start' in pi_event:
del pi_event['start']
return pi_event


class DockerProcessInfoManager(object):
def __init__(self, module_type, unit_names, event_handlers,
update_process_list):
self._module_type = module_type
self._unit_names = unit_names
self._event_handlers = event_handlers
self._update_process_list = update_process_list
self._cached_process_infos = {}
self._process_info_cache = cpm.ProcessInfoCache()
self._client = docker.from_env()

def _get_full_info(self, cid):
Expand Down Expand Up @@ -155,20 +138,6 @@ def _list_containers(self, names):
containers[name] = container
return containers

def _update_cache(self, info):
name = info['name']
cached_info = self._cached_process_infos.get(name)
if cached_info is None:
self._cached_process_infos[name] = info
return True
if info['name'] != cached_info['name'] or \
info['group'] != cached_info['group'] or \
info['pid'] != cached_info['pid'] or \
info['statename'] != cached_info['statename']:
self._cached_process_infos[name] = info
return True
return False

def _get_start_time(self, info):
state = info.get('State')
start_time = state.get('StartedAt') if state else None
Expand Down Expand Up @@ -199,10 +168,10 @@ def _poll_containers(self):
containers = self._list_containers(self._unit_names)
for name in self._unit_names:
container = containers.get(name)
info = (_dummy_process_info(name) if container is None else
info = (cpm.dummy_process_info(name) if container is None else
self._container_to_process_info(container))
if self._update_cache(info):
self._event_handlers['PROCESS_STATE'](_convert_to_pi_event(info))
if self._process_info_cache.update_cache(info):
self._event_handlers['PROCESS_STATE'](cpm.convert_to_pi_event(info))
if self._update_process_list:
self._event_handlers['PROCESS_LIST_UPDATE']()

Expand All @@ -211,10 +180,10 @@ def get_all_processes(self):
containers = self._list_containers(self._unit_names)
for name in self._unit_names:
container = containers.get(name)
info = (_dummy_process_info(name) if container is None else
info = (cpm.dummy_process_info(name) if container is None else
self._container_to_process_info(container))
processes_info_list.append(info)
self._update_cache(info)
self._process_info_cache.update_cache(info)
return processes_info_list

def runforever(self):
Expand Down
2 changes: 1 addition & 1 deletion src/nodemgr/common/event_manager.py
Expand Up @@ -121,7 +121,7 @@ def __init__(self, config, type_info, sandesh_instance,
self.logger = self.sandesh_instance.logger()

if platform.system() == 'Windows':
self.process_info_manager = WindowsProcessInfoManager()
self.process_info_manager = WindowsProcessInfoManager(event_handlers)
elif DockerProcessInfoManager and (utils.is_running_in_docker()
or utils.is_running_in_kubepod()):
self.process_info_manager = DockerProcessInfoManager(
Expand Down
25 changes: 18 additions & 7 deletions src/nodemgr/common/windows_process_manager.py
Expand Up @@ -5,6 +5,7 @@
import psutil
import time

import common_process_manager as cpm
from windows_process_mem_cpu import WindowsProcessMemCpuUsageData


Expand All @@ -31,25 +32,35 @@ def _get_process_by_pid(pid):
return process

class WindowsProcessInfoManager(object):
def __init__(self, event_handlers):
self._event_handlers = event_handlers
self._process_info_cache = cpm.ProcessInfoCache()

def get_mem_cpu_usage_data(self, pid, last_cpu, last_time):
return WindowsProcessMemCpuUsageData(pid, last_cpu, last_time)

def get_all_processes(self):
def _poll_processes(self):
agent_service = _get_service_by_name('ContrailAgent')
info = cpm.dummy_process_info('contrail-vrouter-agent')
if agent_service != None:
info = {}
info['name'] = 'contrail-vrouter-agent'
info['group'] = info['name']
info['statename'] = _service_status_to_state(agent_service.status())
if info['statename'] == 'PROCESS_STATE_RUNNING':
info['pid'] = agent_service.pid()
agent_process = _get_process_by_pid(info['pid'])
if agent_process != None:
info['start'] = str(int(agent_process.create_time() * 1000000))
return [info]
else:
return []
return [info]

def get_all_processes(self):
processes_infos = self._poll_processes()
for info in processes_infos:
self._process_info_cache.update_cache(info)
return processes_infos

def runforever(self):
while True:
processes_infos = self._poll_processes()
for info in processes_infos:
if self._process_info_cache.update_cache(info):
self._event_handlers['PROCESS_STATE'](cpm.convert_to_pi_event(info))
time.sleep(5)

0 comments on commit fae95d2

Please sign in to comment.