diff --git a/contrail-opserver/alarmgen.py b/contrail-opserver/alarmgen.py index 4b3e3afb..e7f6c796 100644 --- a/contrail-opserver/alarmgen.py +++ b/contrail-opserver/alarmgen.py @@ -66,6 +66,7 @@ import redis from collections import namedtuple from strict_redis_wrapper import StrictRedisWrapper +from kafka import common, KafkaProducer OutputRow = namedtuple("OutputRow",["key","typ","val"]) @@ -1362,6 +1363,41 @@ def run_uve_agg(self, lredis, outp, part, acq_time): rows) rows[:] = [] + def run_kafka_liveness_check(self): + """ + This function runs in its own gevent, and does health check + on kafka brokers + """ + gevent.sleep(300) + try: + # acks = 1 (default) wait for leader to write record to + # local log + producer = KafkaProducer( + bootstrap_servers=self._conf.kafka_broker_list()) + self._logger.info("Initialized health-check KafkaProducer") + except Exception as ex: + producer = None + self._logger.error('Failed to init health-check \ + KafkaProducer:%s' % (ex)) + raise SystemExit(1) + finally: + self._logger.info("health-check KafkaProducer call returned") + while True: + if producer is not None: + gevent.sleep(120) + future = producer.send("HEALTH_CHECK_TOPIC", b'live..') + self._logger.info("AlarmGen health-check msg sent to \ + KafkaBrokers") + try: + #Blocks for ack as per 'acks' condition and times out + producer.flush(timeout=2) + self._logger.info("AlarmGen can reach KafkaBrokers") + except Exception as ex: + self._logger.error('AlarmGen CANNOT reach \ + KafkaBrokers! error:%s' % (ex)) + raise SystemExit(1) + # run_kafka_liveness_check + def run_uve_processing(self): """ This function runs in its own gevent, and provides state compression @@ -2409,6 +2445,7 @@ def run(self): self.gevs = [ gevent.spawn(self._config_handler.start), gevent.spawn(self.run_process_stats), gevent.spawn(self.run_uve_processing), + gevent.spawn(self.run_kafka_liveness_check), gevent.spawn(self._us.run)] if self._ad is not None: self._ad.start()