Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery takes too long after stopping and restarting a broker in a cluster #2508

Closed
3 tasks done
sarkanyi opened this issue Sep 4, 2019 · 11 comments
Closed
3 tasks done

Comments

@sarkanyi
Copy link

sarkanyi commented Sep 4, 2019

Description

Stopping a broker for a while (while producing with a high rate) and then restarting it leads to message losses when the broker rejoins the cluster (basically because librdkafka sees the brokers as DOWN). I don't think this should be the case.

How to reproduce

Stop a broker in a cluster for a while then restart it.

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 1.0.1
  • Apache Kafka version: 2.3.0
  • [x ] librdkafka client configuration: "linger.ms=1;queue.buffering.max.messages=10000000;batch.num.messages=100000;message.max.bytes=20971520;queued.max.messages.kbytes=2097151;enable.idempotence=true"
  • Operating system: Centos 7 (x64)

Attaching stack trace and logs.
stacktrace.txt

I will try to provide more data as I figure out more details.

@sarkanyi
Copy link
Author

sarkanyi commented Sep 5, 2019

examples/rdkafka_performance -P -t test -s 343 -c 500000000 -b <> -X linger.ms=1 -X queue.buffering.max.messages=10000000 -X batch.num.messages=1000000 -X message.max.bytes=20971520 -X queued.max.messages.kbytes=2097151 -X enable.idempotence=true -D
Stop broker, and after a few seconds restart it. Cluster needs to have at least 2 brokers.

@sarkanyi
Copy link
Author

sarkanyi commented Sep 9, 2019

Set min.insync.replicas=1. After the second broker restarts, sending hangs for a while and the cause looks like it's somewhere related to rebalancing:

%7|1568019280.284|REQERR|rdkafka#producer-1| [thrd:192.168.4.2:9095/bootstrap]: 192.168.4.2:9095/1: ProduceRequest failed: Broker: Not leader for partition: actions Refresh,MsgNotPersisted
%7|1568019280.284|MSGSET|rdkafka#producer-1| [thrd:192.168.4.2:9095/bootstrap]: 192.168.4.2:9095/1: nioTopicUP [1]: MessageSet with 59131 message(s) (MsgId 29904096, BaseSeq 29904095) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)
%7|1568019280.284|LEADERUA|rdkafka#producer-1| [thrd:192.168.4.2:9095/bootstrap]: nioTopicUP [1]: leader unavailable: produce: Broker: Not leader for partition
%7|1568019280.284|FASTQUERY|rdkafka#producer-1| [thrd:192.168.4.2:9095/bootstrap]: Starting fast leader query

@sarkanyi
Copy link
Author

sarkanyi commented Sep 9, 2019

At about this point in the logs it stops completely with perf top showing that it's sorting using rd_kafka_msgq_enq_sorted0:
%7|1568020658.062|SEND|rdkafka#producer-1| [thrd:192.168.3.2:9095/bootstrap]: 192.168.3.2:9095/0: Sent ProduceRequest (v7, 20971192 bytes @ 20492560, CorrId 12200)
%7|1568020658.406|METADATA|rdkafka#producer-1| [thrd:main]: Requesting metadata for 1/1 topics: partition leader query
%7|1568020658.406|METADATA|rdkafka#producer-1| [thrd:main]: 192.168.3.2:9095/0: Request metadata for 1 topic(s): partition leader query

librdkafka.log.tar.gz

@sarkanyi
Copy link
Author

sarkanyi commented Sep 9, 2019

1568020658 -> somewhere here it stops sending, and 1568021996 -> somewhere around here it starts sending again

@sarkanyi
Copy link
Author

sarkanyi commented Sep 9, 2019

Backtrace:

Thread 5 (Thread 0x7f8c1145f700 (LWP 58218)):
#0  0x00007f8c13e994ed in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8c13e94dcb in _L_lock_883 () from /lib64/libpthread.so.0
#2  0x00007f8c13e94c98 in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x00007f8c14d9737b in mtx_lock () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#4  0x00007f8c14ccba7b in rd_kafka_stats_emit_toppar () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#5  0x00007f8c14cce26a in rd_kafka_stats_emit_all () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#6  0x00007f8c14ccf1c3 in rd_kafka_stats_emit_tmr_cb () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#7  0x00007f8c14d68ea3 in rd_kafka_timers_run () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#8  0x00007f8c14ccf620 in rd_kafka_thread_main () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#9  0x00007f8c14d97551 in _thrd_wrapper_function () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#10 0x00007f8c13e92dd5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f8c1358602d in clone () from /lib64/libc.so.6

Thread 4 (Thread 0x7f8c10c5e700 (LWP 58219)):
#0  0x00007f8c13e96d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f8c14d974f1 in cnd_timedwait () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#2  0x00007f8c14d97ae6 in cnd_timedwait_abs () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#3  0x00007f8c14d4b473 in rd_kafka_q_pop_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#4  0x00007f8c14d4b512 in rd_kafka_q_pop () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#5  0x00007f8c14ce6f35 in rd_kafka_broker_ops_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#6  0x00007f8c14ce704c in rd_kafka_broker_ops_io_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#7  0x00007f8c14ce7608 in rd_kafka_broker_internal_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#8  0x00007f8c14cee03c in rd_kafka_broker_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#9  0x00007f8c14cee255 in rd_kafka_broker_thread_main () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#10 0x00007f8c14d97551 in _thrd_wrapper_function () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#11 0x00007f8c13e92dd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f8c1358602d in clone () from /lib64/libc.so.6

Thread 3 (Thread 0x7f8c1045d700 (LWP 58220)):
#0  0x00007f8c14d69fcc in rd_kafka_msg_cmp_msgid () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#1  0x00007f8c14d20dc9 in rd_kafka_msgq_enq_sorted0 () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#2  0x00007f8c14d404f2 in rd_kafka_msgq_insert_msgq_sort () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#3  0x00007f8c14d405a3 in rd_kafka_msgq_insert_msgq () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#4  0x00007f8c14ce78e4 in rd_kafka_toppar_producer_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#5  0x00007f8c14ce8316 in rd_kafka_broker_produce_toppars () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#6  0x00007f8c14ce84cc in rd_kafka_broker_producer_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#7  0x00007f8c14cee066 in rd_kafka_broker_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#8  0x00007f8c14cee588 in rd_kafka_broker_thread_main () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#9  0x00007f8c14d97551 in _thrd_wrapper_function () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#10 0x00007f8c13e92dd5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f8c1358602d in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x7f8c0fc5c700 (LWP 58221)):
#0  0x00007f8c13e96d12 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f8c14d974f1 in cnd_timedwait () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#2  0x00007f8c14d97ae6 in cnd_timedwait_abs () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#3  0x00007f8c14d4b473 in rd_kafka_q_pop_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#4  0x00007f8c14d4b512 in rd_kafka_q_pop () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#5  0x00007f8c14ce6f35 in rd_kafka_broker_ops_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#6  0x00007f8c14ce704c in rd_kafka_broker_ops_io_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#7  0x00007f8c14ce850a in rd_kafka_broker_producer_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#8  0x00007f8c14cee066 in rd_kafka_broker_serve () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#9  0x00007f8c14cee255 in rd_kafka_broker_thread_main () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#10 0x00007f8c14d97551 in _thrd_wrapper_function () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#11 0x00007f8c13e92dd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f8c1358602d in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x7f8c15229680 (LWP 58217)):
#0  0x00007f8c13e994ed in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8c13e94dcb in _L_lock_883 () from /lib64/libpthread.so.0
#2  0x00007f8c13e94c98 in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x00007f8c14d9737b in mtx_lock () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#4  0x00007f8c14d44a5e in rd_kafka_toppar_leader () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#5  0x00007f8c14d6d4d6 in rd_kafka_topic_partition_available () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#6  0x00007f8c14d210c7 in rd_kafka_msg_partitioner_random () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#7  0x00007f8c14d21178 in rd_kafka_msg_partitioner_consistent_random () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#8  0x00007f8c14d21355 in rd_kafka_msg_partitioner () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#9  0x00007f8c14d1fca7 in rd_kafka_msg_new () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#10 0x00007f8c14d206ee in rd_kafka_produce () from /home/sarkany/rbuild/librdkafka/src/librdkafka.so.1
#11 0x0000000000404511 in do_produce ()
#12 0x0000000000405a75 in main ()

@sarkanyi
Copy link
Author

Interrupted with debugger while sorting
mid-interrupt-old.tar.gz

@edenhill
Copy link
Contributor

Fantastic! 💯

@sarkanyi
Copy link
Author

Breakpoint before sorting and while sorting

before-and-mid-sort.tar.gz

@sarkanyi
Copy link
Author

After sorting (src and dest are identical but attached for completeness' sake)
after-sort.tar.gz

@edenhill
Copy link
Contributor

Message consecutive ranges in the attached files:

 destq-after-sort.id: 33692864..33993690  = 300827
destq-before-sort.id: 33751993..33906867  = 154875
   destq-mid-sort.id: 33692864..33725431  = 32568
   destq-mid-sort.id: 33751993..33906867  = 154875
            destq.id: 14620073..14652608  = 32536
            destq.id: 14679191..15112597  = 433407
  srcq-after-sort.id: 33692864..33993690  = 300827
 srcq-before-sort.id: 33692864..33751992  = 59129
 srcq-before-sort.id: 33906868..33993690  = 86823
    srcq-mid-sort.id: 33692864..33725431  = 32568
    srcq-mid-sort.id: 33751993..33906867  = 154875
             srcq.id: 14620073..14652608  = 32536
             srcq.id: 14679191..15112597  = 433407

Looking at *-before-sort we see that srcq has one missing range which is perfectly filled by the destq.

With the current insert sort method the naiive approximated cost is:
(destq size / 2) * srcq size = (154875 / 2) * (59129 + 86823) = 11302158000 iterations

So there's clearly room for improvement.

@sarkanyi
Copy link
Author

I agree. But I'm even more worried about why does the srcq grow together with the destq?

edenhill added a commit that referenced this issue Sep 16, 2019
… retry (#2508)

The msgq insert code now properly handles interleaved and overlapping
message range inserts, which may occur during Producer retries for
high-throughput applications.
sarkanyi pushed a commit to sarkanyi/librdkafka that referenced this issue Sep 17, 2019
… retry (confluentinc#2508)

The msgq insert code now properly handles interleaved and overlapping
message range inserts, which may occur during Producer retries for
high-throughput applications.

(cherry picked from commit 3e6c64d)
edenhill added a commit that referenced this issue Sep 17, 2019
… retry (#2508)

The msgq insert code now properly handles interleaved and overlapping
message range inserts, which may occur during Producer retries for
high-throughput applications.
sarkanyi pushed a commit to sarkanyi/librdkafka that referenced this issue Sep 17, 2019
… retry (confluentinc#2508)

The msgq insert code now properly handles interleaved and overlapping
message range inserts, which may occur during Producer retries for
high-throughput applications.
edenhill added a commit that referenced this issue Sep 23, 2019
… retry (#2508)

The msgq insert code now properly handles interleaved and overlapping
message range inserts, which may occur during Producer retries for
high-throughput applications.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants