Skip to content

Commit

Permalink
Merge pull request #1028 from lgtml/update_kafka_api
Browse files Browse the repository at this point in the history
Updated KafkaClient API - version 0.9.0
  • Loading branch information
remh committed Jul 14, 2014
2 parents 76d81a1 + ec67a61 commit 1b885a3
Showing 1 changed file with 2 additions and 16 deletions.
18 changes: 2 additions & 16 deletions checks.d/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
zk_connect_str = self.read_config(instance, 'zk_connect_str')
kafka_host_ports = self.read_config(instance, 'kafka_connect_str',
cast=self._parse_connect_str)
kafka_host_ports = self.read_config(instance, 'kafka_connect_str')

# Construct the Zookeeper path pattern
zk_prefix = instance.get('zk_prefix', '')
Expand Down Expand Up @@ -66,8 +65,7 @@ def check(self, instance):
self.log.exception('Error cleaning up Zookeeper connection')

# Connect to Kafka
kafka_host, kafka_port = random.choice(kafka_host_ports)
kafka_conn = KafkaClient(kafka_host, kafka_port)
kafka_conn = KafkaClient(kafka_host_ports)

try:
# Query Kafka for the broker offsets
Expand Down Expand Up @@ -123,15 +121,3 @@ def _validate_consumer_groups(self, val):
mytopic0: [0, 1, 2]
mytopic1: [10, 12]
''')

def _parse_connect_str(self, val):
try:
host_port_strs = val.split(',')
host_ports = []
for hp in host_port_strs:
host, port = hp.strip().split(':')
host_ports.append((host, int(port)))
return host_ports
except Exception, e:
self.log.exception(e)
raise Exception('Could not parse %s. Must be in the form of `host0:port0,host1:port1,host2:port2`' % val)

0 comments on commit 1b885a3

Please sign in to comment.