New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uneven paritioning 0.9.1 driver. #603

Closed
senior7515 opened this Issue Apr 7, 2016 · 59 comments

Comments

3 participants
@senior7515

senior7515 commented Apr 7, 2016

I'm running into an interesting paritioning problem.

If I launch say 4 consumers for a topic of 144 paritions, i get 3 of them consuming all the paritions and one of them get's all his paritions revoked. consumer->unassign()'ed

I tried the range and roundrobin parition strategies from the configuration but nothing gives me an even paritions for the consumers.

The bad part about this is when I launch a new consumer after having an stable set of consumers, the new consumer takes all the paritions and the rest just sit idle and get unassigned

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

I'm using the same consumer grup.id for all and different client.id for each.

senior7515 commented Apr 7, 2016

I'm using the same consumer grup.id for all and different client.id for each.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 7, 2016

Owner

What exact git hash are you on?

Owner

edenhill commented Apr 7, 2016

What exact git hash are you on?

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 commented Apr 7, 2016

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

lastest from master as of yesterday I think.

senior7515 commented Apr 7, 2016

lastest from master as of yesterday I think.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 7, 2016

Owner

Thanks.
Can you run again with debug=cgrp on all nodes and looking for ASSIGN log messages. I'm interested in what the group leader (assignor) is doing when running the assignment strategy.

Owner

edenhill commented Apr 7, 2016

Thanks.
Can you run again with debug=cgrp on all nodes and looking for ASSIGN log messages. I'm interested in what the group leader (assignor) is doing when running the assignment strategy.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

sure. give me 2 mins. redeploying.

senior7515 commented Apr 7, 2016

sure. give me 2 mins. redeploying.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

ugh, sorry, have to --enable-from-beginning.

sry, re running test.

senior7515 commented Apr 7, 2016

ugh, sorry, have to --enable-from-beginning.

sry, re running test.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 7, 2016

Owner

I only see one topic (liberty2) with 8 partitions, is it supposed to have 144 partitions?

Owner

edenhill commented Apr 7, 2016

I only see one topic (liberty2) with 8 partitions, is it supposed to have 144 partitions?

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

Same happens w/ the old liberty topic. Do you want me to re run it for the larger partition set topic ?

senior7515 commented Apr 7, 2016

Same happens w/ the old liberty topic. Do you want me to re run it for the larger partition set topic ?

@senior7515

This comment has been minimized.

Show comment
Hide comment
@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 7, 2016

Owner

Looking at the assignment it looks good:

verity: 7, fac: ASSIGN, event:  Member "concord_client_id_12993836200106424870-c07ac76f-af3b-415e-9577-02d91615bf58" (me) assigned 3 partition(s):
I0407 21:21:32.391666    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [0]
I0407 21:21:32.391705    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [1]
I0407 21:21:32.391708    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [2]
I0407 21:21:32.391711    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:  Member "concord_client_id_251196609599747600-f5ddda5e-e3fe-41f3-b57d-04303265a846" assigned 3 partition(s):
I0407 21:21:32.391715    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [3]
I0407 21:21:32.391716    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [4]
I0407 21:21:32.391718    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [5]
I0407 21:21:32.391721    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:  Member "concord_client_id_6520857798400680564-5dc0ff5b-a29e-409c-8cc5-ddf4931c4a1c" assigned 2 partition(s):
I0407 21:21:32.391724    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [6]
I0407 21:21:32.391726    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [7]
Owner

edenhill commented Apr 7, 2016

Looking at the assignment it looks good:

verity: 7, fac: ASSIGN, event:  Member "concord_client_id_12993836200106424870-c07ac76f-af3b-415e-9577-02d91615bf58" (me) assigned 3 partition(s):
I0407 21:21:32.391666    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [0]
I0407 21:21:32.391705    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [1]
I0407 21:21:32.391708    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [2]
I0407 21:21:32.391711    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:  Member "concord_client_id_251196609599747600-f5ddda5e-e3fe-41f3-b57d-04303265a846" assigned 3 partition(s):
I0407 21:21:32.391715    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [3]
I0407 21:21:32.391716    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [4]
I0407 21:21:32.391718    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [5]
I0407 21:21:32.391721    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:  Member "concord_client_id_6520857798400680564-5dc0ff5b-a29e-409c-8cc5-ddf4931c4a1c" assigned 2 partition(s):
I0407 21:21:32.391724    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [6]
I0407 21:21:32.391726    23 HighLevelKafkaConsumer.hpp:241] Librdkafka log: severity: 7, fac: ASSIGN, event:   liberty2 [7]
@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

It happens on this slave

Running on machine: tmp-mesos-slave-3

senior7515 commented Apr 7, 2016

It happens on this slave

Running on machine: tmp-mesos-slave-3

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

on 8 partition and 4 consumers, shouldn't each consumer get 2 paritions for this topic?

senior7515 commented Apr 7, 2016

on 8 partition and 4 consumers, shouldn't each consumer get 2 paritions for this topic?

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

THese are the logs.

I0407 21:38:05.465020    58 HighLevelKafkaConsumer.hpp:182] RdKafka::RebalanceCb. partitions: 8
I0407 21:38:05.465049    58 HighLevelKafkaConsumer.hpp:218] Unassigning all partitions

senior7515 commented Apr 7, 2016

THese are the logs.

I0407 21:38:05.465020    58 HighLevelKafkaConsumer.hpp:182] RdKafka::RebalanceCb. partitions: 8
I0407 21:38:05.465049    58 HighLevelKafkaConsumer.hpp:218] Unassigning all partitions
@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 7, 2016

Owner

Ah, yeah, it gets stuck in a "Unknown Member Id" -> rebalance(Revoke) -> unassign() loop.
It should rejoin the group at that point.
I'll look into this tomorrow.

Thanks for your detailed logs!

Owner

edenhill commented Apr 7, 2016

Ah, yeah, it gets stuck in a "Unknown Member Id" -> rebalance(Revoke) -> unassign() loop.
It should rejoin the group at that point.
I'll look into this tomorrow.

Thanks for your detailed logs!

@senior7515

This comment has been minimized.

Show comment
Hide comment

senior7515 commented Apr 7, 2016

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 7, 2016

Thanks again! and keep me posted. Happy to help.

senior7515 commented Apr 7, 2016

Thanks again! and keep me posted. Happy to help.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 8, 2016

I was thinking about this last night and thought I'd mention another issue related.

Scenario:

Offsets [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

If we launch 1 consumer and it makes progress to offset 3.

Then we launch another consumer w/ OFFSET_BEGINNING then the offsets get reset for all ?

Shouldn't there be a log error that says, hey, there are registered consumers already, you can't reset
the offset, defaulting to OFFSET_STORED

unless there is an option along the lines of

force.reset.offsets = true

What's the expected behavior here ?

It's fine either way as long as it is predictable/documented.

Thanks!

senior7515 commented Apr 8, 2016

I was thinking about this last night and thought I'd mention another issue related.

Scenario:

Offsets [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

If we launch 1 consumer and it makes progress to offset 3.

Then we launch another consumer w/ OFFSET_BEGINNING then the offsets get reset for all ?

Shouldn't there be a log error that says, hey, there are registered consumers already, you can't reset
the offset, defaulting to OFFSET_STORED

unless there is an option along the lines of

force.reset.offsets = true

What's the expected behavior here ?

It's fine either way as long as it is predictable/documented.

Thanks!

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 8, 2016

Err, sorry, i mean to mention that in my experiment librdkafka idle's (unassigns()) paritions for the consumers that were already at offset 3 in the example above. So there is no way for either the new consumer or the old consumers to react.

senior7515 commented Apr 8, 2016

Err, sorry, i mean to mention that in my experiment librdkafka idle's (unassigns()) paritions for the consumers that were already at offset 3 in the example above. So there is no way for either the new consumer or the old consumers to react.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 10, 2016

Owner

A fix has been pushed to the KIP-35 branch, can you try it out?
Thanks

Owner

edenhill commented Apr 10, 2016

A fix has been pushed to the KIP-35 branch, can you try it out?
Thanks

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 10, 2016

Yeah, i can try it in the morning. 

Sent from mobile, please forgive my handwriting.

On Sun, Apr 10, 2016 at 2:37 PM -0700, "Magnus Edenhill" notifications@github.com wrote:

A fix has been pushed to the KIP-35 branch, can you try it out?

Thanks


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub

senior7515 commented Apr 10, 2016

Yeah, i can try it in the morning. 

Sent from mobile, please forgive my handwriting.

On Sun, Apr 10, 2016 at 2:37 PM -0700, "Magnus Edenhill" notifications@github.com wrote:

A fix has been pushed to the KIP-35 branch, can you try it out?

Thanks


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 11, 2016

Owner

Merged to master.

Owner

edenhill commented Apr 11, 2016

Merged to master.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

@edenhill the fix didn't work with large partitions, it worked for the 8 partition per topic.

I launched 6 consumers, and one of them didn't consume at all.

Furthermore, the time it took to start consuming was about 8 minutes (i guess there might be a loop somewhere trying to repartition for a while).

The version I was using it took about 2 minutes to start consuming, FYI.

Let me start the debug logs and post the logs of the group settings.

senior7515 commented Apr 11, 2016

@edenhill the fix didn't work with large partitions, it worked for the 8 partition per topic.

I launched 6 consumers, and one of them didn't consume at all.

Furthermore, the time it took to start consuming was about 8 minutes (i guess there might be a loop somewhere trying to repartition for a while).

The version I was using it took about 2 minutes to start consuming, FYI.

Let me start the debug logs and post the logs of the group settings.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

@edenhill Here is the log. It looks like it is stuck and makes no progress after 10 minutes.

https://gist.github.com/f87afe3b3c52fbae5480

No records are consumed. I checked the logs w/ another program kafka-console-consumer to verify that we do have 256MM records in Kafka.

senior7515 commented Apr 11, 2016

@edenhill Here is the log. It looks like it is stuck and makes no progress after 10 minutes.

https://gist.github.com/f87afe3b3c52fbae5480

No records are consumed. I checked the logs w/ another program kafka-console-consumer to verify that we do have 256MM records in Kafka.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

Looking it at the logs, it sounds like the partitions assignments are looking perfect.

I0411 14:45:21.474926    54 HighLevelKafkaConsumer.hpp:183] RdKafka::RebalanceCb. partitions: 24

Let me re-pump the same data set and test. Could be hitting some weird issue data expiration

senior7515 commented Apr 11, 2016

Looking it at the logs, it sounds like the partitions assignments are looking perfect.

I0411 14:45:21.474926    54 HighLevelKafkaConsumer.hpp:183] RdKafka::RebalanceCb. partitions: 24

Let me re-pump the same data set and test. Could be hitting some weird issue data expiration

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

I re-ran the test, re-filled kafka w/ 256MM records. and now there is only one consumer. the others worked for a bit and then halt.

Also, the throughput fell (same hardware, same setup) from 200K to 10K records per second.

I'm guessing the offset commiting is an issue?

The logs I posted earlier are still good.

senior7515 commented Apr 11, 2016

I re-ran the test, re-filled kafka w/ 256MM records. and now there is only one consumer. the others worked for a bit and then halt.

Also, the throughput fell (same hardware, same setup) from 200K to 10K records per second.

I'm guessing the offset commiting is an issue?

The logs I posted earlier are still good.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 11, 2016

Owner

That log link does not work

Owner

edenhill commented Apr 11, 2016

That log link does not work

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

Sry, github gist changed the api to add username to the gist archive and I hadn't updated my emacs plugin. Thanks for looking into it. Happy to rerun tests too.

senior7515 commented Apr 11, 2016

Sry, github gist changed the api to add username to the gist archive and I hadn't updated my emacs plugin. Thanks for looking into it. Happy to rerun tests too.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 11, 2016

Owner

Thanks.

Some questions:

  • So this is the log from one consumer, right, but they all behave the same?
  • Are there new messages being produced to the topic as the consumer is running, i.e., there is no natural reason for the consumer to stalll? For instance liberty [60] stops at offset 1845546, are you certain this is not the last offset?
  • You are explicitly setting the starting offset to BEGINNING in your rebalance_cb, why are you doing this and not just using auto.offset.reset=earliest?

Can you rerun this test with debug=cgrp,topic,fetch and show me the logs? Thanks

Owner

edenhill commented Apr 11, 2016

Thanks.

Some questions:

  • So this is the log from one consumer, right, but they all behave the same?
  • Are there new messages being produced to the topic as the consumer is running, i.e., there is no natural reason for the consumer to stalll? For instance liberty [60] stops at offset 1845546, are you certain this is not the last offset?
  • You are explicitly setting the starting offset to BEGINNING in your rebalance_cb, why are you doing this and not just using auto.offset.reset=earliest?

Can you rerun this test with debug=cgrp,topic,fetch and show me the logs? Thanks

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

Thanks!

  • This log is from ALL 6 consumers. It's 10MB
  • There are no messages produced at all.
  • libert[60] stops at that offset is the problem I mention of it stalling/halting.
  • The reason I'm setting the offset on my rebalance_cb is because I'm benchmarking a system and I want to run multiple benchmarks from the same kafka queue. So my consumer can simply say --from_begining=true and I'll reset the offset.

Sure, let me re run it.

senior7515 commented Apr 11, 2016

Thanks!

  • This log is from ALL 6 consumers. It's 10MB
  • There are no messages produced at all.
  • libert[60] stops at that offset is the problem I mention of it stalling/halting.
  • The reason I'm setting the offset on my rebalance_cb is because I'm benchmarking a system and I want to run multiple benchmarks from the same kafka queue. So my consumer can simply say --from_begining=true and I'll reset the offset.

Sure, let me re run it.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 11, 2016

Owner

libert[60] stops at that offset is the problem I mention of it stalling/halting.

And you are sure there are more messages after that offset? E.g. kafkacat -b brk -t liberty -p 60 -o 1845546 gives you more messages?

Owner

edenhill commented Apr 11, 2016

libert[60] stops at that offset is the problem I mention of it stalling/halting.

And you are sure there are more messages after that offset? E.g. kafkacat -b brk -t liberty -p 60 -o 1845546 gives you more messages?

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 11, 2016

Owner

Log from one consumer is sufficient, and more helpful in this case.

Owner

edenhill commented Apr 11, 2016

Log from one consumer is sufficient, and more helpful in this case.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

Ok, here is the log for one consumer, I haven't compiled kafkacat, I'll try it.

https://gist.github.com/senior7515/4914e75f7343b4c754a50a6a8509fe4c

In this run, this consumer started, got some assignment, and then stopped consuming.

Note that some other client got it because I'm able to count the number of items on my sink and it all matches up. The odd thing is that this stops at some point in the future. Even though the other consumers go on for about 10 more minutes consuming.

senior7515 commented Apr 11, 2016

Ok, here is the log for one consumer, I haven't compiled kafkacat, I'll try it.

https://gist.github.com/senior7515/4914e75f7343b4c754a50a6a8509fe4c

In this run, this consumer started, got some assignment, and then stopped consuming.

Note that some other client got it because I'm able to count the number of items on my sink and it all matches up. The odd thing is that this stops at some point in the future. Even though the other consumers go on for about 10 more minutes consuming.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

kafka cat output

$ ./kafkacat -b brk:9092 -t liberty -p 60 -o 1845546                                                                         
% Auto-selecting Consumer mode (use -P or -C to override)
- 1152633660 2006.07.11 ln96 Jul 11 09:01:00 ln96/ln96 CROND[29486]: (root) CMD (run-parts /etc/cron.hourly)
% Reached end of topic liberty [60] at offset 1845547

senior7515 commented Apr 11, 2016

kafka cat output

$ ./kafkacat -b brk:9092 -t liberty -p 60 -o 1845546                                                                         
% Auto-selecting Consumer mode (use -P or -C to override)
- 1152633660 2006.07.11 ln96 Jul 11 09:01:00 ln96/ln96 CROND[29486]: (root) CMD (run-parts /etc/cron.hourly)
% Reached end of topic liberty [60] at offset 1845547
@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 11, 2016

Some background, and maybe the source of erratic behavior.

I launch 6 consumers, all say --from_begining=true => this means that I'll set the offset on the rebalance_cb to OFFSET_BEGINING

The partitions now w/ the KIP-35 branch are evenly spread. So the only remaining issue, seems to be the halting problem I describe.

Thanks, happy to keep debugging :) !

senior7515 commented Apr 11, 2016

Some background, and maybe the source of erratic behavior.

I launch 6 consumers, all say --from_begining=true => this means that I'll set the offset on the rebalance_cb to OFFSET_BEGINING

The partitions now w/ the KIP-35 branch are evenly spread. So the only remaining issue, seems to be the halting problem I describe.

Thanks, happy to keep debugging :) !

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 12, 2016

Owner

So the consumer stops consuming liberty [60] at offset 1845546, and when you consume the same topic with kafkacat it also stops there, so I would say that is the definitive end of that partition and thus the consumer seems to stall since there are no new messages. Or am I missing something?

Owner

edenhill commented Apr 12, 2016

So the consumer stops consuming liberty [60] at offset 1845546, and when you consume the same topic with kafkacat it also stops there, so I would say that is the definitive end of that partition and thus the consumer seems to stall since there are no new messages. Or am I missing something?

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Apr 12, 2016

Owner

Come to think of it, it might be a good idea to try consuming the same topic+partition with the Java consumer for verification, since kafkacat relies on librdkafka.

Owner

edenhill commented Apr 12, 2016

Come to think of it, it might be a good idea to try consuming the same topic+partition with the Java consumer for verification, since kafkacat relies on librdkafka.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Apr 19, 2016

@edenhill I know I owe you some tests, sorry, things are super hectic right now. I worked around the bug by launching the consumers multiple times. Some times it works perfectly. I should have more time to test next week or at the end of this week. Apologize for the delay. I do want to get to the bottom of this too :)

Thanks again.

senior7515 commented Apr 19, 2016

@edenhill I know I owe you some tests, sorry, things are super hectic right now. I worked around the bug by launching the consumers multiple times. Some times it works perfectly. I should have more time to test next week or at the end of this week. Apologize for the delay. I do want to get to the bottom of this too :)

Thanks again.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill May 18, 2016

Owner

Any progress on this?

Owner

edenhill commented May 18, 2016

Any progress on this?

@edenhill edenhill added this to the 0.9.2 milestone May 18, 2016

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 May 21, 2016

@edenhill yeah. I asked and I can dedicate time to this early on Monday. Sorry, had moved away to other projects, but can definitely test again on Monday w/ the same dataset. /cc @shinjikim

senior7515 commented May 21, 2016

@edenhill yeah. I asked and I can dedicate time to this early on Monday. Sorry, had moved away to other projects, but can definitely test again on Monday w/ the same dataset. /cc @shinjikim

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 9, 2016

@edenhill some progress... testing this as we speak.

The current master:

librdkafka_version: '13d330a'

segfaults w/ the same code (no change) doing a ->poll(1). on the first producing of records. will create a more complete ticket for this.

Will log more progress in a bit. Thanks again!

senior7515 commented Jun 9, 2016

@edenhill some progress... testing this as we speak.

The current master:

librdkafka_version: '13d330a'

segfaults w/ the same code (no change) doing a ->poll(1). on the first producing of records. will create a more complete ticket for this.

Will log more progress in a bit. Thanks again!

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Jun 9, 2016

Owner

That doesnt sound good, please provide a backtrace from gdb

Owner

edenhill commented Jun 9, 2016

That doesnt sound good, please provide a backtrace from gdb

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 9, 2016

Good thing i ketp the old core dump. I'm testing the original version first to see if the load balancing works, so I'll start another thread w/ this backtrace tho.

However, here it is.

(gdb) bt
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
#1  0x00000000007aba79 in mtx_trylock (mtx=<optimized out>) at tinycthread.c:262
#2  0x00000000007a28bb in strncpy (__len=<optimized out>, __src=<optimized out>, __dest=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/string3.h:120
#3  rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd26b454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=<optimized out>) at rdkafka_cgrp.c:1985
#4  0x0000000000790399 in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x7fd265817600, err=<optimized out>, rkbuf=0x7fd26481ac00, request=0x0, opaque=0x0)
    at rdkafka_request.c:945
#5  0x000000000078a487 in rd_refcnt_sub0 (R=0x0) at rd.h:277
#6  rd_kafka_buf_callback (rk=0x7fd26b463a00, rkb=<optimized out>, err=<optimized out>, response=0x7fd26481ac00, request=0x7fd265817500) at rdkafka_buf.c:490
#7  0x00000000007a043b in rd_kafka_cgrp_op_serve (rkb=0x7fd265817600, rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1516
#8  rd_kafka_cgrp_serve (rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1746
#9  0x000000000076fb8c in rd_kafka_q_len (rkq=0x0) at rdkafka_queue.h:248
#10 rd_kafka_q_len (rkq=0x7fd268fff700) at rdkafka_queue.h:252
#11 rd_kafka_q_len (rkq=0x7fd26b463a90) at rdkafka_queue.h:252
#12 rd_kafka_thread_main (arg=0x7fd26b463a00) at rdkafka.c:968
#13 0x00000000007ab9d7 in mtx_init (mtx=0x76fa00 <rd_kafka_thread_main+80>, type=<optimized out>) at tinycthread.c:87

senior7515 commented Jun 9, 2016

Good thing i ketp the old core dump. I'm testing the original version first to see if the load balancing works, so I'll start another thread w/ this backtrace tho.

However, here it is.

(gdb) bt
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
#1  0x00000000007aba79 in mtx_trylock (mtx=<optimized out>) at tinycthread.c:262
#2  0x00000000007a28bb in strncpy (__len=<optimized out>, __src=<optimized out>, __dest=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/string3.h:120
#3  rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd26b454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=<optimized out>) at rdkafka_cgrp.c:1985
#4  0x0000000000790399 in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x7fd265817600, err=<optimized out>, rkbuf=0x7fd26481ac00, request=0x0, opaque=0x0)
    at rdkafka_request.c:945
#5  0x000000000078a487 in rd_refcnt_sub0 (R=0x0) at rd.h:277
#6  rd_kafka_buf_callback (rk=0x7fd26b463a00, rkb=<optimized out>, err=<optimized out>, response=0x7fd26481ac00, request=0x7fd265817500) at rdkafka_buf.c:490
#7  0x00000000007a043b in rd_kafka_cgrp_op_serve (rkb=0x7fd265817600, rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1516
#8  rd_kafka_cgrp_serve (rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1746
#9  0x000000000076fb8c in rd_kafka_q_len (rkq=0x0) at rdkafka_queue.h:248
#10 rd_kafka_q_len (rkq=0x7fd268fff700) at rdkafka_queue.h:252
#11 rd_kafka_q_len (rkq=0x7fd26b463a90) at rdkafka_queue.h:252
#12 rd_kafka_thread_main (arg=0x7fd26b463a00) at rdkafka.c:968
#13 0x00000000007ab9d7 in mtx_init (mtx=0x76fa00 <rd_kafka_thread_main+80>, type=<optimized out>) at tinycthread.c:87
@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 9, 2016

full bt just in case

(gdb) bt full
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
        __PRETTY_FUNCTION__ = "__pthread_mutex_lock"
        type = 0
#1  0x00000000007aba79 in mtx_trylock (mtx=<optimized out>) at tinycthread.c:262
No locals.
#2  0x00000000007a28bb in strncpy (__len=<optimized out>, __src=<optimized out>, __dest=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/string3.h:120
No locals.
#3  rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd26b454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=<optimized out>) at rdkafka_cgrp.c:1985
        _logname = "p\000\000\000\000\000\000\000\000\316\063e\000\000\000\000\000:Fk\322\177\000\000\003\000\000\000\000\000\000\000\000\000\200e\322\177\000\000\022\311Y\202\322\177\000\000\t\000\000\000\000\000\000\000\001\000\000\000\001\000\000\000\000\020\203e\322\177\000\000\300\063Jw\322\177\000\000\t\000\000\000\000\000\000\000\000\316\063e'\216\271\361\t\000\000\000\000\000\000\000\232\311Y\202\322\177\000\000\071\071\064\062-42e1\000\000\000\000\000\000"
        _LEN = 4
        rkbuf = 0x7fd265829540
        assignment = <optimized out>
        Version = <optimized out>
        TopicCnt = <optimized out>
        UserData = {len = 16, data = 0x7fd28259705e <malloc+862>, _data = 0x7fd268ffd760 "p"}
        rkgm = {rkgm_subscription = 0x0, rkgm_assignment = 0x7fd2774a33c0, rkgm_eligible = {rl_size = 36864, rl_cnt = 0, rl_elems = 0x7fd200000020, 
            rl_free_cb = 0x7fd268ffd830, rl_flags = 1761597376}, rkgm_member_id = 0x8ad8, rkgm_userdata = 0x9000, rkgm_member_metadata = 0xf}
        __FUNCTION__ = "rd_kafka_cgrp_handle_SyncGroup"
#4  0x0000000000790399 in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x7fd265817600, err=<optimized out>, rkbuf=0x7fd26481ac00, request=0x0, opaque=0x0)
    at rdkafka_request.c:945
        _klen = 0
        rkcg = 0x0
        ErrorCode = <optimized out>
        MemberState = {len = 0, data = 0x7fd26482e05e, _data = 0x7fd268ffd860 "\220:Fk\322\177"}
---Type <return> to continue, or q <return> to quit--- 
        actions = <optimized out>
        __FUNCTION__ = "rd_kafka_handle_SyncGroup"
#5  0x000000000078a487 in rd_refcnt_sub0 (R=0x0) at rd.h:277
        r = <optimized out>
#6  rd_kafka_buf_callback (rk=0x7fd26b463a00, rkb=<optimized out>, err=<optimized out>, response=0x7fd26481ac00, request=0x7fd265817500) at rdkafka_buf.c:490
        __FUNCTION__ = "rd_kafka_buf_callback"
#7  0x00000000007a043b in rd_kafka_cgrp_op_serve (rkb=0x7fd265817600, rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1516
        rktp = 0x0
        err = <optimized out>
        silent_op = 0
        rko = 0x7fd2648151e0
#8  rd_kafka_cgrp_serve (rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1746
        rkb = 0x7fd265817600
        rkb_state = 4
#9  0x000000000076fb8c in rd_kafka_q_len (rkq=0x0) at rdkafka_queue.h:248
        qlen = <optimized out>
#10 rd_kafka_q_len (rkq=0x7fd268fff700) at rdkafka_queue.h:252
        qlen = <optimized out>
#11 rd_kafka_q_len (rkq=0x7fd26b463a90) at rdkafka_queue.h:252
        qlen = <optimized out>
#12 rd_kafka_thread_main (arg=0x7fd26b463a00) at rdkafka.c:968
        rk = 0x7fd26b463a00
        tmr_topic_scan = {rtmr_link = {tqe_next = 0x7fd26b4548f0, tqe_prev = 0x7fd26b463ed0}, rtmr_next = 1466090233307, rtmr_interval = 1000000, 
          rtmr_callback = 0x767970 <rd_kafka_q_len>, rtmr_arg = 0x0}
        tmr_stats_emit = {rtmr_link = {tqe_next = 0x7fd268ffdb10, tqe_prev = 0x7fd26b4548f0}, rtmr_next = 1466149233308, rtmr_interval = 60000000, 
          rtmr_callback = 0x769830 <rd_kafka_stats_emit_tmr_cb+80>, rtmr_arg = 0x0}
---Type <return> to continue, or q <return> to quit---
        tmr_metadata_refresh = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x7fd268ffdae0}, rtmr_next = 1466389233308, rtmr_interval = 300000000, 
          rtmr_callback = 0x767d70 <rd_kafka_metadata_refresh_cb+80>, rtmr_arg = 0x0}
#13 0x00000000007ab9d7 in mtx_init (mtx=0x76fa00 <rd_kafka_thread_main+80>, type=<optimized out>) at tinycthread.c:87
        ret = <optimized out>
        attr = {__size = "\000\000\000", __align = 0}

senior7515 commented Jun 9, 2016

full bt just in case

(gdb) bt full
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
        __PRETTY_FUNCTION__ = "__pthread_mutex_lock"
        type = 0
#1  0x00000000007aba79 in mtx_trylock (mtx=<optimized out>) at tinycthread.c:262
No locals.
#2  0x00000000007a28bb in strncpy (__len=<optimized out>, __src=<optimized out>, __dest=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/string3.h:120
No locals.
#3  rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd26b454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=<optimized out>) at rdkafka_cgrp.c:1985
        _logname = "p\000\000\000\000\000\000\000\000\316\063e\000\000\000\000\000:Fk\322\177\000\000\003\000\000\000\000\000\000\000\000\000\200e\322\177\000\000\022\311Y\202\322\177\000\000\t\000\000\000\000\000\000\000\001\000\000\000\001\000\000\000\000\020\203e\322\177\000\000\300\063Jw\322\177\000\000\t\000\000\000\000\000\000\000\000\316\063e'\216\271\361\t\000\000\000\000\000\000\000\232\311Y\202\322\177\000\000\071\071\064\062-42e1\000\000\000\000\000\000"
        _LEN = 4
        rkbuf = 0x7fd265829540
        assignment = <optimized out>
        Version = <optimized out>
        TopicCnt = <optimized out>
        UserData = {len = 16, data = 0x7fd28259705e <malloc+862>, _data = 0x7fd268ffd760 "p"}
        rkgm = {rkgm_subscription = 0x0, rkgm_assignment = 0x7fd2774a33c0, rkgm_eligible = {rl_size = 36864, rl_cnt = 0, rl_elems = 0x7fd200000020, 
            rl_free_cb = 0x7fd268ffd830, rl_flags = 1761597376}, rkgm_member_id = 0x8ad8, rkgm_userdata = 0x9000, rkgm_member_metadata = 0xf}
        __FUNCTION__ = "rd_kafka_cgrp_handle_SyncGroup"
#4  0x0000000000790399 in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x7fd265817600, err=<optimized out>, rkbuf=0x7fd26481ac00, request=0x0, opaque=0x0)
    at rdkafka_request.c:945
        _klen = 0
        rkcg = 0x0
        ErrorCode = <optimized out>
        MemberState = {len = 0, data = 0x7fd26482e05e, _data = 0x7fd268ffd860 "\220:Fk\322\177"}
---Type <return> to continue, or q <return> to quit--- 
        actions = <optimized out>
        __FUNCTION__ = "rd_kafka_handle_SyncGroup"
#5  0x000000000078a487 in rd_refcnt_sub0 (R=0x0) at rd.h:277
        r = <optimized out>
#6  rd_kafka_buf_callback (rk=0x7fd26b463a00, rkb=<optimized out>, err=<optimized out>, response=0x7fd26481ac00, request=0x7fd265817500) at rdkafka_buf.c:490
        __FUNCTION__ = "rd_kafka_buf_callback"
#7  0x00000000007a043b in rd_kafka_cgrp_op_serve (rkb=0x7fd265817600, rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1516
        rktp = 0x0
        err = <optimized out>
        silent_op = 0
        rko = 0x7fd2648151e0
#8  rd_kafka_cgrp_serve (rkcg=0x7fd26b454600) at rdkafka_cgrp.c:1746
        rkb = 0x7fd265817600
        rkb_state = 4
#9  0x000000000076fb8c in rd_kafka_q_len (rkq=0x0) at rdkafka_queue.h:248
        qlen = <optimized out>
#10 rd_kafka_q_len (rkq=0x7fd268fff700) at rdkafka_queue.h:252
        qlen = <optimized out>
#11 rd_kafka_q_len (rkq=0x7fd26b463a90) at rdkafka_queue.h:252
        qlen = <optimized out>
#12 rd_kafka_thread_main (arg=0x7fd26b463a00) at rdkafka.c:968
        rk = 0x7fd26b463a00
        tmr_topic_scan = {rtmr_link = {tqe_next = 0x7fd26b4548f0, tqe_prev = 0x7fd26b463ed0}, rtmr_next = 1466090233307, rtmr_interval = 1000000, 
          rtmr_callback = 0x767970 <rd_kafka_q_len>, rtmr_arg = 0x0}
        tmr_stats_emit = {rtmr_link = {tqe_next = 0x7fd268ffdb10, tqe_prev = 0x7fd26b4548f0}, rtmr_next = 1466149233308, rtmr_interval = 60000000, 
          rtmr_callback = 0x769830 <rd_kafka_stats_emit_tmr_cb+80>, rtmr_arg = 0x0}
---Type <return> to continue, or q <return> to quit---
        tmr_metadata_refresh = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x7fd268ffdae0}, rtmr_next = 1466389233308, rtmr_interval = 300000000, 
          rtmr_callback = 0x767d70 <rd_kafka_metadata_refresh_cb+80>, rtmr_arg = 0x0}
#13 0x00000000007ab9d7 in mtx_init (mtx=0x76fa00 <rd_kafka_thread_main+80>, type=<optimized out>) at tinycthread.c:87
        ret = <optimized out>
        attr = {__size = "\000\000\000", __align = 0}

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Jun 9, 2016

Owner

If this is reproducible I'd like to see it with a debug-built librdkafka (do ./dev-conf.sh ; make clean all ; sudo make install)

Owner

edenhill commented Jun 9, 2016

If this is reproducible I'd like to see it with a debug-built librdkafka (do ./dev-conf.sh ; make clean all ; sudo make install)

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Jun 10, 2016

Owner

This might happen if you join a group with an empty topic subscription list. Could this be the case for you?

Owner

edenhill commented Jun 10, 2016

This might happen if you join a group with an empty topic subscription list. Could this be the case for you?

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 10, 2016

I thinkso. I'm going to look into it more today

Sent from mobile, please forgive my handwriting.

On Fri, Jun 10, 2016 at 8:26 AM -0400, "Magnus Edenhill" notifications@github.com wrote:

This might happen if you join a group with an empty topic subscription list. Could this be the case for you?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.

senior7515 commented Jun 10, 2016

I thinkso. I'm going to look into it more today

Sent from mobile, please forgive my handwriting.

On Fri, Jun 10, 2016 at 8:26 AM -0400, "Magnus Edenhill" notifications@github.com wrote:

This might happen if you join a group with an empty topic subscription list. Could this be the case for you?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 10, 2016

@edenhill having trouble pumping enough data into kafka - probably because of azure (before I was testing on Google Cloud Compute. Azure's disks are terrible.

I'm lagging behind by a couple orders of magnitude:

I0610 18:27:21.447351 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 111000, total bytes sent: 4923577, bytes received by the broker: 31774, msgs received by broker: 31774, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.448935 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 112000, total bytes sent: 4968026, bytes received by the broker: 32759, msgs received by broker: 32759, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.450328 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 113000, total bytes sent: 5012415, bytes received by the broker: 33013, msgs received by broker: 33013, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.455456 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 114000, total bytes sent: 5056873, bytes received by the broker: 34283, msgs received by broker: 34283, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.457859 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 115000, total bytes sent: 5101049, bytes received by the broker: 35045, msgs received by broker: 35045, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.459427 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 116000, total bytes sent: 5145491, bytes received by the broker: 35299, msgs received by broker: 35299, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.460932 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 117000, total bytes sent: 5189936, bytes received by the broker: 35577, msgs received by broker: 35577, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.462496 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 118000, total bytes sent: 5234099, bytes received by the broker: 36085, msgs received by broker: 36085, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.463973 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 119000, total bytes sent: 5278353, bytes received by the broker: 36339, msgs received by broker: 36339, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.465431 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 120000, total bytes sent: 5322522, bytes received by the broker: 36593, msgs received by broker: 36593, error bytes attempted to send: 0, error msgs sent to broker: 0

So i'm wondering if it would be useful to have something like this after some delta discrepancy:


  int outq; 
  while((outq = producer->outq_len()) > 0 && outq > kMaxTreshold ) {
    LOG(INFO) << "Waiting to drain queue: " << outq;
    producer->poll(5000);
  }

Sorry, trying to repro the original bug, but azure instances can't keep up w/ pumping data into them :( - oiiiiiiiii.

senior7515 commented Jun 10, 2016

@edenhill having trouble pumping enough data into kafka - probably because of azure (before I was testing on Google Cloud Compute. Azure's disks are terrible.

I'm lagging behind by a couple orders of magnitude:

I0610 18:27:21.447351 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 111000, total bytes sent: 4923577, bytes received by the broker: 31774, msgs received by broker: 31774, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.448935 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 112000, total bytes sent: 4968026, bytes received by the broker: 32759, msgs received by broker: 32759, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.450328 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 113000, total bytes sent: 5012415, bytes received by the broker: 33013, msgs received by broker: 33013, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.455456 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 114000, total bytes sent: 5056873, bytes received by the broker: 34283, msgs received by broker: 34283, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.457859 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 115000, total bytes sent: 5101049, bytes received by the broker: 35045, msgs received by broker: 35045, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.459427 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 116000, total bytes sent: 5145491, bytes received by the broker: 35299, msgs received by broker: 35299, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.460932 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 117000, total bytes sent: 5189936, bytes received by the broker: 35577, msgs received by broker: 35577, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.462496 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 118000, total bytes sent: 5234099, bytes received by the broker: 36085, msgs received by broker: 36085, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.463973 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 119000, total bytes sent: 5278353, bytes received by the broker: 36339, msgs received by broker: 36339, error bytes attempted to send: 0, error msgs sent to broker: 0
I0610 18:27:21.465431 13308 HighLevelKafkaProducer.hpp:171] Total msgs sent: 120000, total bytes sent: 5322522, bytes received by the broker: 36593, msgs received by broker: 36593, error bytes attempted to send: 0, error msgs sent to broker: 0

So i'm wondering if it would be useful to have something like this after some delta discrepancy:


  int outq; 
  while((outq = producer->outq_len()) > 0 && outq > kMaxTreshold ) {
    LOG(INFO) << "Waiting to drain queue: " << outq;
    producer->poll(5000);
  }

Sorry, trying to repro the original bug, but azure instances can't keep up w/ pumping data into them :( - oiiiiiiiii.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 10, 2016

ok, recompiled it w/ debug symbols. Same crash

(gdb) full bt
Undefined command: "full".  Try "help".
(gdb) bt full
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
        __PRETTY_FUNCTION__ = "__pthread_mutex_lock"
        type = 1144
#1  0x00000000007d20fc in mtx_lock (mtx=0x478) at tinycthread.c:135
No locals.
#2  0x00000000007c3b75 in rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd6d8454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=0x7fd6d5ffbef0) at rdkafka_cgrp.c:1984
        _logname = "\000\000\000\000\000\000\000\000\000FE\330\326\177\000\000\004\000\000\000\002\000\000\000\063\000\000\000\000\000\000\000\b\000\000\000\004\000\000\000\063\000\000\000\000\000\000\000\000\000\300\322\326\177\000\000\063\000\000\000\000\000\000\000\000\000\300\322\326\177\000\000\366\260\274\356\326\177", '\000' <repeats 14 times>, "\326\177\000\000\001\000\000\000\000\000\000\000\t\000\000\000\000\000\000\000\000\060\303\322\326\177\000\000\360#\337\356\326\177\000"
        _LEN = 2
        rkbuf = 0x7fd6d2c29540
        assignment = 0x17c36d
        log_decode_errors = 1
        Version = 0
        TopicCnt = 40
        UserData = {len = 9, data = 0x7fd6e38911c0, _data = 0x7fd6d5ffbdd0 ""}
        rkgm = {rkgm_subscription = 0x3134343033345f64, rkgm_assignment = 0x3231393936353830, rkgm_eligible = {rl_size = 842019889, rl_cnt = 1717710134, 
            rl_elems = 0x342d356331333739, rl_free_cb = 0x7fd6d2c00000, rl_flags = -289625838}, rkgm_member_id = 0x9, rkgm_userdata = 0x25a6301987ed200, 
          rkgm_member_metadata = 0x9007fd6e38911c0}
        __FUNCTION__ = "rd_kafka_cgrp_handle_SyncGroup"
#3  0x00000000007b2991 in rd_kafka_handle_SyncGroup (rk=0x7fd6d8463a00, rkb=0x7fd6d2c17600, err=RD_KAFKA_RESP_ERR_NO_ERROR, rkbuf=0x7fd6d181ac00, request=0x7fd6d2c29380, 
    opaque=0x7fd6d8454600) at rdkafka_request.c:966
        rkcg = 0x7fd6d8454600
        log_decode_errors = 1
---Type <return> to continue, or q <return> to quit--- 
        ErrorCode = 0
        MemberState = {len = 0, data = 0x7fd6d182e05e, _data = 0x7fd6d5ffbf00 ""}
        actions = 0
        __FUNCTION__ = "rd_kafka_handle_SyncGroup"
#4  0x00000000007a7f46 in rd_kafka_buf_callback (rk=0x7fd6d8463a00, rkb=0x7fd6d2c17600, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7fd6d181ac00, request=0x7fd6d2c29380)
    at rdkafka_buf.c:515
        __FUNCTION__ = "rd_kafka_buf_callback"
#5  0x00000000007a7d57 in rd_kafka_buf_handle_op (rko=0x7fd6d18151e0, err=RD_KAFKA_RESP_ERR_NO_ERROR) at rdkafka_buf.c:461
        request = 0x7fd6d2c29380
        response = 0x7fd6d181ac00
#6  0x00000000007c30a2 in rd_kafka_cgrp_op_serve (rkcg=0x7fd6d8454600, rkb=0x7fd6d2c17600) at rdkafka_cgrp.c:1664
        rktp = 0x0
        err = RD_KAFKA_RESP_ERR_NO_ERROR
        silent_op = 1
        rko = 0x7fd6d18151e0
        __FUNCTION__ = "rd_kafka_cgrp_op_serve"
#7  0x00000000007c32f7 in rd_kafka_cgrp_serve (rkcg=0x7fd6d8454600) at rdkafka_cgrp.c:1746
        rkb = 0x7fd6d2c17600
        rkb_state = 4
#8  0x000000000077c3ff in rd_kafka_thread_main (arg=0x7fd6d8463a00) at rdkafka.c:976
        sleeptime = 100000
        rk = 0x7fd6d8463a00
        tmr_topic_scan = {rtmr_link = {tqe_next = 0x7fd6d84548f0, tqe_prev = 0x7fd6d8463ed0}, rtmr_next = 1557357363339, rtmr_interval = 1000000, 
          rtmr_callback = 0x77c00d <rd_kafka_topic_scan_tmr_cb>, rtmr_arg = 0x0}
---Type <return> to continue, or q <return> to quit---
        tmr_stats_emit = {rtmr_link = {tqe_next = 0x7fd6d5ffc1a0, tqe_prev = 0x7fd6d84548f0}, rtmr_next = 1557416363339, rtmr_interval = 60000000, 
          rtmr_callback = 0x77c042 <rd_kafka_stats_emit_tmr_cb>, rtmr_arg = 0x0}
        tmr_metadata_refresh = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x7fd6d5ffc170}, rtmr_next = 1557656363340, rtmr_interval = 300000000, 
          rtmr_callback = 0x77c06c <rd_kafka_metadata_refresh_cb>, rtmr_arg = 0x0}
#9  0x00000000007d23c0 in _thrd_wrapper_function (aArg=0x7fd6d842e290) at tinycthread.c:611
        fun = 0x77c1b8 <rd_kafka_thread_main>
        arg = 0x7fd6d8463a00
        res = 0
        ti = 0x7fd6d842e290
#10 0x00007fd6eaece184 in start_thread (arg=0x7fd6d5fff700) at pthread_create.c:312
        __res = <optimized out>
        pd = 0x7fd6d5fff700
        now = <optimized out>
        unwind_buf = {cancel_jmp_buf = {{jmp_buf = {140560690050816, 7309513643689354903, 0, 0, 140560690051520, 140560690050816, -7287615768404318569, -7287582620825442665}, 
              mask_was_saved = 0}}, priv = {pad = {0x0, 0x0, 0x0, 0x0}, data = {prev = 0x0, cleanup = 0x0, canceltype = 0}}}
        not_first_call = <optimized out>
        pagesize_m1 = <optimized out>
        sp = <optimized out>
        freesize = <optimized out>
        __PRETTY_FUNCTION__ = "start_thread"
#11 0x00007fd6ea12b37d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

senior7515 commented Jun 10, 2016

ok, recompiled it w/ debug symbols. Same crash

(gdb) full bt
Undefined command: "full".  Try "help".
(gdb) bt full
#0  __GI___pthread_mutex_lock (mutex=0x0) at ../nptl/pthread_mutex_lock.c:66
        __PRETTY_FUNCTION__ = "__pthread_mutex_lock"
        type = 1144
#1  0x00000000007d20fc in mtx_lock (mtx=0x478) at tinycthread.c:135
No locals.
#2  0x00000000007c3b75 in rd_kafka_cgrp_handle_SyncGroup (rkcg=0x7fd6d8454600, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=0x7fd6d5ffbef0) at rdkafka_cgrp.c:1984
        _logname = "\000\000\000\000\000\000\000\000\000FE\330\326\177\000\000\004\000\000\000\002\000\000\000\063\000\000\000\000\000\000\000\b\000\000\000\004\000\000\000\063\000\000\000\000\000\000\000\000\000\300\322\326\177\000\000\063\000\000\000\000\000\000\000\000\000\300\322\326\177\000\000\366\260\274\356\326\177", '\000' <repeats 14 times>, "\326\177\000\000\001\000\000\000\000\000\000\000\t\000\000\000\000\000\000\000\000\060\303\322\326\177\000\000\360#\337\356\326\177\000"
        _LEN = 2
        rkbuf = 0x7fd6d2c29540
        assignment = 0x17c36d
        log_decode_errors = 1
        Version = 0
        TopicCnt = 40
        UserData = {len = 9, data = 0x7fd6e38911c0, _data = 0x7fd6d5ffbdd0 ""}
        rkgm = {rkgm_subscription = 0x3134343033345f64, rkgm_assignment = 0x3231393936353830, rkgm_eligible = {rl_size = 842019889, rl_cnt = 1717710134, 
            rl_elems = 0x342d356331333739, rl_free_cb = 0x7fd6d2c00000, rl_flags = -289625838}, rkgm_member_id = 0x9, rkgm_userdata = 0x25a6301987ed200, 
          rkgm_member_metadata = 0x9007fd6e38911c0}
        __FUNCTION__ = "rd_kafka_cgrp_handle_SyncGroup"
#3  0x00000000007b2991 in rd_kafka_handle_SyncGroup (rk=0x7fd6d8463a00, rkb=0x7fd6d2c17600, err=RD_KAFKA_RESP_ERR_NO_ERROR, rkbuf=0x7fd6d181ac00, request=0x7fd6d2c29380, 
    opaque=0x7fd6d8454600) at rdkafka_request.c:966
        rkcg = 0x7fd6d8454600
        log_decode_errors = 1
---Type <return> to continue, or q <return> to quit--- 
        ErrorCode = 0
        MemberState = {len = 0, data = 0x7fd6d182e05e, _data = 0x7fd6d5ffbf00 ""}
        actions = 0
        __FUNCTION__ = "rd_kafka_handle_SyncGroup"
#4  0x00000000007a7f46 in rd_kafka_buf_callback (rk=0x7fd6d8463a00, rkb=0x7fd6d2c17600, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7fd6d181ac00, request=0x7fd6d2c29380)
    at rdkafka_buf.c:515
        __FUNCTION__ = "rd_kafka_buf_callback"
#5  0x00000000007a7d57 in rd_kafka_buf_handle_op (rko=0x7fd6d18151e0, err=RD_KAFKA_RESP_ERR_NO_ERROR) at rdkafka_buf.c:461
        request = 0x7fd6d2c29380
        response = 0x7fd6d181ac00
#6  0x00000000007c30a2 in rd_kafka_cgrp_op_serve (rkcg=0x7fd6d8454600, rkb=0x7fd6d2c17600) at rdkafka_cgrp.c:1664
        rktp = 0x0
        err = RD_KAFKA_RESP_ERR_NO_ERROR
        silent_op = 1
        rko = 0x7fd6d18151e0
        __FUNCTION__ = "rd_kafka_cgrp_op_serve"
#7  0x00000000007c32f7 in rd_kafka_cgrp_serve (rkcg=0x7fd6d8454600) at rdkafka_cgrp.c:1746
        rkb = 0x7fd6d2c17600
        rkb_state = 4
#8  0x000000000077c3ff in rd_kafka_thread_main (arg=0x7fd6d8463a00) at rdkafka.c:976
        sleeptime = 100000
        rk = 0x7fd6d8463a00
        tmr_topic_scan = {rtmr_link = {tqe_next = 0x7fd6d84548f0, tqe_prev = 0x7fd6d8463ed0}, rtmr_next = 1557357363339, rtmr_interval = 1000000, 
          rtmr_callback = 0x77c00d <rd_kafka_topic_scan_tmr_cb>, rtmr_arg = 0x0}
---Type <return> to continue, or q <return> to quit---
        tmr_stats_emit = {rtmr_link = {tqe_next = 0x7fd6d5ffc1a0, tqe_prev = 0x7fd6d84548f0}, rtmr_next = 1557416363339, rtmr_interval = 60000000, 
          rtmr_callback = 0x77c042 <rd_kafka_stats_emit_tmr_cb>, rtmr_arg = 0x0}
        tmr_metadata_refresh = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x7fd6d5ffc170}, rtmr_next = 1557656363340, rtmr_interval = 300000000, 
          rtmr_callback = 0x77c06c <rd_kafka_metadata_refresh_cb>, rtmr_arg = 0x0}
#9  0x00000000007d23c0 in _thrd_wrapper_function (aArg=0x7fd6d842e290) at tinycthread.c:611
        fun = 0x77c1b8 <rd_kafka_thread_main>
        arg = 0x7fd6d8463a00
        res = 0
        ti = 0x7fd6d842e290
#10 0x00007fd6eaece184 in start_thread (arg=0x7fd6d5fff700) at pthread_create.c:312
        __res = <optimized out>
        pd = 0x7fd6d5fff700
        now = <optimized out>
        unwind_buf = {cancel_jmp_buf = {{jmp_buf = {140560690050816, 7309513643689354903, 0, 0, 140560690051520, 140560690050816, -7287615768404318569, -7287582620825442665}, 
              mask_was_saved = 0}}, priv = {pad = {0x0, 0x0, 0x0, 0x0}, data = {prev = 0x0, cleanup = 0x0, canceltype = 0}}}
        not_first_call = <optimized out>
        pagesize_m1 = <optimized out>
        sp = <optimized out>
        freesize = <optimized out>
        __PRETTY_FUNCTION__ = "start_thread"
#11 0x00007fd6ea12b37d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 10, 2016

this happens when I do this:

// simplest prog.

https://gist.github.com/f6bc4c60b99d5001aaa64d101025e11b


    uint32_t outq(0);
    while((outq = producer_->outq_len()) > 0) {
      producer_->poll(5000);
    }

Thanks. I'm using this git tag:

librdkafka_version: '13d330a'

senior7515 commented Jun 10, 2016

this happens when I do this:

// simplest prog.

https://gist.github.com/f6bc4c60b99d5001aaa64d101025e11b


    uint32_t outq(0);
    while((outq = producer_->outq_len()) > 0) {
      producer_->poll(5000);
    }

Thanks. I'm using this git tag:

librdkafka_version: '13d330a'

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 13, 2016

I think the change 9932f55 fixed it.

I just walked the stack frame and it's basically a nullptr on mtx_lock. for the version I was using.

Going to ry 2213fb2 or 0.9.1 first. Weird that this only happens when I dispatch the producing to a background thread (no synchronization tho - that is,

thread([std::move<producer>(){ while true; producer->produce ( topic, key, value, paritition) }).detach()

senior7515 commented Jun 13, 2016

I think the change 9932f55 fixed it.

I just walked the stack frame and it's basically a nullptr on mtx_lock. for the version I was using.

Going to ry 2213fb2 or 0.9.1 first. Weird that this only happens when I dispatch the producing to a background thread (no synchronization tho - that is,

thread([std::move<producer>(){ while true; producer->produce ( topic, key, value, paritition) }).detach()

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 13, 2016

@edenhill I confirm that 9932f55 fixed the problem. I had to manually patch the config.h because there is no --disable-lz4 and ubuntu 14.04 only has lz4.h not lz4frame.h as part of liblz4-dev apt-package.

Now that we are cooking w/ gass producing records, I'm going to test the consumer load balancing.

Those seem like pretty easy changes on the config files. Would you like me to submit a PR for --disable-lz4 command line ?

Thanks!

  • Alex

senior7515 commented Jun 13, 2016

@edenhill I confirm that 9932f55 fixed the problem. I had to manually patch the config.h because there is no --disable-lz4 and ubuntu 14.04 only has lz4.h not lz4frame.h as part of liblz4-dev apt-package.

Now that we are cooking w/ gass producing records, I'm going to test the consumer load balancing.

Those seem like pretty easy changes on the config files. Would you like me to submit a PR for --disable-lz4 command line ?

Thanks!

  • Alex
@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 13, 2016

Linking #690 for completeness.

senior7515 commented Jun 13, 2016

Linking #690 for completeness.

@senior7515

This comment has been minimized.

Show comment
Hide comment
@senior7515

senior7515 Jun 13, 2016

@edenhill so far so good on the load distribution. Will do a bit more testing tomorrow, but looking great!

Don't have access to the big cluster at the moment, but on the tests I've done it looks good. I'm planning on doing a big load test next week when I get a few more days to work on this. I won't have more time this week after tom.

senior7515 commented Jun 13, 2016

@edenhill so far so good on the load distribution. Will do a bit more testing tomorrow, but looking great!

Don't have access to the big cluster at the moment, but on the tests I've done it looks good. I'm planning on doing a big load test next week when I get a few more days to work on this. I won't have more time this week after tom.

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Jun 18, 2016

Owner

Great!

Owner

edenhill commented Jun 18, 2016

Great!

@edenhill edenhill closed this Jun 18, 2016

@edenhill

This comment has been minimized.

Show comment
Hide comment
@edenhill

edenhill Jul 13, 2016

Owner

@RBlafford Saw your blog post which mentioned this issue, are you still seeing it in librdkafka 0.9.1?

Owner

edenhill commented Jul 13, 2016

@RBlafford Saw your blog post which mentioned this issue, are you still seeing it in librdkafka 0.9.1?

@RBlafford

This comment has been minimized.

Show comment
Hide comment
@RBlafford

RBlafford Jul 14, 2016

I haven't tried to run this example since then. Next week we will slot some time in to test and verify if this is still an issue.

RBlafford commented Jul 14, 2016

I haven't tried to run this example since then. Next week we will slot some time in to test and verify if this is still an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment