From 007e2b37f24907c03e4e28547756b91aea4f78d4 Mon Sep 17 00:00:00 2001 From: Michael Han Date: Wed, 5 Jul 2017 14:23:58 -0700 Subject: [PATCH] ZOOKEEPER-2355: Ephemeral node is never deleted if follower fails while reading the proposal packet. This commit is a back port of the commit ca22b3db19371f6b0f5507b7dd80b283cddc7700 on branch-3.5. Changes include a few interfaces that required by the test case. The test case itself is also updated so it works with 3.4 code base. --- .../zookeeper/server/quorum/Learner.java | 8 +- .../zookeeper/server/quorum/QuorumPeer.java | 6 +- .../server/quorum/QuorumPeerMain.java | 33 ++- .../quorum/EphemeralNodeDeletionTest.java | 219 ++++++++++++++++++ .../server/quorum/QuorumPeerTestBase.java | 6 +- 5 files changed, 254 insertions(+), 18 deletions(-) create mode 100644 src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index c54f6e6b797..1645cda4a98 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -333,7 +333,7 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { - LOG.info("Getting a snapshot from leader"); + LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid())); // The leader is going to dump the database // clear our own database and read zk.getZKDatabase().clear(); @@ -343,6 +343,7 @@ else if (qp.getType() == Leader.SNAP) { LOG.error("Missing signature. Got " + signature); throw new IOException("Missing signature"); } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" @@ -354,7 +355,7 @@ else if (qp.getType() == Leader.SNAP) { + Long.toHexString(qp.getZxid())); System.exit(13); } - + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { LOG.error("Got unexpected packet from leader " @@ -362,8 +363,7 @@ else if (qp.getType() == Leader.SNAP) { System.exit(13); } - zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - zk.createSessionTracker(); + zk.createSessionTracker(); long lastQueued = 0; diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 53f607750f5..267f89ed22c 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -566,7 +566,7 @@ public static QuorumPeer testingQuorumPeer() throws SaslException { return new QuorumPeer(); } - private QuorumPeer() throws SaslException { + protected QuorumPeer() throws SaslException { super("QuorumPeer"); quorumStats = new QuorumStats(this); initialize(); @@ -1333,6 +1333,10 @@ public void setZKDatabase(ZKDatabase database) { this.zkDb = database; } + protected ZKDatabase getZkDb() { + return zkDb; + } + public void setRunning(boolean running) { this.running = running; } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index 4ea7e54caed..527ad84d4a1 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -21,6 +21,7 @@ import java.io.IOException; import javax.management.JMException; +import javax.security.sasl.SaslException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,18 +130,21 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); - - quorumPeer = new QuorumPeer(config.getServers(), - new File(config.getDataDir()), - new File(config.getDataLogDir()), - config.getElectionAlg(), - config.getServerId(), - config.getTickTime(), - config.getInitLimit(), - config.getSyncLimit(), - config.getQuorumListenOnAllIPs(), - cnxnFactory, - config.getQuorumVerifier()); + + quorumPeer = getQuorumPeer(); + + quorumPeer.setQuorumPeers(config.getServers()); + quorumPeer.setTxnFactory(new FileTxnSnapLog( + new File(config.getDataDir()), + new File(config.getDataLogDir()))); + quorumPeer.setElectionType(config.getElectionAlg()); + quorumPeer.setMyid(config.getServerId()); + quorumPeer.setTickTime(config.getTickTime()); + quorumPeer.setInitLimit(config.getInitLimit()); + quorumPeer.setSyncLimit(config.getSyncLimit()); + quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); + quorumPeer.setCnxnFactory(cnxnFactory); + quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); @@ -168,4 +172,9 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException { LOG.warn("Quorum Peer interrupted", e); } } + + // @VisibleForTesting + protected QuorumPeer getQuorumPeer() throws SaslException { + return new QuorumPeer(); + } } diff --git a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java new file mode 100644 index 00000000000..c34f243c2a5 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.net.SocketTimeoutException; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import javax.security.sasl.SaslException; + +public class EphemeralNodeDeletionTest extends QuorumPeerTestBase { + private static int SERVER_COUNT = 3; + private MainThread[] mt = new MainThread[SERVER_COUNT]; + + /** + * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355. + * ZooKeeper ephemeral node is never deleted if follower fail while reading + * the proposal packet. + */ + + @Test(timeout = 120000) + public void testEphemeralNodeDeletion() throws Exception { + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique(); + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + System.out.println(currentQuorumCfgSection); + // start all the servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection) { + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + mt[i].start(); + } + + // ensure all servers started + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); + } + + CountdownWatcher watch = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1], + ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + /** + * now the problem scenario starts + */ + + // 1: create ephemeral node + String nodePath = "/e1"; + zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + // 2: inject network problem in one of the follower + CustomQuorumPeer follower = (CustomQuorumPeer) getByServerState(mt, + ServerState.FOLLOWING); + follower.setInjectError(true); + + // 3: close the session so that ephemeral node is deleted + zk.close(); + + // remove the error + follower.setInjectError(false); + + Assert.assertTrue("Faulted Follower should have joined quorum by now", + ClientBase.waitForServerUp( + "127.0.0.1:" + follower.getClientPort(), + CONNECTION_TIMEOUT)); + + QuorumPeer leader = getByServerState(mt, ServerState.LEADING); + assertNotNull("Leader should not be null", leader); + Assert.assertTrue("Leader must be running", ClientBase.waitForServerUp( + "127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT)); + + watch = new CountdownWatcher(); + zk = new ZooKeeper("127.0.0.1:" + leader.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Stat exists = zk.exists(nodePath, false); + assertNull("Node must have been deleted from leader", exists); + + CountdownWatcher followerWatch = new CountdownWatcher(); + ZooKeeper followerZK = new ZooKeeper( + "127.0.0.1:" + follower.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, followerWatch); + followerWatch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + Stat nodeAtFollower = followerZK.exists(nodePath, false); + + // Problem 1: Follower had one extra ephemeral node /e1 + assertNull("ephemeral node must not exist", nodeAtFollower); + + // Create the node with another session + zk.create(nodePath, "2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + // close the session and newly created ephemeral node should be deleted + zk.close(); + + nodeAtFollower = followerZK.exists(nodePath, false); + + // Problem 2: Before fix, after session close the ephemeral node + // was not getting deleted. But now after the fix after session close + // ephemeral node is getting deleted. + assertNull("After session close ephemeral node must be deleted", + nodeAtFollower); + followerZK.close(); + } + + @After + public void tearDown() { + // stop all severs + for (int i = 0; i < mt.length; i++) { + try { + mt[i].shutdown(); + } catch (InterruptedException e) { + LOG.warn("Quorum Peer interrupted while shutting it down", e); + } + } + } + + private QuorumPeer getByServerState(MainThread[] mt, ServerState state) { + for (int i = mt.length - 1; i >= 0; i--) { + QuorumPeer quorumPeer = mt[i].getQuorumPeer(); + if (null != quorumPeer && state == quorumPeer.getPeerState()) { + return quorumPeer; + } + } + return null; + } + + static class CustomQuorumPeer extends QuorumPeer { + private boolean injectError = false; + + public CustomQuorumPeer() throws SaslException { + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) + throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, + this, null /*DataTreeBuilder is never used*/, + this.getZkDb())) { + + @Override + void readPacket(QuorumPacket pp) throws IOException { + /** + * In real scenario got SocketTimeoutException while reading + * the packet from leader because of network problem, but + * here throwing SocketTimeoutException based on whether + * error is injected or not + */ + super.readPacket(pp); + if (injectError && pp.getType() == Leader.PROPOSAL) { + String type = LearnerHandler.packetToString(pp); + throw new SocketTimeoutException( + "Socket timeout while reading the packet for operation " + + type); + } + } + + }; + } + + public void setInjectError(boolean injectError) { + this.injectError = injectError; + } + + } + + static class MockTestQPMain extends TestQPMain { + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new CustomQuorumPeer(); + } + } +} \ No newline at end of file diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java index 6d5eb47c45c..c19963c913c 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java @@ -129,7 +129,7 @@ public MainThread(int myid, int clientPort, String quorumCfgSection) Thread currentThread; synchronized public void start() { - main = new TestQPMain(); + main = getTestQPMain(); currentThread = new Thread(this); currentThread.start(); mainFailed = new CountDownLatch(1); @@ -202,5 +202,9 @@ public Map getOtherConfigs() { public File getConfFile() { return confFile; } + + public TestQPMain getTestQPMain() { + return new TestQPMain(); + } } }