Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-Node: Producer never get notified when a partition moved to new broker on auto rebalance. #175

Closed
lsampathreddy opened this issue Mar 17, 2015 · 18 comments · Fixed by #176

Comments

@lsampathreddy
Copy link

Scenario,
I have two brokers (broker 1 & broker 2) with auto.leader.rebalance.enable=true.
Create TEST topic with replica: 2, partitions 8.
Use a producer to push message on TEST topic.

when i stopped a broker1, broker2 became leader for all the partitions. few mins later broker1 comes online. after leader.imbalance.check.interval.seconds, all partitions are distribute among two brokers but my producer never gets notified to refresh the metadata.
there after some msgs are getting failing because of payload was sending to broker w/ invalid or non-exist partition.

Is it right behavior of Producer? or am i doing wrong?

@kadishmal
Copy link
Contributor

Which producer are you using: low level or high level?

The low level producer, I think, doesn't do this detection job. Not sure, thought. But the high level producer may handle this scenario.

The desirable behavior of the high level producer, in my opinion, should be as follows:

  1. The producer sends messages to a non-existing partition which might have moved to a different broker.
  2. The producer receives an error from the server.
  3. If the error says that the partition is not found, then the producer should try to refresh its metadata.
  4. Then try sending again to the same partition found in another broker.
  5. If the partition is not found after refreshing the metadata, then should escalate the error to the client.

@haio what do you say?

@lsampathreddy
Copy link
Author

Thanks for the response @kadishmal ...

i tried with both Producers (Low & High Level). unfortunately i couldn't see any error in the send callback. however kafka broker started getting below error after auto rebalance.

2015-03-16 10:22:48,370] WARN [KafkaApi-1] Produce request with correlation id 14878 from client slam.local on partition [TESTING,4] failed due to Leader not local for partition [TESTING,4] on broker 1 (kafka.server.KafkaApis)

[2015-03-16 10:22:52,394] WARN [KafkaApi-1] Produce request with correlation id 14882 from client slam.local on partition [TESTING,6] failed due to Leader not local for partition [TESTING,6] on broker 1 (kafka.server.KafkaApis)

[2015-03-16 10:22:55,416] WARN [KafkaApi-1] Produce request with correlation id 14885 from client slam.local on partition [TESTING,6] failed due to Leader not local for partition [TESTING,6] on broker 1 (kafka.server.KafkaApis)

[2015-03-16 10:22:57,436] WARN [KafkaApi-1] Produce request with correlation id 14887 from client slam.local on partition [TESTING,4] failed due to Leader not local for partition [TESTING,4] on broker 1 (kafka.server.KafkaApis)

I could see event notification when i initiate manual preferred replica election for rebalance from CLI. this scenario Kafka-Node/lib/Producer works fine. but auto rebalance needs to be fixed i guess.

please share your thoughts on this…

@haio
Copy link
Member

haio commented Mar 17, 2015

@lsampathreddy the Producer(Low and High Level) will not refresh metadata when broker change so far, I think this feature can be added in next version.
@kadishmal the current implementation for refreshing metadata is watching the broker metadata in zookeeper, if it changes then emit a brokersChanged event, then refresh the metadata.
In your advices above, I don't know why should do this: The producer sends messages to a non-existing partition which might have moved to a different broker.

@kadishmal
Copy link
Contributor

@haio a response to your last question:

The producer sends messages to a non-existing partition which might have moved to a different broker.

  1. Imagine a producer continuously sending messages.
  2. At some point rebalancing occurs.
  3. However, producer keeps sending before the client notices the brokerChanged event. In such case, the producer will send messages which will fail.

I've checked your PR, the fix you've committed looks very simple. I doubt it fixes this issue. @lsampathreddy would you please fetch the latest version of kafka-node and see if it fixes your problem?

@estliberitas
Copy link
Contributor

As far as I remember, re-balancing can't be detected via watches set by Zookeeper#listBrokers(), so brokersChanged is never emitter when re-balancing occurs.

Instead, after brokers re-balance topics between each other, ZooKeeper nodes located in /brokers/topics path are changed. I tried to solve this issue in my fork's branch (but still have got issues): watch-topics-zookeeper

@lsampathreddy
Copy link
Author

@estliberitas , @kadishmal & @haio Thanks for your responses.

@kadishmal
#176 works perfectly when broker added or deleted. zookeeper emit brokerChanged to its watchers. but my problem still not resolved completely...

Somehow i debug the Kafka scala code, KafkaController never notify the changes to zookeeper when auto-rebalance is enabled, i feel condition not required at KafkaController.scala . if Kafka itself notifies the preferred election changes to Zookeeper via Path(/admin/preferred_replica_election or /admin/reassign_partitions) then Kafka-Node will work perfectly in auto rebalance.

i reported an issue at Kafka Jira ISSUE. waiting for their response.

@estliberitas if i do manual re-balance with below command Kafka-Node was able to identify the changes.

bin/kafka-preferred-replica-election.sh

@estliberitas
Copy link
Contributor

@lsampathreddy great! Maybe my 0.8.0 setup was different. We had an issue making simple test: stopping 1 broker, then starting it and waiting for re-balancing.

@haio haio reopened this Mar 18, 2015
@haio
Copy link
Member

haio commented Mar 18, 2015

The client watch /brokers/ids in zookeeper to check whether the brokers change, this works when broker die, relive or add new broker, but it can't handle the case when re-balancing occurs but the data in /brokers/ids didn't change. @estliberitas mentioned another way: watch /brokers/topics, I'm not sure it can work, since the client support multiple topics and a topic may has multiple partitions, this way can be more complicate.

@lsampathreddy In what case you still have this issue?

@lsampathreddy
Copy link
Author

Hi @haio,
Thanks for your response
My issue here is client should watch leadership imbalance on brokers.

This imbalance could happen when a broker dies and comes back. Kafka 0.8.1 feature auto rebalance periodically check for any partition imbalance on brokers and tries to elect and balance the partition among the brokers.
now this change was not determined/detected by Kafka-Node client watchers. This is tricky. i am looking for appropriate solution.

@haio, @estliberitas & @kadishmal please share your thoughts on this...

@kadishmal
Copy link
Contributor

I believe it should detect the rebalanced partitions. At least the high producer and consumer should handle this.

@haio
Copy link
Member

haio commented Mar 18, 2015

hi @lsampathreddy, how can we reproduce the imbalance on brokers?

@lsampathreddy
Copy link
Author

@haio
here is the steps in detail for reproducing partition imbalance… (using Kafka-Node@0.2.23)

  • start zookeeper (should run on 2181 port)
  • start 2 kafka servers with individual server.properties. make sure these properties file have below properties
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=100
  • create topic ‘TESTING’ using shell command. replica:2 & partition: 8
  • send message to TESTING topic from test file.
var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client('127.0.0.1:2181', 'clientId'),
    producer = new Producer(client);


var buildPayload = function (msg) {
        var payload = [];
        payload.push({topic: 'TESTING', messages: msg+" ", partition: Math.floor(Math.random() * 8)});
        return payload;
};

producer.on('ready', function () {
  console.log('producer is ready...');
});

var counter = 0;
setInterval(function(){
  counter ++;
  var payloads = buildPayload(counter);
  console.log(payloads);
  producer.send(payloads, function (err, data) {
    //if(err) console.log('error'+ err);
  });
}, 1000);

producer.on('error', function (err) {
  console.log('err at produer'+err);

})
  • start kafka-console-consumer.sh to see the msgs sent from producer
  • check the partition information for TEST topic by running script
slam:kafka_2.9.2-0.8.1.1 s.lambu$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TESTING

Topic:TESTING   PartitionCount:8    ReplicationFactor:2 Configs:

