Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KafkaIO considers readCommitted() as it would commit back the offsets, which it doesn't #22631

Closed
nbali opened this issue Aug 9, 2022 · 1 comment · Fixed by #22633

Comments

@nbali
Copy link
Contributor

nbali commented Aug 9, 2022

What happened?

I have been reading from Kafka and trying to figure out which offset management would be the best for my use-case. During that I noticed something odd.

private boolean configuredKafkaCommit() {
return getConsumerConfig().get("isolation.level") == "read_committed"
|| Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}

private boolean configuredKafkaCommit() {
  return getConsumerConfig().get("isolation.level") == "read_committed"
      || Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}

if (isCommitOffsetEnabled()) {
if (configuredKafkaCommit()) {
LOG.info(
"Either read_committed or auto_commit is set together with commitOffsetEnabled but you "
+ "only need one of them. The commitOffsetEnabled is going to be ignored");
}
}

if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
outputWithDescriptor =
outputWithDescriptor
.apply(Reshuffle.viaRandomKey())
.setCoder(
KvCoder.of(
input
.getPipeline()
.getSchemaRegistry()
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
PCollection<Void> unused = outputWithDescriptor.apply(new KafkaCommitOffset<K, V>(this));
unused.setCoder(VoidCoder.of());
}

The name of the method, and how it's being used in the code certainly suggest that using read_committed isolation level handles and commits kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. Well it does not.

  • using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it

  • using ONLY commitOffsetsInFinalize() does commit it

  • using ONLY withReadCommitted() does NOT commit it

Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read

So is it a bug, or what am I missing here?

If it is indeed a bug, then is it with the read_committed (so it should commit it although found no explicit documentation about that anywhere), or having that isolation level shouldn't prefer the commit in the finalize and that method is wrong?


@johnjcasey:
withReadCommitted() doesn't commit messages when read, it instead specifies that the kafka consumer should only read messages that have themselves been committed to kafka.

Its use is for exactly once applications.

@johnjcasey
Which looking at your message again, would imply that the configuredKafkaCommit() method shouldn't inspect isolation.level

Issue Priority

Priority: 2

Issue Component

Component: io-java-kafka

@nbali
Copy link
Contributor Author

nbali commented Aug 9, 2022

.take-issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant