-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[WIP][FLINK-26372][runtime][state] Allow to configure Changelog Storage per program #20523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (changelogStateBackendEnabled.getOrDefault(false)) { | ||
| StateChangelogOptionsInternal.putConfiguration( | ||
| jobConfiguration, changelogConfiguration); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, changelog configuration is passed from JM to TM inside the jobGraph.jobConfiguration. It is then merged on TM with its "local" TM configuration.
It's not strongly typed (something like ChangelogConfig) because different implementations might have different parameters. And it's not a serialized factory because passing a string map seems safer and easier.
It's added to jobConfiguration as serialized value under a single key (rather than merged) because semantically they are different. jobConfiguration is some internal Flink config, while changelog configuration contains some user-facing keys; so they might clash.
@zentol WDYT about this approach?
…r program Because different changelog implementations might have different options, the configuration is passed as a (serialized) string map. To extract the relevant KV-pairs (and not pass the whole configuration), Changelog factory is used on JM. On TM, its configuration is combined with the one from job. The path in common cases is as follows: env -> streamGraphGenerator -> streamGraph -> jobGraph -> TM
masteryhx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the draft!
The passing path makes sense to me.
I have a comment about the overide logic, PTAL.
BTW, we could validate the overide logic by a UT ?
| StateChangelogStorageView<?> createStorageView(Configuration configuration) throws IOException; | ||
|
|
||
| /** Extract the relevant to this factory portion of the configuration. */ | ||
| default Configuration extractConfiguration(ReadableConfig src) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, Even if users haven't setted these parameter in job level, we will still overide them in current logic?
Should we just overide them when users have setted them ?
|
Superseded by #21637. |
Uh oh!
There was an error while loading. Please reload this page.