Topic: TESTING  Partition: 0    Leader: 0   Replicas: 0,1   Isr: 0,1
Topic: TESTING  Partition: 1    Leader: 1   Replicas: 1,0   Isr: 0,1
Topic: TESTING  Partition: 2    Leader: 0   Replicas: 0,1   Isr: 0,1
Topic: TESTING  Partition: 3    Leader: 1   Replicas: 1,0   Isr: 0,1
Topic: TESTING  Partition: 4    Leader: 0   Replicas: 0,1   Isr: 0,1
Topic: TESTING  Partition: 5    Leader: 1   Replicas: 1,0   Isr: 0,1
Topic: TESTING  Partition: 6    Leader: 0   Replicas: 0,1   Isr: 0,1
Topic: TESTING  Partition: 7    Leader: 1   Replicas: 1,0   Isr: 0,1
  • stop broker 1 by Ctrl + C. now Kafka will elect new leader for all partition. now broker 0 becomes leader for all partitions. we can verify it by below command.
    also verify producer, it should refresh the metadata on 'brokersChanged'.
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TESTING
Topic:TESTING   PartitionCount:8    ReplicationFactor:2 Configs:

Topic: TESTING  Partition: 0    Leader: 0   Replicas: 0,1   Isr: 0
Topic: TESTING  Partition: 1    Leader: 0   Replicas: 1,0   Isr: 0
Topic: TESTING  Partition: 2    Leader: 0   Replicas: 0,1   Isr: 0
Topic: TESTING  Partition: 3    Leader: 0   Replicas: 1,0   Isr: 0
Topic: TESTING  Partition: 4    Leader: 0   Replicas: 0,1   Isr: 0
Topic: TESTING  Partition: 5    Leader: 0   Replicas: 1,0   Isr: 0
Topic: TESTING  Partition: 6    Leader: 0   Replicas: 0,1   Isr: 0
Topic: TESTING  Partition: 7    Leader: 0   Replicas: 1,0   Isr: 0
  • start the broker 1 again. now Kafka will expand the ISR to 2 nodes but leader is broker 0. we can verify it with same above command.
    here we can see leader imbalance, two brokers available but still single broker (0) is leader to all the partitions.
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TESTING
Topic:TESTING   PartitionCount:8    ReplicationFactor:2 Configs:
    Topic: TESTING  Partition: 0    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: TESTING  Partition: 1    Leader: 0   Replicas: 1,0   Isr: 0,1
    Topic: TESTING  Partition: 2    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: TESTING  Partition: 3    Leader: 0   Replicas: 1,0   Isr: 0,1
    Topic: TESTING  Partition: 4    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: TESTING  Partition: 5    Leader: 0   Replicas: 1,0   Isr: 0,1
    Topic: TESTING  Partition: 6    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: TESTING  Partition: 7    Leader: 0   Replicas: 1,0   Isr: 0,1
  • after leader.imbalance.check.interval.seconds each broker runs it own process to check whether any imbalance exist. attaching the controller.log one of the kafka broker.
    here we can see how partition 5 moved from 0 to 1.

[2015-03-17 15:11:36,419] checking need to trigger partition rebalance (kafka.controller.KafkaController)
[2015-03-17 15:11:36,419] DEBUG [Controller 0]: preferred replicas by broker Map(1 -> Map([TESTING,5] -> List(1, 0), [TESTING,3] -> List(1, 0), [TESTING,1] -> List(1, 0), [TESTING,7] -> List(1, 0)), 0 -> Map([TESTING,4] -> List(0, 1), [TESTING,6] -> List(0, 1), [TESTING,2] -> List(0, 1), [TESTING,0] -> List(0, 1))) (kafka.controller.KafkaController)
[2015-03-17 15:11:36,419] DEBUG [Controller 0]: topics not in preferred replica Map([TESTING,5] -> List(1, 0), [TESTING,3] -> List(1, 0), [TESTING,1] -> List(1, 0), [TESTING,7] -> List(1, 0)) (kafka.controller.KafkaController)
>>> [2015-03-17 15:11:36,420] TRACE [Controller 0]: leader imbalance ratio for broker 1 is 1.000000 (kafka.controller.KafkaController)
[2015-03-17 15:11:36,420] DEBUG [Controller 0]: controllerContext.liveBrokerIds Set(1, 0) (kafka.controller.KafkaController)
[2015-03-17 15:11:36,420] DEBUG [Controller 0]: leaderBroker 1 (kafka.controller.KafkaController)
[2015-03-17 15:11:36,420] INFO [Controller 0]: Starting preferred replica leader election for partitions [TESTING,5] (kafka.controller.KafkaController)
[2015-03-17 15:11:36,421] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [TESTING,5] (kafka.controller.PartitionStateMachine)
>>> [2015-03-17 15:11:36,426] INFO [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [TESTING,5] is not the preferred replica. Trigerring preferred replica leader election (kafka.controller.PreferredReplicaPartitionLeaderSelector)
>>> [2015-03-17 15:11:36,428] DEBUG [Partition state machine on Controller 0]: After leader election, leader cache is updated to Map([TESTING,5] -> (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:2), [TESTING,4] -> (Leader:0,ISR:1,0,LeaderEpoch:2,ControllerEpoch:1), [TESTING,6] -> (Leader:0,ISR:1,0,LeaderEpoch:2,ControllerEpoch:1), [TESTING,3] -> (Leader:0,ISR:0,LeaderEpoch:2,ControllerEpoch:2), [TESTING,2] -> (Leader:0,ISR:1,0,LeaderEpoch:2,ControllerEpoch:1), [TESTING,0] -> (Leader:0,ISR:1,0,LeaderEpoch:2,ControllerEpoch:1), [TESTING,1] -> (Leader:0,ISR:0,LeaderEpoch:2,ControllerEpoch:2), [TESTING,7] -> (Leader:0,ISR:0,LeaderEpoch:2,ControllerEpoch:2)) (kafka.controller.PartitionStateMachine)
>>> [2015-03-17 15:11:36,430] INFO [Controller 0]: Partition [TESTING,5] completed preferred replica leader election. New leader is 1 (kafka.controller.KafkaController)
  • verify our test producer, it should still sending msg to TESTING topic with out any error. if any msg sending to partition 5 that msg never read by consumer. because our metadataHash having old values, partition5 will maps to broker 0 not broker 1. since zookeeper never send 'borkerChanged' or 'anyOther' event to refresh the metadata in this scenario.

hence msg lost… :(

please update me if I’m doing wrong…

@haio
Copy link
Member

haio commented Mar 18, 2015

Thanks, will check this.

@estliberitas
Copy link
Contributor

@haio @lsampathreddy what I offered seems a bit tricky to implement because /brokers/topics has 2-level tree of nodes inside: /brokers/topics/TOPIC_ID/PARTITION_NUM. In case of many topics and many partitions kafka-node can produce a watch-hell for ZK. But maybe there is no need in watching partition states, topic nodes. I did not make enough experiments.

haio added a commit to haio/kafka-node that referenced this issue Mar 19, 2015
This should fix issue SOHU-Co#175. When the client get `NotLeaderForPartition`
error, that means the leader for the partition has changed, so it emit a
`brokersChanged` event, consumer and producer listen this event and
refresh topic metadata.
@haio
Copy link
Member

haio commented Mar 19, 2015

I have made a fix for this. When the client get NotLeaderForPartitionerror, that means the leader for the partition has changed, so it emit a brokersChanged event(just for consistency), consumer and producer listen this event and refresh topic metadata.
@lsampathreddy can you test it with this commit 704ae54

@lsampathreddy
Copy link
Author

@haio Thank you for your response.

I tested with your changes.. its working perfectly now. 👍
as soon as preferred-election complete in Kafka, next msg gets fail because of partition moved. new code emits brokersChanged event when it receive 'NotLeaderForPartition' error, so client can refresh the metadata. Its perfect...

please let me know which version you are going include these changes.

@haio
Copy link
Member

haio commented Mar 20, 2015

@lsampathreddy check kafka-node@0.2.24

@lsampathreddy
Copy link
Author

Thanks you @haio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants