Skip to content

Commit

Permalink
leader 选举
Browse files Browse the repository at this point in the history
  • Loading branch information
haha174 committed Mar 21, 2020
1 parent 7e891aa commit 1174717
Showing 1 changed file with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,31 @@
public class LeaderElectionSupport implements Watcher {

private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionSupport.class);

/**
* zookeeper client
*/
private ZooKeeper zooKeeper;

/**
* 状态
*/
private State state;
private Set<LeaderElectionAware> listeners;

/**
* 选举的根节点地址
*/
private String rootNodeName;
/**
* leader 对象类
*/
private LeaderOffer leaderOffer;
/**
* 当前的主机名
*/
private String hostName;

/**
* 构造函数 构造一个对象线程安全的set 集合 存放监听事件
*/
public LeaderElectionSupport() {
state = State.STOP;
listeners = Collections.synchronizedSet(new HashSet<>());
Expand All @@ -116,8 +131,12 @@ public LeaderElectionSupport() {
* listeners.
* </p>
*/
/**
*选举的开始方法
*/
public synchronized void start() {
state = State.START;
// 广播选举开始
dispatchEvent(EventType.START);

LOG.info("Starting leader election support");
Expand All @@ -144,6 +163,9 @@ public synchronized void start() {
* Stops all election services, revokes any outstanding leader offers, and
* disconnects from ZooKeeper.
*/
/**
* 停止选举并删除 对应节点
*/
public synchronized void stop() {
state = State.STOP;
dispatchEvent(EventType.STOP_START);
Expand All @@ -162,6 +184,11 @@ public synchronized void stop() {
dispatchEvent(EventType.STOP_COMPLETE);
}

/**
* 在root 目录下创建节点
* @throws KeeperException
* @throws InterruptedException
*/
private void makeOffer() throws KeeperException, InterruptedException {
state = State.OFFER;
dispatchEvent(EventType.OFFER_START);
Expand All @@ -173,6 +200,7 @@ private void makeOffer() throws KeeperException, InterruptedException {
hostnameBytes = hostName.getBytes();
newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
// 零时节点
CreateMode.EPHEMERAL_SEQUENTIAL));
leaderOffer = newLeaderOffer;
}
Expand All @@ -185,6 +213,12 @@ private synchronized LeaderOffer getLeaderOffer() {
return leaderOffer;
}

/**
*
* 选出最小序号的文件 对应的机器就是leader
* @throws KeeperException
* @throws InterruptedException
*/
private void determineElectionStatus() throws KeeperException, InterruptedException {

state = State.DETERMINE;
Expand Down Expand Up @@ -213,8 +247,10 @@ private void determineElectionStatus() throws KeeperException, InterruptedExcept
dispatchEvent(EventType.DETERMINE_COMPLETE);

if (i == 0) {
// 最小的那个变成leader
becomeLeader();
} else {
// 其他的是非leader
becomeReady(leaderOffers.get(i - 1));
}

Expand All @@ -236,6 +272,10 @@ private void becomeReady(LeaderOffer neighborLeaderOffer)
* Make sure to pass an explicit Watcher because we could be sharing this
* zooKeeper instance with someone else.
*/
/**
*
* 进行watch,监视上一个节点 如果上一个节点删除了 就重新掉用determineElectionStatus
*/
Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this);

if (stat != null) {
Expand Down

0 comments on commit 1174717

Please sign in to comment.