Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6174][HA]introduce a SMARTER leader latch to make JobManager less sensitive to disconnect to zookeeper #3599

Closed
wants to merge 8 commits into from

Conversation

WangTaoTheTonic
Copy link
Contributor

@WangTaoTheTonic WangTaoTheTonic commented Mar 23, 2017

Now in yarn mode, if we use zookeeper as high availability choice, it will create a election service to get a leader depending on zookeeper election.

When zookeeper leader crashes or the connection between JobManager and zookeeper instance was broken, JobManager's leadership will be revoked and send a Disconnect message to TaskManager, which will cancle all running tasks and make them waiting connection rebuild between JM and ZK.

In yarn mode, we have one and only JobManager(AM) in same time, and it should be alwasy leader instead of elected through zookeeper. We can introduce a new leader election service in yarn mode to achive that.

Update:
In case of "split brain" issue, we cannot directly set one JM to leader alltime. Instead i introduce a smarter leader latch that will cache the suspend state and wait a connection timeout duration until the connection to zookeeper is back.

@wenlong88
Copy link
Contributor

Hi, @WangTaoTheTonic I think we can improve the reaction of ZookeeperLeaderElectionService on zookeeper connection expired or other errors instead of introducing the AlwaysLeaderService such as adding a retry before revoking leadership, because when the problem is caused by errors on the machine which JM is running on, we need to trigger a failover to make the JM change a machine.
On the other hand, in the coming FLIP-6 implementation, JM failover will not trigger cancelling all running tasks.

@WangTaoTheTonic
Copy link
Contributor Author

Thanks for your comments @wenlong88 .

I also gave a thought about adding retry logic when zk failover, but this part should modify LeaderLatch in curator, which is a 3rd party library, or we can only add a our private LeaderLatch through coping most parts of the implementation in curator.

Even with adding this AlwaysLeaderService, the JM failover can also go well as RM will start a new instance.

about FLIP-6, I'll check the solution and find if anything can help with this :)

@wenlong88
Copy link
Contributor

wenlong88 commented Mar 23, 2017

Hi, I may have described my concern wrongly in the last comment, my concern is that in yarn it is possible that two application master running at the same time:

eg:
RM launches a AM and then the machine lost connection with RM by some reason, RM will launch another AM. It is possible that the first AM will be still running when launching the second AM in some scenario like NM heartbeat timeout but running.

When it is possible that there are two AM running at the same time, we may go into a dead lock using the AlwaysLeaderService as follows:

  1. the first AM grant leadership
  2. the second AM grant leadership
  3. the second AM write leader info
  4. the first AM write leader info
  5. the first AM killed by NM or some cluster monitor tool since RM marked NM as unavailable.

@StephanEwen
Copy link
Contributor

-1 sorry.

This needs to go to the drawing board (FLIP or detailed JIRA discussion) before we consider a change that is impacting the guarantees and failure mode so heavily.

Some initial comments:

  • In proper HA, you need some service that "locks" the leader, otherwise you are vulnerable to the "split brain" problem where a network partition makes multiple JobManagers work as leaders, each with some TaskManagers.

  • In FLIP-6, we are introducing the HighAvailabilityServices to allow for multiple levels of guarantees with different implementations. I can see that introducing a highly-available but not split-brain-protected is interesting, but it should not replace any existing mode, but be a new mode.

@StephanEwen
Copy link
Contributor

I would suggest to fix this the following way:

  • There is an upcoming patch that makes the Flink codebase use the HighAvailabilityServices properly in all places.
  • We introduce a new HA mode called yarnsimple or so (next to none and zookeeper) and instantiate a new implementation of HighAvailabilityServices which is ZooKeeper independent.
  • The new implementation of the High Availability Services does not use ZooKeeper. It uses a leader service that always grants the JobManager leadership, but also implements a way for TaskManagers to find the JobManager (to be seen how, possibly a file in HDFS or so). It also implements a ZooKeeper independent CompletedCheckpointStore that finds checkpoints by maintaining a file with completed checkpoints.

That is all not a "proper" HA setup - it only works as long as there is strictly only one master
But it comes close and is ZooKeeper independent.

Is that what you are looking for?

@WangTaoTheTonic
Copy link
Contributor Author

WangTaoTheTonic commented Mar 25, 2017

I don't think it's a good idea, as it can not solve the "split brain" issue too.

The key problem is that LeaderLatch in curator is too sensitive to connection state to Zookeeper(it will revoke leadership when connection to zookeeper is temporarily broke), and probably the best way is offerring a "duller"(by adding waiting or retrying) LeaderLatch, which can be also used in standalone cluster.

I did same work in our own private Spark release, let me see if it can be reused.

@WangTaoTheTonic WangTaoTheTonic changed the title [FLINK-6174][HA]introduce a new election service to make JobManager always available [FLINK-6174][HA]introduce a SMARTER leader latch to make JobManager less sensitive to disconnect to zookeeper Mar 29, 2017
@WangTaoTheTonic
Copy link
Contributor Author

WangTaoTheTonic commented Mar 29, 2017

@StephanEwen
I've done the changes, which introduce a new smarter leader latch(the reason why i write a new class is that handleStateChange method is private in LeaderLatch and cannot be overrided) which will wait a connection timeout duration when connection to zookeeper is broken, instead of revoking leadership immediately.

@WangTaoTheTonic
Copy link
Contributor Author

@wenlong88 Feel free to review, thanks :)

@StephanEwen
Copy link
Contributor

Thanks for adding this!
I have a few questions:

  • From the formatting, the code looks like might be adapted from some other project. If yes, can you share from where and add a comment above the code that it is adopted from another project. Also, formatting in the common Flink style would be good.

  • The HA leader election is very critical, so we usually don't just replace code that has worked well (for many users with new code that we have not extensively tested. Therefore, I suggest to add a config option to activate this smarter latch, thus making it optional for users that want to use it initially.

  • Once this proves stable, we can make it the default in the next version.

@@ -70,6 +70,15 @@ under the License.
<include>org.apache.curator:*</include>
</includes>
</artifactSet>
<relocations>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here? I think relocation happens in flink-runtime, when it puts curator into its shaded jar.

@tillrohrmann
Copy link
Contributor

Closing this PR because of inactivity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants