New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10612] add Flink 1.11 runner #12564
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome @nevillelyh! Thanks for the PR. A couple comments:
-
Please do not duplicate/move any testing code. Whenever necessary, add a version-specific override or Reflection, but keep the tests in the base source folder. We try to minimize the amount of code duplication for the supported Flink versions to a minimum.
-
For the TimerServiceManager, the field is private but there is a getter! Please use the getter
AbstractStreamOperator#getTimeServiceManager()
. -
Please update the docs with information on the newly supported version.
* OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to | ||
* https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details. | ||
*/ | ||
public class FlinkRunnerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we could avoid duplicating all this code? If the signature changed, we should rather define an Interface to retrieve the pipeline/construct the environment, which different versions can override. Alternatively, using reflection is also a valid option in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah should be doable. I'll add a shim.
Also totally missed getTimeServiceManager
. My IDEA is still glitchy with the setup, need to get use to the multiple versions code sharing flow 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm addressed most code copy issues by adding compat layers. ./gradlew -p runners/flink test
passes for all versions but build
fails checkstyle due to missing package-info.java
, even though it exists in the base src/main
dir. Any idea how to fix?
I'll look at the docs next. Not sure about the CI test scripts though.
> Task :runners:flink:1.10:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.10/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.8:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.8/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.9:checkstyleMain
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.9/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.9:checkstyleMain FAILED
> Task :runners:flink:1.11:checkstyleMain
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.11/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
If the problem doesn't go away after running ./gradlew -p runners/flink clean
, then you might have to add a suppression here:
<suppress checks="JavadocPackage" files=".*runners.flink.*CoderTypeSerializer\.java"/> |
d2f3a75
to
8ee08b5
Compare
runners/flink/1.8/build.gradle
Outdated
@@ -17,7 +17,6 @@ | |||
*/ | |||
|
|||
def basePath = '..' | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unrelated change
|
||
@Override | ||
public int getClusterPort() { | ||
return getClusterInformation().getBlobServerPort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really the blob server port here or do we want the Job server RPC port?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I really monkeyed that one 🙈
If I understood it correctly, it's a workaround for this, to expose port in useSingleRpcService
mode:
https://github.com/apache/flink/blob/release-1.10.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L270
Which was changed to createLocalRpcService
in 1.11.
https://github.com/apache/flink/blob/release-1.11.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L271
Pushed a fix for this.
* OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to | ||
* https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details. | ||
*/ | ||
public class FlinkRunnerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
If the problem doesn't go away after running ./gradlew -p runners/flink clean
, then you might have to add a suppression here:
<suppress checks="JavadocPackage" files=".*runners.flink.*CoderTypeSerializer\.java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Great work. Thanks @nevillelyh.
Could you squash the commits?
@@ -699,7 +699,7 @@ public final void processWatermark1(Watermark mark) throws Exception { | |||
|
|||
long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark()); | |||
if (keyCoder != null) { | |||
timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold)); | |||
getTimeServiceManagerCompat().advanceWatermark(new Watermark(inputWatermarkHold)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a field for this instead of calling the getter every time?
a090ca5
to
3be2b32
Compare
- Fix deprecated OperatorStateStore.getOperatorState => getListState - Fix WindowedValue OutputTag in DoFnOperatorTest and ExecutableStageDoFnOperatorTest - Fix FlinkStreamingTransformTranslatorsTest - Fix SourceTransformation => LegacySourceTransformation rename - Fix timeServiceManager access change in AbstractStreamOperator - Fix RemoteMiniClusterImpl RPC service port work around - Abstract version specific env and PackagedProgram logic in FlinkRunnerTest - Suppress checkstyle false alarm on AbstractStreamOperatorCompat
3be2b32
to
5893ee4
Compare
Squashed & made |
Run PythonDocker PreCommit |
Run Java_Examples_Dataflow PreCommit |
Run Portable_Python PreCommit |
Run Python2_PVR_Flink PreCommit |
Most builds fail at the moment:
Considering you only squashed and it was passing before (https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/13017/), it would probably be fine, but I'll try to kick off the tests again. |
retest this please |
1 similar comment
retest this please |
Run Python PreCommit |
Unrelated test failure in Python:
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14588/ |
Thanks for making this happen! |
Hey, I know this is a closed PR but I'm hoping someone can help me. I'm trying to understand how I might be able to take advantage of this PR. The environment I'm trying to run my beam job on has flink 1.11 installed but the only runner package I see available is for 1.10 (https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink-1.10) Is there another way of utilizing this code? |
Support for Flink 1.11 will be included in Beam 2.25.0. Until then,
you'll have to manually build the jar from source using the following
command:
./gradlew :runners:flink:1.11:jar
|
thanks! that was all the information I was looking to hear |
WIP
Almost there except the following error:
I tried working around it by forking the test & back porting the
StreamOperatorStateContext.internalTimerServiceManager
logic from upstream but ran into other issues including checkstyle errors w.r.t. missingpackage-info.java
and test failures.It worked with my forked upstream Flink build with
AbstractStreamOperator.timeServiceManager
reverted toprotected
.@mxm any ideas?
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.