[BEAM-5971] Prevent unwanted shutdown of UnboundedSourceWrapper#6946
Conversation
When `shutdownSourcesOnFinalWatermark` is set to `false`, sources must not be shut down in order to enable the checkpointing to work correctly.
angoenka
left a comment
There was a problem hiding this comment.
Thanks Max.
Some comments on simplification otherwise looks good.
| finalizeSource(); | ||
| } | ||
|
|
||
| private void finalizeSource() { |
There was a problem hiding this comment.
'While' inside this method was never executed earlier as isRunning was always false when we reached here.
Shall we call this method only from the 1st if statement L205 as even not, in the other else blocks will exit only after isRunning is false.
There was a problem hiding this comment.
Also wait is only used as sleep here. Shall we change it to sleep to make it more evident?
There was a problem hiding this comment.
Yes, the bug was that the finalize code was never executed for empty readers.
Shall we call this method only from the 1st if statement L205 as even not, in the other else blocks will exit only after isRunning is false.
All if blocks will eventually execute finalizeSource, so I wouldn't call it directly in line 205 because that would additionally require a return; statement.
There was a problem hiding this comment.
Also wait is only used as sleep here. Shall we change it to sleep to make it more evident?
Yes, good idea. The object we're waiting for, can't be notified, so sleep is better here.
| finalizeSource(); | ||
| } | ||
|
|
||
| private void finalizeSource() { |
We're not holding a shared lock, so Thread.sleep is the better approach for waiting.
When
shutdownSourcesOnFinalWatermarkis set tofalse, sources must not beshut down in order to enable the checkpointing to work correctly.
R: @angoenka
Post-Commit Tests Status (on master branch)