Skip to content

Commit

Permalink
Common message handlng library for all config services.
Browse files Browse the repository at this point in the history
Closes-Bug: 1572332

Change-Id: Ibbbad70ac29f39dca2791b5c3768f7d58f7d8eba
  • Loading branch information
cijohnson committed Oct 10, 2016
1 parent 0f12c85 commit 24f5cca
Show file tree
Hide file tree
Showing 20 changed files with 691 additions and 732 deletions.
2 changes: 2 additions & 0 deletions src/config/common/SConscript
Expand Up @@ -40,6 +40,8 @@ local_sources = [
'vnc_api_stats.py',
'ssl_adapter.py',
'vnc_greenlets.py',
'vnc_amqp.py',
'vnc_logger.py',
]
local_sources_rules = []
for file in local_sources:
Expand Down
196 changes: 196 additions & 0 deletions src/config/common/vnc_amqp.py
@@ -0,0 +1,196 @@
import socket
import gevent
import cStringIO
from pprint import pformat

from cfgm_common.utils import cgitb_hook
from cfgm_common.exceptions import NoIdError
from cfgm_common.vnc_kombu import VncKombuClient
from cfgm_common.dependency_tracker import DependencyTracker


class VncAmqpHandle(object):

def __init__(self, logger, db_cls, reaction_map, q_name_prefix, args=None):
self.logger = logger
self.db_cls = db_cls
self.reaction_map = reaction_map
self.q_name_prefix = q_name_prefix
self._db_resync_done = gevent.event.Event()
self._args = args

def establish(self):
q_name = '.'.join([self.q_name_prefix, socket.gethostname()])
self._vnc_kombu = VncKombuClient(
self._args.rabbit_server, self._args.rabbit_port,
self._args.rabbit_user, self._args.rabbit_password,
self._args.rabbit_vhost, self._args.rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.logger.log, rabbit_use_ssl=self._args.rabbit_use_ssl,
kombu_ssl_version=self._args.kombu_ssl_version,
kombu_ssl_keyfile=self._args.kombu_ssl_keyfile,
kombu_ssl_certfile=self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs=self._args.kombu_ssl_ca_certs)

def msgbus_store_err_msg(self, msg):
pass

def msgbus_trace_msg(self):
pass

def _vnc_subscribe_callback(self, oper_info):
self._db_resync_done.wait()
try:
self.oper_info = oper_info
self.vnc_subscribe_actions()

except Exception:
string_buf = cStringIO.StringIO()
cgitb_hook(file=string_buf, format="text")
self.logger.error(string_buf.getvalue())

self.msgbus_store_err_msg(string_buf.getvalue())
try:
with open(self._args.trace_file, 'a') as err_file:
err_file.write(string_buf.getvalue())
except IOError:
pass
finally:
try:
self.msgbus_trace_msg()
except Exception:
pass
del self.oper_info
del self.obj_type
del self.obj_class
del self.obj
del self.dependency_tracker

def create_msgbus_trace(self, request_id, oper, uuid):
pass

def vnc_subscribe_actions(self):
msg = "Notification Message: %s" % (pformat(self.oper_info))
self.logger.debug(msg)

self.obj = None
self.dependency_tracker = None
self.obj_type = self.oper_info['type'].replace('-', '_')
self.obj_class = self.db_cls.get_obj_type_map().get(self.obj_type)
if self.obj_class is None:
return

oper = self.oper_info['oper']
obj_id = self.oper_info['uuid']
self.create_msgbus_trace(self.oper_info.get('request_id'),
oper, obj_id)
if oper == 'CREATE':
self.handle_create()
elif oper == 'UPDATE':
self.handle_update()
elif oper == 'DELETE':
self.handle_delete()
else:
self.handle_unknown()
return
if self.obj is None:
self.logger.error('Error while accessing %s uuid %s' % (
self.obj_type, obj_id))
return
self.evaluate_dependency()

def handle_create(self):
obj_dict = self.oper_info['obj_dict']
obj_key = self.db_cls.get_key_from_dict(obj_dict)
obj_id = self.oper_info['uuid']
obj_fq_name = obj_dict['fq_name']
self.db_cls._cassandra.cache_uuid_to_fq_name_add(
obj_id, obj_fq_name, self.obj_type)
self.obj = self.obj_class.locate(obj_key)
if self.obj is None:
self.logger.info('%s id %s fq_name %s not found' % (
self.obj_type, obj_id, obj_fq_name))
return
self.dependency_tracker = DependencyTracker(
self.db_cls.get_obj_type_map(), self.reaction_map)
self.dependency_tracker.evaluate(self.obj_type, self.obj)

def handle_update(self):
obj_id = self.oper_info['uuid']
self.obj = self.obj_class.get_by_uuid(obj_id)
old_dt = None
if self.obj is not None:
old_dt = DependencyTracker(
self.db_cls.get_obj_type_map(), self.reaction_map)
old_dt.evaluate(self.obj_type, self.obj)
else:
self.logger.info('%s id %s not found' % (self.obj_type,
obj_id))
return

try:
self.obj.update()
except NoIdError:
obj_id = self.oper_info['uuid']
self.logger.warning('%s uuid %s update caused NoIdError' %
(self.obj_type, obj_id))
return

self.dependency_tracker = DependencyTracker(
self.db_cls.get_obj_type_map(), self.reaction_map)
self.dependency_tracker.evaluate(self.obj_type, self.obj)
if old_dt:
for resource, ids in old_dt.resources.items():
if resource not in self.dependency_tracker.resources:
self.dependency_tracker.resources[resource] = ids
else:
self.dependency_tracker.resources[resource] = list(
set(self.dependency_tracker.resources[resource]) |
set(ids))

def handle_delete(self):
obj_id = self.oper_info['uuid']
self.obj = self.obj_class.get_by_uuid(obj_id)
self.db_cls._cassandra.cache_uuid_to_fq_name_del(obj_id)
if self.obj is None:
return
self.dependency_tracker = DependencyTracker(
self.db_cls.get_obj_type_map(), self.reaction_map)
self.dependency_tracker.evaluate(self.obj_type, self.obj)
obj_key = self.db_cls.get_key_from_dict(self.oper_info['obj_dict'])
self.obj_class.delete(obj_key)

def handle_unknown(self):
# unknown operation
self.logger.error('Unknown operation %s' % self.oper_info['oper'])

def init_msgbus_fq_name(self):
pass

def init_msgbus_dtr(self):
pass

def add_msgbus_dtr(self, res_type, res_id_list):
pass

def evaluate_dependency(self):
if not self.dependency_tracker:
return

self.init_msgbus_fq_name()
self.init_msgbus_dtr()

for res_type, res_id_list in self.dependency_tracker.resources.items():
if not res_id_list:
continue
self.add_msgbus_dtr(res_type, res_id_list)
cls = self.db_cls.get_obj_type_map().get(res_type)
if cls is None:
continue
for res_id in res_id_list:
res_obj = cls.get(res_id)
if res_obj is not None:
res_obj.evaluate()

def close(self):
self._vnc_kombu.shutdown()
19 changes: 19 additions & 0 deletions src/config/common/vnc_db.py
Expand Up @@ -345,5 +345,24 @@ def get_obj_type_map(cls):
if cls.__module__ == x.obj_type]
return dict((x.obj_type, x) for x in module_base[0].__subclasses__())

@classmethod
def get_key_from_dict(cls, obj_dict):
if cls._indexed_by_name:
obj_key = ':'.join(obj_dict['fq_name'])
else:
obj_key = obj_dict['uuid']
return obj_key

@classmethod
def get_by_uuid(cls, uuid, *args):
name_or_uuid = uuid
if cls._indexed_by_name:
fq_name = cls._cassandra.uuid_to_fq_name(uuid)
name_or_uuid = ':'.join(fq_name)
try:
return cls.get(name_or_uuid)
except NoIdError:
return None

# end class DBBase

139 changes: 139 additions & 0 deletions src/config/common/vnc_logger.py
@@ -0,0 +1,139 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
#

"""
Logger for config services
"""

import datetime
import logging
import socket
import cStringIO

from cfgm_common.utils import cgitb_hook

from pysandesh.sandesh_base import Sandesh, SandeshSystem
from pysandesh.sandesh_logger import SandeshLogger
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel

from sandesh_common.vns.constants import (
ModuleNames, Module2NodeType, NodeTypeNames, INSTANCE_ID_DEFAULT)

from pysandesh.connection_info import ConnectionState
from cfgm_common.uve.nodeinfo.ttypes import NodeStatusUVE, \
NodeStatus


class ConfigServiceLogger(object):

_LOGGER_LEVEL_TO_SANDESH_LEVEL = {
logging.CRITICAL: SandeshLevel.SYS_EMERG,
logging.CRITICAL: SandeshLevel.SYS_ALERT,
logging.CRITICAL: SandeshLevel.SYS_CRIT,
logging.ERROR: SandeshLevel.SYS_ERR,
logging.WARNING: SandeshLevel.SYS_WARN,
logging.WARNING: SandeshLevel.SYS_NOTICE,
logging.INFO: SandeshLevel.SYS_INFO,
logging.DEBUG: SandeshLevel.SYS_DEBUG
}

def __init__(self, discovery, module, module_pkg, args=None):
self.discovery = discovery
self.module_pkg = module_pkg
if not hasattr(self, 'context'):
self.context = module_pkg
self._args = args

node_type = Module2NodeType[module]
self._module_name = ModuleNames[module]
self._node_type_name = NodeTypeNames[node_type]
self.table = "ObjectConfigNode"
self._instance_id = INSTANCE_ID_DEFAULT
self._hostname = socket.gethostname()

# sandesh init
self.sandesh_init()

def _get_sandesh_logger_level(self, sandesh_level):
return self._LOGGER_LEVEL_TO_SANDESH_LEVEL[sandesh_level]

def log(self, log_msg, level=SandeshLevel.SYS_DEBUG, fun=None):
if fun:
log = fun(level=level, og_msg=log_msg, sandesh=self._sandesh)
log.send(sandesh=self._sandesh)
else:
self._sandesh.logger().log(
SandeshLogger.get_py_logger_level(level), log_msg)

def emergency(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_EMERG, fun=log_fun)

def alert(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_ALERT, fun=log_fun)

def critical(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_CRIT, fun=log_fun)

def error(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_ERR, fun=log_fun)

def cgitb_error(self):
string_buf = cStringIO.StringIO()
cgitb_hook(file=string_buf, format="text")
self.error(string_buf.getvalue())

def warning(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_WARN, fun=log_fun)

def notice(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_NOTICE, fun=log_fun)

def info(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_INFO, fun=log_fun)

def debug(self, log_msg, log_fun=None):
self.log(log_msg, level=SandeshLevel.SYS_DEBUG, fun=log_fun)

def _utc_timestamp_usec(self):
epoch = datetime.datetime.utcfromtimestamp(0)
now = datetime.datetime.utcnow()
delta = now - epoch
return (delta.microseconds +
(delta.seconds + delta.days * 24 * 3600) * 10 ** 6)

def redefine_sandesh_handles(self):
""" Redefine sandesh handle requests for various object types. """
pass

def sandesh_init(self):
""" Init sandesh """
self._sandesh = Sandesh()
# Reset the sandesh send rate limit value
if self._args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit(
self._args.sandesh_send_rate_limit)
self.redefine_sandesh_handles()
self._sandesh.init_generator(
self._module_name, self._hostname, self._node_type_name,
self._instance_id, self._args.collectors,
'%s_context' % self.context, int(self._args.http_server_port),
['cfgm_common', '%s.sandesh' % self.module_pkg], self.discovery,
logger_class=self._args.logger_class,
logger_config_file=self._args.logging_conf)

self._sandesh.set_logging_params(
enable_local_log=self._args.log_local,
category=self._args.log_category,
level=self._args.log_level,
file=self._args.log_file,
enable_syslog=self._args.use_syslog,
syslog_facility=self._args.syslog_facility)

# connection state init
ConnectionState.init(
self._sandesh, self._hostname, self._module_name,
self._instance_id,
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus, self.table)
1 change: 1 addition & 0 deletions src/config/device-manager/SConscript
Expand Up @@ -54,6 +54,7 @@ local_sources = [
'device_manager/device_manager.py',
'device_manager/db.py',
'device_manager/dm_utils.py',
'device_manager/dm_amqp.py',
'device_manager/physical_router_config.py',
]

Expand Down
2 changes: 1 addition & 1 deletion src/config/device-manager/device_manager/db.py
Expand Up @@ -1322,7 +1322,7 @@ def __init__(self, manager, zkclient):

super(DMCassandraDB, self).__init__(
cass_server_list, self._args.cluster_id, keyspaces, None,
manager.config_log, credential=cred)
manager.logger.log, credential=cred)

self.pr_vn_ip_map = {}
self.init_pr_map()
Expand Down

0 comments on commit 24f5cca

Please sign in to comment.