Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9855] Provide an option to configure the Flink state backend #13116

Merged
merged 2 commits into from Nov 10, 2020

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Oct 14, 2020

We should make it easier to configure a Flink state backend. At the moment,
users have to either:

(A) Configure the default state backend in their Flink cluster

(B1) Include the dependency in their Gradle/Maven
project (e.g. "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
for RocksDB).
(B2) Set the state backend factory in the FlinkPipelineOptions. This only works
in Java due to the factory specification being in Java!

We can make it easier by simple adding pipeline options for the state backend
name and the checkpoint directory which will be enough for configuring the state
backend. We bundle the RocksDB state backend by default.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@mxm
Copy link
Contributor Author

mxm commented Oct 16, 2020

Run Java PreCommit

@mxm
Copy link
Contributor Author

mxm commented Oct 16, 2020

Unrelated flaky tests in Java PreCommit.

@@ -148,6 +148,8 @@ dependencies {
compile "org.apache.flink:flink-java:$flink_version"
compile "org.apache.flink:flink-runtime_2.11:$flink_version"
compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
// RocksDB state backend
compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to confirm that dependency won't be baked into the job server as it is already part of the Flink dist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? AFAIK RocksDB is an optional dependency. We can change the scope to provided and add a separate dependency with testCompile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and the dependency does not seem to be part of the default Flink distribution (quite surprising for me btw). However the way of instantiation makes the dependency needed, making it provided will solve it but it is a bit less user-friendly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is part of the flink distribution:

$ tar -tf flink-dist_2.12-1.11.2.jar | grep RocksDBStateBackend
org/apache/flink/contrib/streaming/state/RocksDBStateBackend.class
org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.class
org/apache/flink/contrib/streaming/state/RocksDBStateBackend$PriorityQueueStateType.class

If it wasn't, you could not configure the default state backend:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-default-state-backend

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB used to be not included in the dist. Good point, we can set it to provided then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've verified that RocksDB is not included in any of the to-be-released jars.


// State backend
if (options.getStateBackendFactory() != null) {
private static void configureStateBackend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink is headed in the direction where everything that is set on an environment becomes configurable (including the executor, FLIP-73). This change kind of goes in the opposite direction, increasing the amount of Flink pipeline options further. Should we look into the generic configuration mechanism instead, where it is really easy for the user to supply the Flink configuration (optionally inline, instead of via a separate file)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a great idea. We can start thinking about that on the mailing list and handle this via separate JIRA issue. This pipeline option can then be replaced by the generic configuration option.

For now, this pipeline option will fulfill a common request by Beam users to directly set the state backend without having to change the Flink configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, we want to be able to easily run Nexmark with RocksDB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention of supporting this to be able to configure new Backends too? I mean like the new one by the RISE team? Is this the intended Nexmark use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, new state backends can be added as needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia why does Nexmark need the rocksdb state backend? That backend is required when state does not fit into the heap, otherwise it is almost always better to use filesystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to experiment with different state backends.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this change is required to specify the backend per job only and there seems not to be another way to do it in current Flink runner. It is curious I expected users to be able to do Flink specific runtime configuration independently of Beam's, isn't there a way to do this?

@@ -148,6 +148,8 @@ dependencies {
compile "org.apache.flink:flink-java:$flink_version"
compile "org.apache.flink:flink-runtime_2.11:$flink_version"
compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
// RocksDB state backend
compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and the dependency does not seem to be part of the default Flink distribution (quite surprising for me btw). However the way of instantiation makes the dependency needed, making it provided will solve it but it is a bit less user-friendly.


// State backend
if (options.getStateBackendFactory() != null) {
private static void configureStateBackend(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention of supporting this to be able to configure new Backends too? I mean like the new one by the RISE team? Is this the intended Nexmark use?

Copy link
Contributor Author

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address the remaining comments and will merge this if there are no further remarks.

@tweise
Copy link
Contributor

tweise commented Oct 27, 2020

I suppose this change is required to specify the backend per job only and there seems not to be another way to do it in current Flink runner. It is curious I expected users to be able to do Flink specific runtime configuration independently of Beam's, isn't there a way to do this?

I'm not sure that is the case, have you tried it? Can the state backend not be configured as part of flinkConfiguration (

)?

@mxm
Copy link
Contributor Author

mxm commented Nov 8, 2020

There is currently no universal way to set the configuration per-job. Unless a per-job cluster is used. In this case a configuration file can be supplied for the per-job cluster. There is no way to explicitly configure a state backend via a flag to the main program. Even if we allowed for a configuration file parameter, we would rely on that configuration file to be present during runtime which we can't assume. Further, if we allowed a direct configuration YAML string, we would also have to explicitly set the state backend on the StreamExecutionEnvironment, otherwise during runtime the cluster's default state backend would be used instead of the configured one (the default state backend can only be configured during job creation time). The intention here is to allow setting and overriding the default state backend. Thus, it makes sense (for now) to have an explicit way to configure the state backend.

We should make it easier to configure a Flink state backend. At the moment,
users have to either:

(A)  Configure the default state backend in their Flink cluster

(B1) Include the dependency in their Gradle/Maven
     project (e.g. "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
     for RocksDB).
(B2) Set the state backend factory in the FlinkPipelineOptions. This only works
     in Java due to the factory specification being in Java!

We can make it easier by simple adding pipeline options for the state backend
name and the checkpoint directory which will be enough for configuring the state
backend. We bundle the RocksDB state backend by default.
@mxm mxm merged commit d492371 into apache:master Nov 10, 2020
@TheNeuralBit
Copy link
Member

I think this has broken Java PreCommit (https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/3471/), probably just a missing import

10:18:56 > Task :runners:flink:1.10:compileTestJava
10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:474: error: cannot find symbol
10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
10:18:56                                    ^
10:18:56   symbol:   variable PipelineOptionsFactory
10:18:56   location: class FlinkExecutionEnvironmentsTest
10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:489: error: cannot find symbol
10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
10:18:56                                    ^
10:18:56   symbol:   variable PipelineOptionsFactory
10:18:56   location: class FlinkExecutionEnvironmentsTest
10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:503: error: cannot find symbol
10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
10:18:56                                    ^
10:18:56   symbol:   variable PipelineOptionsFactory
10:18:56   location: class FlinkExecutionEnvironmentsTest
10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:517: error: cannot find symbol
10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
10:18:56                                    ^
10:18:56   symbol:   variable PipelineOptionsFactory
10:18:56   location: class FlinkExecutionEnvironmentsTest

ibzib added a commit to ibzib/beam that referenced this pull request Nov 11, 2020
@mxm
Copy link
Contributor Author

mxm commented Nov 11, 2020

Thanks Brian. Yes, this caused a merge conflict which was not visible before merge. Should have rebased since this PR was open for a bit. Looks like #13297 fixed it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants