Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade from 0.34.0 to 0.36.0 causes consumer to stop consuming but keep running #638

Open
zhrebicek opened this issue Dec 12, 2023 · 34 comments

Comments

@zhrebicek
Copy link

As the title says, after upgrade from 0.34.0 to 0.36.0 we started getting Kafka consumers that would stop consuming messages, start leaking memory and do not leave the consumer group.

  • I do not have clear reproducer as it happened to use across multiple production environments in random fashion.
    • And for example in our staging it never happened.
  • It happened more often to environments where Kafka was under higher load.
    Maybe it has to do with change to use event interface, but that is just wild guess.
  • In the end I tried to set low max.poll.interval.ms but found out it happens again, which seems like it is still sending heartbeats, but for whatever reason stops pulling records, CPU drops down, memory increases and the consumer group does not re-balance.
  • After revert to use 0.34.0
Screenshot 2023-12-12 at 14 40 59 Screenshot 2023-12-12 at 14 40 33

Disclaimer: Running on the 0.34.0 for half day without it reproducing and it happened many times with 0.36.0. Would report back if it still happens on 0.34.0 and the origin is elsewhere.

@scanterog
Copy link
Collaborator

scanterog commented Dec 12, 2023

Thanks for reporting it, we'll investigate further. 0.35.0 does not contain the event api change if you wanna give it a try.

Can you provide some snippet of your code? What consumer are you using? Stream, base? the properties, any logs, etc.

@neoeinstein
Copy link

Confirming that I too encountered a similar issue, which resulted in significant increases in idle time being reported across the board, and some instances of what appeared to be an executor stall (almost like if some blocking calls were on the async path). I happened to be using a producer for this, with no consumers on my particular service, though I do use a long publish batching window of 1 second.

Downgrading from 0.36.0 to 0.35.0 did indeed resolve the issue.

@davidblewett
Copy link
Collaborator

davidblewett commented Dec 12, 2023

@zhrebicek @neoeinstein The event API does allow some transient errors to be propagated that the callback API did not. @scanterog brought this up to upstream here: confluentinc/librdkafka#4493 . Is it possible that your code might have not handled these explicitly, leading to either the consumer or producer becoming idle?

This may manifest with KafkaError (Message consumption error: BrokerTransportFailure (Local: Broker transport failure)).

@scanterog
Copy link
Collaborator

We tested internally today the BaseProducer and BaseConsumer and we couldn't reproduce the issue. We'll try out tomorrow the FutureProducer and the StreamConsumer. Please provide the properties you're setting and any relevant logs. Are you also using the statically linked librdkafka or linking it dynamically?

@neoeinstein
Copy link

neoeinstein commented Dec 12, 2023

On our side, we are using FutureProducer. We also record the amount of time that it takes to confirm the publish, but move that off the main execution path by tokio::spawning a future wrapping that .send. In general, each instance of this service is publishing about 100 messages per second into rdkafka, with an average of about 40 such spawned tasks pending concurrently (within a 1 second batching window).

I did also confirm that we did not see any errors bubble up out of the .send call.

Edit to confirm the properties we set:

bootstrap.servers=<broker>
client.id=<identifier>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<username>
sasl.password=<password>
request.required.acks=-1
queue.buffering.max.ms=1000

On a side note, I do have an internal strongly-typed convention for adding these to the ClientConfig if that has any interest to you.

We have logging set to info, with the tracing feature enabled, but don't see any relevant logs get published up. That doesn't seem unusual, since we generally don't have any errors. We don't ask rdkafka to debug anything right now, though we could do something around that in the future.

Our rdkafka dependency is set up to statically link librdkafka and friends.

@zhrebicek
Copy link
Author

zhrebicek commented Dec 13, 2023

We use rdkafka as is rdkafka = "0.34.0", running on debian-bookworm-slim.
We use StreamConsumer, and settings:

client.id: <id>
enable.auto.commit: true
enable.auto.offset.store: false
enable.partition.eof: false
linger.ms: 100
enable.idempotence: true

@davidblewett We can look into handling explicitly the errors you speak about, but we basically construct new consumer on any error.

 loop {
        let consumer = Arc::new(try_create_consumer_from_config(
            kafka_config.consumer.clone(),
        )?);
        ...
        consumer.subscribe(&topics)?;

        match run(
            ... (consumer.stream ... inside -> anyhow::Result<()>)
        )
        .await
        {
            Ok(()) => break Ok(()),
            Err(error) => warn!(?error, "Processor failed"),
        }
    }

But all we have is this on rebalance

error:Store offset error: State (Local: Erroneous state)

Caused by:
State (Local: Erroneous state)

Edit: I see one of these couple of days back, but only one instance of it and I am not even sure it happened on 0.36.0

error:Meta data fetch error: BrokerTransportFailure (Local: Broker transport failure)

Caused by:
BrokerTransportFailure (Local: Broker transport failure)

@zhrebicek
Copy link
Author

zhrebicek commented Dec 13, 2023

One addition we had no issues for producers (FuruteProducer), and we still have it in couple of services running for couple of days. So no need to test that @scanterog

Thanks a lot for looking into this!

@davidblewett
Copy link
Collaborator

I went ahead and yanked the 0.36.0 release from crates.io until we figure out what's going on. The implementation is still present in git.

@scanterog
Copy link
Collaborator

scanterog commented Dec 13, 2023

I'm surprised we're seeing a working case of the producer with @zhrebicek and a non working one with @neoeinstein. I wonder what the differences would be here.

@zhrebicek I see you're setting enable.idempotence: True but this is only a producer setting (same for linger.ms). I see you set enable.auto.offset.store: false with auto commit set to true. Do you commit offsets via the StoreOffsets API?

@neoeinstein can you provide an exact snippet of what you're doing? I wasn't able to reproduce it as well with the FutureProducer.

@zhrebicek
Copy link
Author

zhrebicek commented Dec 15, 2023

@scanterog We do it like this

consumer
        .stream()
        .take_until(token.wait_for_shutdown())
        .err_into::<anyhow::Error>()
        // Rest of pipe working with messages.
        // Last step before storing offsets is publishing converted data to other topic.
        .try_buffered(converter_config.max_in_flight)
        .try_for_each(|message| {
            let span = message.span();
            let consumer = consumer.clone();
            async move {
                consumer.store_offset(message.topic(), message.partition(), message.offset())?;
                Ok(())
            }.instrument(span)
        })
        .await

@scanterog
Copy link
Collaborator

@zhrebicek yeah that translates to stopping the stream entirely when an error is encountered. If a kafka cluster is under high load, then most likely you'll have a lot of requests retries that will be bubbled up as a transient errors. That's usually persistent which translates to the consumer not making progress at all and you'll be recreating the consumers constantly. The only thing that does not add up to me here is that I'd expect still the error to be logged everytime the stream stop and the consumer is recreated. Are you logging these events?

One potential solution is to stop forwarding those transient errors to the app (as with the callback API) but sadly rdkafka does not provide any api to identify those and will require to scan the rdkafka code base and filter those codes in our side.

@zhrebicek
Copy link
Author

zhrebicek commented Dec 18, 2023

@scanterog we are logging all the errors, and there are no consequent errors after the thing is stuck, so it should not be recreating in loop.

From what I remember there were at most 3 consequent errors like this that would cause recreation of it all per pod and even with that other pods did not stuck.

@scanterog
Copy link
Collaborator

scanterog commented Dec 18, 2023

@zhrebicek I see. We haven't able to reproduce this so far. We have tests running for over 5 days and we have tried several operations on the cluster (rolling restarts, broker replacements, ungraceful shutdown). I'll try to setup a service using the pattern you've shared before and see whether we can get closer to understanding what's going on.

@zhrebicek
Copy link
Author

zhrebicek commented Dec 18, 2023

@scanterog If I would be able to reproduce on staging I would deploy the version on purpose and enable rdkafka debug logs, but it never happened there. And it caused issues in production.

One note, maybe set high replication factor, so it has more work with acks

@scanterog
Copy link
Collaborator

scanterog commented Dec 18, 2023

@zhrebicek no worries! Totally get that. what RF are you using and what kafka version?

@zhrebicek
Copy link
Author

@scanterog
Version: confluent 6.1.0 (kafka 2.7.0 equivalent)
Replication factor 2, in one env it was by mistake 4 and it happened there a bit more then in other envs.

@scanterog
Copy link
Collaborator

@zhrebicek would it be possible to run a shadow service (that send all reads to dev/null) with this version and debug level enabled (same for rdkafka logs) on the environment where you're getting this issue? Sadly we aren't able to reproduce this issue so there must be something specific we're missing.

@neoeinstein this also applies for the FutureProducer. If you can provide that information would be extremely useful for the project!

Thanks both!

@zhrebicek
Copy link
Author

I will see what time allows me to try.

@scanterog
Copy link
Collaborator

Thanks @zhrebicek ! appreciate that

@Ten0
Copy link
Contributor

Ten0 commented Jan 13, 2024

Same problem here, took a few hours to diagnose that this was the issue.
We're also seeing a bunch of "Too many open files" errors coming from the consumer threads, until it finishes with a segfault after ~1h30min.
It seems pretty likely that this is due to #617.
Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed?
(Also this deserves the bug tag)

@davidblewett
Copy link
Collaborator

@Ten0 until we have a way of reproducing the issue, we can't work on a fix. That commit does change the underlying implementation, but aside from some errors bubbling up that didn't before there shouldn't have been a change in behavior. It is also the same mechanism that the Confluent Kafka libraries use. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.

Are you sure you aren't accidentally swallowing some exceptions? We need a self contained example that triggers the behavior to investigate further.

@Ten0
Copy link
Contributor

Ten0 commented Jan 13, 2024

We need a self contained example that triggers the behavior to investigate further.

Unfortunately that is going to be too hard to extract for us as well :(

Our code looks like basically like this:

let stream: rdkafka::MessageStream<_> = stream_consumer.stream();
loop {
    let msg = stream.next().await?;
    // do stuff with the message
}

The librdkafka parameters are:

session.timeout.ms = "6000"
group.id = "some_group_id"
enable.auto.commit = "true"
auto.commit.interval.ms = "1000"
enable.auto.offset.store = "false"
enable.partition.eof = "false"
statistics.interval.ms = "1000"

We are handling ~300k lines/s dispatched on several servers.
The issue happens with both Kafka v2.0 and v2.1.

Are you sure you aren't accidentally swallowing some exceptions?

Yes I am sure that we aren't accidentally dropping any error either returned by this library or printed to log (there is nothing of level Warn or greater in log).
There is no message besides the ones related to too many open files, and those come from the dedicated threads without going through log.

there shouldn't have been a change in behavior. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue.

From what I read in this topic there have been 3 different people encountering this precise issue (same blockage, same memory increase...) since this precise release: @zhrebicek @neoeinstein and myself. I still believe it is time to acknowledge that this issue indeed does exist and has a strong impact despite a minimal reproduction not being established yet.

Would you please consider yanking the affected releases (and maybe have them only as release candidates) until this is fixed?
(Also this deserves the bug tag)

@DoumanAsh
Copy link

I'm not sure it is related, but upgrading to 0.36.0 makes topic deletion hang forever, if you have uncommitted messages
JFYI

@pi-xel
Copy link

pi-xel commented Feb 13, 2024

I think I was able to reproduce the issue locally. Running high-level consumer with following timing-related settings:

max.poll.interval.ms=20000
session.timeout.ms=10000
auto.commit.interval.ms=5000
statistics.interval.ms=5000

After calling subscribe I am polling the consumer in a loop using recv. If I add a tokio::time::sleep of 7s between polls, 0.36.2 silently fails & just stops consuming, with a sleep of 6s it resumes. With 0.35.0 it keeps consuming, even using larger sleep durations (10s+).

My guess is that there's something wrong with the underlying librdkafka consumer not sending heartbeats.

On a side note, another issue I noticed is that since 0.36.2 the consumer starts printing verbose log messages if the debug setting is not left empty, not only from the custom client context (as it would be expected), but apparently also directly from librdkafka.

@pi-xel
Copy link

pi-xel commented Feb 13, 2024

Minimal example to reproduce the issue (can be included as-is in the crate's high-level consumer integration tests):

// All produced messages should be consumed.
#[tokio::test(flavor = "multi_thread")]
async fn test_produce_consume_delay() {
    let _r = env_logger::try_init();

    let start_time = current_time_millis();
    let topic_name = rand_test_topic("test_produce_consume_delay");
    let message_map = populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
    let consumer = create_stream_consumer(&rand_test_group(), None);
    consumer.subscribe(&[topic_name.as_str()]).unwrap();

    for i in 0..10 {
        println!("awaiting message: {}", i);

        let message = consumer.recv().await;

        let timeout = tokio::time::sleep(std::time::Duration::from_millis(5000));

        println!("processing message: {}", i);

        match message {
            Ok(m) => {
                let id = message_map[&(m.partition(), m.offset())];
                match m.timestamp() {
                    Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time),
                    _ => panic!("Expected createtime for message timestamp"),
                };
                assert_eq!(m.payload_view::<str>().unwrap().unwrap(), value_fn(id));
                assert_eq!(m.key_view::<str>().unwrap().unwrap(), key_fn(id));
                assert_eq!(m.topic(), topic_name.as_str());
            }
            Err(e) => panic!("Error receiving message: {:?}", e),
        };

        timeout.await;
    }
}

On v0.35.0 this completes w/o issues, while on v0.36.2 it gets stuck after the first message w/o any error.

You can also try lowering the timeout of 5s, on my machine it still fails on 0.36.2 with a timeout of 1s and works with 500ms.

@pi-xel
Copy link

pi-xel commented Feb 13, 2024

Interestingly, the issue seems to be directly related to statistics.interval.ms:

If I bump it from 500 to 5000ms, the consumer keeps working for any poll delays at or below 5000ms and fails for values above.

@jwilm
Copy link

jwilm commented Apr 8, 2024

I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.

@untitaker
Copy link

untitaker commented Apr 8, 2024 via email

@scotow
Copy link

scotow commented Apr 9, 2024

I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.

Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.

As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a StreamConsumer. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).

Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).

image

Will rollback to 0.35.0 to see if it fixes the issue.

@untitaker
Copy link

untitaker commented Apr 9, 2024 via email

@neoeinstein
Copy link

neoeinstein commented Apr 9, 2024

I'll mention that 0.35.0 is what we consider the last-known-good version of rdkafka, and we've currently blocked upgrades to 0.36.0 or higher. If there were a plan to perform a breaking republish, I'd recommend 0.35.0.

From an ecosystem perspective, I lean toward the yank. If a library is forcing you onto 0.36, then that library is forcing you to use a version of rdkafka that I wouldn't consider stable for production usage and is functionally broken for some usage patterns. That library maintainer should re-evaluate whether they want to force their users onto such a version.

Of note, lib.rs indicates that the following libraries are depending on the 0.36.x line of rdkafka. From there, the users of the most generally used intermediate libraries tend to be using the version that relies on 0.35.

Crates that have some general usage, with prior version supporting 0.35

  • picokafka version 0.1.11 supports 0.35, most users are staying on 0.1.11
  • sarchive optional, version 0.12.7 supports 0.35, most users are staying on 0.12.7

New crates that don't have support for 0.35 [only ever depended on 0.36]:

Crates that have earlier versions that support 0.35 [but little general usage]

Depended on only as a dev-dependency

@untitaker
Copy link

then that library is forcing you to use a version of rdkafka that I wouldn't consider stable for production usage and is functionally broken for some usage pattern

I think that's the point of contention. We use 0.36 just fine and the maintainers have trouble reproducing the issue. It's not universally broken.

@jwilm
Copy link

jwilm commented Apr 9, 2024

If yanking is considered to be too problematic, then perhaps reverting the event API work and publishing a patch version of 0.36 would be more appropriate. Several users have reported production issues with the current release, and it would be good stewardship to prevent others from dealing with that headache.

@Ten0
Copy link
Contributor

Ten0 commented Apr 10, 2024

There seems to be a minimal repro above as well now so maybe fixing it could also be an option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants