From 72cbd526bab2441aa9e530985cd92e598afe731c Mon Sep 17 00:00:00 2001 From: Fangmin Lyu Date: Wed, 12 Sep 2018 11:30:05 -0700 Subject: [PATCH] Add CloseSessionTxn to track the nodes being deleted when close session Enable the CloseSessionTxn by default Move test into a different file specify the type of list fix the test failure --- .../src/main/resources/zookeeper.jute | 11 +- .../org/apache/zookeeper/server/DataTree.java | 61 ++++++++--- .../server/PrepRequestProcessor.java | 11 +- .../zookeeper/server/ZooKeeperServer.java | 19 ++++ .../zookeeper/server/util/SerializeUtils.java | 10 +- .../server/PrepRequestProcessorTest.java | 53 ++++++++- .../server/quorum/CloseSessionTxnTest.java | 102 ++++++++++++++++++ .../quorum/FuzzySnapshotRelatedTest.java | 60 +++++++---- 8 files changed, 283 insertions(+), 44 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 2bb04921b70..8310664a6e5 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -72,7 +72,7 @@ module org.apache.zookeeper.proto { vectordataWatches; vectorexistWatches; vectorchildWatches; - } + } class RequestHeader { int xid; int type; @@ -92,12 +92,12 @@ module org.apache.zookeeper.proto { long zxid; int err; } - - class GetDataRequest { + + class GetDataRequest { ustring path; boolean watch; } - + class SetDataRequest { ustring path; buffer data; @@ -323,6 +323,9 @@ module org.apache.zookeeper.txn { class CreateSessionTxn { int timeOut; } + class CloseSessionTxn { + vector paths2Delete; + } class ErrorTxn { int err; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 9d552f03b85..4657f8e599f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -63,6 +63,7 @@ import org.apache.zookeeper.server.watch.WatchesReport; import org.apache.zookeeper.server.watch.WatchesSummary; import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateTTLTxn; import org.apache.zookeeper.txn.CreateTxn; @@ -947,7 +948,14 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: - killSession(header.getClientId(), header.getZxid()); + long sessionId = header.getClientId(); + if (txn != null) { + killSession(sessionId, header.getZxid(), + ephemerals.remove(sessionId), + ((CloseSessionTxn) txn).getPaths2Delete()); + } else { + killSession(sessionId, header.getZxid()); + } break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; @@ -1119,20 +1127,45 @@ void killSession(long session, long zxid) { // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in sequence. - Set list = ephemerals.remove(session); - if (list != null) { - for (String path : list) { - try { - deleteNode(path, zxid); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session)); - } - } catch (NoNodeException e) { - LOG.warn("Ignoring NoNodeException for path " - + path - + " while removing ephemeral for dead session 0x" - + Long.toHexString(session)); + killSession(session, zxid, ephemerals.remove(session), null); + } + + void killSession(long session, long zxid, Set paths2DeleteLocal, + List paths2DeleteInTxn) { + if (paths2DeleteInTxn != null) { + deleteNodes(session, zxid, paths2DeleteInTxn); + } + + if (paths2DeleteLocal == null) { + return; + } + + if (paths2DeleteInTxn != null) { + // explicitly check and remove to avoid potential performance + // issue when using removeAll + for (String path: paths2DeleteInTxn) { + paths2DeleteLocal.remove(path); + } + if (!paths2DeleteLocal.isEmpty()) { + LOG.warn("Unexpected extra paths under session {} which " + + "are not in txn 0x{}", paths2DeleteLocal, + Long.toHexString(zxid)); + } + } + + deleteNodes(session, zxid, paths2DeleteLocal); + } + + void deleteNodes(long session, long zxid, Iterable paths2Delete) { + for (String path : paths2Delete) { + try { + deleteNode(path, zxid); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting ephemeral node {} for session 0x{}", path, Long.toHexString(session)); } + } catch (NoNodeException e) { + LOG.warn("Ignoring NoNodeException for path {} while removing ephemeral for dead session 0x{}", + path, Long.toHexString(session)); } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index cccecbf85ee..8b2bab0dd3a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -66,6 +66,7 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTTLTxn; @@ -532,8 +533,12 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); long startTime = Time.currentElapsedTime(); - Set es = zks.getZKDatabase().getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { + // need to move getEphemerals into zks.outstandingChanges + // synchronized block, otherwise there will be a race + // condition with the on flying deleteNode txn, and we'll + // delete the node again here, which is not correct + Set es = zks.getZKDatabase().getEphemerals(request.sessionId); for (ChangeRecord c : zks.outstandingChanges) { if (c.stat == null) { // Doing a delete @@ -545,7 +550,9 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, for (String path2Delete : es) { addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null)); } - + if (ZooKeeperServer.isCloseSessionTxnEnabled()) { + request.setTxn(new CloseSessionTxn(new ArrayList(es))); + } zks.sessionTracker.setSessionClosing(request.sessionId); } ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index b8e0bd2eac5..e982cd1e054 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -110,6 +110,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; + // Add a enable/disable option for now, we should remove this one when + // this feature is confirmed to be stable + public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; + private static boolean closeSessionTxnEnabled = true; + static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -127,6 +132,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); + + closeSessionTxnEnabled = Boolean.parseBoolean( + System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); + LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + } + + public static boolean isCloseSessionTxnEnabled() { + return closeSessionTxnEnabled; + } + + public static void setCloseSessionTxnEnabled(boolean enabled) { + ZooKeeperServer.closeSessionTxnEnabled = enabled; + LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, + ZooKeeperServer.closeSessionTxnEnabled); } protected ZooKeeperServerBean jmxServerBean; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java index 25e86a0119d..2454c43f91e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java @@ -34,7 +34,9 @@ import org.apache.zookeeper.common.IOUtils; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTTLTxn; @@ -67,7 +69,9 @@ public static Record deserializeTxn(byte[] txnBytes, TxnHeader hdr) throws IOExc txn = new CreateSessionTxn(); break; case OpCode.closeSession: - return null; + txn = ZooKeeperServer.isCloseSessionTxnEnabled() + ? new CloseSessionTxn() : null; + break; case OpCode.create: case OpCode.create2: txn = new CreateTxn(); @@ -115,6 +119,10 @@ public static Record deserializeTxn(byte[] txnBytes, TxnHeader hdr) throws IOExc create.setAcl(createv0.getAcl()); create.setEphemeral(createv0.getEphemeral()); create.setParentCVersion(-1); + } else if (hdr.getType() == OpCode.closeSession) { + // perhaps this is before CloseSessionTxn was added, + // ignore it and reset txn to null + txn = null; } else { throw e; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 9724423453b..264601de076 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -45,6 +45,7 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord; import org.apache.zookeeper.test.ClientBase; @@ -103,6 +104,10 @@ public void testPRequest() throws Exception { } private Request createRequest(Record record, int opCode) throws IOException { + return createRequest(record, opCode, 1L); + } + + private Request createRequest(Record record, int opCode, long sessionId) throws IOException { // encoding ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); @@ -110,7 +115,7 @@ private Request createRequest(Record record, int opCode) throws IOException { baos.close(); // Id List ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE); - return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); + return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); } private void process(List ops) throws Exception { @@ -173,6 +178,52 @@ public void testMultiRollbackNoLastChange() throws Exception { assertNull(zks.outstandingChangesForPath.get("/foo")); } + /** + * Test ephemerals are deleted when the session is closed with + * the newly added CloseSessionTxn in ZOOKEEPER-3145. + */ + @Test + public void testCloseSessionTxn() throws Exception { + boolean before = ZooKeeperServer.isCloseSessionTxnEnabled(); + + ZooKeeperServer.setCloseSessionTxnEnabled(true); + try { + // create a few ephemerals + long ephemeralOwner = 1; + DataTree dt = zks.getZKDatabase().dataTree; + dt.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0); + dt.createNode("/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0); + + // close session + RequestHeader header = new RequestHeader(); + header.setType(OpCode.closeSession); + + final FinalRequestProcessor frq = new FinalRequestProcessor(zks); + final CountDownLatch latch = new CountDownLatch(1); + processor = new PrepRequestProcessor(zks, new RequestProcessor() { + @Override + public void processRequest(Request request) { + frq.processRequest(request); + latch.countDown(); + } + + @Override + public void shutdown() { + // TODO Auto-generated method stub + } + }); + processor.pRequest(createRequest(header, OpCode.closeSession, ephemeralOwner)); + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + // assert ephemerals are deleted + assertEquals(null, dt.getNode("/foo")); + assertEquals(null, dt.getNode("/bar")); + } finally { + ZooKeeperServer.setCloseSessionTxnEnabled(before); + } + } + /** * It tests that PrepRequestProcessor will return BadArgument KeeperException * if the request path (if it exists) is not valid, e.g. empty string. diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java new file mode 100644 index 00000000000..ac9665b7a20 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.Assert; +import org.junit.Test; + +public class CloseSessionTxnTest extends QuorumPeerTestBase { + + /** + * Test leader/leader compatibility with/without CloseSessionTxn, so that + * we can gradually rollout this code and rollback if there is problem. + */ + @Test + public void testCloseSessionTxnCompatile() throws Exception { + // Test 4 cases: + // 1. leader disabled, follower disabled + testCloseSessionWithDifferentConfig(false, false); + + // 2. leader disabled, follower enabled + testCloseSessionWithDifferentConfig(false, true); + + // 3. leader enabled, follower disabled + testCloseSessionWithDifferentConfig(true, false); + + // 4. leader enabled, follower enabled + testCloseSessionWithDifferentConfig(true, true); + } + + private void testCloseSessionWithDifferentConfig( + boolean closeSessionEnabledOnLeader, + boolean closeSessionEnabledOnFollower) throws Exception { + // 1. set up an ensemble with 3 servers + final int numServers = 3; + servers = LaunchServers(numServers); + int leaderId = servers.findLeader(); + ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnLeader); + + // 2. shutdown one of the follower, start it later to pick up the + // CloseSessionTxnEnabled config change + // + // We cannot use different static config in the same JVM, so have to + // use this tricky + int followerA = (leaderId + 1) % numServers; + servers.mt[followerA].shutdown(); + waitForOne(servers.zk[followerA], States.CONNECTING); + + // 3. create an ephemeral node + String path = "/testCloseSessionTxnCompatile"; + servers.zk[leaderId].create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + + // 3. close the client + servers.restartClient(leaderId, this); + waitForOne(servers.zk[leaderId], States.CONNECTED); + + // 4. update the CloseSessionTxnEnabled config before follower A + // started + System.setProperty("zookeeper.retainZKDatabase", "true"); + ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnFollower); + + // 5. restart follower A + servers.mt[followerA].start(); + waitForOne(servers.zk[followerA], States.CONNECTED); + + // 4. verify the ephemeral node is gone + for (int i = 0; i < numServers; i++) { + final CountDownLatch syncedLatch = new CountDownLatch(1); + servers.zk[i].sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + syncedLatch.countDown(); + } + }, null); + Assert.assertTrue(syncedLatch.await(3, TimeUnit.SECONDS)); + Assert.assertNull(servers.zk[i].exists(path, false)); + } + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java index 5cd825947e9..9003b3d7e54 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java @@ -125,7 +125,7 @@ public void tearDown() throws Exception { public void testMultiOpConsistency() throws Exception { LOG.info("Create a parent node"); final String path = "/testMultiOpConsistency"; - createEmptyNode(zk[followerA], path); + createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT); LOG.info("Hook to catch the 2nd sub create node txn in multi-op"); CustomDataTree dt = (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree(); @@ -175,8 +175,10 @@ public void testPZxidUpdatedDuringSnapSyncing() throws Exception { final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode"; final String child = parent + "/child"; - createEmptyNode(zk[leaderId], parent); - createEmptyNode(zk[leaderId], child); + createEmptyNode(zk[leaderId], parent, CreateMode.PERSISTENT); + createEmptyNode(zk[leaderId], child, CreateMode.EPHEMERAL); + // create another child to test closeSession + createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL); LOG.info("shutdown follower {}", followerA); mt[followerA].shutdown(); @@ -205,8 +207,10 @@ public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception { final String parent = "/testPZxidUpdatedDuringTakingSnapshot"; final String child = parent + "/child"; - createEmptyNode(zk[followerA], parent); - createEmptyNode(zk[followerA], child); + createEmptyNode(zk[followerA], parent, CreateMode.PERSISTENT); + createEmptyNode(zk[followerA], child, CreateMode.EPHEMERAL); + // create another child to test closeSession + createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL); LOG.info("Set up ZKDatabase to catch the node serializing in DataTree"); addSerializeListener(followerA, parent, child); @@ -217,8 +221,12 @@ public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception { LOG.info("Restarting follower A to load snapshot"); mt[followerA].shutdown(); - QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CLOSED); mt[followerA].start(); + // zk[followerA] will be closed in addSerializeListener, re-create it + zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], + ClientBase.CONNECTION_TIMEOUT, this); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED); LOG.info("Check and make sure the pzxid of the parent is the same " + "on leader and follower A"); @@ -226,13 +234,14 @@ public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception { } private void addSerializeListener(int sid, String parent, String child) { - final ZooKeeper zkClient = zk[followerA]; + final ZooKeeper zkClient = zk[sid]; CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree(); dt.addListener(parent, new NodeSerializeListener() { @Override public void nodeSerialized(String path) { try { zkClient.delete(child, -1); + zkClient.close(); LOG.info("Deleted the child node after the parent is serialized"); } catch (Exception e) { LOG.error("Error when deleting node {}", e); @@ -242,13 +251,26 @@ public void nodeSerialized(String path) { } private void compareStat(String path, int sid, int compareWithSid) throws Exception { - Stat stat1 = new Stat(); - zk[sid].getData(path, null, stat1); - - Stat stat2 = new Stat(); - zk[compareWithSid].getData(path, null, stat2); - - assertEquals(stat1, stat2); + ZooKeeper[] compareZk = new ZooKeeper[2]; + compareZk[0] = new ZooKeeper("127.0.0.1:" + clientPorts[sid], + ClientBase.CONNECTION_TIMEOUT, this); + compareZk[1] = new ZooKeeper("127.0.0.1:" + clientPorts[compareWithSid], + ClientBase.CONNECTION_TIMEOUT, this); + QuorumPeerMainTest.waitForAll(compareZk, States.CONNECTED); + + try { + Stat stat1 = new Stat(); + compareZk[0].getData(path, null, stat1); + + Stat stat2 = new Stat(); + compareZk[1].getData(path, null, stat2); + + assertEquals(stat1, stat2); + } finally { + for (ZooKeeper z: compareZk) { + z.close(); + } + } } @Test @@ -286,19 +308,13 @@ public void process(long sessionId) { LOG.info("Make sure the global sessions are consistent with leader"); Map globalSessionsOnLeader = mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); - if (mt[followerA].main.quorumPeer == null) { - LOG.info("quorumPeer is null"); - } - if (mt[followerA].main.quorumPeer.getZkDb() == null) { - LOG.info("zkDb is null"); - } Map globalSessionsOnFollowerA = mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), globalSessionsOnFollowerA.keySet()); assertTrue(globalSessionsOnFollowerA.keySet().containsAll(globalSessionsOnLeader.keySet())); } - private void createEmptyNode(ZooKeeper zk, String path) throws Exception { - zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + private void createEmptyNode(ZooKeeper zk, String path, CreateMode mode) throws Exception { + zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, mode); } interface NodeCreateListener {