Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WARN librdkafka: librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 399ms #433

Open
bbigras opened this issue Jan 18, 2022 · 5 comments

Comments

@bbigras
Copy link

bbigras commented Jan 18, 2022

I got this warning and my program seems to have stopped consuming:

  Jan 18 00:47:06.807  WARN librdkafka: librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 399ms (adjust max.poll.interval.ms for long-running message processing): leaving group
    at src/client.rs:65

I don't know if my code is blocking for ever.

The warning seems to come from rust-rdkafka. Any way I can get that error form stream.next().await so I can panic or restart?

@benesch
Copy link
Collaborator

benesch commented Jan 27, 2022

Nope, don’t think so. I think it’s just not in the librdkafka design. But you can create a custom ClientContext and override the log method to install a handler. If all you’re trying to do is panic on this warning, that shouldn’t be too hard.

@la-mar
Copy link

la-mar commented Jan 30, 2022

I've encountered this issue several times recently and I'm not quite sure how best to handle it at the moment. Our consumer emits a very similar log message that also indicates the consumer has left the consumer group:

librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 346ms (adjust max.poll.interval.ms for long-running message processing): leaving group

The underlying cause in our case is still TBD as it occurs infrequently. When it does occur, the process running the consumer doesn't panic/exit and seems otherwise unaffected, except the consumer is no longer consuming any messages. My uneducated guess is that it has something to do with the future running the consumer stops being scheduled on the event loop, but it really is just a guess at this point.

Thanks for your suggestion @benesch. I will look into seeing what can be done with the ClientContext to avoid the consumer entering this "zombie" state. If anyone has other suggestions or ideas in the meantime, I'd be interested to hear them. Thanks!

@Zarathustra2
Copy link

Encountering this as well, @la-mar how did you fix it if at all?

@Zarathustra2
Copy link

In my case it was an issue of having a multiple consumers spawning each in its own thread. One consumer in particular had 1000x more messages on its topic than the other and was doing some more heavy lifting work which did not allow other consumers to run.

Maybe if someone else runs into this, it may make sense to make sure that nothing in your application/tokio runtime is blocking

@helios2k6
Copy link

helios2k6 commented Aug 27, 2023

So, I also ran into this issue a while ago, but I was able to manage it by doing a couple of things:

1/ Store the offsets manually, opt to not commit offsets manually, and rely on the auto-offset committer to do the actual commit for you:

let consumer: BaseConsumer<_> = ClientConfig::new()
    // We are opting *NOT* to manually commit offsets to Kafka because we are processing
    // messages faster than we can commit them. This leads to a huge queue of offsets and
    // that will trigger the Kafka Group to be recreated and start the cycle all over again.
    // ref: https://github.com/confluentinc/librdkafka/issues/826#issuecomment-252901186
    .set("enable.auto.commit", "true")
    .set("enable.auto.offset.store", "false")
    .set("auto.offset.reset", "earliest")
    .set("fetch.wait.max.ms", "50")
    // Explicitly set this and interrupt the tokio loop to ensure we're always attempting
    // to get a message within this boundary.
    .set("session.timeout.ms", "60000")
    .set("max.poll.interval.ms", "60000")
    .unwrap();

2/ After receiving and processing a Kafka message, be sure to store the offset:

// As per the note above, we are going to store the offset instead of committing
// the offset ourselves to reduce the chances of overloading the queue.
consumer.store_offset(msg.topic(), msg.partition(), msg.offset())?;

This doesn't prevent the issue entirely, but it definitely alleviates much the issue. We've been using this to process real-time price data without much incident. We also have a custom context that was use to panic so we can restart our container should this error occur. The only reason this works is because our program is pretty much stateless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants