Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Raise LeaderNotAvailable Error when get topic #338

Closed
xqliang opened this issue Nov 10, 2015 · 7 comments
Closed

Raise LeaderNotAvailable Error when get topic #338

xqliang opened this issue Nov 10, 2015 · 7 comments
Assignees
Labels

Comments

@xqliang
Copy link

xqliang commented Nov 10, 2015

Traceback (most recent call last):
  File "/Users/allen/.virtualenvs/roommgr/lib/python2.7/site-packages/eventhub/eventhub.py", line 61, in get_consumer
    tp = self._kafkaclient.topics[topic]
  File "/Users/allen/.virtualenvs/roommgr/lib/python2.7/site-packages/pykafka/cluster.py", line 57, in __getitem__
    topic = Topic(self._cluster(), meta.topics[key])
  File "/Users/allen/.virtualenvs/roommgr/lib/python2.7/site-packages/pykafka/topic.py", line 52, in __init__
    self.update(topic_metadata)
  File "/Users/allen/.virtualenvs/roommgr/lib/python2.7/site-packages/pykafka/topic.py", line 150, in update
    raise LeaderNotAvailable()
pykafka.exceptions.LeaderNotAvailable
[1]    70054 segmentation fault  python roomonline.py

I have 3 brokers, and they are running without ERROR logs. The above error messages are logged directly to STDERR,and then the process quits for segmentation fault.

@jasonrhaas
Copy link

@xqliang What is the command you are running? Check out your Kafka topic status by running the built in kafka tool bin/kafka-topics.sh --describe. My guess is that the topic you are looking at has lost its leader because there is some trouble with one of your brokers.

@emmettbutler
Copy link
Contributor

@xqliang I'm not sure which command you're running, but this error is expected if you start a consumer or producer when there are not enough brokers alive in the cluster to satisfy the replication factor for the topic you're using.

@xqliang
Copy link
Author

xqliang commented Nov 11, 2015

@jasonrhaas Here is the describing result:

$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.229.156:2181 --topic dispatch_1025_33
Topic:dispatch_1025_33  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: dispatch_1025_33 Partition: 0    Leader: 2       Replicas: 2     Isr: 2

The console consumer is ok:

$ ./bin/kafka-console-consumer.sh --zookeeper 192.168.229.156:2181 --topic dispatch_1025_33 --from-beginning
!?C 
!?C 
^CConsumed 2 messages

The metadata fetched by pykafka is:

TopicMetadata(name='dispatch_1025_33', partitions={0: PartitionMetadata(id=0, leader=-1, replicas=[2], isr=[], err=5)}, err=0)

@xqliang
Copy link
Author

xqliang commented Nov 11, 2015

The zookeeper info:

[zk: 192.168.229.156:2181(CONNECTED) 24] ls /brokers/ids 
[2, 1, 0]

[zk: 192.168.229.156:2181(CONNECTED) 22] get /controller
{"version":1,"brokerid":1,"timestamp":"1447038499256"}
cZxid = 0x990004e1d6
ctime = Mon Nov 09 11:08:19 CST 2015
mZxid = 0x990004e1d6
mtime = Mon Nov 09 11:08:19 CST 2015
pZxid = 0x990004e1d6
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3504a56735e05b5
dataLength = 54
numChildren = 0

[zk: 192.168.229.156:2181(CONNECTED) 40] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1447038342442","host":"192.168.229.153","version":1,"port":19092}
cZxid = 0x990004e16d
ctime = Mon Nov 09 11:08:16 CST 2015
mZxid = 0x990004e16d
mtime = Mon Nov 09 11:08:16 CST 2015
pZxid = 0x990004e16d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2504a5672e5058f
dataLength = 93
numChildren = 0

[zk: 192.168.229.156:2181(CONNECTED) 38] get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1447038502528","host":"192.168.229.153","version":1,"port":19093}
cZxid = 0x990004e24a
ctime = Mon Nov 09 11:08:22 CST 2015
mZxid = 0x990004e24a
mtime = Mon Nov 09 11:08:22 CST 2015
pZxid = 0x990004e24a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3504a56735e05b5
dataLength = 93
numChildren = 0

[zk: 192.168.229.156:2181(CONNECTED) 39] get /brokers/ids/2
{"jmx_port":-1,"timestamp":"1447097560166","host":"192.168.229.153","version":1,"port":19094}
cZxid = 0x9a00000003
ctime = Tue Nov 10 03:32:43 CST 2015
mZxid = 0x9a00000003
mtime = Tue Nov 10 03:32:43 CST 2015
pZxid = 0x9a00000003
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x250edbc78a70000
dataLength = 93
numChildren = 0

The logs from kafka-server[0,1].stdout:

==> logs/kafka-server1.stdout <==
[2015-11-11 13:05:33,928] INFO conflict in /brokers/ids/1 data: {"jmx_port":-1,"timestamp":"1447038502597","host":"192.168.229.153","version":1,"port":19093} stored data: {"jmx_port":-1,"timestamp":"1447038502528","host":"192.168.22
9.153","version":1,"port":19093} (kafka.utils.ZkUtils$)
[2015-11-11 13:05:33,931] INFO I wrote this conflicted ephemeral node [{"jmx_port":-1,"timestamp":"1447038502597","host":"192.168.229.153","version":1,"port":19093}] at /brokers/ids/1 a while back in a different session, hence I wil
l backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
[2015-11-11 13:05:37,268] INFO Partition [__consumer_offsets,18] on broker 1: Shrinking ISR for partition [__consumer_offsets,18] from 1,2,0 to 1,0 (kafka.cluster.Partition)
[2015-11-11 13:05:37,301] INFO Partition [__consumer_offsets,18] on broker 1: Cached zkVersion [31] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-11-11 13:05:37,301] INFO Partition [__consumer_offsets,30] on broker 1: Shrinking ISR for partition [__consumer_offsets,30] from 1,2,0 to 1,0 (kafka.cluster.Partition)
......(repeat)

==> logs/kafka-server0.stdout <==
[2015-11-11 13:05:33,951] INFO conflict in /brokers/ids/0 data: {"jmx_port":-1,"timestamp":"1447038496429","host":"192.168.229.153","version":1,"port":19092} stored data: {"jmx_port":-1,"timestamp":"1447038342442","host":"192.168.22
9.153","version":1,"port":19092} (kafka.utils.ZkUtils$)
[2015-11-11 13:05:33,953] INFO Partition [__consumer_offsets,5] on broker 0: Cached zkVersion [18] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-11-11 13:05:33,953] INFO Partition [__consumer_offsets,37] on broker 0: Expanding ISR for partition [__consumer_offsets,37] from 0 to 0,1 (kafka.cluster.Partition)
[2015-11-11 13:05:33,954] INFO I wrote this conflicted ephemeral node [{"jmx_port":-1,"timestamp":"1447038496429","host":"192.168.229.153","version":1,"port":19092}] at /brokers/ids/0 a while back in a different session, hence I wil
l backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
[2015-11-11 13:05:33,970] INFO Partition [__consumer_offsets,37] on broker 0: Cached zkVersion [14] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-11-11 13:05:33,970] INFO Partition [__consumer_offsets,11] on broker 0: Expanding ISR for partition [__consumer_offsets,11] from 0 to 0,1 (kafka.cluster.Partition)
......(repeat)

It looks like metadata is out of sync among the brokers, it may be a kafka bug fixed in 0.9.

I try to init consumer only with broker 2(192.168.229.153:19094), and the LeaderNotAvailable error is gone. That is to say, to work around this, pykafka can retry all given brokers to get the right metadata (rather than fetch from only one broker and raise LeaderNotAvailable) when brokers are out of sync.

After i restart all brokers and clear kafka data, all goes well.

@yungchin
Copy link
Contributor

Thanks for the thorough job gathering information @xqliang - I think your analysis is spot-on. If we'd retry our metadata fetch when hitting LeaderNotAvailable that would make a lot of sense, and we should then probably pick a random broker to query in Cluster._get_metadata().

@emmettbutler
Copy link
Contributor

I've opened PR #358 to make topic creation a bit more resilient against this type of failure. @xqliang if you'd like to test the branch and give feedback, please feel free.

yungchin added a commit that referenced this issue Nov 21, 2015
…utures-3

* parsely/master:
  fix held_offsets in balancedconsumer
  fixup sourcecode directives
  add Kafka 0.9 roadmap to ReadTheDocs
  balancedconsumer test pep8
  pass zk connect string through to balanced consumer
  tests: fixes after changes to SimpleConsumer.held_offsets
  remove unused import
  test for zookeeper connect strings in Cluster()
  have Cluster accept zookeeper connect strings as well as broker lists
  return -2 if last_offset_consumed is -1. fixes #216
  wrap another _socket call in a disconnect handler
  balancedconsumer: re-raise ConsumerStoppedException
  Set disconnected on socket errors from util.
  retry topic creation on a random broker. fixes #338
@xqliang
Copy link
Author

xqliang commented Nov 29, 2015

Sorry @emmett9001 , I'm kind of busy these days, and forgot this. After I totally restart all Kafka brokers, I can not reproduce this error.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants