-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file #8479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 2e316e2 (Fri Feb 28 21:49:54 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
Hi, @StefanRRichter can you help take a look on this PR when you have time, thx. |
|
The travis-ci failure seems unrelated |
| * @param classLoader The class loader. | ||
| */ | ||
| private RocksDBStateBackend(RocksDBStateBackend original, Configuration config, ClassLoader classLoader) { | ||
| this.configuration = config; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we use RocksDBStateBackend, seems we always use the original statebackend's conf first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @klion26 for your review. I'm not sure whether i quite get you meaning. But IMO this construct is only for creating a re-configured copy of the original state backend. So the configure passed in will be used first ? And also should keep this configure to a member field to reuse it to config the statebackend loaded from StateBackendLoader#fromApplicationOrConfigOrDefault.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the code below(start from line 302) seems to use the original backend's conf first, so I'm not sure you change now is on the right directory(maybe the change you've made can't solve the problem on the issue)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @klion26 I check the code again, only the priorityQueueStateType is configured from configuration, and the type from configuration will be used first see
final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);
this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
And other parameters are all passed by the constructor, so it can work for the issue i think. but it looks strange indeed(some follow the original backend, some follow the config first), I will think how to make it better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the other configs? Do we need to do the same thing here for the other configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other config are all following this manner: check the original config whether set or undefined, if it is not defined then it will look into the config , at last fallback to the default config which is consistent with the doc of the config()
Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not yet specified in this state backend.
But for the priorityQueueStateType there is no undefined type, it only has two: rocksdb and heap, if we have to follow the manner I think we just have to add an undefined value for priorityQueueStateType, but i am not sure do we need to do this.
What i am do now is just overriding the config loading from file with the original statebackend config to solve this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we need to check the config first, then fallback the original config, and at last fallback to the default config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't have to, I prefer to keep the former style, but only use the config in user code with higher priority than the default configuration loading from the file.
|
Hi @klion26 I refactor the test case to verify this fix work explicitly. @StefanRRichter could you help review this bug fix, too. |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this problem @Aitozi
I think you are right that the user custom configuration should have a precedence.
However, it looks like other parameters already have a certain configuration pattern which was probably accidentally overlooked in the original effort.
E.g. localRocksDbDirectories is firstly checked whether it is already set or not by user. If it is set then it is not overwritten otherwise it is set from the config which will be default value in case of cluster config.
It looks like it would be simpler just to stick to the same pattern for priorityQueueStateType instead of changing the existing approach.
wdyt?
|
Thanks for your suggestion @azagrebin , I will look into this . |
|
Hi @azagrebin , I have done the fix following your comments, please help review when you are free. |
|
Do you know why this push do not trigger travis ci ? How can i trigger it manually @azagrebin ? |
|
@Aitozi it triggered the CI, the setup has been recently changed, you can see CI results in the special build comment in this PR. |
|
@azagrebin I check the CI log just now, I think it failed due to
seems unrelated to this PR ? |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @Aitozi
I left some smaller comments
| PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; | ||
| if (original.priorityQueueStateType == PriorityQueueStateType.UNDEFINED) { | ||
| String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); | ||
| this.priorityQueueStateType = PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
short-cut for getting Enum values is: config.getEnum(...)
| PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; | ||
| if (original.priorityQueueStateType == PriorityQueueStateType.UNDEFINED) { | ||
| String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); | ||
| this.priorityQueueStateType = PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think introducing UNDEFINED makes sense in general but I am wondering if making it @nullable and leaving initialized by null is actually simpler in this case. If we use UNDEFINED, I think we also have to check here explicitly and then test it that if user configures UNDEFINED then she gets an error at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree to use null directly.
| enableTtlCompactionFilter = TernaryBoolean.TRUE; | ||
| } | ||
|
|
||
| private PriorityQueueStateType resolvePriorityQueueStateType(PriorityQueueStateType origin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if it is more user-friendly if we add setter/getter for this like we have for some other things.
we can also use this getter with the resolution logic in createKeyedStateBackend.
|
|
||
| // Fix the option value string and ensure all are covered | ||
| Assert.assertEquals(2, RocksDBStateBackend.PriorityQueueStateType.values().length); | ||
| Assert.assertEquals(3, RocksDBStateBackend.PriorityQueueStateType.values().length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also UNDEFINED assertion if we choose to go with it
Assert.assertEquals("UNDEFINED", RocksDBStateBackend.PriorityQueueStateType.UNDEFINED.toString());
...db/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
Outdated
Show resolved
Hide resolved
| */ | ||
| @Test | ||
| public void testConfigureTimerServiceLoadingFromApplication() throws Exception { | ||
| final Environment env = getMockEnvironment(tempFolder.newFolder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid using mockito mocks in new code. We have e.g. MockEnvironmentBuilder which we could try here to achieve similar behaviour.
| final RocksDBStateBackend rocksDBStateBackend = (RocksDBStateBackend) loadedBackend; | ||
|
|
||
| RocksDBKeyedStateBackend<Integer> keyedBackend2 = createKeyedStateBackend(rocksDBStateBackend, env); | ||
| Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend2.getPriorityQueueFactory().getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyedBackend(2).dispose(); at the end
|
|
||
| RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(backend, env); | ||
|
|
||
| Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In these two lines we duplicate check from testConfigureTimerService.
I would suggest we test only one thing in a test at a time.
|
yes, I think it is unrelated |
|
Please take a look again when you have time, I have fixed following your comments. @azagrebin |
|
ping @azagrebin |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi
Sorry for the late answer. The changes look good to me, I left couple of really small comments.
I was trying to reach @StefanRRichter, just to check whether there was some other reason to implement this option like it was before. I would prefer to give him some time to comment on that.
If there is no objection from his side after some time, we will merge the PR.
...kend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
Outdated
Show resolved
Hide resolved
...kend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
Outdated
Show resolved
Hide resolved
...kend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
Outdated
Show resolved
Hide resolved
|
OK, I have addressed your comments @azagrebin . |
|
ping @azagrebin |
carp84
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM, +1.
@azagrebin Is there any response from Stefan? IMHO it would be good if we could include this in 1.10.0, wdyt? Thanks.
784e65d to
e102192
Compare
|
Thanks @carp84 for help review this PR , and I have rebased the master solved the conflicts. |
…ration option do not work
e102192 to
2e316e2
Compare
|
@Aitozi @carp84 |
…ption to be settable per job This closes #8479.
…ption to be settable per job This closes #8479.
…ption to be settable per job This closes #8479.
+1 (belated) on introducing a setter instead of I'd like to see this bug fix also go into release-1.10, please let me know if you need a hand on the backport @azagrebin . Thanks. |
What is the purpose of the change
As described in the jira, User's customize configuration which is configured by
backend.configure()method will be override by the configuration loading from flink-conf.yaml. I think the config in the code should has a higher priority than the default file configuration.Brief change log
Merge configuration before create the new state backend.
Verifying this change
Adding a test case to verify.
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation