-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-14091][coordination] Allow updates to connection state when ZKCheckpointIDCounter reconnects to ZK #10754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
also cc @lamber-ken |
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 760a023 (Fri Jan 03 03:41:39 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this problem @tisonkun. The changes look good to me. I was wondering whether we could add a test case for the ZooKeeperCheckpointIDCounter which ensures that we can increment the counter after reconnecting.
|
Thanks for your review @tillrohrmann ! I push a follow-up commit for adding dedicate test for it. |
5fd6901 to
7bfd2b1
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test case @tisonkun. The changes look good to me. I'll address my remaining comments while merging this PR.
| private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(3); | ||
|
|
||
| @AfterClass | ||
| public static void tearDown() throws Exception { | ||
| ZOOKEEPER.shutdown(); | ||
| } | ||
|
|
||
| @Before | ||
| public void cleanUp() throws Exception { | ||
| ZOOKEEPER.deleteAll(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of the ZooKeeperTestEnvironment I would recommend using the ZooKeeperResource. Combining this with the @Rule will replace the AfterClass and Before methods. Maybe one needs to make the TestingServer accessible, though.
| // encountered connected loss, this prevents us from getting false positive | ||
| while (true) { | ||
| try { | ||
| idCounter.get(); | ||
| } catch (IllegalStateException ignore) { | ||
| log.debug("Encountered connection loss."); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that this always happens? This looks quite brittle to me. What if the restart is so fast that the client does not lose its connection?
| while (true) { | ||
| try { | ||
| long id = idCounter.get(); | ||
| assertThat(id, is(localCounter.get())); | ||
| break; | ||
| } catch (IllegalStateException ignore) { | ||
| log.debug("During ZooKeeper client reconnecting..."); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a timeout/deadline here might make sense.
| } | ||
| } | ||
|
|
||
| assertThat(idCounter.getLastState(), is(ConnectionState.RECONNECTED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not important for the test to ensure that some internal state is RECONNECTED. What we should try to test is that we can increment the ID counter under loss of connection but it is not important how exactly this works.
| @VisibleForTesting | ||
| ConnectionState getLastState() { | ||
| return connStateListener.lastState; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this exposes internal details which are not relevant for the test. I would try to write the test without exposing these internals.
|
|
||
| private volatile ConnectionState lastState; | ||
| private void checkConnectionState() { | ||
| final ConnectionState lastState = this.lastState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not shadow the local variable. One could rename the variable currentLastState.
| client.getConnectionStateListenable().addListener(connStateListener); | ||
|
|
||
| for (ConnectionStateListener listener : connectionStateListeners) { | ||
| client.getConnectionStateListenable().addListener(listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since getConnectionStateListenable does not guarantee the order in which the listener are called, the added test case is unstable (if the testing listener is called before the one which sets lastState). I suggest to introduce a LastStateConnectionStateListener (similar to SharedCountConnectionStateListener) which we pass into the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way is that we instead implement a chain of listeners so that the order is deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about this but I think the other approach is easier to understand.
…CheckpointIDCounter reconnects to ZK
…rd from connection loss
… testable codebase
In order to avoid race conditions between notifying different listeners, this commit introduces the LastStateConnectionStateListener which is passed into the ZooKeeperCheckpointIDCounter. This listener can be modified to fulfill the required testing purposes in ZKCheckpointIDCounterMultiServersTest#testRecoveredAfterConnectionLoss.
4d0d330 to
357601e
Compare
|
Thanks for reviewing and merging this patch! |
SO, does it need be fixed in FLINK 1.7 ?? |
@Wangtao87 the community no longer actively supports Flink 1.7. Hence you would need to backport the fix to this version yourself if needed. |
What is the purpose of the change
ZKCheckpointIDCounterdoesn't tolerate ZK suspended & reconnected while it could do. This causes that job can not trigger checkpoint forever after zookeeper change leader.Brief change log
Allow updates to connection state when ZKCheckpointIDCounter reconnects to ZK.
Verifying this change
This change is a trivial fix that can be reasoned by code.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation