-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Environment:
- single instance Zookeeper (version: 3.4.6)
- a Kafka cluster with four broker on one machine, running in supervisor (version: 0.8.2.0)
- Java version "1.8.0_45"
- topic info:
Topic:timeline PartitionCount:20 ReplicationFactor:2 Configs:
Topic: timeline Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 3 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
<and more...>
I tried the latest code on branch master, to test cluster failover. It seems that SimpleConsumer exited due to an AttributeError when a broker was shutdown. Below is The stack:
[E 150604 11:24:47 /Users/leon/zhihu/kafka-python/kafka/conn.py:107 leon-MacBook-Pro:22207] Unable to receive data from Kafka
Traceback (most recent call last):
File "/Users/leon/zhihu/kafka-python/kafka/conn.py", line 104, in _read_bytes
raise socket.error("Not enough data to read message -- did server kill socket?")
error: Not enough data to read message -- did server kill socket?
[W 150604 11:24:47 /Users/leon/zhihu/kafka-python/kafka/client.py:204 leon-MacBook-Pro:22207] Could not receive response to request [00000084000100000000004a000c6b61666b612d707974686f6effffffff0000ea600000000100000001000874696d656c696e650000000500000012000000000000000900001000000000020000000000000028000010000000000e0000000000000009000010000000000a00000000000000070000100000000006000000000000000900001000] from server <KafkaConnection host=kids.aws.dev port=9094>: Kafka @ kids.aws.dev:9094 went away
Traceback (most recent call last):
File "zhihu_logger/consumer.py", line 23, in <module>
main()
File "zhihu_logger/consumer.py", line 19, in main
for m in consumer:
File "/Users/leon/zhihu/kafka-python/kafka/consumer/simple.py", line 311, in __iter__
message = self.get_message(True, timeout)
File "/Users/leon/zhihu/kafka-python/kafka/consumer/simple.py", line 270, in get_message
return self._get_message(block, timeout, get_partition_info)
File "/Users/leon/zhihu/kafka-python/kafka/consumer/simple.py", line 283, in _get_message
self._fetch()
File "/Users/leon/zhihu/kafka-python/kafka/consumer/simple.py", line 344, in _fetch
check_error(resp)
File "/Users/leon/zhihu/kafka-python/kafka/common.py", line 218, in check_error
if response.error:
AttributeError: 'FailedPayloadsError' object has no attribute 'error'
When fetch messages from Kafka cluster, we call the client.send_fetch_request
method with parameter fail_on_error=False
, so in responses there may exists FailedPayloadsError
, and function check_error
actually cannot deal with a real Exception. The logic then touch the response's attribute error
which is not exist.
I tried to fix problem above (I can make a pull request about this), and later recovered the down broker. Kafka cluster waited the recovered broker to catch up. When this procedure was done, the leader changed back to the recovered broker. This caused a NotLeaderForPartitionError in consumer. So the Consumer exited.
Topic:timeline PartitionCount:20 ReplicationFactor:2 Configs:
Topic: timeline Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 3 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 5 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 6 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 7 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 9 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 10 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 11 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 13 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 14 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 15 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 16 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 17 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 18 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 19 Leader: 3 Replicas: 3,0 Isr: 3,0
the topic config changed from the above to the below.
Topic:timeline PartitionCount:20 ReplicationFactor:2 Configs:
Topic: timeline Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 3 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 6 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 7 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 9 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 10 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 11 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 13 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 14 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 15 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: timeline Partition: 16 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: timeline Partition: 17 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: timeline Partition: 18 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: timeline Partition: 19 Leader: 3 Replicas: 3,0 Isr: 3,0
Below is the stack:
Traceback (most recent call last):
File "zhihu_logger/consumer.py", line 23, in <module>
main()
File "zhihu_logger/consumer.py", line 19, in main
for m in consumer:
File "/Users/leon/opensource/kafka-python/kafka/consumer/simple.py", line 311, in __iter__
message = self.get_message(True, timeout)
File "/Users/leon/opensource/kafka-python/kafka/consumer/simple.py", line 270, in get_message
return self._get_message(block, timeout, get_partition_info)
File "/Users/leon/opensource/kafka-python/kafka/consumer/simple.py", line 283, in _get_message
self._fetch()
File "/Users/leon/opensource/kafka-python/kafka/consumer/simple.py", line 346, in _fetch
check_error(resp)
File "/Users/leon/opensource/kafka-python/kafka/common.py", line 220, in check_error
raise error_class(response)
kafka.common.NotLeaderForPartitionError: FetchResponse(topic='timeline', partition=5, error=6, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x10116ab40>)
The two problem exist in both producer and consumer, since the logic is mainly in Kafka client.
Looking forward to responses~ THX~ 😄