From 68a0382093da1d64583211746ac672ed12f5da5c Mon Sep 17 00:00:00 2001 From: Abraham Fine Date: Tue, 12 Dec 2017 16:35:50 -0800 Subject: [PATCH 1/4] ZOOKEEPER-2953: Flaky Test: testNoLogBeforeLeaderEstablishment --- .../server/quorum/QuorumPeerMainTest.java | 167 ++++++++++++++---- .../server/quorum/QuorumPeerTestBase.java | 27 ++- .../org/apache/zookeeper/test/QuorumTest.java | 92 +--------- 3 files changed, 158 insertions(+), 128 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index a5ca72f7850..b135355a0af 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; @@ -83,7 +84,7 @@ public void testQuorum() throws Exception { String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 - + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2; MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); @@ -335,6 +336,100 @@ public void testHighestZxidJoinLate() throws Exception { output[0], 2); } + /** + * This test validates that if a quorum member determines that it is leader without the support of the rest of the + * quorum (the other members do not believe it to be the leader) it will stop attempting to lead and become a follower. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testElectionFraud() throws IOException, InterruptedException { + // capture QuorumPeer logging + Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + WriterAppender appender = new WriterAppender(layout, os); + appender.setThreshold(Level.INFO); + Logger qlogger = Logger.getLogger(QuorumPeer.class); + qlogger.addAppender(appender); + + int numServers = 3; + + // used for assertions later + boolean foundLeading = false; + boolean foundLooking = false; + boolean foundFollowing = false; + + try { + // spin up a quorum, we use a small ticktime to make the test run faster + Servers servers = LaunchServers(numServers, 500); + + // find the leader + int trueLeader = -1; + for (int i = 0; i < numServers; i++) { + if (servers.mt[i].main.quorumPeer.leader != null) { + trueLeader = i; + } + } + Assert.assertTrue("There should be a leader", trueLeader >= 0); + + // find a follower + int falseLeader = (trueLeader + 1) % numServers; + Assert.assertTrue(servers.mt[falseLeader].main.quorumPeer.follower != null); + + // to keep the quorum peer running and force it to go into the looking state, we kill leader election + // and close the connection to the leader + servers.mt[falseLeader].main.quorumPeer.electionAlg.shutdown(); + servers.mt[falseLeader].main.quorumPeer.follower.getSocket().close(); + + // wait for the falseLeader to disconnect + waitForOne(servers.zk[falseLeader], States.CONNECTING); + + // convince falseLeader that it is the leader + servers.mt[falseLeader].main.quorumPeer.setPeerState(QuorumPeer.ServerState.LEADING); + + // provide time for the falseleader to realize no followers have connected + // (this is twice the timeout used in Leader#getEpochToPropose) + Thread.sleep(2 * servers.mt[falseLeader].main.quorumPeer.initLimit * servers.mt[falseLeader].main.quorumPeer.tickTime); + + // Restart leader election + servers.mt[falseLeader].main.quorumPeer.startLeaderElection(); + + // The previous client connection to falseLeader likely closed, create a new one + servers.zk[falseLeader] = new ZooKeeper("127.0.0.1:" + servers.mt[falseLeader].getClientPort(), ClientBase.CONNECTION_TIMEOUT, this); + + // Wait for falseLeader to rejoin the quorum + waitForOne(servers.zk[falseLeader], States.CONNECTED); + + // and ensure trueLeader is still the leader + Assert.assertTrue(servers.mt[trueLeader].main.quorumPeer.leader != null); + + // Look through the logs for output that indicates the falseLeader is LEADING, then LOOKING, then FOLLOWING + LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); + Pattern leading = Pattern.compile(".*myid=" + falseLeader + ".*LEADING.*"); + Pattern looking = Pattern.compile(".*myid=" + falseLeader + ".*LOOKING.*"); + Pattern following = Pattern.compile(".*myid=" + falseLeader + ".*FOLLOWING.*"); + + String line; + while ((line = r.readLine()) != null) { + if (!foundLeading) { + foundLeading = leading.matcher(line).matches(); + } else if(!foundLooking) { + foundLooking = looking.matcher(line).matches(); + } else if (following.matcher(line).matches()){ + foundFollowing = true; + break; + } + } + } finally { + qlogger.removeAppender(appender); + } + + Assert.assertTrue("falseLeader never attempts to become leader", foundLeading); + Assert.assertTrue("falseLeader never gives up on leadership", foundLooking); + Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing); + } + private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 500; while (zk.getState() != state) { @@ -371,40 +466,48 @@ private static class Servers { ZooKeeper zk[]; } - /** - * This is a helper function for launching a set of servers - * - * @param numServers - * @return - * @throws IOException - * @throws InterruptedException - */ - private Servers LaunchServers(int numServers) throws IOException, InterruptedException { - int SERVER_COUNT = numServers; - Servers svrs = new Servers(); - final int clientPorts[] = new int[SERVER_COUNT]; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n"); - } - String quorumCfgSection = sb.toString(); - MainThread mt[] = new MainThread[SERVER_COUNT]; - ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); - mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); - } - - waitForAll(zk, States.CONNECTED); - - svrs.mt = mt; - svrs.zk = zk; - return svrs; + private Servers LaunchServers(int numServers) throws IOException, InterruptedException { + return LaunchServers(numServers, null); } + /** * This is a helper function for launching a set of servers + * + * @param numServers* @param tickTime A ticktime to pass to MainThread + * @return + * @throws IOException + * @throws InterruptedException + */ + private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { + int SERVER_COUNT = numServers; + Servers svrs = new Servers(); + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n"); + } + String quorumCfgSection = sb.toString(); + + MainThread mt[] = new MainThread[SERVER_COUNT]; + ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + for(int i = 0; i < SERVER_COUNT; i++) { + if (tickTime != null) { + mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap(), tickTime); + } else { + mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); + } + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + + waitForAll(zk, States.CONNECTED); + + svrs.mt = mt; + svrs.zk = zk; + return svrs; + } + /** * Verify handling of bad quorum address */ diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java index 21aa81911e5..33999a7f505 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java @@ -65,7 +65,7 @@ public void shutdown() { } } } - + public static class MainThread implements Runnable { final File confFile; final File tmpDir; @@ -82,8 +82,18 @@ public static class MainThread implements Runnable { private String quorumCfgSection; private Map otherConfigs; + /** + * Create a MainThread + * + * @param myid + * @param clientPort + * @param quorumCfgSection + * @param otherConfigs + * @param tickTime initLimit will be 10 and syncLimit will be 5 + * @throws IOException + */ public MainThread(int myid, int clientPort, String quorumCfgSection, - Map otherConfigs) throws IOException { + Map otherConfigs, int tickTime) throws IOException { baseDir = ClientBase.createTmpDir(); this.myid = myid; this.clientPort = clientPort; @@ -94,7 +104,7 @@ public MainThread(int myid, int clientPort, String quorumCfgSection, confFile = new File(baseDir, "zoo.cfg"); FileWriter fwriter = new FileWriter(confFile); - fwriter.write("tickTime=4000\n"); + fwriter.write("tickTime=" + tickTime + "\n"); fwriter.write("initLimit=10\n"); fwriter.write("syncLimit=5\n"); @@ -149,7 +159,7 @@ public MainThread(int myid, int clientPort, String quorumCfgSection, boolean wri throws IOException { this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, writeDynamicConfigFile); } - + public MainThread(int myid, int clientPort, String quorumCfgSection, String peerType, boolean writeDynamicConfigFile) throws IOException { this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, peerType, writeDynamicConfigFile); @@ -220,7 +230,7 @@ public MainThread(int myid, int clientPort, int adminServerPort, Integer secureC if (secureClientPort != null) { fwriter.write("secureClientPort=" + secureClientPort + "\n"); } - + if (peerType != null) { fwriter.write("peerType=" + peerType + "\n"); } @@ -266,7 +276,7 @@ public File[] getDynamicFiles() { } public File[] getFilesWithPrefix(final String prefix) { - return tmpDir.listFiles(new FilenameFilter() { + return tmpDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return name.startsWith(prefix); @@ -296,6 +306,11 @@ public MainThread(int myid, int clientPort, String quorumCfgSection) new HashMap()); } + public MainThread(int myid, int clientPort, String quorumCfgSection, + Map otherConfigs) throws IOException { + this(myid, clientPort, quorumCfgSection, otherConfigs, 4000); + } + Thread currentThread; synchronized public void start() { diff --git a/src/java/test/org/apache/zookeeper/test/QuorumTest.java b/src/java/test/org/apache/zookeeper/test/QuorumTest.java index 469a9d16578..5e9b08cf821 100644 --- a/src/java/test/org/apache/zookeeper/test/QuorumTest.java +++ b/src/java/test/org/apache/zookeeper/test/QuorumTest.java @@ -313,7 +313,7 @@ public void testFollowersStartAfterLeader() throws Exception { // try to reestablish the quorum qu.start(index); - + // Connect the client after services are restarted (otherwise we would get // SessionExpiredException as the previous local session was not persisted). ZooKeeper zk = new ZooKeeper( @@ -321,7 +321,7 @@ public void testFollowersStartAfterLeader() throws Exception { ClientBase.CONNECTION_TIMEOUT, watcher); try{ - watcher.waitForConnected(CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); } catch(TimeoutException e) { Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); } @@ -329,94 +329,6 @@ public void testFollowersStartAfterLeader() throws Exception { zk.close(); } - /** - * Tests if closeSession can be logged before a leader gets established, which - * could lead to a locked-out follower (see ZOOKEEPER-790). - * - * The test works as follows. It has a client connecting to a follower f and - * sending batches of 1,000 updates. The goal is that f has a zxid higher than - * all other servers in the initial leader election. This way we can crash and - * recover the follower so that the follower believes it is the leader once it - * recovers (LE optimization: once a server receives a message from all other - * servers, it picks a leader. - * - * It also makes the session timeout very short so that we force the false - * leader to close the session and write it to the log in the buggy code (before - * ZOOKEEPER-790). Once f drops leadership and finds the current leader, its epoch - * is higher, and it rejects the leader. Now, if we prevent the leader from closing - * the session by only starting up (see Leader.lead()) once it obtains a quorum of - * supporters, then f will find the current leader and support it because it won't - * have a highe epoch. - * - */ - @Test - public void testNoLogBeforeLeaderEstablishment () throws Exception { - final Semaphore sem = new Semaphore(0); - - qu = new QuorumUtil(2, 10); - qu.startQuorum(); - - int index = 1; - while(qu.getPeer(index).peer.leader == null) - index++; - - Leader leader = qu.getPeer(index).peer.leader; - - Assert.assertNotNull(leader); - - /* - * Reusing the index variable to select a follower to connect to - */ - index = (index == 1) ? 2 : 1; - - ZooKeeper zk = new DisconnectableZooKeeper( - "127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), - ClientBase.CONNECTION_TIMEOUT, new Watcher() { - public void process(WatchedEvent event) { } - }); - - zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - for(int i = 0; i < 50000; i++) { - zk.setData("/blah", new byte[0], -1, new AsyncCallback.StatCallback() { - public void processResult(int rc, String path, Object ctx, - Stat stat) { - counter++; - if (rc != 0) { - errors++; - } - if(counter == 20000){ - sem.release(); - } - } - }, null); - - if(i == 5000){ - qu.shutdown(index); - LOG.info("Shutting down s1"); - } - if(i == 12000){ - qu.start(index); - LOG.info("Setting up server: " + index); - } - if((i % 1000) == 0){ - Thread.sleep(500); - } - } - - // Wait until all updates return - sem.tryAcquire(15, TimeUnit.SECONDS); - - // Verify that server is following and has the same epoch as the leader - Assert.assertTrue("Not following", qu.getPeer(index).peer.follower != null); - long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); - long epochL = (leader.getEpoch() >> 32L); - Assert.assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZxid() + - "Current epoch: " + epochF, epochF == epochL); - - zk.close(); - } - // skip superhammer and clientcleanup as they are too expensive for quorum /** From 9fac0e202bd0fda7aa9a4ab97962d52a75e55881 Mon Sep 17 00:00:00 2001 From: Abraham Fine Date: Wed, 13 Dec 2017 00:17:59 -0800 Subject: [PATCH 2/4] fix whitespace --- .../zookeeper/server/quorum/QuorumPeerMainTest.java | 2 +- .../zookeeper/server/quorum/QuorumPeerTestBase.java | 8 ++++---- src/java/test/org/apache/zookeeper/test/QuorumTest.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index b135355a0af..8fa23a7cad5 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -84,7 +84,7 @@ public void testQuorum() throws Exception { String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 - + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2; MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java index 33999a7f505..a357a6ed161 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java @@ -65,7 +65,7 @@ public void shutdown() { } } } - + public static class MainThread implements Runnable { final File confFile; final File tmpDir; @@ -159,7 +159,7 @@ public MainThread(int myid, int clientPort, String quorumCfgSection, boolean wri throws IOException { this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, writeDynamicConfigFile); } - + public MainThread(int myid, int clientPort, String quorumCfgSection, String peerType, boolean writeDynamicConfigFile) throws IOException { this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, peerType, writeDynamicConfigFile); @@ -230,7 +230,7 @@ public MainThread(int myid, int clientPort, int adminServerPort, Integer secureC if (secureClientPort != null) { fwriter.write("secureClientPort=" + secureClientPort + "\n"); } - + if (peerType != null) { fwriter.write("peerType=" + peerType + "\n"); } @@ -276,7 +276,7 @@ public File[] getDynamicFiles() { } public File[] getFilesWithPrefix(final String prefix) { - return tmpDir.listFiles(new FilenameFilter() { + return tmpDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return name.startsWith(prefix); diff --git a/src/java/test/org/apache/zookeeper/test/QuorumTest.java b/src/java/test/org/apache/zookeeper/test/QuorumTest.java index 5e9b08cf821..cd1d1532b63 100644 --- a/src/java/test/org/apache/zookeeper/test/QuorumTest.java +++ b/src/java/test/org/apache/zookeeper/test/QuorumTest.java @@ -313,7 +313,7 @@ public void testFollowersStartAfterLeader() throws Exception { // try to reestablish the quorum qu.start(index); - + // Connect the client after services are restarted (otherwise we would get // SessionExpiredException as the previous local session was not persisted). ZooKeeper zk = new ZooKeeper( @@ -321,7 +321,7 @@ public void testFollowersStartAfterLeader() throws Exception { ClientBase.CONNECTION_TIMEOUT, watcher); try{ - watcher.waitForConnected(CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); } catch(TimeoutException e) { Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); } From b5bc449cb94d421ee643881de4eafd4500c6ab4d Mon Sep 17 00:00:00 2001 From: Abraham Fine Date: Wed, 13 Dec 2017 12:51:27 -0800 Subject: [PATCH 3/4] use shared Servers and numServers --- .../apache/zookeeper/server/quorum/QuorumPeerMainTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 8fa23a7cad5..96a5c6ab2ba 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -353,7 +353,7 @@ public void testElectionFraud() throws IOException, InterruptedException { Logger qlogger = Logger.getLogger(QuorumPeer.class); qlogger.addAppender(appender); - int numServers = 3; + numServers = 3; // used for assertions later boolean foundLeading = false; @@ -362,7 +362,7 @@ public void testElectionFraud() throws IOException, InterruptedException { try { // spin up a quorum, we use a small ticktime to make the test run faster - Servers servers = LaunchServers(numServers, 500); + servers = LaunchServers(numServers, 500); // find the leader int trueLeader = -1; From dc21603dcda678e3b2f09ff04338eb33dcd943bd Mon Sep 17 00:00:00 2001 From: Abraham Fine Date: Thu, 14 Dec 2017 08:30:32 -0800 Subject: [PATCH 4/4] improve assertion logging --- .../org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 96a5c6ab2ba..2a02428d103 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -375,7 +375,7 @@ public void testElectionFraud() throws IOException, InterruptedException { // find a follower int falseLeader = (trueLeader + 1) % numServers; - Assert.assertTrue(servers.mt[falseLeader].main.quorumPeer.follower != null); + Assert.assertTrue("All servers should join the quorum", servers.mt[falseLeader].main.quorumPeer.follower != null); // to keep the quorum peer running and force it to go into the looking state, we kill leader election // and close the connection to the leader