Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
After 4.0, alarm is not getting generated when kafka container is
down, because discovery doesn't detect the event in the container
model of deployment.

This fix dectects kafka down event and raises SysExit exception
in alrmgen. Upon detecting kafka health check producer failure
the said exception is raised.

Change-Id: If3922d8dc550d292578731c97d7ce9e5ae3a1631
Closes-Bug: 1761424
  • Loading branch information
Jackjvs committed Apr 17, 2018
1 parent c81da39 commit 2a19b76
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions contrail-opserver/alarmgen.py
Expand Up @@ -66,6 +66,7 @@
import redis
from collections import namedtuple
from strict_redis_wrapper import StrictRedisWrapper
from kafka import common, KafkaProducer

OutputRow = namedtuple("OutputRow",["key","typ","val"])

Expand Down Expand Up @@ -1362,6 +1363,41 @@ def run_uve_agg(self, lredis, outp, part, acq_time):
rows)
rows[:] = []

def run_kafka_liveness_check(self):
"""
This function runs in its own gevent, and does health check
on kafka brokers
"""
gevent.sleep(300)
try:
# acks = 1 (default) wait for leader to write record to
# local log
producer = KafkaProducer(
bootstrap_servers=self._conf.kafka_broker_list())
self._logger.info("Initialized health-check KafkaProducer")
except Exception as ex:
producer = None
self._logger.error('Failed to init health-check \
KafkaProducer:%s' % (ex))
raise SystemExit(1)
finally:
self._logger.info("health-check KafkaProducer call returned")
while True:
if producer is not None:
gevent.sleep(120)
future = producer.send("HEALTH_CHECK_TOPIC", b'live..')
self._logger.info("AlarmGen health-check msg sent to \
KafkaBrokers")
try:
#Blocks for ack as per 'acks' condition and times out
producer.flush(timeout=2)
self._logger.info("AlarmGen can reach KafkaBrokers")
except Exception as ex:
self._logger.error('AlarmGen CANNOT reach \
KafkaBrokers! error:%s' % (ex))
raise SystemExit(1)
# run_kafka_liveness_check

def run_uve_processing(self):
"""
This function runs in its own gevent, and provides state compression
Expand Down Expand Up @@ -2409,6 +2445,7 @@ def run(self):
self.gevs = [ gevent.spawn(self._config_handler.start),
gevent.spawn(self.run_process_stats),
gevent.spawn(self.run_uve_processing),
gevent.spawn(self.run_kafka_liveness_check),
gevent.spawn(self._us.run)]
if self._ad is not None:
self._ad.start()
Expand Down

0 comments on commit 2a19b76

Please sign in to comment.