New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2979] Fix a race condition in getWatermark() in KafkaIO. #3985
Conversation
+R @tgroh. Thanks Thomas. |
private KafkaRecord<K, V> curRecord; | ||
private Instant curTimestamp; | ||
// curRecord and curTimestamp are accessed outside advance(), which might be another thread. | ||
private AtomicReference<KafkaRecord<K, V>> curRecord = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is extremely smelly; the current record and current timestamp must be updated atomically and simultaneously, so using two separate atomic references is not sufficient - you need to store the timestamped record here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. I thought of commenting on that explicitly. Right now we don't depend on them to be consistent. But I agree, simpler to do the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Please take a look.
@tgroh PTAL. I changed it to access them within same synchronized block. |
Wait, sorry, I just looked at this again. A Specifically, the
The synchronization is as such unnecessary |
That was my impression too.. that's we never synchronized around getWatermark(). Good that it is in the contract as well. Looks like Flink does call |
b4fc45f
to
981df9f
Compare
@wtanaka, It looks like the runner is not supposed to call Source API explicitly mentions single threaded access : https://github.com/apache/beam/blob/release-2.0.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java#L131 |
Yes, this is a FlinkRunner problem. In |
Two fixes : - Don't set curRecord to null before updating. If user deserializers throw, ok to keep curRecord pointing to old one. - use atomic references for curRecord and curTimestamp since these are accessed in getWatermark() and getCurrentTimestamp(). These methods could be called concurrently with advance().
981df9f
to
31a57bc
Compare
+R: @chamikaramj, @tgroh |
LGTM |
Run Java Precommit |
1 similar comment
Run Java Precommit |
tests still failing? |
Run Java Precommit |
Run Java Precommit. |
@chamikaramj, how many attempts should we attempt ;-) |
Ok, I give up. Will test locally and commit :) |
Two fixes :