Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume thread takes 100% cpu #2084

Closed
3 tasks
sepcity opened this issue Nov 2, 2018 · 14 comments
Closed
3 tasks

consume thread takes 100% cpu #2084

sepcity opened this issue Nov 2, 2018 · 14 comments

Comments

@sepcity
Copy link

sepcity commented Nov 2, 2018

Description

After running my program for a period of time, i found that my consume thread takes 100% cpu

The stack is as follows:

#0 0x00007f9de4e09950 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000007b4c09 in cnd_timedwait (cond=, mtx=, ts=) at tinycthread.c:469
#2 0x00000000007b4fbd in cnd_timedwait_abs (cnd=cnd@entry=0xbac2c88, mtx=mtx@entry=0xbac2c60, tspec=tspec@entry=0x7f9dad7f8520) at tinycthread_extra.c:100
#3 0x000000000077d571 in rd_kafka_q_pop_serve (rkq=rkq@entry=0xbac2c60, timeout_ms=, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#4 0x000000000077d640 in rd_kafka_q_pop (rkq=rkq@entry=0xbac2c60, timeout_ms=, version=version@entry=0) at rdkafka_queue.c:399
#5 0x000000000074ea55 in rd_kafka_consume0 (rk=0xbac1be0, rkq=0xbac2c60, timeout_ms=) at rdkafka.c:2239
#6 0x0000000000741a7a in RdKafka::KafkaConsumerImpl::consume (this=, timeout_ms=) at KafkaConsumerImpl.cpp:112

when i debug it, i found it dead cycle at rdkafka_queue.c:342
the pthread_cond_timedwait return 22, maybe the condition variable is invalid?

How to reproduce

I also don't know....

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [0.11.6] librdkafka version (release number or git tag): <REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
  • [ 0.10.1] Apache Kafka version: <REPLACE with e.g., 0.10.2.3>
  • [Default] librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • [Centos 7.2(x64) ] Operating system: <REPLACE with e.g., Centos 5 (x64)>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Nov 2, 2018

Please fill out the checklist: What's your client configuration and platform?

@sepcity
Copy link
Author

sepcity commented Nov 7, 2018

Please fill out the checklist: What's your client configuration and platform?

I already fill out the checklist, do i need to to provide any other information?

@edenhill
Copy link
Contributor

edenhill commented Nov 7, 2018

Thank you.
Can you build librdkafka without optimization (./configure --disable-optimization) and reproduce the issue from gdb, and then print the arguments to cnd_timedwait_abs() in stack frame 3 to help us better understand what is going on?
A log with debug config property set to cgrp,broker,topic,protocol would also be useful.

Can you share relevant parts of your code?

@sepcity
Copy link
Author

sepcity commented Nov 8, 2018

Thank you.
Can you build librdkafka without optimization (./configure --disable-optimization) and reproduce the issue from gdb, and then print the arguments to cnd_timedwait_abs() in stack frame 3 to help us better understand what is going on?
A log with debug config property set to cgrp,broker,topic,protocol would also be useful.

Can you share relevant parts of your code?

Okay, i will try as you say

Today, i encounter another problem with the same phenomenon(consumer can't get any msg) . The last error msg is "Local: Broker transport failure", what does it mean?

Follow is my code:

void KafkaMsgMgr::ThreadFunc()
{
    while (m_running)
    {
        for (RdKafka::KafkaConsumer* consumer : m_consumer_set)
        {
            bool msg_left = false;
            do
            {
                msg_left = false;
                std::unique_ptr<RdKafka::Message> msg;
                msg.reset(consumer->consume(CONSUMER_CONSUME_TIMEOUT.count()));

                switch (msg->err())
                {
                case RdKafka::ERR_NO_ERROR:
                    ConsumeMsg(msg);
                    msg_left = true;
                    break;
                case RdKafka::ERR__TIMED_OUT:
                    break;
                case RdKafka::ERR__PARTITION_EOF:
                    INFO("PARTITION_EOF");
                    break;
                default:
                    ERROR("consume failed: %s", msg->errstr().c_str());
                    break;
                }
            }while (msg_left);
        }
    }
}

I create a thread to consume message from kafka, then post it to my working thread.
What this thread does is very simple.

@sepcity
Copy link
Author

sepcity commented Nov 8, 2018

I already reproduce this issue.
Follow is my debug info.

Breakpoint 5, cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:470
470	tinycthread.c: File or directory not found.
(gdb) p ret
$3 = 22
(gdb) bt
#0  cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:470
#1  0x00000000007f288d in cnd_timedwait_abs (cnd=cnd@entry=0x13df2b8, mtx=mtx@entry=0x13df290, tspec=tspec@entry=0x7fe7f9ff9520) at tinycthread_extra.c:100
#2  0x00000000007bae41 in rd_kafka_q_pop_serve (rkq=rkq@entry=0x13df290, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#3  0x00000000007baf10 in rd_kafka_q_pop (rkq=rkq@entry=0x13df290, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#4  0x000000000078c325 in rd_kafka_consume0 (rk=0x13de270, rkq=0x13df290, timeout_ms=<optimized out>) at rdkafka.c:2239
#5  0x000000000077f3da in RdKafka::KafkaConsumerImpl::consume (this=<optimized out>, timeout_ms=<optimized out>) at KafkaConsumerImpl.cpp:112
#6  0x0000000000610192 in fsp::sss::common::KafkaMsgMgr::ThreadFunc() ()
#7  0x00007fe808e9cc60 in std::(anonymous namespace)::execute_native_thread_routine (__p=<optimized out>) at ../../../.././libstdc++-v3/src/c++11/thread.cc:84
#8  0x00007fe80a015dc5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007fe80860476d in clone () from /lib64/libc.so.6
(gdb) f 1
#1  0x00000000007f288d in cnd_timedwait_abs (cnd=cnd@entry=0x13df2b8, mtx=mtx@entry=0x13df290, tspec=tspec@entry=0x7fe7f9ff9520) at tinycthread_extra.c:100
100	tinycthread_extra.c: File or directory not found.
(gdb) p *cnd
$4 = {__data = {__lock = 0, __futex = 7291784, __total_seq = 3645892, __wakeup_seq = 3645892, __woken_seq = 3645892, __mutex = 0x13df290, __nwaiters = 0, __broadcast_seq = 0}, 
  __size = "\000\000\000\000\210Co\000?7\000\000\000\000\000?7\000\000\000\000\000?7\000\000\000\000\000\220\362=\001", '\000' <repeats 11 times>, __align = 31317973809496064}
(gdb) p *mtx
$5 = {__data = {__lock = 2, __count = 0, __owner = 29943, __nusers = 1, __kind = 0, __spins = 0, __list = {__prev = 0x0, __next = 0x0}}, 
  __size = "\002\000\000\000\000\000\000\000\367t\000\000\001", '\000' <repeats 26 times>, __align = 2}
(gdb) p *tspec
$6 = {tv_sec = 1541656539, tv_nsec = 1000000000}

The debug environment is still running, what other information can i offer?

@edenhill
Copy link
Contributor

edenhill commented Nov 8, 2018

Thank you. Both the cnd and mutex looks alright.
I will need to ask you to reproduce this with threadsanitizer. The easiest way is to
uncomment this line in dev-conf.sh, and then run: ./dev-conf.sh, and then reproduce the issue.
ThreadSanitizer should print why the cond is invalid.

@sepcity
Copy link
Author

sepcity commented Nov 23, 2018

Hi,
Maybe i found the bug in code.

In the above stack, what you say is right, Both the cnd and mutex looks alright, but the tspec is wrong. U can see that the ts->tv_nsec = 1000000000, It's an invalid value, Its interval must in[0,1000000000).
The bug may caused by rd_timeout_init_timespec

static RD_INLINE void rd_timeout_init_timespec (struct timespec *tspec,
                                                int timeout_ms) {
        if (timeout_ms == RD_POLL_INFINITE ||
            timeout_ms == RD_POLL_NOWAIT) {
                tspec->tv_sec = timeout_ms;
                tspec->tv_nsec = 0;
        } else {
                timespec_get(tspec, TIME_UTC);
                tspec->tv_sec  += timeout_ms / 1000;
                tspec->tv_nsec += (timeout_ms % 1000) * 1000000;
                if (tspec->tv_nsec > 1000000000) {
                        tspec->tv_nsec -= 1000000000;
                        tspec->tv_sec++;
                }
        }
}
I think "if (tspec->tv_nsec > 1000000000)" should modify > to >=.

@edenhill
Copy link
Contributor

@sepcity Right on! This bug was just fixed on master: #2108.

@klesta490
Copy link

@edenhill hi! Will this be fixed in 11.6? And if so, do you have any idea when? Thx for info

@edenhill
Copy link
Contributor

edenhill commented Dec 4, 2018

v0.11.6 has already been released.
This issue is fixed in the upcoming v1.0.0.
Please verify the fix with v1.0.0-RC4

@klesta490
Copy link

klesta490 commented Dec 4, 2018

Sure, I will give it a try in our master branch (but because of breaking changes it will need some time). Anyway for our deployed solutions, could we expect fix in 11.6 version successor? I mean is 11.7 on plan? And if no, is there any timeschedule for 1.0 GA? Thanks

@edenhill
Copy link
Contributor

edenhill commented Dec 4, 2018

The librdkafka project only maintains one release train, the latest release is always GA unless otherwise noted.
There will be no more 0.11.x releases, the next one is v1.0.0.

@GeorgeShagov
Copy link

Had this been fixed I wonder? I have the case in 0.11.6: #1858 (comment)

@klesta490
Copy link

Sorry for late answer, in 1.0 it works fine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants