Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumeStrings() discards buffered messages #53

Open
martin-frydl opened this issue May 20, 2020 · 3 comments
Open

consumeStrings() discards buffered messages #53

martin-frydl opened this issue May 20, 2020 · 3 comments
Labels

Comments

@martin-frydl
Copy link

When there are multiple messages in topic and consumeStrings() is called with number of messages less than number present in there, the extra messages will be discarded.

Example:

@ClassRule
public static KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create()).waitForStartup();

@Test
public void test() throws Exception {
        kafka.helper().produceStrings("test", "a", "b");
        kafka.helper().consumeStrings("test", 1).get();
        kafka.helper().consumeStrings("test", 1).get();
}

Here I send two string messages into topic and then try to read one-by-one using consumeStrings(). The first one will succeed and return "a" while the second one will block forever.

The reason is enable.auto.commit set to true. The consumer will receive both messages in one poll() and commit only one (RecordConsumer.call(), line 311). This is correct but since autocommit is enabled, both message were already committed. The problem is that I'm not able to override properties passed to KafkaConsumer or set autocommit to false in consumerConfig() - as these are both called automatically from consumeStrings().

I think the solution could be to use consumerConfig(false) for creating consumer in consumeStrings(). But I haven't tested it.

@charithe
Copy link
Owner

consumeStrings is a convenience method that makes a lot of assumptions so it probably won't suit some use cases. You can create your own consumer by calling createStringConsumer or createConsumer with the desired configuration and use that instead.

@martin-frydl
Copy link
Author

But that method is practically unusable when trying to read less message than there are in topic. The rest will be always lost. And if that is desired behavior, it should be noted in javadoc since I spent more than hour trying to find out what's going on. The same applies to consume() method itself. When I pass consumer with autocommit into it, it will discard some messages. Also should have some note in documentation.

@charithe charithe reopened this May 21, 2020
@charithe charithe added the bug label May 21, 2020
@charithe
Copy link
Owner

It's not desired behaviour but rather an unfortunate side effect. The consume* helpers are something I regret in hindsight so it's probably time to deprecate them. They make a lot of assumptions about their usage and haven't aged well.

I am not sure it's possible to document all possible ways in which they could go wrong but I'd be happy to accept a PR if you have any suggestions.

I'd suggest using the following pattern as an immediate workaround for your problem:

KafkaConsumer<String, String> consumer = kafkaRule.helper().createStringConsumer(kafkaRule.helper().consumerConfig(false));
List<String> result = kafkaRule.helper().consume("my-test-topic", consumer,3).get().stream().map(ConsumerRecord::value).collect(Collectors.toList());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants