From 3d042f98100057808cea31e12d947b044924ab3a Mon Sep 17 00:00:00 2001 From: Robert Evans Date: Mon, 29 Jan 2018 14:27:10 -0600 Subject: [PATCH 1/5] ZOOKEEPER-2845: Apply commit log when restarting server. --- .../apache/zookeeper/server/ZKDatabase.java | 28 ++++++++++++++----- .../zookeeper/server/ZooKeeperServer.java | 24 +++++++++++----- .../server/persistence/FileTxnSnapLog.java | 16 +++++++++++ 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 6679e783b00..a03c955c61d 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -223,6 +223,11 @@ public ConcurrentHashMap getSessionWithTimeOuts() { return sessionsWithTimeouts; } + private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { + public void onTxnLoaded(TxnHeader hdr, Record txn){ + addCommittedProposal(hdr, txn); + } + }; /** * load the database from the disk onto memory and also add @@ -231,18 +236,27 @@ public ConcurrentHashMap getSessionWithTimeOuts() { * @throws IOException */ public long loadDataBase() throws IOException { - PlayBackListener listener=new PlayBackListener(){ - public void onTxnLoaded(TxnHeader hdr,Record txn){ - Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid()); - addCommittedProposal(r); - } - }; + long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); + initialized = true; + return zxid; + } - long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); + /** + * Fast forward the database adding transactions from the committed log into memory. + * @return the last valid zxid. + * @throws IOException + */ + public long fastForwardDataBase() throws IOException { + long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; } + private void addCommittedProposal(TxnHeader hdr, Record txn) { + Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + addCommittedProposal(r); + } + /** * maintains a list of last committedLog * or so committed requests. This is used for diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index c8cd72dc45f..9099b2fbbbe 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -557,14 +557,24 @@ public synchronized void shutdown(boolean fullyShutDown) { firstProcessor.shutdown(); } - if (fullyShutDown && zkDb != null) { - zkDb.clear(); + if (zkDb != null) { + if (fullyShutDown) { + zkDb.clear(); + } else { + // else there is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot + try { + //This will fast forward the database to the latest recorded transactions + zkDb.fastForwardDataBase(); + } catch (IOException e) { + LOG.error("Error updating DB", e); + zkDb.clear(); + } + } } - // else there is no need to clear the database - // * When a new quorum is established we can still apply the diff - // on top of the same zkDb data - // * If we fetch a new snapshot from leader, the zkDb will be - // cleared anyway before loading the snapshot unregisterJMX(); } diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 3a03c81e3a7..58267326f49 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -207,6 +207,22 @@ public long restore(DataTree dt, Map sessions, return -1L; } } + return fastForwardFromEdits(dt, sessions, listener); + } + + /** + * This function will fast forward the server database to have the latest + * transactions in it. This is the same as restore, but only reads from + * the transaction logs and not restores from a snapshot. + * @param dt the datatree to write transactions to. + * @param sessions the sessions to be restored. + * @param listener the playback listener to run on the + * database transactions. + * @return the highest zxid restored. + * @throws IOException + */ + public long fastForwardFromEdits(DataTree dt, Map sessions, + PlayBackListener listener) throws IOException { TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; From 93168d71647ec5283aab10806d756f5841061105 Mon Sep 17 00:00:00 2001 From: Robert Evans Date: Tue, 13 Feb 2018 13:40:06 -0600 Subject: [PATCH 2/5] Added in a modified version of the test --- .../server/quorum/QuorumPeerMainTest.java | 131 +++++++++++++++++- 1 file changed, 127 insertions(+), 4 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 9c6bd3a6639..c6464d15798 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -435,7 +435,7 @@ private void waitForOne(ZooKeeper zk, States state) throws InterruptedException int iterations = ClientBase.CONNECTION_TIMEOUT / 500; while (zk.getState() != state) { if (iterations-- == 0) { - throw new RuntimeException("Waiting too long"); + throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state); } Thread.sleep(500); } @@ -501,9 +501,9 @@ private Servers LaunchServers(int numServers, Integer tickTime) throws IOExcepti mt[i].start(); zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); } - + waitForAll(zk, States.CONNECTED); - + svrs.mt = mt; svrs.zk = zk; return svrs; @@ -673,7 +673,7 @@ public void testBadPackets() throws Exception { + ":" + electionPort1 + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2 + ";" + CLIENT_PORT_QP2; - + MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); @@ -888,4 +888,127 @@ public void testWithOnlyMinSessionTimeout() throws Exception { maxSessionTimeOut, quorumPeer.getMaxSessionTimeout()); } + @Test + public void testTxnAheadSnapInRetainDB() throws Exception { + // 1. start up server and wait for leader election to finish + ClientBase.setupTestEnv(); + final int SERVER_COUNT = 3; + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + clientPorts[i] + "\n"); + } + String quorumCfgSection = sb.toString(); + + MainThread mt[] = new MainThread[SERVER_COUNT]; + ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + + waitForAll(zk, States.CONNECTED); + + // we need to shutdown and start back up to make sure that the create session isn't the first transaction since + // that is rather innocuous. + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + + waitForAll(zk, States.CONNECTING); + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].start(); + // Recreate a client session since the previous session was not persisted. + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + + waitForAll(zk, States.CONNECTED); + + // 2. kill all followers + int leader = -1; + Map outstanding = null; + for (int i = 0; i < SERVER_COUNT; i++) { + if (mt[i].main.quorumPeer.leader != null) { + leader = i; + outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; + // increase the tick time to delay the leader going to looking + mt[leader].main.quorumPeer.tickTime = 10000; + } + } + LOG.warn("LEADER {}", leader); + + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].shutdown(); + } + } + + // 3. start up the followers to form a new quorum + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].start(); + } + } + + // 4. wait one of the follower to be the leader + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + // Recreate a client session since the previous session was not persisted. + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + waitForOne(zk[i], States.CONNECTED); + } + } + + // 5. send a create request to leader and make sure it's synced to disk, + // which means it acked from itself + try { + zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // just make sure that we actually did get it in process at the + // leader + Assert.assertTrue(outstanding.size() == 1); + Proposal p = (Proposal) outstanding.values().iterator().next(); + Assert.assertTrue(p.request.getHdr().getType() == OpCode.create); + + // make sure it has a chance to write it to disk + Thread.sleep(1000); + p.qvAcksetPairs.get(0).getAckset().contains(leader); + + // 6. wait the leader to quit due to no enough followers + Thread.sleep(4000); + //waitForOne(zk[leader], States.CONNECTING); + mt[leader].shutdown(); + + int newLeader = -1; + for (int i = 0; i < SERVER_COUNT; i++) { + if (mt[i].main.quorumPeer.leader != null) { + newLeader = i; + } + } + // make sure a different leader was elected + Assert.assertTrue(newLeader != leader); + + // 7. restart the previous leader + mt[leader].start(); + waitForOne(zk[leader], States.CONNECTED); + + // 8. check the node exist in previous leader but not others + // make sure everything is consistent + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("server " + i + " should not have /zk" + leader, zk[i].exists("/zk" + leader, false) == null); + } + for (int i = 0; i < SERVER_COUNT; i++) { + zk[i].close(); + } + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } } From f26a21ad6ad9cd202d363030065f219c133649c5 Mon Sep 17 00:00:00 2001 From: Robert Evans Date: Fri, 16 Feb 2018 13:27:54 -0600 Subject: [PATCH 3/5] Addressed review comments --- .../server/quorum/QuorumPeerMainTest.java | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index c6464d15798..80984a71aec 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -889,7 +889,7 @@ public void testWithOnlyMinSessionTimeout() throws Exception { } @Test - public void testTxnAheadSnapInRetainDB() throws Exception { + public void testFailedTxnAsPartOfQuorumLoss() throws Exception { // 1. start up server and wait for leader election to finish ClientBase.setupTestEnv(); final int SERVER_COUNT = 3; @@ -953,7 +953,7 @@ public void testTxnAheadSnapInRetainDB() throws Exception { } } - // 4. wait one of the follower to be the leader + // 4. wait one of the follower to be the new leader for (int i = 0; i < SERVER_COUNT; i++) { if (i != leader) { // Recreate a client session since the previous session was not persisted. @@ -962,7 +962,7 @@ public void testTxnAheadSnapInRetainDB() throws Exception { } } - // 5. send a create request to leader and make sure it's synced to disk, + // 5. send a create request to old leader and make sure it's synced to disk, // which means it acked from itself try { zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, @@ -974,16 +974,32 @@ public void testTxnAheadSnapInRetainDB() throws Exception { // just make sure that we actually did get it in process at the // leader Assert.assertTrue(outstanding.size() == 1); - Proposal p = (Proposal) outstanding.values().iterator().next(); + Proposal p = outstanding.values().iterator().next(); Assert.assertTrue(p.request.getHdr().getType() == OpCode.create); // make sure it has a chance to write it to disk - Thread.sleep(1000); - p.qvAcksetPairs.get(0).getAckset().contains(leader); + int sleepTime = 0; + Long longLeader = new Long(leader); + while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) { + if (sleepTime > 2000) { + Assert.fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } - // 6. wait the leader to quit due to no enough followers - Thread.sleep(4000); - //waitForOne(zk[leader], States.CONNECTING); + // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum + sleepTime = 0; + Follower f = mt[leader].main.quorumPeer.follower; + while (f == null || !f.isRunning()) { + if (sleepTime > 4000) { + Assert.fail("Took too long for old leader to time out"); + } + Thread.sleep(100); + sleepTime += 100; + f = mt[leader].main.quorumPeer.follower; + } mt[leader].shutdown(); int newLeader = -1; @@ -997,7 +1013,7 @@ public void testTxnAheadSnapInRetainDB() throws Exception { // 7. restart the previous leader mt[leader].start(); - waitForOne(zk[leader], States.CONNECTED); + waitForAll(zk, States.CONNECTED); // 8. check the node exist in previous leader but not others // make sure everything is consistent From 583e34435eb51e681cb2d56006d14614e7649c09 Mon Sep 17 00:00:00 2001 From: Robert Evans Date: Fri, 16 Feb 2018 14:10:27 -0600 Subject: [PATCH 4/5] Using framework APIs for test --- .../server/quorum/QuorumPeerMainTest.java | 155 +++++++++--------- 1 file changed, 75 insertions(+), 80 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 80984a71aec..b26599d0ede 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -39,6 +39,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; @@ -441,6 +442,10 @@ private void waitForOne(ZooKeeper zk, States state) throws InterruptedException } } + private void waitForAll(Servers servers, States state) throws InterruptedException { + waitForAll(servers.zk, state); + } + private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; boolean someoneNotConnected = true; @@ -465,6 +470,37 @@ private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedExcepti private static class Servers { MainThread mt[]; ZooKeeper zk[]; + int[] clientPorts; + + public void shutDownAllServers() throws InterruptedException { + for (MainThread t: mt) { + t.shutdown(); + } + } + + public void restartAllServersAndClients(Watcher watcher) throws IOException { + for (MainThread t : mt) { + if (!t.isAlive()) { + t.start(); + } + } + for (int i = 0; i < zk.length; i++) { + restartClient(i, watcher); + } + } + + public void restartClient(int i, Watcher watcher) throws IOException { + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watcher); + } + + public int findLeader() { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } } @@ -474,7 +510,8 @@ private Servers LaunchServers(int numServers) throws IOException, InterruptedExc /** * This is a helper function for launching a set of servers * - * @param numServers* @param tickTime A ticktime to pass to MainThread + * @param numServers the number of servers + * @param tickTime A ticktime to pass to MainThread * @return * @throws IOException * @throws InterruptedException @@ -482,30 +519,28 @@ private Servers LaunchServers(int numServers) throws IOException, InterruptedExc private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { int SERVER_COUNT = numServers; Servers svrs = new Servers(); - final int clientPorts[] = new int[SERVER_COUNT]; + svrs.clientPorts = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n"); + svrs.clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n"); } String quorumCfgSection = sb.toString(); - MainThread mt[] = new MainThread[SERVER_COUNT]; - ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + svrs.mt = new MainThread[SERVER_COUNT]; + svrs.zk = new ZooKeeper[SERVER_COUNT]; for(int i = 0; i < SERVER_COUNT; i++) { if (tickTime != null) { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap(), tickTime); + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap(), tickTime); } else { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection); } - mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + svrs.mt[i].start(); + svrs.restartClient(i, this); } - waitForAll(zk, States.CONNECTED); + waitForAll(svrs, States.CONNECTED); - svrs.mt = mt; - svrs.zk = zk; return svrs; } @@ -893,63 +928,34 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { // 1. start up server and wait for leader election to finish ClientBase.setupTestEnv(); final int SERVER_COUNT = 3; - final int clientPorts[] = new int[SERVER_COUNT]; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + clientPorts[i] + "\n"); - } - String quorumCfgSection = sb.toString(); - - MainThread mt[] = new MainThread[SERVER_COUNT]; - ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); - mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); - } + servers = LaunchServers(SERVER_COUNT); - waitForAll(zk, States.CONNECTED); + waitForAll(servers, States.CONNECTED); // we need to shutdown and start back up to make sure that the create session isn't the first transaction since // that is rather innocuous. - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i].shutdown(); - } - - waitForAll(zk, States.CONNECTING); - - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i].start(); - // Recreate a client session since the previous session was not persisted. - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); - } - - waitForAll(zk, States.CONNECTED); + servers.shutDownAllServers(); + waitForAll(servers, States.CONNECTING); + servers.restartAllServersAndClients(this); + waitForAll(servers, States.CONNECTED); // 2. kill all followers - int leader = -1; - Map outstanding = null; - for (int i = 0; i < SERVER_COUNT; i++) { - if (mt[i].main.quorumPeer.leader != null) { - leader = i; - outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; - // increase the tick time to delay the leader going to looking - mt[leader].main.quorumPeer.tickTime = 10000; - } - } + int leader = servers.findLeader(); + Map outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals; + // increase the tick time to delay the leader going to looking + servers.mt[leader].main.quorumPeer.tickTime = 10000; LOG.warn("LEADER {}", leader); for (int i = 0; i < SERVER_COUNT; i++) { if (i != leader) { - mt[i].shutdown(); + servers.mt[i].shutdown(); } } // 3. start up the followers to form a new quorum for (int i = 0; i < SERVER_COUNT; i++) { if (i != leader) { - mt[i].start(); + servers.mt[i].start(); } } @@ -957,15 +963,15 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { for (int i = 0; i < SERVER_COUNT; i++) { if (i != leader) { // Recreate a client session since the previous session was not persisted. - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); - waitForOne(zk[i], States.CONNECTED); + servers.restartClient(i, this); + waitForOne(servers.zk[i], States.CONNECTED); } } // 5. send a create request to old leader and make sure it's synced to disk, // which means it acked from itself try { - zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, + servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.fail("create /zk" + leader + " should have failed"); } catch (KeeperException e) { @@ -973,9 +979,9 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { // just make sure that we actually did get it in process at the // leader - Assert.assertTrue(outstanding.size() == 1); + Assert.assertEquals(1, outstanding.size()); Proposal p = outstanding.values().iterator().next(); - Assert.assertTrue(p.request.getHdr().getType() == OpCode.create); + Assert.assertEquals(OpCode.create, p.request.getHdr().getType()); // make sure it has a chance to write it to disk int sleepTime = 0; @@ -991,40 +997,29 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum sleepTime = 0; - Follower f = mt[leader].main.quorumPeer.follower; + Follower f = servers.mt[leader].main.quorumPeer.follower; while (f == null || !f.isRunning()) { - if (sleepTime > 4000) { - Assert.fail("Took too long for old leader to time out"); + if (sleepTime > 10_000) { + Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState()); } Thread.sleep(100); sleepTime += 100; - f = mt[leader].main.quorumPeer.follower; + f = servers.mt[leader].main.quorumPeer.follower; } - mt[leader].shutdown(); + servers.mt[leader].shutdown(); - int newLeader = -1; - for (int i = 0; i < SERVER_COUNT; i++) { - if (mt[i].main.quorumPeer.leader != null) { - newLeader = i; - } - } + int newLeader = servers.findLeader(); // make sure a different leader was elected - Assert.assertTrue(newLeader != leader); + Assert.assertNotEquals(leader, newLeader); // 7. restart the previous leader - mt[leader].start(); - waitForAll(zk, States.CONNECTED); + servers.mt[leader].start(); + waitForAll(servers, States.CONNECTED); // 8. check the node exist in previous leader but not others // make sure everything is consistent for (int i = 0; i < SERVER_COUNT; i++) { - Assert.assertTrue("server " + i + " should not have /zk" + leader, zk[i].exists("/zk" + leader, false) == null); - } - for (int i = 0; i < SERVER_COUNT; i++) { - zk[i].close(); - } - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i].shutdown(); + Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false)); } } } From 28c074a2605de86b933760f0f4c0020e094104fd Mon Sep 17 00:00:00 2001 From: Robert Evans Date: Wed, 21 Feb 2018 09:00:42 -0600 Subject: [PATCH 5/5] Addressed review comments --- .../server/quorum/QuorumPeerMainTest.java | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index b26599d0ede..43c341a5d8f 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -250,14 +250,7 @@ public void testHighestZxidJoinLate() throws Exception { numServers = 3; servers = LaunchServers(numServers); String path = "/hzxidtest"; - int leader = -1; - - // find the leader - for (int i = 0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - leader = i; - } - } + int leader = servers.findLeader(); // make sure there is a leader Assert.assertTrue("There should be a leader", leader >= 0); @@ -367,12 +360,7 @@ public void testElectionFraud() throws IOException, InterruptedException { servers = LaunchServers(numServers, 500); // find the leader - int trueLeader = -1; - for (int i = 0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - trueLeader = i; - } - } + int trueLeader = servers.findLeader(); Assert.assertTrue("There should be a leader", trueLeader >= 0); // find a follower @@ -489,8 +477,8 @@ public void restartAllServersAndClients(Watcher watcher) throws IOException { } } - public void restartClient(int i, Watcher watcher) throws IOException { - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watcher); + public void restartClient(int clientIndex, Watcher watcher) throws IOException { + zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher); } public int findLeader() { @@ -925,6 +913,7 @@ public void testWithOnlyMinSessionTimeout() throws Exception { @Test public void testFailedTxnAsPartOfQuorumLoss() throws Exception { + final int LEADER_TIMEOUT_MS = 10_000; // 1. start up server and wait for leader election to finish ClientBase.setupTestEnv(); final int SERVER_COUNT = 3; @@ -943,7 +932,7 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { int leader = servers.findLeader(); Map outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals; // increase the tick time to delay the leader going to looking - servers.mt[leader].main.quorumPeer.tickTime = 10000; + servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; LOG.warn("LEADER {}", leader); for (int i = 0; i < SERVER_COUNT; i++) { @@ -996,23 +985,24 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { } // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum + LOG.info("Waiting for leader {} to timeout followers", leader); sleepTime = 0; Follower f = servers.mt[leader].main.quorumPeer.follower; while (f == null || !f.isRunning()) { - if (sleepTime > 10_000) { + if (sleepTime > LEADER_TIMEOUT_MS * 2) { Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState()); } Thread.sleep(100); sleepTime += 100; f = servers.mt[leader].main.quorumPeer.follower; } - servers.mt[leader].shutdown(); int newLeader = servers.findLeader(); // make sure a different leader was elected Assert.assertNotEquals(leader, newLeader); - // 7. restart the previous leader + // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state + servers.mt[leader].shutdown(); servers.mt[leader].start(); waitForAll(servers, States.CONNECTED);