Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produced messages get stuck in rkb_retrybufs until Local: Message timed out #1432

Closed
3 of 9 tasks
charkost opened this issue Sep 25, 2017 · 7 comments
Closed
3 of 9 tasks

Comments

@charkost
Copy link

Description

Hello,

The Producer message reliability # Unresponsive brokers wiki section states following:

if the connection is closed before a response is received and before the timeout hits, then metadata is refreshed (to find out if there is a new partition leader that will take over from the down broker) and the messages are put back on the partition queue for a future retransmission.

in practice however it seems that the above retransmission happens only if the old partition leader returns to UP state. If we leave the old partition leader down for a period longer than message.timeout.ms, the messages seem to get stuck in the rkb_retrybufs queue until they hit the message.timeout.ms and then fail.

In the code, if we follow the producer related callers chain of rd_kafka_broker_retry_bufs_move() which moves the messages from the retry queue to the outbuf queue, we see that it is called under a case RD_KAFKA_BROKER_STATE_UP (src/rdkafka_broker.c:3150) which probably proves the above hypothesis.

How to reproduce

$ wc -l msgs
400000 msgs
$ head -1 msgs
0
$ tail -1 msgs
399999

$ kafkacat -P -X acks=-1 -X retries=10 -b <host> -t <topic> -l msgs

While kafkacat produces the messages we run the following on a partition leader broker:

$ kill -STOP $PIDOFBROKER
$ systemctl stop kafka 

kafkacat output:

% ERROR: Local: Broker transport failure: kafka-a.vm.skroutz.gr:9092/1: Receive failed: Connection reset by peer
% ERROR: Local: Broker transport failure: kafka-a.vm.skroutz.gr:9092/1: Connection closed
% ERROR: Local: Broker transport failure: kafka-a.vm.skroutz.gr:9092/1: Connect to ipv4#10.42.9.34:9092 failed: Connection refused

after 5 mins (default message.timeout.ms):

% Delivery failed for message: Local: Message timed out
% Delivery failed for message: Local: Message timed out
% Delivery failed for message: Local: Message timed out
% Delivery failed for message: Local: Message timed out
% Delivery failed for message: Local: Message timed out
% Delivery failed for message: Local: Message timed out

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 61d786b
  • Apache Kafka version: 0.10.1
  • librdkafka client configuration:
  • Operating system: Debian 9
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Very good report!

Do you have more than one broker in the cluster, and if so, are other brokers picking up leadership for the killed broker's partitions?

@charkost
Copy link
Author

Thanks!

The cluster has 3 brokers. Yeap it seems that other brokers are picking up the leadership.

Leadership report before broker stop:

$ kafka topics --describe --topic action-logs2
kafka-topics --zookeeper zoo1.vm.skroutz.gr:2181 --describe --topic action-logs2
Topic:action-logs2	PartitionCount:3	ReplicationFactor:3	Configs:min.insync.replicas=2
	Topic: action-logs2	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 1,2,3
	Topic: action-logs2	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 1,2,3
	Topic: action-logs2	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

And after stop:

$ kafka topics --describe --topic action-logs2
kafka-topics --zookeeper zoo1.vm.skroutz.gr:2181 --describe --topic action-logs2
Topic:action-logs2	PartitionCount:3	ReplicationFactor:3	Configs:min.insync.replicas=2
	Topic: action-logs2	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3
	Topic: action-logs2	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 2,3
	Topic: action-logs2	Partition: 2	Leader: 2	Replicas: 1,2,3	Isr: 2,3

edenhill added a commit that referenced this issue Dec 12, 2017
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
edenhill added a commit that referenced this issue Dec 20, 2017
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
barrotsteindev pushed a commit to barrotsteindev/librdkafka that referenced this issue Jan 2, 2018
…, confluentinc#1092, confluentinc#1432, confluentinc#1476, confluentinc#1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
edenhill added a commit that referenced this issue Jan 2, 2018
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
edenhill added a commit that referenced this issue Jan 3, 2018
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
edenhill added a commit that referenced this issue Jan 10, 2018
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
@edenhill
Copy link
Contributor

This is now fixed on master

@charkost
Copy link
Author

charkost commented Apr 24, 2018

Hello,

I think this should re-open.

I run the steps listed in the "How to reproduce" section again but with librdkafka 0.11.4 and kafka 0.11.0.2.

I froze and then stopped the partition leader and after 5 mins (the default message.timeout.ms) librdkafka reported Delivery failed for message: Local: Message timed out nearly the same times as the count of messages which never arrived at the consumer.

After the Delivery failed for message: Local: Message timed out some retransmission appeared in the consumer for the affected partition. This seems to be the only difference from librdkafka 0.11.1. But still 12000 messages out of 400000 never arrived at the consumer and the Delivery failed for message: Local: Message timed out count is not exactly the same as the lost messages count.

Also notice that the retransmission happened right after the Local: Message timed out and propably not after the connection termination from the old partition leader as the wiki page states.

@edenhill
Copy link
Contributor

Do you have any debug logs from this occurence?

@charkost
Copy link
Author

I re-run the reproduction steps with debug=all. The weird thing is that with debug=all the producer worked fine a few times without losing that much messages (~100 from 400000) and without any Local: Message timed out.

But still most of the times i get the usual Local: Message timed out after 5 mins & message loss ranging from 10000 to 40000.

The attached debug logs come from a high message loss occurence:

msgs.gz

producer_stdout.gz
producer_sterr.gz

consumer_stdout.gz
consumer_stderr.gz

@romaindequidt
Copy link

I've got exactly the same error.
Precisely 5 minutes after the first message sent.

2018-04-26 22:44:10,813:7(0x7ff77e5c0700):ZOO_DEBUG@process_sync_completion@4343: Processing sync_completion with type=3 xid=0x5ae2563b rc=0
2018-04-26 22:44:10,813:7(0x7ff78141ef40):ZOO_DEBUG@zoo_awget@3327: Sending request xid=0x5ae2563c for path [/brokers/ids/1002] to **.**.**.**:2181
2018-04-26 22:44:10,814:7(0x7ff77e5c0700):ZOO_DEBUG@process_sync_completion@4343: Processing sync_completion with type=2 xid=0x5ae2563c rc=0
kafkacSendMessage: mq_send error 9 (Bad file descriptor)
kafka producer started
% Sent 608 bytes to topic "topic" partition 0
% Sent 608 bytes to topic "topic" partition 0
% Sent 608 bytes to topic "topic" partition 0
2018-04-26 22:44:14,152:7(0x7ff77e5c0700):ZOO_DEBUG@zookeeper_process@2793: Got ping response in -760418267 ms
...
% Sent 608 bytes to topic "topic" partition 0
2018-04-26 22:49:11,161:7(0x7ff77e5c0700):ZOO_DEBUG@zookeeper_process@2793: Got ping response in -760418267 ms
% Sent 607 bytes to topic "topic" partition 0
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Message delivery failed: -192 (Local: Message timed out)

with librdkafka 0.11.4 and kafka 1.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants