diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 08a054e129d..c3664a3b1c5 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1025,12 +1025,13 @@ of servers -- that is, when deploying clusters of servers. non-authenticated UDP-based version of fast leader election, "2" corresponds to the authenticated UDP-based version of fast leader election, and "3" corresponds to TCP-based version of - fast leader election. Currently, algorithm 3 is the default. + fast leader election. Algorithm 3 was made default in 3.2.0 and + prior versions (3.0.0 and 3.1.0) were using algorithm 1 and 2 as well. ###### Note - >The implementations of leader election 1, and 2 are now - **deprecated**. We have the intention - of removing them in the next release, at which point only the - FastLeaderElection will be available. + >The implementations of leader election 1, and 2 were + **deprecated** in 3.4.0. Since 3.6.0 only FastLeaderElection is available, + in case of upgrade you have to shutdown all of your servers and + restart them with electionAlg=3 (or by removing the line from the configuration file). > * *initLimit* : (No Java system property) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java deleted file mode 100644 index cb0fec8d928..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java +++ /dev/null @@ -1,956 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.zookeeper.common.Time; -import org.apache.zookeeper.jmx.MBeanRegistry; -import org.apache.zookeeper.server.ZooKeeperThread; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @deprecated This class has been deprecated as of release 3.4.0. - */ -@Deprecated -public class AuthFastLeaderElection implements Election { - - private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class); - - /* Sequence numbers for messages */ - static int sequencer = 0; - static int maxTag = 0; - - /* - * Determine how much time a process has to wait once it believes that it - * has reached the end of leader election. - */ - static int finalizeWait = 100; - - /* - * Challenge counter to avoid replay attacks - */ - - static int challengeCounter = 0; - - /* - * Flag to determine whether to authenticate or not - */ - - private boolean authEnabled = false; - - public static class Notification { - - /* - * Proposed leader - */ long leader; - - /* - * zxid of the proposed leader - */ long zxid; - - /* - * Epoch - */ long epoch; - - /* - * current state of sender - */ QuorumPeer.ServerState state; - - /* - * Address of the sender - */ InetSocketAddress addr; - - } - - /* - * Messages to send, both Notifications and Acks - */ - public static class ToSend { - - enum mType { - crequest, - challenge, - notification, - ack - } - - ToSend(mType type, long tag, long leader, long zxid, long epoch, ServerState state, InetSocketAddress addr) { - - switch (type) { - case crequest: - this.type = 0; - this.tag = tag; - this.leader = leader; - this.zxid = zxid; - this.epoch = epoch; - this.state = state; - this.addr = addr; - - break; - case challenge: - this.type = 1; - this.tag = tag; - this.leader = leader; - this.zxid = zxid; - this.epoch = epoch; - this.state = state; - this.addr = addr; - - break; - case notification: - this.type = 2; - this.leader = leader; - this.zxid = zxid; - this.epoch = epoch; - this.state = QuorumPeer.ServerState.LOOKING; - this.tag = tag; - this.addr = addr; - - break; - case ack: - this.type = 3; - this.tag = tag; - this.leader = leader; - this.zxid = zxid; - this.epoch = epoch; - this.state = state; - this.addr = addr; - - break; - default: - break; - } - } - - /* - * Message type: 0 notification, 1 acknowledgement - */ int type; - - /* - * Proposed leader in the case of notification - */ long leader; - - /* - * id contains the tag for acks, and zxid for notifications - */ long zxid; - - /* - * Epoch - */ long epoch; - - /* - * Current state; - */ QuorumPeer.ServerState state; - - /* - * Message tag - */ long tag; - - InetSocketAddress addr; - - } - - LinkedBlockingQueue sendqueue; - - LinkedBlockingQueue recvqueue; - - private class Messenger { - - final DatagramSocket mySocket; - long lastProposedLeader; - long lastProposedZxid; - long lastEpoch; - final Set ackset; - final ConcurrentHashMap challengeMap; - final ConcurrentHashMap challengeMutex; - final ConcurrentHashMap ackMutex; - final ConcurrentHashMap> addrChallengeMap; - - class WorkerReceiver extends ZooKeeperThread { - - DatagramSocket mySocket; - Messenger myMsg; - - WorkerReceiver(DatagramSocket s, Messenger msg) { - super("WorkerReceiver-" + s.getRemoteSocketAddress()); - mySocket = s; - myMsg = msg; - } - - boolean saveChallenge(long tag, long challenge) { - Semaphore s = challengeMutex.get(tag); - if (s != null) { - synchronized (Messenger.this) { - challengeMap.put(tag, challenge); - challengeMutex.remove(tag); - } - - s.release(); - } else { - LOG.error("No challenge mutex object"); - } - - return true; - } - - public void run() { - byte[] responseBytes = new byte[48]; - ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); - DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length); - while (true) { - // Sleeps on receive - try { - responseBuffer.clear(); - mySocket.receive(responsePacket); - } catch (IOException e) { - LOG.warn("Ignoring exception receiving", e); - } - // Receive new message - if (responsePacket.getLength() != responseBytes.length) { - LOG.warn("Got a short response: {} {}", responsePacket.getLength(), responsePacket.toString()); - continue; - } - responseBuffer.clear(); - int type = responseBuffer.getInt(); - if ((type > 3) || (type < 0)) { - LOG.warn("Got bad Msg type: {}", type); - continue; - } - long tag = responseBuffer.getLong(); - - QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; - switch (responseBuffer.getInt()) { - case 0: - ackstate = QuorumPeer.ServerState.LOOKING; - break; - case 1: - ackstate = QuorumPeer.ServerState.LEADING; - break; - case 2: - ackstate = QuorumPeer.ServerState.FOLLOWING; - break; - default: - LOG.warn("unknown type {}", responseBuffer.getInt()); - break; - } - - Vote current = self.getCurrentVote(); - - switch (type) { - case 0: - // Receive challenge request - ToSend c = new ToSend( - ToSend.mType.challenge, - tag, - current.getId(), - current.getZxid(), - logicalclock.get(), - self.getPeerState(), - (InetSocketAddress) responsePacket.getSocketAddress()); - sendqueue.offer(c); - break; - case 1: - // Receive challenge and store somewhere else - long challenge = responseBuffer.getLong(); - saveChallenge(tag, challenge); - - break; - case 2: - Notification n = new Notification(); - n.leader = responseBuffer.getLong(); - n.zxid = responseBuffer.getLong(); - n.epoch = responseBuffer.getLong(); - n.state = ackstate; - n.addr = (InetSocketAddress) responsePacket.getSocketAddress(); - - if ((myMsg.lastEpoch <= n.epoch) - && ((n.zxid > myMsg.lastProposedZxid) - || ((n.zxid == myMsg.lastProposedZxid) - && (n.leader > myMsg.lastProposedLeader)))) { - myMsg.lastProposedZxid = n.zxid; - myMsg.lastProposedLeader = n.leader; - myMsg.lastEpoch = n.epoch; - } - - long recChallenge; - InetSocketAddress addr = (InetSocketAddress) responsePacket.getSocketAddress(); - if (authEnabled) { - ConcurrentHashMap tmpMap = addrChallengeMap.get(addr); - if (tmpMap != null) { - if (tmpMap.get(tag) != null) { - recChallenge = responseBuffer.getLong(); - - if (tmpMap.get(tag) == recChallenge) { - recvqueue.offer(n); - - ToSend a = new ToSend( - ToSend.mType.ack, - tag, - current.getId(), - current.getZxid(), - logicalclock.get(), - self.getPeerState(), - addr); - - sendqueue.offer(a); - } else { - LOG.warn("Incorrect challenge: {}, {}", recChallenge, addrChallengeMap.toString()); - } - } else { - LOG.warn("No challenge for host: {} {}", addr, tag); - } - } - } else { - recvqueue.offer(n); - - ToSend a = new ToSend( - ToSend.mType.ack, - tag, - current.getId(), - current.getZxid(), - logicalclock.get(), - self.getPeerState(), - (InetSocketAddress) responsePacket.getSocketAddress()); - - sendqueue.offer(a); - } - break; - - // Upon reception of an ack message, remove it from the - // queue - case 3: - Semaphore s = ackMutex.get(tag); - - if (s != null) { - s.release(); - } else { - LOG.error("Empty ack semaphore"); - } - - ackset.add(tag); - - if (authEnabled) { - ConcurrentHashMap tmpMap = addrChallengeMap.get(responsePacket.getSocketAddress()); - if (tmpMap != null) { - tmpMap.remove(tag); - } else { - LOG.warn("No such address in the ensemble configuration {}", responsePacket.getSocketAddress()); - } - } - - if (ackstate != QuorumPeer.ServerState.LOOKING) { - Notification outofsync = new Notification(); - outofsync.leader = responseBuffer.getLong(); - outofsync.zxid = responseBuffer.getLong(); - outofsync.epoch = responseBuffer.getLong(); - outofsync.state = ackstate; - outofsync.addr = (InetSocketAddress) responsePacket.getSocketAddress(); - - recvqueue.offer(outofsync); - } - - break; - // Default case - default: - LOG.warn("Received message of incorrect type {}", type); - break; - } - } - } - - } - - class WorkerSender extends ZooKeeperThread { - - Random rand; - int maxAttempts; - int ackWait = finalizeWait; - - /* - * Receives a socket and max number of attempts as input - */ - - WorkerSender(int attempts) { - super("WorkerSender"); - maxAttempts = attempts; - rand = new Random(java.lang.Thread.currentThread().getId() + Time.currentElapsedTime()); - } - - long genChallenge() { - byte[] buf = new byte[8]; - - buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24); - buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16); - buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8); - buf[3] = (byte) ((challengeCounter & 0x000000ff)); - - challengeCounter++; - int secret = rand.nextInt(java.lang.Integer.MAX_VALUE); - - buf[4] = (byte) ((secret & 0xff000000) >>> 24); - buf[5] = (byte) ((secret & 0x00ff0000) >>> 16); - buf[6] = (byte) ((secret & 0x0000ff00) >>> 8); - buf[7] = (byte) ((secret & 0x000000ff)); - - return (((long) (buf[0] & 0xFF)) << 56) - + (((long) (buf[1] & 0xFF)) << 48) - + (((long) (buf[2] & 0xFF)) << 40) - + (((long) (buf[3] & 0xFF)) << 32) - + (((long) (buf[4] & 0xFF)) << 24) - + (((long) (buf[5] & 0xFF)) << 16) - + (((long) (buf[6] & 0xFF)) << 8) - + ((long) (buf[7] & 0xFF)); - } - - public void run() { - while (true) { - try { - ToSend m = sendqueue.take(); - process(m); - } catch (InterruptedException e) { - break; - } - - } - } - - @SuppressFBWarnings( - value = "RV_RETURN_VALUE_IGNORED", - justification = "tryAcquire result not chacked, but it is not an issue") - private void process(ToSend m) { - int attempts = 0; - byte[] zeroes; - byte[] requestBytes = new byte[48]; - DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length); - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - - switch (m.type) { - case 0: - /* - * Building challenge request packet to send - */ - requestBuffer.clear(); - requestBuffer.putInt(ToSend.mType.crequest.ordinal()); - requestBuffer.putLong(m.tag); - requestBuffer.putInt(m.state.ordinal()); - zeroes = new byte[32]; - requestBuffer.put(zeroes); - - requestPacket.setLength(48); - try { - requestPacket.setSocketAddress(m.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() - + " with addr:" + m.addr, e); - } - - try { - if (challengeMap.get(m.tag) == null) { - mySocket.send(requestPacket); - } - } catch (IOException e) { - LOG.warn("Exception while sending challenge: ", e); - } - - break; - case 1: - /* - * Building challenge packet to send - */ - - long newChallenge; - ConcurrentHashMap tmpMap = addrChallengeMap.get(m.addr); - if (tmpMap != null) { - Long tmpLong = tmpMap.get(m.tag); - if (tmpLong != null) { - newChallenge = tmpLong; - } else { - newChallenge = genChallenge(); - } - - tmpMap.put(m.tag, newChallenge); - - requestBuffer.clear(); - requestBuffer.putInt(ToSend.mType.challenge.ordinal()); - requestBuffer.putLong(m.tag); - requestBuffer.putInt(m.state.ordinal()); - requestBuffer.putLong(newChallenge); - zeroes = new byte[24]; - requestBuffer.put(zeroes); - - requestPacket.setLength(48); - try { - requestPacket.setSocketAddress(m.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() - + " with addr:" + m.addr, e); - } - - try { - mySocket.send(requestPacket); - } catch (IOException e) { - LOG.warn("Exception while sending challenge: ", e); - } - } else { - LOG.error("Address is not in the configuration: {}", m.addr); - } - - break; - case 2: - - /* - * Building notification packet to send - */ - - requestBuffer.clear(); - requestBuffer.putInt(m.type); - requestBuffer.putLong(m.tag); - requestBuffer.putInt(m.state.ordinal()); - requestBuffer.putLong(m.leader); - requestBuffer.putLong(m.zxid); - requestBuffer.putLong(m.epoch); - zeroes = new byte[8]; - requestBuffer.put(zeroes); - - requestPacket.setLength(48); - try { - requestPacket.setSocketAddress(m.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() - + " with addr:" + m.addr, e); - } - - boolean myChallenge = false; - boolean myAck = false; - - while (attempts < maxAttempts) { - try { - /* - * Try to obtain a challenge only if does not have - * one yet - */ - - if (!myChallenge && authEnabled) { - ToSend crequest = new ToSend( - ToSend.mType.crequest, - m.tag, - m.leader, - m.zxid, - m.epoch, - QuorumPeer.ServerState.LOOKING, - m.addr); - sendqueue.offer(crequest); - - try { - double timeout = ackWait * java.lang.Math.pow(2, attempts); - - Semaphore s = new Semaphore(0); - synchronized (Messenger.this) { - challengeMutex.put(m.tag, s); - s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS); - myChallenge = challengeMap.containsKey(m.tag); - } - } catch (InterruptedException e) { - LOG.warn("Challenge request exception: ", e); - } - } - - /* - * If don't have challenge yet, skip sending - * notification - */ - - if (authEnabled && !myChallenge) { - attempts++; - continue; - } - - if (authEnabled) { - requestBuffer.position(40); - Long tmpLong = challengeMap.get(m.tag); - if (tmpLong != null) { - requestBuffer.putLong(tmpLong); - } else { - LOG.warn("No challenge with tag: {}", m.tag); - } - } - mySocket.send(requestPacket); - try { - Semaphore s = new Semaphore(0); - double timeout = ackWait * java.lang.Math.pow(10, attempts); - ackMutex.put(m.tag, s); - s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("Ack exception: ", e); - } - - if (ackset.remove(m.tag)) { - myAck = true; - } - - } catch (IOException e) { - LOG.warn("Sending exception: ", e); - /* - * Do nothing, just try again - */ - } - if (myAck) { - /* - * Received ack successfully, so return - */ - challengeMap.remove(m.tag); - - return; - } else { - attempts++; - } - } - /* - * Return message to queue for another attempt later if - * epoch hasn't changed. - */ - if (m.epoch == logicalclock.get()) { - challengeMap.remove(m.tag); - sendqueue.offer(m); - } - break; - case 3: - - requestBuffer.clear(); - requestBuffer.putInt(m.type); - requestBuffer.putLong(m.tag); - requestBuffer.putInt(m.state.ordinal()); - requestBuffer.putLong(m.leader); - requestBuffer.putLong(m.zxid); - requestBuffer.putLong(m.epoch); - - requestPacket.setLength(48); - try { - requestPacket.setSocketAddress(m.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() - + " with addr:" + m.addr, e); - } - - try { - mySocket.send(requestPacket); - } catch (IOException e) { - LOG.warn("Exception while sending ack: ", e); - } - break; - default: - LOG.warn("unknown type {}", m.type); - break; - } - } - - } - - Messenger(int threads, DatagramSocket s) { - mySocket = s; - ackset = Collections.newSetFromMap(new ConcurrentHashMap()); - challengeMap = new ConcurrentHashMap(); - challengeMutex = new ConcurrentHashMap(); - ackMutex = new ConcurrentHashMap(); - addrChallengeMap = new ConcurrentHashMap>(); - lastProposedLeader = 0; - lastProposedZxid = 0; - lastEpoch = 0; - - for (int i = 0; i < threads; ++i) { - Thread t = new Thread(new WorkerSender(3), "WorkerSender Thread: " + (i + 1)); - t.setDaemon(true); - t.start(); - } - - for (QuorumServer server : self.getVotingView().values()) { - InetAddress address = server.addr.getReachableOrOne().getAddress(); - InetSocketAddress saddr = new InetSocketAddress(address, port); - addrChallengeMap.put(saddr, new ConcurrentHashMap()); - } - - Thread t = new Thread(new WorkerReceiver(s, this), "WorkerReceiver Thread"); - t.start(); - } - - } - - QuorumPeer self; - int port; - AtomicLong logicalclock = new AtomicLong(); /* Election instance */ - DatagramSocket mySocket; - long proposedLeader; - long proposedZxid; - - public AuthFastLeaderElection(QuorumPeer self, boolean auth) { - this.authEnabled = auth; - starter(self); - } - - public AuthFastLeaderElection(QuorumPeer self) { - starter(self); - } - - private void starter(QuorumPeer self) { - this.self = self; - port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0); - proposedLeader = -1; - proposedZxid = -1; - - try { - mySocket = new DatagramSocket(port); - // mySocket.setSoTimeout(20000); - } catch (SocketException e1) { - e1.printStackTrace(); - throw new RuntimeException(); - } - sendqueue = new LinkedBlockingQueue(2 * self.getVotingView().size()); - recvqueue = new LinkedBlockingQueue(2 * self.getVotingView().size()); - new Messenger(self.getVotingView().size() * 2, mySocket); - } - - private void leaveInstance() { - logicalclock.incrementAndGet(); - } - - private void sendNotifications() { - for (QuorumServer server : self.getView().values()) { - - InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne(); - ToSend notmsg = new ToSend( - ToSend.mType.notification, - AuthFastLeaderElection.sequencer++, - proposedLeader, - proposedZxid, - logicalclock.get(), - QuorumPeer.ServerState.LOOKING, - address); - - sendqueue.offer(notmsg); - } - } - - private boolean totalOrderPredicate(long id, long zxid) { - return (zxid > proposedZxid) || ((zxid == proposedZxid) && (id > proposedLeader)); - - } - - private boolean termPredicate(Map votes, long l, long zxid) { - - Collection votesCast = votes.values(); - int count = 0; - /* - * First make the views consistent. Sometimes peers will have different - * zxids for a server depending on timing. - */ - for (Vote v : votesCast) { - if ((v.getId() == l) && (v.getZxid() == zxid)) { - count++; - } - } - - return count > (self.getVotingView().size() / 2); - - } - - /** - * There is nothing to shutdown in this implementation of - * leader election, so we simply have an empty method. - */ - public void shutdown() { - } - - /** - * Invoked in QuorumPeer to find or elect a new leader. - * - * @throws InterruptedException - */ - public Vote lookForLeader() throws InterruptedException { - try { - self.jmxLeaderElectionBean = new LeaderElectionBean(); - MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - self.jmxLeaderElectionBean = null; - } - - try { - HashMap recvset = new HashMap(); - - HashMap outofelection = new HashMap(); - - logicalclock.incrementAndGet(); - - proposedLeader = self.getId(); - proposedZxid = self.getLastLoggedZxid(); - - LOG.info("Election tally"); - sendNotifications(); - - /* - * Loop in which we exchange notifications until we find a leader - */ - - while (self.getPeerState() == ServerState.LOOKING) { - /* - * Remove next notification from queue, times out after 2 times - * the termination time - */ - Notification n = recvqueue.poll(2 * finalizeWait, TimeUnit.MILLISECONDS); - - /* - * Sends more notifications if haven't received enough. - * Otherwise processes new notification. - */ - if (n == null) { - if (((!outofelection.isEmpty()) || (recvset.size() > 1))) { - sendNotifications(); - } - } else { - switch (n.state) { - case LOOKING: - if (n.epoch > logicalclock.get()) { - logicalclock.set(n.epoch); - recvset.clear(); - if (totalOrderPredicate(n.leader, n.zxid)) { - proposedLeader = n.leader; - proposedZxid = n.zxid; - } - sendNotifications(); - } else if (n.epoch < logicalclock.get()) { - break; - } else if (totalOrderPredicate(n.leader, n.zxid)) { - proposedLeader = n.leader; - proposedZxid = n.zxid; - - sendNotifications(); - } - - recvset.put(n.addr, new Vote(n.leader, n.zxid)); - - // If have received from all nodes, then terminate - if (self.getVotingView().size() == recvset.size()) { - self.setPeerState((proposedLeader == self.getId()) - ? ServerState.LEADING - : ServerState.FOLLOWING); - // if (self.state == ServerState.FOLLOWING) { - // Thread.sleep(100); - // } - leaveInstance(); - return new Vote(proposedLeader, proposedZxid); - - } else if (termPredicate(recvset, proposedLeader, proposedZxid)) { - // Otherwise, wait for a fixed amount of time - LOG.info("Passed predicate"); - Thread.sleep(finalizeWait); - - // Notification probe = recvqueue.peek(); - - // Verify if there is any change in the proposed leader - while ((!recvqueue.isEmpty()) - && !totalOrderPredicate(recvqueue.peek().leader, recvqueue.peek().zxid)) { - recvqueue.poll(); - } - if (recvqueue.isEmpty()) { - // LOG.warn("Proposed leader: " + - // proposedLeader); - self.setPeerState((proposedLeader == self.getId()) - ? ServerState.LEADING - : ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(proposedLeader, proposedZxid); - } - } - break; - case LEADING: - outofelection.put(n.addr, new Vote(n.leader, n.zxid)); - - if (termPredicate(outofelection, n.leader, n.zxid)) { - - self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - break; - case FOLLOWING: - outofelection.put(n.addr, new Vote(n.leader, n.zxid)); - - if (termPredicate(outofelection, n.leader, n.zxid)) { - - self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING); - - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - break; - default: - break; - } - } - } - - return null; - } finally { - try { - if (self.jmxLeaderElectionBean != null) { - MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); - } - } catch (Exception e) { - LOG.warn("Failed to unregister with JMX", e); - } - self.jmxLeaderElectionBean = null; - } - } - -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 72d06ecbd05..415a738080e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1227,11 +1227,9 @@ protected Election createElectionAlgorithm(int electionAlgorithm) { //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 1: - le = new AuthFastLeaderElection(this); - break; + throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); case 2: - le = new AuthFastLeaderElection(this, true); - break; + throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); case 3: QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index 459a057d605..99fc2ed8244 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -312,8 +312,8 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti connectToLearnerMasterLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) { electionAlg = Integer.parseInt(value); - if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) { - throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported."); + if (electionAlg != 3) { + throw new ConfigException("Invalid electionAlg value. Only 3 is supported."); } } else if (key.equals("quorumListenOnAllIPs")) { quorumListenOnAllIPs = Boolean.parseBoolean(value); diff --git a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml index 28ac46858b8..2a352ccfb03 100644 --- a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml +++ b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml @@ -117,11 +117,6 @@ - - - - -