Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_toppar_enq_msg() lock when produce msg #2447

Closed
4 of 7 tasks
MockingJayWong opened this issue Aug 2, 2019 · 31 comments
Closed
4 of 7 tasks

rd_kafka_toppar_enq_msg() lock when produce msg #2447

MockingJayWong opened this issue Aug 2, 2019 · 31 comments

Comments

@MockingJayWong
Copy link

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

after sending msg for a while(a day or 20minutes, unpredictable), thread which is used to apply rd_kafka_produce() lock. I want to know what reason may causes this.

How to reproduce

<your steps how to reproduce goes here, or remove section if not relevant>

making a Performance Test, sending msg in a long time, and then thread will lock。
gdb info will paste in comment.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): librdkafka-1.0.1
  • Apache Kafka version: kafka_2.12-2.2.0
  • librdkafka client configuration: {"queue.buffering.max.messages", "10000000"}, {"acks", "all"}, {"linger.ms", "0"}, {"compression.codec", "none"}, {"socket.keepalive.enable", "true"}, {"enable.idempotence", "true"}, {"message.timeout.ms", "100000"}, {"reconnect.backoff.jitter.ms", "300"}
  • Operating system: Centos 7 (x64)
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@MockingJayWong
Copy link
Author

MockingJayWong commented Aug 2, 2019

info thread

14 Thread 0x7f0770400700 (LWP 17877) "producer_per" 0x00007f077176438d in poll () from /lib64/libc.so.6
13 Thread 0x7f076fbff700 (LWP 17878) "producer_per" 0x00007f0771460965 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
12 Thread 0x7f076f3fe700 (LWP 17879) "producer_per" 0x00007f07714634ed in __lll_lock_wait () from /lib64/libpthread.so.0
11 Thread 0x7f076e3fc700 (LWP 17882) "rdk:main" 0x00007f07714602ce in pthread_rwlock_wrlock () from /lib64/libpthread.so.0
10 Thread 0x7f076ebfd700 (LWP 17883) "rdk:broker-1" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
9 Thread 0x7f076dbfb700 (LWP 17884) "rdk:broker0" 0x00007f077176438d in poll () from /lib64/libc.so.6
8 Thread 0x7f076d3fa700 (LWP 17885) "rdk:broker1" 0x00007f0771179344 in rd_kafka_msgq_enq_sorted0 (rkmq=rkmq@entry=0x7f0754001430, rkm=rkm@entry=0x7f06ecd680e0,
order_cmp=order_cmp@entry=0x7f077117b8a0 <rd_kafka_msg_cmp_msgid>) at rdkafka_msg.c:632
7 Thread 0x7f076cbf9700 (LWP 17886) "rdk:broker2" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
6 Thread 0x7f075ffff700 (LWP 17887) "rdk:main" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
5 Thread 0x7f075f7fe700 (LWP 17888) "rdk:broker-1" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
4 Thread 0x7f075effd700 (LWP 17889) "rdk:broker0" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
3 Thread 0x7f075e7fc700 (LWP 17890) "rdk:broker1" 0x00007f077176438d in poll () from /lib64/libc.so.6
2 Thread 0x7f075dffb700 (LWP 17891) "rdk:broker2" 0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
1 Thread 0x7f077207e3c0 (LWP 17876) "producer_per" 0x00007f077176438d in poll () from /lib64/libc.so.6

thread apply all bt

Thread 14 (Thread 0x7f0770400700 (LWP 17877)):
#0  0x00007f077176438d in poll () from /lib64/libc.so.6
#1  0x00007f0770f3fcf2 in do_io (v=0x55c619132ad0) at src/mt_adaptor.c:387
#2  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#3  0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 13 (Thread 0x7f076fbff700 (LWP 17878)):
#0  0x00007f0771460965 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f0770f3fe3b in do_completion (v=0x55c619132ad0) at src/mt_adaptor.c:463
#2  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#3  0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 12 (Thread 0x7f076f3fe700 (LWP 17879)):
#0  0x00007f07714634ed in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f077145edcb in _L_lock_883 () from /lib64/libpthread.so.0
#2  0x00007f077145ec98 in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x00007f07711c70b9 in mtx_lock (mtx=mtx@entry=0x7f07540013e0) at tinycthread.c:136
#4  0x00007f07711b70a9 in rd_kafka_toppar_enq_msg (rktp=0x7f0754001360, rkm=0x7f071a34dc20) at rdkafka_partition.c:646
#5  0x00007f0771179767 in rd_kafka_msg_partitioner (rkt=rkt@entry=0x55c61913e700, rkm=rkm@entry=0x7f071a34dc20, do_lock=do_lock@entry=1) at rdkafka_msg.c:889
#6  0x00007f0771179932 in rd_kafka_msg_new (rkt=0x55c61913e700, force_partition=<optimized out>, msgflags=1,
    payload=0x55c63112c270 "abcdefghigklmnopqrstuvwxyzABCDEFGHIGKLMNOPQRSTUVWXYZ123456789_e_time_1564711257239436", len=86, key=0x0, keylen=0, msg_opaque=0x7f0771c5d4e7 <mq_delivey_cb>) at rdkafka_msg.c:267
#7  0x00007f0771c5ff0b in mq_producer_produce_impl (producer=0x55c61913e3b0, flags=1, data=0x55c63112c270 "abcdefghigklmnopqrstuvwxyzABCDEFGHIGKLMNOPQRSTUVWXYZ123456789_e_time_1564711257239436",
    data_len=86) at /mnt/d/code/CU-Broker/CU-Broker/dev/mq_client/source/mq_producer.c:222
#8  0x00007f0771c60731 in mq_producer_produce_nocopy (producer=0x55c61913e3b0, data=0x55c63112c270 "abcdefghigklmnopqrstuvwxyzABCDEFGHIGKLMNOPQRSTUVWXYZ123456789_e_time_1564711257239436", data_len=86)
    at /mnt/d/code/CU-Broker/CU-Broker/dev/mq_client/source/mq_producer.c:462
#9  0x00007f0771c5ceef in msg_deliver_func (param=0x0) at /mnt/d/code/CU-Broker/CU-Broker/dev/message/source/message.c:142
#10 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 11 (Thread 0x7f076e3fc700 (LWP 17882)):
#0  0x00007f07714602ce in pthread_rwlock_wrlock () from /lib64/libpthread.so.0
#1  0x00007f07711c7629 in rwlock_wrlock (rwl=rwl@entry=0x55c61913e718) at tinycthread_extra.c:134
#2  0x00007f077117c80b in rd_kafka_topic_metadata_update (rkt=rkt@entry=0x55c61913e700, mdt=mdt@entry=0x7f0754000d00, ts_age=<optimized out>) at rdkafka_topic.c:891
#3  0x00007f077117d3ff in rd_kafka_topic_metadata_update2 (rkb=rkb@entry=0x55c619136cc0, mdt=mdt@entry=0x7f0754000d00) at rdkafka_topic.c:1016
#4  0x00007f07711cadab in rd_kafka_parse_Metadata (rkb=rkb@entry=0x55c619136cc0, request=request@entry=0x7f0750000990, rkbuf=rkbuf@entry=0x7f0758000d20, mdp=mdp@entry=0x7f076e3f9e48)
    at rdkafka_metadata.c:491
#5  0x00007f07711963dc in rd_kafka_handle_Metadata (rk=<optimized out>, rkb=0x55c619136cc0, err=<optimized out>, rkbuf=0x7f0758000d20, request=0x7f0750000990, opaque=0x0) at rdkafka_request.c:1430
#6  0x00007f077118b14b in rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=<optimized out>, response=0x7f0758000d20, request=0x7f0750000990) at rdkafka_buf.c:456
#7  0x00007f07711945f3 in rd_kafka_op_handle_std (rk=rk@entry=0x55c6191345e0, rkq=rkq@entry=0x7f076e3fa080, rko=rko@entry=0x7f0758000ca0, cb_type=cb_type@entry=1) at rdkafka_op.c:626
#8  0x00007f0771194625 in rd_kafka_op_handle (rk=0x55c6191345e0, rkq=0x7f076e3fa080, rko=0x7f0758000ca0, cb_type=RD_KAFKA_Q_CB_CALLBACK, opaque=0x0, callback=0x0) at rdkafka_op.c:657
#9  0x00007f077118f4c3 in rd_kafka_q_serve (rkq=0x55c619135110, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:485
#10 0x00007f077115db1c in rd_kafka_thread_main (arg=arg@entry=0x55c6191345e0) at rdkafka.c:1836
#11 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#12 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#13 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 10 (Thread 0x7f076ebfd700 (LWP 17883)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c619136c28, mtx=mtx@entry=0x55c619136c00, tspec=tspec@entry=0x7f076ebfad50) at tinycthread_extra.c:100
#3  0x00007f077118f1fc in rd_kafka_q_pop_serve (rkq=0x55c619136c00, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007f077118f2d0 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007f077117529f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x55c6191360b0, timeout_ms=<optimized out>) at rdkafka_broker.c:2983
#6  0x00007f0771175327 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c6191360b0, abs_timeout=abs_timeout@entry=1793076526944) at rdkafka_broker.c:3036
#7  0x00007f0771176593 in rd_kafka_broker_internal_serve (abs_timeout=1793076526944, rkb=0x55c6191360b0) at rdkafka_broker.c:3090
#8  rd_kafka_broker_serve (rkb=rkb@entry=0x55c6191360b0, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4251
#9  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c6191360b0) at rdkafka_broker.c:4370
#10 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
---Type <return> to continue, or q <return> to quit---
#11 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 9 (Thread 0x7f076dbfb700 (LWP 17884)):
#0  0x00007f077176438d in poll () from /lib64/libc.so.6
#1  0x00007f07711896ce in rd_kafka_transport_poll (rktrans=rktrans@entry=0x7f07580012d0, tmout=tmout@entry=1000) at rdkafka_transport.c:943
#2  0x00007f0771189757 in rd_kafka_transport_io_serve (rktrans=0x7f07580012d0, timeout_ms=1000) at rdkafka_transport.c:804
#3  0x00007f077117531d in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c619136cc0, abs_timeout=abs_timeout@entry=1793076592497) at rdkafka_broker.c:3029
#4  0x00007f0771176413 in rd_kafka_broker_producer_serve (abs_timeout=1793076592497, rkb=0x55c619136cc0) at rdkafka_broker.c:3520
#5  rd_kafka_broker_serve (rkb=rkb@entry=0x55c619136cc0, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4253
#6  0x00007f077117691d in rd_kafka_broker_thread_main (arg=arg@entry=0x55c619136cc0) at rdkafka_broker.c:4403
#7  0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#8  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 8 (Thread 0x7f076d3fa700 (LWP 17885)):
#0  0x00007f0771179344 in rd_kafka_msgq_enq_sorted0 (rkmq=rkmq@entry=0x7f0754001430, rkm=rkm@entry=0x7f06ecd680e0, order_cmp=order_cmp@entry=0x7f077117b8a0 <rd_kafka_msg_cmp_msgid>) at rdkafka_msg.c:632
#1  0x00007f07711b7243 in rd_kafka_msgq_insert_msgq_sort (cmp=<optimized out>, srcq=<optimized out>, destq=<optimized out>) at rdkafka_partition.c:685
#2  rd_kafka_msgq_insert_msgq (destq=destq@entry=0x7f0754001430, srcq=srcq@entry=0x7f0754001410, cmp=0x7f077117b8a0 <rd_kafka_msg_cmp_msgid>) at rdkafka_partition.c:731
#3  0x00007f0771170d32 in rd_kafka_toppar_producer_serve (rkb=rkb@entry=0x55c6191377d0, rktp=rktp@entry=0x7f0754001360, pid=..., now=now@entry=1792977469763, next_wakeup=next_wakeup@entry=0x7f076d3f7e50,
    do_timeout_scan=do_timeout_scan@entry=0) at rdkafka_broker.c:3244
#4  0x00007f077117637f in rd_kafka_broker_produce_toppars (do_timeout_scan=0, next_wakeup=<synthetic pointer>, now=<optimized out>, rkb=0x55c6191377d0) at rdkafka_broker.c:3459
#5  rd_kafka_broker_producer_serve (abs_timeout=1792978469463, rkb=0x55c6191377d0) at rdkafka_broker.c:3513
#6  rd_kafka_broker_serve (rkb=rkb@entry=0x55c6191377d0, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4253
#7  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c6191377d0) at rdkafka_broker.c:4370
#8  0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#9  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#10 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 7 (Thread 0x7f076cbf9700 (LWP 17886)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c619139088, mtx=mtx@entry=0x55c619139060, tspec=tspec@entry=0x7f076cbf6d50) at tinycthread_extra.c:100
#3  0x00007f077118f1fc in rd_kafka_q_pop_serve (rkq=0x55c619139060, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007f077118f2d0 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007f077117529f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x55c619138530, timeout_ms=<optimized out>) at rdkafka_broker.c:2983
#6  0x00007f0771175327 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c619138530, abs_timeout=abs_timeout@entry=1793076546731) at rdkafka_broker.c:3036
#7  0x00007f0771176413 in rd_kafka_broker_producer_serve (abs_timeout=1793076546731, rkb=0x55c619138530) at rdkafka_broker.c:3520
#8  rd_kafka_broker_serve (rkb=rkb@entry=0x55c619138530, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4253
#9  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c619138530) at rdkafka_broker.c:4370
#10 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#11 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 6 (Thread 0x7f075ffff700 (LWP 17887)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c61913a808, mtx=mtx@entry=0x55c61913a7e0, tspec=tspec@entry=0x7f075fffd070) at tinycthread_extra.c:100
#3  0x00007f077118f5de in rd_kafka_q_serve (rkq=0x55c61913a7e0, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:462
#4  0x00007f077115db1c in rd_kafka_thread_main (arg=arg@entry=0x55c619139bf0) at rdkafka.c:1836
#5  0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#6  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#7  0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 5 (Thread 0x7f075f7fe700 (LWP 17888)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c61913b858, mtx=mtx@entry=0x55c61913b830, tspec=tspec@entry=0x7f075f7fbd50) at tinycthread_extra.c:100
#3  0x00007f077118f1fc in rd_kafka_q_pop_serve (rkq=0x55c61913b830, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0,
---Type <return> to continue, or q <return> to quit---
    opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007f077118f2d0 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007f077117529f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x55c61913ad00, timeout_ms=<optimized out>) at rdkafka_broker.c:2983
#6  0x00007f0771175327 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c61913ad00, abs_timeout=abs_timeout@entry=1793077372146) at rdkafka_broker.c:3036
#7  0x00007f0771176593 in rd_kafka_broker_internal_serve (abs_timeout=1793077372146, rkb=0x55c61913ad00) at rdkafka_broker.c:3090
#8  rd_kafka_broker_serve (rkb=rkb@entry=0x55c61913ad00, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4251
#9  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c61913ad00) at rdkafka_broker.c:4370
#10 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#11 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 4 (Thread 0x7f075effd700 (LWP 17889)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c6191394b8, mtx=mtx@entry=0x55c619139490, tspec=tspec@entry=0x7f075effab30) at tinycthread_extra.c:100
#3  0x00007f077118f1fc in rd_kafka_q_pop_serve (rkq=0x55c619139490, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007f077118f2d0 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007f077117529f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x55c61913bb60, timeout_ms=<optimized out>) at rdkafka_broker.c:2983
#6  0x00007f0771175327 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c61913bb60, abs_timeout=abs_timeout@entry=1793077419030) at rdkafka_broker.c:3036
#7  0x00007f07711755cf in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0x55c61913bb60, abs_timeout=abs_timeout@entry=1793077419030) at rdkafka_broker.c:4153
#8  0x00007f0771176563 in rd_kafka_broker_serve (rkb=rkb@entry=0x55c61913bb60, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4255
#9  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c61913bb60) at rdkafka_broker.c:4370
#10 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#11 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 3 (Thread 0x7f075e7fc700 (LWP 17890)):
#0  0x00007f077176438d in poll () from /lib64/libc.so.6
#1  0x00007f07711896ce in rd_kafka_transport_poll (rktrans=rktrans@entry=0x7f073c000c30, tmout=tmout@entry=1000) at rdkafka_transport.c:943
#2  0x00007f0771189757 in rd_kafka_transport_io_serve (rktrans=0x7f073c000c30, timeout_ms=1000) at rdkafka_transport.c:804
#3  0x00007f077117531d in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c61913c670, abs_timeout=abs_timeout@entry=1793077294023) at rdkafka_broker.c:3029
#4  0x00007f07711755cf in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0x55c61913c670, abs_timeout=abs_timeout@entry=1793077294023) at rdkafka_broker.c:4153
#5  0x00007f0771176563 in rd_kafka_broker_serve (rkb=rkb@entry=0x55c61913c670, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4255
#6  0x00007f077117691d in rd_kafka_broker_thread_main (arg=arg@entry=0x55c61913c670) at rdkafka_broker.c:4403
#7  0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#8  0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x7f075dffb700 (LWP 17891)):
#0  0x00007f0771460d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f07711c71d9 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:462
#2  0x00007f07711c758d in cnd_timedwait_abs (cnd=cnd@entry=0x55c61913df68, mtx=mtx@entry=0x55c61913df40, tspec=tspec@entry=0x7f075dff8b30) at tinycthread_extra.c:100
#3  0x00007f077118f1fc in rd_kafka_q_pop_serve (rkq=0x55c61913df40, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0,
    opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007f077118f2d0 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007f077117529f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x55c61913d3f0, timeout_ms=<optimized out>) at rdkafka_broker.c:2983
#6  0x00007f0771175327 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55c61913d3f0, abs_timeout=abs_timeout@entry=1793076852106) at rdkafka_broker.c:3036
#7  0x00007f07711755cf in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0x55c61913d3f0, abs_timeout=abs_timeout@entry=1793076852106) at rdkafka_broker.c:4153
#8  0x00007f0771176563 in rd_kafka_broker_serve (rkb=rkb@entry=0x55c61913d3f0, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4255
#9  0x00007f0771176d4c in rd_kafka_broker_thread_main (arg=arg@entry=0x55c61913d3f0) at rdkafka_broker.c:4370
#10 0x00007f07711c7037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#11 0x00007f077145cdd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f077176f02d in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x7f077207e3c0 (LWP 17876)):
#0  0x00007f077176438d in poll () from /lib64/libc.so.6
#1  0x00007f0771c5d40b in send_message (channel_name=0x7fff2cbae034 "wlx_test_0801", data=0x7fff2cbae0d0, data_lenth=86) at /mnt/d/code/CU-Broker/CU-Broker/dev/message/source/message.c:249
#2  0x000055c618ac9103 in main () at cu_broker_producer_per_test.c:121

@MockingJayWong
Copy link
Author

inside rd_kafka_toppar_enq_msg this function, I got

  • (gdb) p *rktp
    $3 = {rktp_rklink = {tqe_next = 0x0, tqe_prev = 0x0}, rktp_rkblink = {tqe_next = 0x0, tqe_prev = 0x55c619137848}, rktp_activelink = {cqe_next = 0x55c619137860, cqe_prev = 0x55c619137860}, rktp_rktlink = {
    tqe_next = 0x0, tqe_prev = 0x0}, rktp_cgrplink = {tqe_next = 0x0, tqe_prev = 0x0}, rktp_rkt = 0x55c61913e700, rktp_s_rkt = 0x55c61913e700, rktp_partition = 0, rktp_leader_id = 1,
    rktp_leader = 0x55c6191377d0, rktp_next_leader = 0x0, rktp_refcnt = {val = 3}, rktp_lock = {__data = {__lock = 2, __count = 0, __owner = 17885, __nusers = 1, __kind = 0, __spins = 0, __elision = 0,
    __list = {__prev = 0x0, __next = 0x0}}, __size = "\002\000\000\000\000\000\000\000\335E\000\000\001", '\000' <repeats 26 times>, __align = 2}, rktp_msgq_wakeup_q = 0x55c619135f60, rktp_msgq = {
    rkmq_msgs = {tqh_first = 0x7f071a4154e0, tqh_last = 0x7f06ece7b8b8}, rkmq_msg_cnt = 7212, rkmq_msg_bytes = 620232}, rktp_xmit_msgq = {rkmq_msgs = {tqh_first = 0x7f071a4154e0,
    tqh_last = 0x7f06ecd68078}, rkmq_msg_cnt = 6815244, rkmq_msg_bytes = 586110984}, rktp_fetch = 0, rktp_fetchq = 0x7f07540008c0, rktp_ops = 0x7f0754001150, rktp_msgs_inflight = {val = 0},
    rktp_msgid = 21579317358, rktp_eos = {pid = {id = 20011, epoch = 32126}, acked_msgid = 21572485801, epoch_base_msgid = 21572005802, next_ack_seq = 490000, next_err_seq = 490000, wait_drain = 1 '\001'},
    rktp_version = {val = 1}, rktp_op_version = 1, rktp_fetch_version = 0, rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE, rktp_fetch_msg_max_bytes = 1048576, rktp_ts_fetch_backoff = 0,
    rktp_query_offset = -1001, rktp_next_offset = -1001, rktp_last_next_offset = -1001, rktp_app_offset = -1001, rktp_stored_offset = -1001, rktp_committing_offset = -1001, rktp_committed_offset = -1001,
    rktp_ts_committed_offset = 0, rktp_offsets = {fetch_offset = 0, eof_offset = -1001, hi_offset = -1001}, rktp_offsets_fin = {fetch_offset = 0, eof_offset = -1001, hi_offset = -1001},
    rktp_hi_offset = -1001, rktp_lo_offset = -1001, rktp_ts_offset_lag = 0, rktp_offset_path = 0x0, rktp_offset_fp = 0x0, rktp_cgrp = 0x0, rktp_assigned = 0, rktp_replyq = {q = 0x0, version = 0},
    rktp_flags = 0, rktp_s_for_desp = 0x0, rktp_s_for_cgrp = 0x0, rktp_s_for_rkb = 0x7f0754001360, rktp_offset_query_tmr = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0,
    rtmr_oneshot = 0 '\000', rtmr_callback = 0x0, rtmr_arg = 0x0}, rktp_offset_commit_tmr = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000',
    rtmr_callback = 0x0, rtmr_arg = 0x0}, rktp_offset_sync_tmr = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000', rtmr_callback = 0x0,
    rtmr_arg = 0x0}, rktp_consumer_lag_tmr = {rtmr_link = {tqe_next = 0x0, tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000', rtmr_callback = 0x0, rtmr_arg = 0x0},
    rktp_wait_consumer_lag_resp = 0, rktp_last_err = {err = RD_KAFKA_RESP_ERR__TIMED_OUT, actions = 136, ts = 1792977469058, base_msgid = 21572485802, base_seq = 480000, last_seq = 489999}, rktp_c = {
    tx_msgs = {val = 15113161992}, tx_msg_bytes = {val = 1299731931289}, rx_msgs = {val = 0}, rx_msg_bytes = {val = 0}, producer_enq_msgs = {val = 21579317359}, rx_ver_drops = {val = 0}}}
  • (gdb) p *rkm
    $4 = {rkm_rkmessage = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x55c61913e700, partition = 0, payload = 0x55c63112c270, len = 86, key = 0x0, key_len = 0, offset = -1001,
    _private = 0x7f0771c5d4e7 <mq_delivey_cb>}, rkm_link = {tqe_next = 0x7f071a34dcd0, tqe_prev = 0x7f0750000f70}, rkm_flags = 458753, rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME,
    rkm_timestamp = 1564711257330, rkm_headers = 0x0, rkm_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED, rkm_u = {producer = {ts_timeout = 1792987474178, ts_enq = 1792977474178, ts_backoff = 0, msgid = 0,
    last_msgid = 0, retries = 0}, consumer = {binhdrs = {len = 1986111746, data = 0x1a175c91282, _data = ""}}}}

@edenhill
Copy link
Contributor

edenhill commented Aug 2, 2019

Thread 12 is the application calling produce():

  • it holds the rkt locks
  • it tries to acquire the rktp lock

Thread 11 is the rdkafka main thread which is updating topic state from metadata:

  • it tries to acquire the rkt lock that is held by 12

Thread 8 is the broker thread producing messages:

  • it holds the rktp lock

From what I can see there is no deadlock, thread 8 is busy moving messages from the partition queue to the partition xmit queue.
Either your system is under-powered and the producer is just running slowly, a librdkafka client typically requires at least 4 CPU cores to perform well, but it should not hang indefinitely.

Or the message queue that is being moved is corrupt and cyclic, this we can check in gdb:

gdb) source path/to/librdkafka/.gdbmacros
gdb) thread 8
gdb) frame 3
gdb) dump_msgq rktp.rktp_xmit_msgq
gdb) dump_msgq rktp.rktp_msgq

@MockingJayWong
Copy link
Author

MockingJayWong commented Aug 2, 2019

follow your advice, there are two msgq content below. @edenhill .
so what may slow down or stop for a while when I call rd_kafka_produce, i want to avoid it.
what i am sure is that we stop at rd_kafka_produce for a while

(gdb) dump_msgq rktp.rktp_xmit_msgq

#0 ((rd_kafka_msgq_t *)0x7f071a4154e0) msgid 21572495703
#1 ((rd_kafka_msgq_t *)0x7f071a415590) msgid 21572495704
#2 ((rd_kafka_msgq_t *)0x7f071a415640) msgid 21572495705
#3 ((rd_kafka_msgq_t *)0x7f071a4156f0) msgid 21572495706
#4 ((rd_kafka_msgq_t *)0x7f071a4157a0) msgid 21572495707
#5 ((rd_kafka_msgq_t *)0x7f071a415850) msgid 21572495708
#6 ((rd_kafka_msgq_t *)0x7f071a415900) msgid 21572495709
#7 ((rd_kafka_msgq_t *)0x7f071a4159b0) msgid 21572495710
#8 ((rd_kafka_msgq_t *)0x7f071a415a60) msgid 21572495711
#9 ((rd_kafka_msgq_t *)0x7f071a415b10) msgid 21572495712
#10 ((rd_kafka_msgq_t *)0x7f071a415bc0) msgid 21572495713
#11 ((rd_kafka_msgq_t *)0x7f071a415c70) msgid 21572495714
#12 ((rd_kafka_msgq_t *)0x7f071a415d20) msgid 21572495715
#13 ((rd_kafka_msgq_t *)0x7f071a415dd0) msgid 21572495716
#14 ((rd_kafka_msgq_t *)0x7f071a415e80) msgid 21572495717
#15 ((rd_kafka_msgq_t *)0x7f071a415f30) msgid 21572495718
#16 ((rd_kafka_msgq_t *)0x7f071a415fe0) msgid 21572495719
#17 ((rd_kafka_msgq_t *)0x7f071a416090) msgid 21572495720
#18 ((rd_kafka_msgq_t *)0x7f071a416140) msgid 21572495721
---Type <return> to continue, or q <return> to quit---
#19 ((rd_kafka_msgq_t *)0x7f071a4161f0) msgid 21572495722
#20 ((rd_kafka_msgq_t *)0x7f071a4162a0) msgid 21572495723
#21 ((rd_kafka_msgq_t *)0x7f071a416350) msgid 21572495724
#22 ((rd_kafka_msgq_t *)0x7f071a416400) msgid 21572495725
#23 ((rd_kafka_msgq_t *)0x7f071a4164b0) msgid 21572495726
#24 ((rd_kafka_msgq_t *)0x7f071a416560) msgid 21572495727
#25 ((rd_kafka_msgq_t *)0x7f071a416610) msgid 21572495728
#26 ((rd_kafka_msgq_t *)0x7f071a4166c0) msgid 21572495729
#27 ((rd_kafka_msgq_t *)0x7f071a416770) msgid 21572495730
#28 ((rd_kafka_msgq_t *)0x7f071a416820) msgid 21572495731
#29 ((rd_kafka_msgq_t *)0x7f071a4168d0) msgid 21572495732
#30 ((rd_kafka_msgq_t *)0x7f071a416980) msgid 21572495733
#31 ((rd_kafka_msgq_t *)0x7f071a416a30) msgid 21572495734
#32 ((rd_kafka_msgq_t *)0x7f071a416ae0) msgid 21572495735
#33 ((rd_kafka_msgq_t *)0x7f071a416b90) msgid 21572495736
#34 ((rd_kafka_msgq_t *)0x7f071a416c40) msgid 21572495737
#35 ((rd_kafka_msgq_t *)0x7f071a416cf0) msgid 21572495738
#36 ((rd_kafka_msgq_t *)0x7f071a416da0) msgid 21572495739
#37 ((rd_kafka_msgq_t *)0x7f071a416e50) msgid 21572495740

(gdb) dump_msgq rktp.rktp_msgq

#0 ((rd_kafka_msgq_t *)0x7f071a4154e0) msgid 21572495703
#1 ((rd_kafka_msgq_t *)0x7f071a415590) msgid 21572495704
#2 ((rd_kafka_msgq_t *)0x7f071a415640) msgid 21572495705
#3 ((rd_kafka_msgq_t *)0x7f071a4156f0) msgid 21572495706
#4 ((rd_kafka_msgq_t *)0x7f071a4157a0) msgid 21572495707
#5 ((rd_kafka_msgq_t *)0x7f071a415850) msgid 21572495708
#6 ((rd_kafka_msgq_t *)0x7f071a415900) msgid 21572495709
#7 ((rd_kafka_msgq_t *)0x7f071a4159b0) msgid 21572495710
#8 ((rd_kafka_msgq_t *)0x7f071a415a60) msgid 21572495711
#9 ((rd_kafka_msgq_t *)0x7f071a415b10) msgid 21572495712
#10 ((rd_kafka_msgq_t *)0x7f071a415bc0) msgid 21572495713
#11 ((rd_kafka_msgq_t *)0x7f071a415c70) msgid 21572495714
#12 ((rd_kafka_msgq_t *)0x7f071a415d20) msgid 21572495715
#13 ((rd_kafka_msgq_t *)0x7f071a415dd0) msgid 21572495716
#14 ((rd_kafka_msgq_t *)0x7f071a415e80) msgid 21572495717
#15 ((rd_kafka_msgq_t *)0x7f071a415f30) msgid 21572495718
#16 ((rd_kafka_msgq_t *)0x7f071a415fe0) msgid 21572495719
#17 ((rd_kafka_msgq_t *)0x7f071a416090) msgid 21572495720
#18 ((rd_kafka_msgq_t *)0x7f071a416140) msgid 21572495721

@edenhill
Copy link
Contributor

edenhill commented Aug 2, 2019

does produce() stop for a while or indefinitely?

@MockingJayWong
Copy link
Author

just for a while, but i receives no dr_cb reported that queue is full. So if queue in toppar is not empty, what else will cause ‘produce()’ stop.

@edenhill
Copy link
Contributor

edenhill commented Aug 2, 2019

ERR__QUEUE_FULL is propagated by produce() failing and returning that error code, in which case the message you tried to produce will have been added to the queue.

if produce() blocks for a while it typically means your system can't keep up.
What is your system or instance specs? (cpu type, cpu cores, memory, etc)

@MockingJayWong
Copy link
Author

MockingJayWong commented Aug 2, 2019

it has only 2cpus: Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz, 10g memory.
are u to free to have a chat?

@edenhill
Copy link
Contributor

edenhill commented Aug 2, 2019

Monitor the CPU usage and system load to see if it is reaching its limits.

@MockingJayWong
Copy link
Author

MockingJayWong commented Aug 2, 2019

top - 07:48:31 up 21 days, 3:50, 6 users, load average: 3.19, 1.75, 1.83
Tasks: 120 total, 2 running, 117 sleeping, 0 stopped, 0 zombie
%Cpu(s): 95.3 us, 2.9 sy, 0.0 ni, 1.5 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
KiB Mem : 10073736 total, 2090800 free, 4818900 used, 3164036 buff/cache
KiB Swap: 2097148 total, 2097148 free, 0 used. 4361436 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25943 root 20 0 2939872 2.0g 1520 R 196.3 20.8 23:06.97 producer_per_08
1 root 20 0 127996 6600 4112 S 0.0 0.1 0:31.08 systemd
2 root 20 0 0 0 0 S 0.0 0.0 0:00.14 kthreadd
3 root 20 0 0 0 0 S 0.0 0.0 0:00.46 ksoftirqd/0
5 root 0 -20 0 0 0 S 0.0 0.0 0:00.00 kworker/0:0H


cpu over 200%.
what is the suggestion for this? upgrade hardware or something else

@MockingJayWong
Copy link
Author

some case here:
first time I use poll(0) after send msg. then producer stop for a while and recover.
then I use another thread poll like this and producer stop for more time.

while (running) {
    rd_kafka_poll(rk, 1000);
}

@edenhill
Copy link
Contributor

edenhill commented Aug 2, 2019

Seems like your CPUs are saturated.

@MockingJayWong
Copy link
Author

i will use another vm with more cpu to test

@MockingJayWong
Copy link
Author

actually when cpu in a normal state, rd_kafka_produce work well. maybe i will reopen it when it appears again. thank u!

@MockingJayWong
Copy link
Author

MockingJayWong commented Aug 12, 2019

Hi, @edenhill. I have tested with a vm with 20cpus again. I think I met with same problem that producer cant send msg. as u can see:
top info

top - 02:47:13 up 4 days, 23:11,  6 users,  load average: 1.03, 0.93, 0.93
Tasks: 246 total,   1 running, 241 sleeping,   3 stopped,   1 zombie
%Cpu(s):  5.0 us,  0.0 sy,  0.0 ni, 94.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 16264364 total,  9520412 free,  5530564 used,  1213388 buff/cache
KiB Swap:  2097148 total,  1938600 free,   158548 used.  9734912 avail Mem

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
 7451 root      20   0 3226356   2.2g   1652 S 100.3 14.5  63:55.19 broker_examp
32451 root      20   0  158416   3308   1620 S   1.0  0.0  31:30.98 watch
 9300 root      20   0  162124   2468   1600 R   0.3  0.0   0:00.01 top

top each thread

KiB Swap:  2097148 total,  1938600 free,   158548 used.  9736372 avail Mem

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
 7459 root      20   0 3226356   2.2g   1652 R 99.7 14.5  25:47.01 rdk:broker1
 7451 root      20   0 3226356   2.2g   1652 S  0.7 14.5   1:05.82 broker_examp
 7452 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:03.39 broker_examp
 7453 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.00 broker_examp
 7456 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:04.13 rdk:main
 7457 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.24 rdk:broker-1
 7458 root      20   0 3226356   2.2g   1652 S  0.0 14.5   4:29.62 rdk:broker0
 7460 root      20   0 3226356   2.2g   1652 S  0.0 14.5  19:32.04 rdk:broker2
 7461 root      20   0 3226356   2.2g   1652 S  0.0 14.5   1:38.50 broker_examp
 7462 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:05.38 rdk:main
 7463 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.23 rdk:broker-1
 7464 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.27 rdk:broker0
 7465 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.26 rdk:broker1
 7466 root      20   0 3226356   2.2g   1652 S  0.0 14.5   0:00.27 rdk:broker2
 7467 root      20   0 3226356   2.2g   1652 S  0.0 14.5   5:33.77 broker_examp

function that rdk:broker1 thread using

Samples: 657K of event 'cycles', Event count (approx.): 37300654973
  Children      Self  Shared Object                   Symbol                                                                                                                                                                 
-   82.79%    82.36%  /usr/local/lib/librdkafka.so.1  0x328a7            d [.] rd_kafka_msg_cmp_msgid                                                                                                                        
     82.36% 0x55efbcb1ce10                                                                                                                                                                                                   
        rd_kafka_msg_cmp_msgid                                                                                                                                                                                               
-   17.21%    17.10%  /usr/local/lib/librdkafka.so.1  0x30344            d [.] rd_kafka_msgq_enq_sorted0                                                                                                                     
     17.10% 0x55efbcb1ce10                                                                                                                                                                                                   
        rd_kafka_msgq_enq_sorted0                                                                                                                                                                                            
+   12.53%     0.00%  [unknown]                       0x55efbcb1ce10     ! [.] 0x000055efbcb1ce10                                                                                                                            
+    0.52%     0.03%  /proc/kcore                     0x7fff9dd227f2     k [k] apic_timer_interrupt                                                                                                                          
     0.48%     0.00%  /proc/kcore                     0x7fff9dd260a3     k [k] smp_apic_timer_interrupt                                                                                                                      
+    0.41%     0.00%  /proc/kcore                     0x7fff9d65982b     k [k] local_apic_timer_interrupt                                                                                                                    

+    0.41%     0.01%  /proc/kcore                     0x7fff9d6c158f     k [k] hrtimer_interrupt        

and with my vNIC thoughout is not sending msg, TX Data/Rate should be larger than 20MB when it is normal. when i run another broker_example to produce msg, it work well.

Every 1.0s: ifstat eth0                                                                                                                                                                               Mon Aug 12 02:53:11 2019

#kernel
Interface        RX Pkts/Rate    TX Pkts/Rate    RX Data/Rate    TX Data/Rate
                 RX Errs/Drop    TX Errs/Drop    RX Over/Rate    TX Coll/Rate
eth0                   2 0             2 0           120 0           188 0
                       0 0             0 0             0 0             0 0

when with 2 producer, only one can work well.

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
16562 root      20   0 2400556   1.4g   1644 S 101.0  9.1  29:31.33 broker_examp
 7451 root      20   0  951852  59916   1652 S   6.7  0.4 115:01.78 broker_examp

So is there any thing that can helps figuring out this dilemma to me, thanks!

@MockingJayWong
Copy link
Author

let me add up something info:
each time when the producer stuck for a while, it will always work well again when I met with this warning like below

%6|1565595960.518|FAIL|rdkafka#producer-1| [thrd:vm2:9092/bootstrap]: vm2:9092/1: Disconnected (after 1162591ms in state UP)
%3|1565595960.518|ERROR|rdkafka#producer-1| [thrd:vm2:9092/bootstrap]: vm2:9092/1: Disconnected (after 1162591ms in state UP)

@MockingJayWong
Copy link
Author

@edenhill ,do you have any ideas?

@edenhill
Copy link
Contributor

Are there any adverse cluster events when this happens, such as brokers going down or network congestion?

@edenhill
Copy link
Contributor

Do you see any logs from the producer?

@MockingJayWong
Copy link
Author

I dont enable debug config. but sometimes receive

%3|1565603528.599|ERROR|rdkafka#consumer-2| [thrd:vm2:9092/bootstrap]: 3/3 brokers are down
%6|1565603905.882|FAIL|rdkafka#producer-1| [thrd:vm1:9092/bootstrap]: vm1:9092/0: Disconnected (after 1483869ms in state UP)
%3|1565603905.882|ERROR|rdkafka#producer-1| [thrd:vm1:9092/bootstrap]: vm1:9092/0: Disconnected (after 1483869ms in state UP)
%5|1565604082.784|REQTMOUT|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: vm3:9092/2: Timed out MetadataRequest in flight (after 0ms, timeout #0)
%5|1565604112.576|REQTMOUT|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: vm3:9092/2: Timed out MetadataRequest in flight (after 0ms, timeout #1)
%4|1565604112.576|REQTMOUT|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: vm3:9092/2: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1565604112.576|REQTMOUT|rdkafka#producer-1| [thrd:vm2:9092/bootstrap]: vm2:9092/1: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests
%3|1565604112.576|FAIL|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: vm3:9092/2: 2 request(s) timed out: disconnect (after 1690618ms in state UP)
%3|1565604112.576|ERROR|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: vm3:9092/2: 2 request(s) timed out: disconnect (after 1690618ms in state UP)
%3|1565604112.576|ERROR|rdkafka#producer-1| [thrd:vm3:9092/bootstrap]: 3/3 brokers are down

OR

%3|1565604189.308|ERROR|rdkafka#consumer-2| [thrd:vm3:9092/bootstrap]: 3/3 brokers are down

the second one I think is just a warning.

@MockingJayWong
Copy link
Author

I also find that when work well,
the thread stack is like below.

#0  0x00007f2ef1a6938d in poll () from /lib64/libc.so.6
#1  0x00007f2ef148e6ce in rd_kafka_transport_poll (rktrans=rktrans@entry=0x7f2ecc001020, tmout=tmout@entry=861) at rdkafka_transport.c:943
#2  0x00007f2ef148e757 in rd_kafka_transport_io_serve (rktrans=0x7f2ecc001020, timeout_ms=861) at rdkafka_transport.c:804
#3  0x00007f2ef147a31d in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x55b4ff7a0da0, abs_timeout=abs_timeout@entry=438636568923) at rdkafka_broker.c:3029
#4  0x00007f2ef147b413 in rd_kafka_broker_producer_serve (abs_timeout=438636568923, rkb=0x55b4ff7a0da0) at rdkafka_broker.c:3520
#5  rd_kafka_broker_serve (rkb=rkb@entry=0x55b4ff7a0da0, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4253
#6  0x00007f2ef147b91d in rd_kafka_broker_thread_main (arg=arg@entry=0x55b4ff7a0da0) at rdkafka_broker.c:4403
#7  0x00007f2ef14cc037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#8  0x00007f2ef1761dd5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007f2ef1a7402d in clone () from /lib64/libc.so.6

but when stuck. the stack is like this

#0  0x00007f18d4c3f8a7 in rd_kafka_msg_cmp_msgid (_a=0x7f189921f770, _b=0x7f188e3cffa0) at rdkafka_msg.h:380
#1  0x00007f18d4c3d355 in rd_kafka_msgq_enq_sorted0 (rkmq=rkmq@entry=0x7f18a40021d0, rkm=rkm@entry=0x7f189921f770, order_cmp=order_cmp@entry=0x7f18d4c3f8a0 <rd_kafka_msg_cmp_msgid>) at rdkafka_msg.c:632
#2  0x00007f18d4c7b243 in rd_kafka_msgq_insert_msgq_sort (cmp=<optimized out>, srcq=<optimized out>, destq=<optimized out>) at rdkafka_partition.c:685
#3  rd_kafka_msgq_insert_msgq (destq=destq@entry=0x7f18a40021d0, srcq=srcq@entry=0x7f18a40021b0, cmp=0x7f18d4c3f8a0 <rd_kafka_msg_cmp_msgid>) at rdkafka_partition.c:731
#4  0x00007f18d4c34d32 in rd_kafka_toppar_producer_serve (rkb=rkb@entry=0x55efbcb13a40, rktp=rktp@entry=0x7f18a4002100, pid=..., now=now@entry=438689460092, next_wakeup=next_wakeup@entry=0x7f18d16b7e50,
    do_timeout_scan=do_timeout_scan@entry=0) at rdkafka_broker.c:3244
#5  0x00007f18d4c3a37f in rd_kafka_broker_produce_toppars (do_timeout_scan=0, next_wakeup=<synthetic pointer>, now=<optimized out>, rkb=0x55efbcb13a40) at rdkafka_broker.c:3459
#6  rd_kafka_broker_producer_serve (abs_timeout=438689653495, rkb=0x55efbcb13a40) at rdkafka_broker.c:3513
#7  rd_kafka_broker_serve (rkb=rkb@entry=0x55efbcb13a40, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4253
#8  0x00007f18d4c3a91d in rd_kafka_broker_thread_main (arg=arg@entry=0x55efbcb13a40) at rdkafka_broker.c:4403
#9  0x00007f18d4c8b037 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#10 0x00007f18d4f20dd5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f18d523302d in clone () from /lib64/libc.so.6

something different in rd_kafka_broker_produce_toppars

@edenhill
Copy link
Contributor

So what I think is happening here is that you have some connectivity issues with the cluster, or the cluster is unstable for other reasons, which causes broker connections to go down, which in turn makes the producer re-insert messages in-flight/in-tx-queue on the partition's queue again.
This needs to be done in order to maintain the original produce ordering and if the message ranges overlap it performs an insert-sort, which is O(N^2).
There isn't that much we can do about that, so I suggest that you look into your cluster's health to understand why it is disconnecting and when.
Using debug=broker might give you some hints.

@MockingJayWong
Copy link
Author

thank for swift response, I will check now.

@MockingJayWong
Copy link
Author

It might be a issue related to NIC throughtput . I change kafka broker to a vm with 10GEvnic, then work well at same scenerio with more than 200 consumers with producer 5MB/s.
the tx data throughtput in broker will average have 5MB/s * 200 / 3 = 300MB/s up. So it have some limitation with NIC. I will put more clues with original env to prove this. thanks u! @edenhill

@firefeifei
Copy link

@edenhill hello edenhil
I encountered the same problem, this problem will occur when a large number of 105 errors, kafka thread deadlock, can't recover

@edenhill
Copy link
Contributor

@firefeifei What librdkafka version? What Kafka version? What do you mean by "105 errors"? Is that the number of errors?

@firefeifei
Copy link

@firefeifei What librdkafka version? What Kafka version? What do you mean by "105 errors"? Is that the number of errors?

librdkafka 1.2.0

rd_kafka_produce return -1, errno is 105

@edenhill
Copy link
Contributor

105 is ENOBUFS which is RD_KAFKA_RESP_ERR__QUEUE_FULL, the producer queue is full, typically because you don't call rd_kafka_poll() at regular intervals.
See https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer-api

@firefeifei
Copy link

105 is ENOBUFS which is RD_KAFKA_RESP_ERR__QUEUE_FULL, the producer queue is full, typically because you don't call rd_kafka_poll() at regular intervals.
See https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer-api

This error(105) occurs when the message is suddenly increased and there is insufficient Kafka partion , but because this error causes the KaKa thread to deadlock and cannot recover, even after adding a partion

The main problem is a thread deadlock

call stack like this:

#2447 (comment)

@edenhill
Copy link
Contributor

Please try to reproduce this on the latest release, 1.6.0

@firefeifei
Copy link

Thanks, I will update version testing
Could you tell me which issue fix this problem.

77259 root 20 0 83.792g 0.016t 6620 R 99.6 82.9 9:48.35 rdk:broker7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants