[BEAM-4681] Add support for portable timers in Flink batch mode #7008
[BEAM-4681] Add support for portable timers in Flink batch mode #7008mxm merged 6 commits intoapache:masterfrom
Conversation
|
Run Java Flink PortableValidatesRunner |
ed93559 to
fb53c99
Compare
ryan-williams
left a comment
There was a problem hiding this comment.
looks great! I left a couple comments/questions
...rg/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
Outdated
Show resolved
Hide resolved
...rg/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
Outdated
Show resolved
Hide resolved
...rg/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
Outdated
Show resolved
Hide resolved
...va/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
Outdated
Show resolved
Hide resolved
5a3a299 to
23c278c
Compare
|
Thanks for the review @ryan-williams! |
81f5f67 to
0da9483
Compare
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java Flink PortableValidatesRunner |
a916c36 to
48f5ae6
Compare
|
Rebased this to #6981. |
bf9045b to
7c74482
Compare
|
Run Java Flink PortableValidatesRunner |
|
Run Java PreCommit |
|
Run Java Flink PortableValidatesRunner |
7c247fb to
5cb59b2
Compare
|
Run Python Flink ValidatesRunner |
|
Run Java Flink PortableValidatesRunner |
|
Run Java PreCommit |
|
Run Python PreCommit |
|
Run Java Flink PortableValidatesRunner |
|
Run Flink ValidatesRunner |
5cb59b2 to
bbefdbd
Compare
|
Run Flink ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Java Flink PortableValidatesRunner |
|
Run Python Flink ValidatesRunner |
3537b56 to
62a1313
Compare
|
Run Python Flink ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Java PreCommit |
|
Ran "Java Flink PortableValidatesRunner" locally with the latest master. All tests passed. |
|
Unfortunately, the Jenkins setup for it seems to be broken at the moment. |
|
Same goes for "Run JavaPortabilityApi PreCommit" which fails continuously at the moment for all commits. My PR doesn't change anything regarding the portability API. |
...n/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
Show resolved
Hide resolved
| int tagInt = unionTag; | ||
| return receivedElement -> { | ||
| synchronized (collectorLock) { | ||
| collector.collect(new RawUnionValue(tagInt, receivedElement)); |
There was a problem hiding this comment.
In streaming doing this in the RPC handler can cause a deadlock when collect triggers the next bundle in an operator chain. Presumably that cannot happen due to different scheduling in batch?
There was a problem hiding this comment.
Good question. I think you are right. This can deadlock if the receiving side backpressures because it is waiting for input from another channel generated by this operator. Is that what you meant?
There was a problem hiding this comment.
Yes, but I don't know if it is applicable for batch since it is scheduled stage by stage. For streaming the issue would surface just by running wordcount, for batch not.
There was a problem hiding this comment.
The stage-by-stage is only in place if the ExecutionMode is set to FORCE_BATCH, instead of the default PIPELINE.
...n/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
Outdated
Show resolved
Hide resolved
e604fed to
284222a
Compare
|
Run Website PreCommit |
|
Run Python Flink ValidatesRunner |
284222a to
b752ad5
Compare
|
Squashed for merge. |
|
Run Python Flink ValidatesRunner |
|
Run Website PreCommit |
1 similar comment
|
Run Website PreCommit |
| int tagInt = unionTag; | ||
| return receivedElement -> { | ||
| synchronized (collectorLock) { | ||
| collector.collect(new RawUnionValue(tagInt, receivedElement)); |
There was a problem hiding this comment.
Yes, but I don't know if it is applicable for batch since it is scheduled stage by stage. For streaming the issue would surface just by running wordcount, for batch not.
I find the structure quite logical and it should enable more incremental rollback:
But it is a personal preference and I'd also be fine with squashing. |
|
Run Flink ValidatesRunner |
|
Run Website PreCommit |
This adds support for portable timers in Flink Runner's batch mode, and enables the Java/Python Portable ValidatesRunner tests.
CC @tweise @angoenka @robertwb
Post-Commit Tests Status (on master branch)