From 2a19b7645950b8932de831e8eea5ee17b6068879 Mon Sep 17 00:00:00 2001 From: Jackjvs Date: Sat, 14 Apr 2018 10:05:10 -0700 Subject: [PATCH] After 4.0, alarm is not getting generated when kafka container is down, because discovery doesn't detect the event in the container model of deployment. This fix dectects kafka down event and raises SysExit exception in alrmgen. Upon detecting kafka health check producer failure the said exception is raised. Change-Id: If3922d8dc550d292578731c97d7ce9e5ae3a1631 Closes-Bug: 1761424 --- contrail-opserver/alarmgen.py | 37 +++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/contrail-opserver/alarmgen.py b/contrail-opserver/alarmgen.py index 4b3e3afb..e7f6c796 100644 --- a/contrail-opserver/alarmgen.py +++ b/contrail-opserver/alarmgen.py @@ -66,6 +66,7 @@ import redis from collections import namedtuple from strict_redis_wrapper import StrictRedisWrapper +from kafka import common, KafkaProducer OutputRow = namedtuple("OutputRow",["key","typ","val"]) @@ -1362,6 +1363,41 @@ def run_uve_agg(self, lredis, outp, part, acq_time): rows) rows[:] = [] + def run_kafka_liveness_check(self): + """ + This function runs in its own gevent, and does health check + on kafka brokers + """ + gevent.sleep(300) + try: + # acks = 1 (default) wait for leader to write record to + # local log + producer = KafkaProducer( + bootstrap_servers=self._conf.kafka_broker_list()) + self._logger.info("Initialized health-check KafkaProducer") + except Exception as ex: + producer = None + self._logger.error('Failed to init health-check \ + KafkaProducer:%s' % (ex)) + raise SystemExit(1) + finally: + self._logger.info("health-check KafkaProducer call returned") + while True: + if producer is not None: + gevent.sleep(120) + future = producer.send("HEALTH_CHECK_TOPIC", b'live..') + self._logger.info("AlarmGen health-check msg sent to \ + KafkaBrokers") + try: + #Blocks for ack as per 'acks' condition and times out + producer.flush(timeout=2) + self._logger.info("AlarmGen can reach KafkaBrokers") + except Exception as ex: + self._logger.error('AlarmGen CANNOT reach \ + KafkaBrokers! error:%s' % (ex)) + raise SystemExit(1) + # run_kafka_liveness_check + def run_uve_processing(self): """ This function runs in its own gevent, and provides state compression @@ -2409,6 +2445,7 @@ def run(self): self.gevs = [ gevent.spawn(self._config_handler.start), gevent.spawn(self.run_process_stats), gevent.spawn(self.run_uve_processing), + gevent.spawn(self.run_kafka_liveness_check), gevent.spawn(self._us.run)] if self._ad is not None: self._ad.start()