Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for topic of multiple partitions, sometimes not all partitions are consumed by highlevelconsumer #339

Closed
tomwang1013 opened this issue Mar 15, 2016 · 18 comments

Comments

@tomwang1013
Copy link

it happened quite a few times: i use highlevelconsumer to consume a topic of 4 partitions, but only 2 partitions were consumed, like:
image

Sometime reboot the consumer fixed it, sometimes not. why?

@hyperlink
Copy link
Collaborator

Does this issue happen in version 0.2.30 ?

@tomwang1013
Copy link
Author

@hyperlink: yes

@sensoroDj
Copy link

I also encountered this situation。but in version 0.2.29 no this problem

@sensoroDj
Copy link

@tomwang1013 and what tools do you use to see the partitions? thank you

@tomwang1013
Copy link
Author

@tomwang1013
Copy link
Author

@sensoroDj are you sure that 0.2.29 not has this problem? i will revert to 0.2.29

@sensoroDj
Copy link

Sorry . I use the 0.2.29 version! my partitions and consumers are one-for-one ! I will let one consumer consume one partitions eg: 3 patitions to 3 consumers ! but when i use 0.3.x highlevel api will not work i want to be. I recommand you to use low version when you use highleve api. I found it cant work well in 0.3.x

@jbarreto
Copy link

@tomwang1013 can you give the output using kafka tools ?

Something like
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper [ZKHOST:ZKPORT] --topic [TOPIC_NAME]
example:
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper local.zookeeper.net:2181 --topic feedUpdates2

The only scenario I have been able to reproduce your issue is when you are only sending messages to a few partitions and you don'f have one consumer per partition, for example

  • Publishing to partitions 0,1,2,3,4
  • Running 3 consumers

You will get something like:

Group           Topic                          Pid Offset          logSize         Lag             Owner
kafka-node-group feedUpdates2                   0   170841          178521          7680            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   1   160526          167043          6517            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   2   171471          177998          6527            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   3   158815          166503          7688            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   4   159599          166138          6539            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   5   7537            7537            0               kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   6   7537            7537            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   7   7537            7537            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   8   7536            7536            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   9   7534            7534            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   10  7534            7534            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   11  7533            7533            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   12  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   13  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   14  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   15  7529            7529            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d

The distribution is basically (partitions / consumers) + one consumer gets an additional for the fraction part

So you get:

  • 1st consumer gets 6 partitions
  • 2nd consumer gets 5 partitions
  • 3rd consumer gets 5 partitions

So ONLY one consumer is getting messages since there are only messages on the first 5 partitions

@tomwang1013
Copy link
Author

@jbarreto in my case, there was only one consumer in this group, so it should consume all the partitions. But *_sometimes *_it did not as i show above. The correct scenario is:
image

@tomwang1013
Copy link
Author

@sensoroDj sorry, i was a little confused, you said: "but when i use 0.3.x highlevel api will not work i want to be" and "I found it cant work well in 0.3.x". Should we use 0.3.x or 0.2.29 when using highlevel api?

@ericdolson
Copy link

I am seeing this same issue and am using 0.2.27

Group              Topic                      Pid Offset          logSize         Lag             Owner
mygroup            my_topic                   0   129777          129777          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   1   128295          128295          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   2   132121          132121          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   3   132676          132676          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   4   139752          139752          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   5   137116          137116          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   6   147484          147484          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   7   125432          125432          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   8   131383          131383          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   9   129714          129714          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   10  158580          158580          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   11  149554          149554          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   12  144542          144542          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   13  136326          136326          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   14  122674          136842          14168           mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   15  152034          152034          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6

I have 3 consumers which should all be taking an equal share of the messages, but partition 14 is stuck. This was happening before and when a rebalance would occur (like when adding or removing a consumer to the group) the stuck partition would suddenly get drained and all the stuck messages would be processed several days late.

Can someone suggest which version, if any, this issue has been resolved in or how this can be remedied?

@sensoroDj
Copy link

@tomwang1013 sorry ! my English is not well ! I mean sometimes it works well in 0.2.x ! I all met your problem ! one consumer for all paritions ! but it is not all partitions to be consumered! I fount it may caused by zookeeper ! kafka consumer not balanced ok!

@BadLambdaJamma
Copy link

This is a non-obvious part of Kafka node if you ask me. Consumer starvation versus partition "warmup" with default partitioners can be confusing. It can take several minutes for each partition to get messages (or longer) === you should always produce with a partition # or a keyed message IMO. Also after consumer count > partition count, any new consumers ALWAYS starve all the time...... This is proper operation by design.... but yet again confusing if you don't know the technical details around a rebalance operation.

@tomwang1013
Copy link
Author

0.2.29 also has this problem. i have given it up and use a client of another language.

@ericdolson
Copy link

I solved this issue! Maybe it isn't the same reason for others, but here is what it was...

I copied the fetchMaxBytes: 1024 * 10 line from the readme thinking this was standard and used it in the options for my HighLevelConsumer. Apparently, this is a very small buffer. The default if this is left out is 1024 * 1024 bytes.

So, what happened was a message was produced which was too big for my fetchMaxBytes setting. Since I couldn't consume the whole message, it just sat there in the partition waiting to be consumed. And since messages must be consumed in order, new messages for that partition just piled up behind it!

Once I removed my config and let the default do its thing, my stuck partitions all got consumed and everything is working perfectly.

Note: I am currently using version 0.2.27. Just before writing this I had 11 out of 15 partitions stuck. Now all are perfectly fine.

@hyperlink
Copy link
Collaborator

@ericdolson looks like a bug in the documentation. All the options mirror the defaults except for fetchMaxBytes. And according to issue #192 there's a bit of tweaking to the options to get kafka-node to work with each use case. This should also be documented. Interested in submitting a PR? :)

@hyperlink
Copy link
Collaborator

Noticed kafka console consumer will throw an error if the message being fetched exceeds maxBytes. It would be nice if kafka-node could do something like that instead of failing silently.

[2016-04-11 14:58:32,926] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic ProducerConnectivityTest partition 1 at fetch offset 41. Increase the fetch size, or decrease the maximum message size the broker will allow.
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
    at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 1 messages

@nsharad
Copy link

nsharad commented Oct 27, 2017

Facing the same issue with Kafka-node 1.6.2

kafka-node-group nc_delta 0 34620 34620 0 none
kafka-node-group nc_delta 1 34962 34968 6 none
kafka-node-group nc_delta 2 34497 34497 0 none
kafka-node-group nc_delta 3 34595 34595 0 none
kafka-node-group nc_delta 4 35199 35199 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 5 34544 34544 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 6 34782 34782 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 7 33740 33740 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 8 34474 34474 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 9 34180 34180 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 10 34823 34823 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 11 34237 34237 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants