Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent.Kafka.KafkaException: 'Local: Queue full' when running BeginProduce example #703

Open
7 tasks
gmeena opened this issue Dec 15, 2018 · 13 comments
Open
7 tasks

Comments

@gmeena
Copy link

gmeena commented Dec 15, 2018

Description

I am getting error "Confluent.Kafka.KafkaException: 'Local: Queue full'" when trying to run the Basic Producer Examples (with p.BeginProduce). all settings are default.

It works till I set for loop at 100k, after that it start throwing this error.

How to reproduce

  1. Install Kafka & zookeeper
  2. Start servers
  3. Produce message to topic 'test' using simple producer example and consume through kafka-console-consumer.

Checklist

Please provide the following information:

  • Confluent.Kafka nuget version:
  • Apache Kafka version: 2.1.0
  • Client configuration:
  • Operating system: Window 10, 16 GB RAM, 64 bit OS
  • Provide logs (with "debug" : "..." as necessary in configuration)
  • Provide broker log excerpts
  • Critical issue
@srinathrangaramanujam
Copy link

Can you increase the messagetimeout and the retries to higher value?

@mhowlett
Copy link
Contributor

mhowlett commented Dec 18, 2018

you're trying to send messages faster than librdkafka can get them delivered to kafka and the local queue is filling up. you can increase the queue size (queue.buffering.max.messages / queue.buffering.max.kbytes), but if you sustain the the same rate of producing (i.e. as fast as you can), the local queue is still going to fill up, just a bit later.

what you should do if you get this exception is catch it, then wait for some time before continuing. if you get the exception, you can be sure the message will not be sent to kafka, so you can be confident that re-trying will not result in duplicate messages (due to this exception at least).

marked as an enhancement as a reminder to note this in the readme.

@edenhill
Copy link
Contributor

And you probably want to set linger.ms to 100 (or so, try different values) to promote batching (== increased performance)

@mhowlett
Copy link
Contributor

yes, good point. i've found linger.ms = 5 to be a good general purpose setting - you get nearly the max throughput but minimal latency impact.

@woohan
Copy link

woohan commented Oct 14, 2019

I encountered similar error while using confluent-kafka-python. I inject network fault in my test, with a packet loss over 10% and when the error 'BufferError: Local: Queue full' prompts, most of the messages are lost. Can this be solved by adjusting the parameters? The network condition needs to be checked, I suppose.

@jacopofar
Copy link

jacopofar commented Jan 13, 2020

Is there a way to get the current queue usage? I am producing messages from Flask requests, and would be nice to block and flush when the queue is full instead of getting errors. Flushing at every write seems too slow.

Currently I'm doing this:

 try:
        producer.produce(
            topic=my_topic,
            value=data
        )
 except BufferError:
        logger.warning('Buffer error, the queue must be full! Flushing...')
        producer.flush()

        logger.info('Queue flushed, will write the message again')
        producer.produce(
            topic=my_topic,
            value=data,
         )

@mhowlett
Copy link
Contributor

are you calling producer.poll regularly? in python you need to do this (this is automatic in .net), otherwise the internal queue will fill up with delivery reports, and they'll never get removed. you shouldn't generally call flush except on shutdown.

assuming you are doing this, it's intended that when you get a Queue full error, you should just wait a bit (no need to call flush) and try again. you should also configure the queue size so that this is not expected under normal operation.

the statistics callback will tell you internal queue sizes i believe.

@jacopofar
Copy link

I'm not, the documentation says "Polls the producer for events" but I don't understand what it means.

I expect events to come only from the consumer, does it refer to "acknowledge" events?

@mhowlett
Copy link
Contributor

callbacks (delivery notification, global error notification, statistics) are called as a side effect to calling poll. more info is in this blog post: https://www.confluent.io/blog/kafka-python-asyncio-integration/

each pending callback has an associated event inside librdkafka, and these will accumulate if poll is not being called.

@jacopofar
Copy link

Also I noticed only now that this is the issue tracker for the dotnet library -_-

@woohan
Copy link

woohan commented Jan 14, 2020

Thanks for the advice. Yes I did with the poll() method in the producer. The code is simplified as follows:

for i in range(totalMsgNumber):
    msg = str('some message contents...')
    try:
         producer.poll(0)
         producer.produce(topicName, bytes(msg, 'utf-8'))
    except BufferError as bfer:
         logger2.error(bfer)
         producer.poll(0.1)
producer.flush()

Say if I set totalMsgNumber=10000, will the producer accumulate all the messages in the buffer and flush them after the loop ends? I just want to emulate the scenario that producer send messages continuously.

@mhowlett
Copy link
Contributor

as soon as you call produce, messages are queued to be sent to the broker - and this will happen automatically. In practice, messages will be sent the the broker almost immediately with default settings, though you can control this with linger.ms for better batching (performance under high load).

you only need to call poll to service delivery notifications. messages will get through to the broker even if you don't call this. however, if you don't call it, the internal queue will eventually fill up, and you'll be unable to send messages due to that.

@cvenwu
Copy link

cvenwu commented Aug 22, 2022

And, i have a question, can increasing the number of partitions avoid the problem of queue full?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants