Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message stalled in queue when broker (partition leader) die #1913

Closed
6 of 7 tasks
chronidev opened this issue Aug 2, 2018 · 5 comments
Closed
6 of 7 tasks

Message stalled in queue when broker (partition leader) die #1913

chronidev opened this issue Aug 2, 2018 · 5 comments

Comments

@chronidev
Copy link

chronidev commented Aug 2, 2018

Description

When a broker, which is the leader of the partition, goes down while a producer produce message at continuous rate, some messages timeout at the end of the message.timeout.ms and result in messages loss.

[2018-08-02 04:34:03,735] kafka producer delivery_cb: _MSG_TIMED_OUT: Local: Message timed out 1533176942362;360001

It seems the messages stay in a queue somewhere and are not re-send to the new leader.

I have Repeated log message like this:

[2018-08-02 04:34:00,734] CONNECT [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: broker in state DOWN connecting
[2018-08-02 04:34:00,734] CONNECT [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Connecting to ipv4#10.67.146.9:9092 (plaintext) with socket 15
[2018-08-02 04:34:00,734] STATE [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Broker changed state DOWN -> CONNECT
[2018-08-02 04:34:00,734] BROADCAST [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: Broadcasting state change
[2018-08-02 04:34:00,734] BROKERFAIL [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: failed: err: Local: Broker transport failure: (errno: Connection refused)
[2018-08-02 04:34:00,734] FAIL [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Connect to ipv4#10.67.146.9:9092 failed: Connection refused
[2018-08-02 04:34:00,734] STATE [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Broker changed state CONNECT -> DOWN
[2018-08-02 04:34:00,734] BROADCAST [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: Broadcasting state change
[2018-08-02 04:34:00,734] BUFQ [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Purging bufq with 0 buffers
[2018-08-02 04:34:00,734] BUFQ [rdkafka#producer-1] [thrd:broker3.testy.fr:9092/bootstrap]: broker3.testy.fr:9092/3: Updating 1 buffers on connection reset

Full log:
github_kafka-producer.log.gz

The consumer:

#!/usr/bin/env python                                                                                                                                                                                              
from __future__ import division
import time
import logging
import argparse
import sys
from confluent_kafka import Producer
from ratelimiter import RateLimiter

LOGGER = logging.getLogger(__name__)


def setup_logging(log_level, log_file="/tmp/log.txt"):
   """ """
   logger = logging.getLogger()
   
   formatter = logging.Formatter('[%(asctime)s] %(message)s')
   file_handler = logging.FileHandler(log_file)
   file_handler.setFormatter(formatter)
   logger.addHandler(file_handler)
   logger.setLevel(log_level)


def delivery_cb(err, msg):
   """ """
   if err:
      LOGGER.error('kafka producer delivery_cb: {}: {} {}'.format(err.name(),
                                                                  err.str(),
                                                                  msg.value()))

def error_cb (err):
   """ """
   LOGGER.error('kafka producer error_cb: {}: %s'.format(err.name()), err.str())

   
def send_messages(producer, topic, rate, nb_msg):
   """ """
   LOGGER.info('Start produce messages')
   counter = 0
   try:
      limiter = RateLimiter(rate, 1)
      while counter < nb_msg:
         with limiter:
            try:
               timestamp = int(time.time() * 1000)
               value = '{timestamp};{counter}'.format(timestamp=timestamp,
                                                      counter=counter)
               producer.produce(topic=topic,
                                value=value)
               counter += 1
               producer.poll(timeout=0)
            except Exception as e:
               LOGGER.error('Exception: {} '.format(e))
               producer.poll(timeout=1)
      LOGGER.info('Stop produce messages')
   except KeyboardInterrupt:
      pass

   LOGGER.info('Start Send remaining messages')

   remaining_msgs = 1
   while True:
      remaining_msgs = producer.flush(1)
      LOGGER.info('Remaining messages to send: {}'.format(remaining_msgs))
      if not remaining_msgs:
         break

   LOGGER.info('Stop Send remaining messages')

def main():
   parser = argparse.ArgumentParser(
      formatter_class=argparse.ArgumentDefaultsHelpFormatter,
      description='Simple Kafka Producer'
   )

   parser.add_argument('-t',
                       '--topic',
                       required=True,
                       action='store',
                       help='topic')
   parser.add_argument('-r',
                       '--rate',
                       default=100,
                       action='store',
                       type=int,
                       help='Max number of messages per second')
   parser.add_argument('-n',
                       '--number-of-messages',
                       default=5000000,
                       action='store',
                       type=int,
                       help='Number of messages to send')
   parser.add_argument('-b',
                       '--brokers',
                       required=True,
                       action='store',
                       help='List of brokers separated by comma')
   parser.add_argument('-f',
                       '--log-file',
                       required=True,
                       action='store',
                       help='Log file')

   args = parser.parse_args()

   kafka_settings = {
      'api.version.request': True,
      'socket.keepalive.enable': True,
      'default.topic.config': {
         'request.required.acks': -1,
      },
      'max.in.flight': 1,
      'error_cb': error_cb,
      'on_delivery': delivery_cb,
      'debug': "all",
      'metadata.broker.list': args.brokers
   }

   setup_logging(logging.DEBUG, log_file=args.log_file)

   LOGGER.info('kafka producer settings: {}'.format(kafka_settings))

   kafka_producer = Producer(kafka_settings, logger=LOGGER)

   try:
      send_messages(producer=kafka_producer, topic=args.topic,
                    rate=args.rate, nb_msg=args.number_of_messages)
   except KeyboardInterrupt:
      pass

if __name__ == '__main__':
    main()

How to reproduce

  1. Create the topic:
kafka-topics --create --if-not-exists --zookeeper zoo1.tesy.fr:2181,zoo2.tesy.fr:2181,zoo3.tesy.fr:2181 --replication-factor 3 --partitions 1 --topic test13-20180626004841 --config message.timestamp.type=LogAppendTime min.insync.replicas=2
  1. Launch the following command:
./producer.py --topic lost-messages --brokers broker1.testy.fr:9092,broker2.testy.fr:9092,broker3.testy.fr:9092,broker4.testy.fr:9092,broker5.testy.fr:9092,broker6.testy.fr:9092 --rate 30000 --number-of-messages 5000000 --log-file /tmp/kafka-producer.log \
  1. Kill a broker:
    kill -9 the leader of the partition
  2. Wait the end of the producer
  3. Check the log

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): v0.11.5
  • Confluent Kafka version: 4.0.0
  • librdkafka client configuration: api.version.request=True,socket.keepalive.enable= True,request.required.acks=-1,max.in.flight=1,error_cb=error_cb,on_delivery=delivery_cb,debug=all
  • Operating system: Rhel 7 (x64)
  • Provide logs from librdkafka
  • Provide broker log excerpts
  • Critical issue

PS: I use the max.in.flight=1 to not flood the log, if you remove this parameter you will observe multiple message timeout.

@rnpridgeon
Copy link

If you are going to lower max.in.flight I recommend increasing the amount of batching you are doing. Otherwise you have tons of micro batches on the accumulator waiting to be dispatched. This is a compounding cost as each message is queued behind the next ultimately leading to the timeouts you are seeing.

Try increasing linger.ms(default: 0) to 2000. Depending how quickly you populate these batches you may want to consider increasing batch.num.messages and message.max.bytes as well.

@chronidev
Copy link
Author

@rnpridgeon I'm agree with you but like I said it's just to make the troubleshooting easier.

@chronidev chronidev changed the title Message lost due to Message timeout Message lost when broker (partition leader) die Aug 4, 2018
@edenhill
Copy link
Contributor

edenhill commented Aug 6, 2018

The issue seems to be that the ProduceRequests in rkb_outbufs (requests waiting to be sent) are left on the queue rather than having their error-checking callback triggered, which in the ProduceRequest case would move the messages back to the partition queue.

This was introduced in a commit over a year ago: 70cf144

edenhill added a commit that referenced this issue Aug 6, 2018
There are multiple parts to this fix:
 * A request/response handler can now check if a failing request
   was sent out on the wire (or did not make it past the output
   queue).
 * The retry code now only increments the retry count for actually
   sent requests.
 * When the broker connection goes down, requests in the output queue
   are now purged to have their handler callbacks called which in turn
   will trigger a (now free) retry.
 * ProduceRequests are not retried, but their messages are put back
   on the partition queue (existing behaviour).
edenhill added a commit that referenced this issue Aug 7, 2018
There are multiple parts to this fix:
 * A request/response handler can now check if a failing request
   was sent out on the wire (or did not make it past the output
   queue).
 * The retry code now only increments the retry count for actually
   sent requests.
 * When the broker connection goes down, requests in the output queue
   are now purged to have their handler callbacks called which in turn
   will trigger a (now free) retry.
 * ProduceRequests are not retried, but their messages are put back
   on the partition queue (existing behaviour).
edenhill added a commit that referenced this issue Aug 7, 2018
There are multiple parts to this fix:
 * A request/response handler can now check if a failing request
   was sent out on the wire (or did not make it past the output
   queue).
 * The retry code now only increments the retry count for actually
   sent requests.
 * When the broker connection goes down, requests in the output queue
   are now purged to have their handler callbacks called which in turn
   will trigger a (now free) retry.
 * ProduceRequests are not retried, but their messages are put back
   on the partition queue (existing behaviour).
@chronidev
Copy link
Author

I confirm your commit correct the issue. I guess you can close the issue. Maybe do you have an idea when it will be released ? in librdkafka and in confluent-kafka-python, in my opinion it's a critical issue due to messages loss.

@edenhill edenhill changed the title Message lost when broker (partition leader) die Message stalled in queue when broker (partition leader) die Aug 13, 2018
@edenhill
Copy link
Contributor

Since the message is stuck in an internal queue and the application never gets a callback for a message it has produced, the messages are technically not lost - the application knows it tried to produce a message but never got a response back.

A lost message is one that librdkafka triggered a successful delivery report but did not deliver the message.

We'll schedule a maintenance release soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants