New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 #10945
Conversation
Run CommunityMetrics PreCommit |
Run Python2_PVR_Flink PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There a large amount of code duplication. Is that really necessary? AFAIK the changes between 1.9 and 1.10 should be minimal. Could you please revise this or explain why it is necessary?
There are a lot of changes related to the job client API(https://issues.apache.org/jira/browse/FLINK-14392, https://issues.apache.org/jira/browse/FLINK-14376) in 1.10. I have added comments at the header of each test case copied to 1.10 for the reason making a copy of them. Could you help to take a look at if the reason makes sense for you? :) |
Hi @mxm I found that the test case
I guess it's related to the feature "Unified Memory Configuration for TaskExecutors"(https://issues.apache.org/jira/browse/FLINK-13980) which is introduced in Flink 1.10. Before 1.10, the memory managed by the Flink's MemoryManager is calculated dynamically if not configured and I have checked that it will be about 2500 MB in my local machine. Since 1.10, it will be 128 MB if not configured (taskmanager.memory.managed.size). I have performed a simple test and the failed test beam/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java Line 77 in 7b3a3fa
I'm still investigating the best way to address this issue at Beam side. And appreciate if you have any suggestion on this :) |
@@ -67,6 +69,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment( | |||
|
|||
// depending on the master, create the right environment. | |||
if ("[local]".equals(flinkMasterHostPort)) { | |||
flinkConfiguration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("2048m")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has sets the default managed memory size to 128MB for MiniCluster in https://issues.apache.org/jira/browse/FLINK-15763. Have set it to a large value when the master host is [local]. Appreciate for any suggestions on a better way to address this issue.
2c7680b
to
82b781c
Compare
Run Java PreCommit |
Run Python PreCommit |
Test failure is not caused by the current PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about removing 1.7 first and then revising this PR? I don't feel comfortable now with the amount of code duplication. Also I suspect that this PR will break support for 1.7 to the RemoteEnvironment removal.
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
Show resolved
Hide resolved
runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
Outdated
Show resolved
Hide resolved
82b781c
to
3657ca4
Compare
@mxm Thanks for the review. I have updated the PR according to your comments. Currently it only copied 3 tests and I think even we drop 1.7 support, we still need to copy these tests. What's your thought? |
Run Spotless PreCommit |
Run CommunityMetrics PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sunjincheng121. It generally looks good to me. IMHO the test code duplication is not optimal and I would like to change that but this could also be done in a follow-up. Could you squash the commits?
...s/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
Outdated
Show resolved
Hide resolved
.../1.10/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
Show resolved
Hide resolved
retest this please |
7d87a6d
to
1f513ad
Compare
Thanks for the review @mxm ! And the suggestion about test case is great :) , I have update the PR accordingly :) |
@@ -77,6 +67,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment( | |||
|
|||
// depending on the master, create the right environment. | |||
if ("[local]".equals(flinkMasterHostPort)) { | |||
flinkConfiguration.setString("taskmanager.memory.managed.size", "2048m"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling this won't be reliable enough. Why not instead taskmanager.memory.managed.fraction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will set the taskmanager.memory.managed.size
as 128MB for MiniCluster if it's not set. I think set taskmanager.memory.managed.fraction"
doesn't take effect here. Thoughts? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant with this default because it will always pre-allocate 2GB of memory which won't be used most of the time, except for the one large record test case you mentioned.
We could set I'd go for something like https://github.com/apache/flink/blob/42a56f4c75693773e21fa2dea45df640c2d7f9da/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java#L287 based on the memory available.
Actually, that is what the Flink 1.8 code used to do: https://github.com/apache/flink/blob/60d9b96456f142f8d18d5882016840a00159403e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java#L296
So let's just check the free memory and use a fraction for memory managed memory by default. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mxm, Sounds good to me ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks for the changes.
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
Outdated
Show resolved
Hide resolved
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
Outdated
Show resolved
Hide resolved
Thanks for making the adjustments. A couple more comments but otherwise it looks very good! Great work. |
1f513ad
to
4cc537a
Compare
Run Python PreCommit |
1 similar comment
Run Python PreCommit |
…le with Flink 1.10
4cc537a
to
f91b390
Compare
Run Python2_PVR_Flink PreCommit |
Run Python PreCommit |
Run Python2_PVR_Flink PreCommit |
1 similar comment
Run Python2_PVR_Flink PreCommit |
Run Java PreCommit |
Run Python2_PVR_Flink PreCommit |
Run Java PreCommit |
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sunjincheng121. I'll merge this once the tests pass.
Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.