Skip to content

Commit

Permalink
Incorporated comments
Browse files Browse the repository at this point in the history
Signed-off-by: anmolbabu <anmolbudugutta@gmail.com>
  • Loading branch information
anmolbabu committed Dec 11, 2016
1 parent df48ba7 commit e6f9d9d
Showing 1 changed file with 41 additions and 48 deletions.
89 changes: 41 additions & 48 deletions tendrl/node_agent/manager/alert_socket.py
@@ -1,10 +1,13 @@
import SocketServer
from multiprocessing import Process
import json
from tendrl.node_agent.config import TendrlConfig
import logging
from tendrl.node_agent.manager.rpc import EtcdRPC
from tendrl.common.config import ConfigNotFound
import logging
import json
import gevent.event
import gevent.greenlet
from gevent.server import StreamServer
from _socket import error as _socket_error
from io import BlockingIOError

config = TendrlConfig()
LOG = logging.getLogger(__name__)
Expand All @@ -14,75 +17,65 @@
class AlertUtils():
def validate_alert(self, alert_data):
alert = json.loads(alert_data)
if 'Host' not in alert:
raise KeyError('Host')
if 'Plugin' not in alert:
raise KeyError('Plugin')
if 'Severity' not in alert:
raise KeyError('Severity')
if 'Source' not in alert:
raise KeyError('Source')
if 'time_stamp' not in alert:
raise KeyError('time_stamp')
if 'resource' not in alert:
raise KeyError('resource')
if 'severity' not in alert:
raise KeyError('severity')
if 'source' not in alert:
raise KeyError('source')
if 'current_value' not in alert:
raise KeyError('current_value')
if 'host' not in alert:
raise KeyError('host')
if 'type' not in alert:
raise KeyError('type')
return alert

def persist_alert(self, alert):
key = '/alerts/%s/%s' % (alert['Host'], alert['Plugin'])
if 'PluginInstance' in alert:
key = "%s/%s" % (key, alert['PluginInstance'])
key = '/alerts/%s/%s' % (alert['host'], alert['resource'])
if 'tags' in alert and 'plugin_instance' in alert['tags']:
key = "%s/%s" % (key, alert['tags']['plugin_instance'])
EtcdRPC().client.write(key, alert)


class AlertSocketHandler(SocketServer.BaseRequestHandler):
def handle(self):
class AlertsManager(gevent.greenlet.Greenlet):

def read_socket(self, sock, address):
try:
data = self.request.recv(RECEIVE_DATA_SIZE)
data = sock.recv(RECEIVE_DATA_SIZE)
alert_utils = AlertUtils()
alert = alert_utils.validate_alert(data)
alert_utils.persist_alert(alert)
except (KeyError, TypeError, ValueError) as ex:
LOG.error('Failed to handle data %s on alert socket.Error %s' % (
data, ex)
)
finally:
self.request.close()
LOG.error('Failed to handle data on alert socket.Error %s' % ex)


class AlertsServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
daemon_threads = True
allow_reuse_address = True

def __init__(self, server_address, RequestHandlerClass):
SocketServer.TCPServer.__init__(
self, server_address, RequestHandlerClass)


class AlertsManager(Process):
def __init__(self):
Process.__init__(self)
super(AlertsManager, self).__init__()
try:
hostname = config.get(
self.hostname = config.get(
"node_agent",
"collectd_socket_addr"
"tendrl_alerts_socket_addr"
)
port = config.get(
self.port = config.get(
"node_agent",
"collectd_socket_port"
"tendrl_alerts_socket_port"
)
self.server = StreamServer(
(self.hostname, int(self.port)),
self.read_socket
)
except ConfigNotFound as ex:
LOG.error('Failed to fetch alerting socket configurations.\
Error %s' % ex)
self.server = AlertsServer(
(hostname, int(port)),
AlertSocketHandler
)

def run(self):
def _run(self):
try:
self.server.serve_forever()
except Exception as ex:
except (TypeError, BlockingIOError, _socket_error, ValueError) as ex:
LOG.error('Error trying to serve the alerting socket forever.\
Error %s' % ex)

def stop(self):
self.server.server_close()
self.terminate()

self.server.close()

0 comments on commit e6f9d9d

Please sign in to comment.