Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Kafka consumer offsets are not being committed to Kafka in Beam v2.52.0 on Flink Runner #30539

Closed
1 of 16 tasks
anartemp opened this issue Mar 6, 2024 · 2 comments · Fixed by #30971
Closed
1 of 16 tasks

Comments

@anartemp
Copy link

anartemp commented Mar 6, 2024

What happened?

Beam v2.52.0
Flink v1.12.4

I have a Beam pipeline that reads from Kafka, transforms data and writes to Kafka. It runs on Flink with periodic checkpointing enabled. KafkaIO is set up as follows:

KafkaIO.read()
...
.withReadCommitted()
.commitOffsetsInFinalize()
.withoutMetadata()

After moving to Beam v2.52.0, I don't see Kafka consumer offsets being committed to Kafka as part of checkpoint finalisation anymore. It worked as expected in Beam v2.51.0 and prior.

After looking at the code, I noticed that #28614 replaced UnboundedSourceWrapper with FlinkUnboundedSource/FlinkUnboundedSourceReader.
UnboundedSourceWrapper implements Flink's CheckpointListener and overrides notifyOnCheckpointComplete method, which finalises Kafka checkpoint marks.
However, FlinkUnboundedSourceReader -> FlinkSourceReaderBase implement Flink SourceReader which provides a default no-op implementation for notifyOnCheckpointComplete. FlinkUnboundedSourceReader -> FlinkSourceReaderBase do not explicitly override the default implementation meaning that checkpoint finalisation in KafkaIO does not happen and Kafka consumer offsets are not committed back to Kafka. Is this accurate?

Thank you!

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tvalentyn
Copy link
Contributor

cc: @jto

@jto
Copy link
Contributor

jto commented Apr 4, 2024

Hey @anartemp 👋
I made a PR here: #30849 that I think should address the problem.
Since I don't use Beam+Flink for streaming jobs, it's quite hard for me to validate it actually works in practice.
Could you please give it a try and let me know if the issue is indeed resolved ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants