-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-7623][tests] Add tests to make sure operator is never restored when using new operator id #4851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
116b4fb to
d82b5d7
Compare
| /** | ||
| * Stream environment that allows to wait for checkpoint acknowledgement. | ||
| */ | ||
| class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a similar refactoring in one of my pending PRs, but it's ok because that one will probably not make it into 1.4. What I would still suggest, if you search for subclasses of StreamMockEnvironment, there are still more cases (some as anonymous classes) that could be replaced by a proper dummy like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found only one more usage of StreamMockEnvironment in AsyncWaitOperatorTest#testStateSnapshotAndRestore. Did you mean something more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RocksDBAsyncSnapshotTest does something very similar in anonymous class
| assertEquals(1, RestoreCounterOperator.RESTORE_COUNTER.get()); | ||
| } | ||
| finally { | ||
| RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just a matter of personal taste, but wouldn't it be easier to simply reset the count to 0 at the beginning of each test or even have a setup method (or tearDown if you prefer to reset after the test) for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general tearDown would not catch this, since I'm using at least two different counters. However I think it will be cleaner if I refactor this code inject counter/set (newly created in each test) from the outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, agree.
| } | ||
| } | ||
|
|
||
| private AcknowledgeStreamMockEnvironment processRecords( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the name of this method pretty confusing. I think it does a lot more than processing records, so maybe it is clearer if you break it down into multiple methods and give the top-level method a different name?
| new RestoreCounterOperator(), | ||
| Optional.of(stateHandles)); | ||
|
|
||
| assertEquals(1, StatelessRestoreCounterOperator.RESTORE_COUNTER.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this also be interpreted in the exact opposite way because we don't validate for which of the two operators we counted isRestored == true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StatelessRestoreCounterOperator and RestoreCounterOperator are using different counters. However that leads to my mistake, because this shows, that even stateless operators are getting isRestore() == true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, sorry...I misread them as the same :-)
| new RestoreCounterOperator(), | ||
| Optional.of(stateHandles)); | ||
|
|
||
| assertEquals(1, RestoreCounterOperator.RESTORE_COUNTER.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this also be interpreted in the exact opposite way because we don't validate for which of the two operators we counted isRestored == true? This be correct in connection with the other tests in this class, but i think the invariant is not completely clear from only this test and it is not fully self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe keeping a set of the restored operator ids instead of a count already does the job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes sure, keeping the set is much better idea :)
| Collections.emptyMap()); | ||
|
|
||
| TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); | ||
| stateHandles.putSubtaskStateByOperatorID(headOperatorID, emptyHeadOperatorState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One point that this test will not catch is when the behaviour of the state assignment changes or something else is used instead, e.g. producing a null instead of an empty OperatorSubtaskState and this will influence the result. I am aware that covering this probably means having an IT case or covering this with another unit test - so do you think it is worth having to also guard against this kind of code modifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same also applies to a certain degree to all other tests: right now we assume that the reported states get passed back without any modification from the checkpoint coordinator. Again, modifications to that could change behaviour without detection from this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ad 1. - that's why I have used StateAssignmentOperation.operatorSubtaskStateFrom, to share this logic. I didn't want to use all of the StateAssignmentOperation, because it's constructor is really annoying to fulfil.
ad 2. - that's a valid concern, however writing ITCases for this might be also an overkill. And wouldn't it be to necessary to test it against every state backend, to make sure that there are no introduced quirks during (de)serialisation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say it's great to have this unit test, eventually an IT case could also make sense. But this can be done in a different work / PR.
StefanRRichter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I am very glad that you are contributing this test to define the exact behaviour of isRestored()! I have a few comments and discussion points inline.
On top of that, I wonder if it also makes sense to test the empty state cases in mixed with stateful cases AND in isolation, just to make sure that having one stateful case in the chain does not trigger creation on an object where otherwise null would be returned or something like that. It should behave correctly in combination and isolation.
|
Also from our offline discussion, I would suggest that this behaves as: if an operator with a given ID participated in a checkpoint, it should be marked as restored. From this definition, all cases should be derived. I believe this is slightly different to the current implementation. Both make sense, so I think we should agree to something. @pnowojski @aljoscha which is the better definition for you? |
|
It appears that current behaviour is as you wished @StefanRRichter:
|
|
That's great! In that case I would approve this 👍 |
| } | ||
| } | ||
|
|
||
| public StreamConfigChainer setupOpertorChain(OperatorID headOperatorId, OneInputStreamOperator<?, ?> headOperator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo, but I'll fix while merging
|
It's excellent that you separated the cleanup work from the actual change. 👍 I rebased on master and merge once Travis is green. |
|
I merged, could you please close the PR? |
|
Thanks! |
What is the purpose of the change
This PR adds tests coverage for correct behaviour of
ManagedInitializationContext#isRestoredflag - if application is restarted and a some operator has a newuid, it should return false. This bug was fixed by #4353.Brief change log
Please check commit messages for change log
Verifying this change
This PR adds
RestoreStreamTaskTestand is not changing any productional code.