diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index 808cf8951f9..0b42c3df80f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -181,7 +181,7 @@ protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { * Ledger ID * @return ledger node path */ - protected abstract String getLedgerPath(long ledgerId); + public abstract String getLedgerPath(long ledgerId); /** * Get ledger id from its znode ledger path. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java index 9cd6aed92e3..b5fd0f21ae8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java @@ -73,7 +73,7 @@ public void processResult(int rc, String path, Object ctx) { } @Override - protected String getLedgerPath(long ledgerId) { + public String getLedgerPath(long ledgerId) { return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index abc2827332a..9de751ecc2f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -629,7 +629,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept LOG.debug("releaseLedger(ledgerId={})", ledgerId); } try { - Lock l = heldLocks.remove(ledgerId); + Lock l = heldLocks.get(ledgerId); if (l != null) { zkc.delete(l.getLockZNode(), -1); } @@ -642,6 +642,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); } + heldLocks.remove(ledgerId); } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index 11531b504ef..09751e99e88 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -34,6 +35,7 @@ import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; @@ -44,11 +46,14 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.AbstractZkLedgerManager; +import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.meta.MetadataBookieDriver; import org.apache.bookkeeper.meta.MetadataClientDriver; import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; @@ -59,8 +64,17 @@ import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; +import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,8 +93,10 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { private MetadataBookieDriver driver; private LedgerManagerFactory mFactory; private LedgerUnderreplicationManager underReplicationManager; + private LedgerManager ledgerManager; private static byte[] data = "TestReplicationWorker".getBytes(); private OrderedScheduler scheduler; + private String zkLedgersRootPath; public TestReplicationWorker() { this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); @@ -100,7 +116,7 @@ public TestReplicationWorker() { public void setUp() throws Exception { super.setUp(); - String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf); + zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf); basePath = zkLedgersRootPath + '/' + BookKeeperConstants.UNDER_REPLICATION_NODE + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; @@ -121,6 +137,7 @@ public void setUp() throws Exception { NullStatsLogger.INSTANCE); // initialize urReplicationManager mFactory = driver.getLedgerManagerFactory(); + ledgerManager = mFactory.newLedgerManager(); underReplicationManager = mFactory.newLedgerUnderreplicationManager(); } @@ -857,4 +874,210 @@ private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId, } } + class MockZooKeeperClient extends ZooKeeperClient { + private final String connectString; + private final int sessionTimeoutMs; + private final ZooKeeperWatcherBase watcherManager; + private volatile String pathOfSetDataToFail; + private volatile String pathOfDeleteToFail; + private AtomicInteger numOfTimesSetDataFailed = new AtomicInteger(); + private AtomicInteger numOfTimesDeleteFailed = new AtomicInteger(); + + MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher) + throws IOException { + /* + * in OperationalRetryPolicy maxRetries is set to 0. So it wont + * retry incase of any error/exception. + */ + super(connectString, sessionTimeoutMs, watcher, + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE), + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0), + NullStatsLogger.INSTANCE, 1, 0, false); + this.connectString = connectString; + this.sessionTimeoutMs = sessionTimeoutMs; + this.watcherManager = watcher; + } + + @Override + protected ZooKeeper createZooKeeper() throws IOException { + return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, this.watcherManager, false); + } + + private void setPathOfSetDataToFail(String pathOfSetDataToFail) { + this.pathOfSetDataToFail = pathOfSetDataToFail; + } + + private void setPathOfDeleteToFail(String pathOfDeleteToFail) { + this.pathOfDeleteToFail = pathOfDeleteToFail; + } + + private int getNumOfTimesSetDataFailed() { + return numOfTimesSetDataFailed.get(); + } + + private int getNumOfTimesDeleteFailed() { + return numOfTimesDeleteFailed.get(); + } + + class MockZooKeeper extends ZooKeeper { + public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) + throws IOException { + super(connectString, sessionTimeout, watcher, canBeReadOnly); + } + + @Override + public void setData(final String path, final byte[] data, final int version, final StatCallback cb, + final Object context) { + if ((pathOfSetDataToFail != null) && (pathOfSetDataToFail.equals(path))) { + /* + * if pathOfSetDataToFail matches with the path of the node, + * then callback with CONNECTIONLOSS error. + */ + LOG.error("setData of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path); + numOfTimesSetDataFailed.incrementAndGet(); + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, context, null); + } else { + super.setData(path, data, version, cb, context); + } + } + + @Override + public void delete(final String path, final int version) throws KeeperException, InterruptedException { + if ((pathOfDeleteToFail != null) && (pathOfDeleteToFail.equals(path))) { + /* + * if pathOfDeleteToFail matches with the path of the node, + * then throw CONNECTIONLOSS exception. + */ + LOG.error("delete of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path); + numOfTimesDeleteFailed.incrementAndGet(); + throw new KeeperException.ConnectionLossException(); + } else { + super.delete(path, version); + } + } + } + } + + @Test + public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception { + /* + * create MockZooKeeperClient instance and wait for it to be connected. + */ + int zkSessionTimeOut = 10000; + ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, + NullStatsLogger.INSTANCE); + MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(), + zkSessionTimeOut, zooKeeperWatcherBase); + zkFaultInjectionWrapper.waitForConnection(); + assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED, + zkFaultInjectionWrapper.getState()); + long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId(); + + /* + * create ledger and add entries. + */ + BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper); + long ledgerId = 567L; + LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD, + null); + for (int i = 0; i < 10; i++) { + lh.addEntry(i, data); + } + lh.close(); + + /* + * trigger Expired event so that MockZooKeeperClient would run + * 'clientCreator' and create new zk handle. In this case it would + * create MockZooKeeper instance. + */ + zooKeeperWatcherBase.process(new WatchedEvent(EventType.None, KeeperState.Expired, "")); + zkFaultInjectionWrapper.waitForConnection(); + for (int i = 0; i < 10; i++) { + if (zkFaultInjectionWrapper.getState() == States.CONNECTED) { + break; + } + Thread.sleep(200); + } + assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED, + zkFaultInjectionWrapper.getState()); + assertNotEquals("Session Id of old and new ZK instance should be different", oldZkInstanceSessionId, + zkFaultInjectionWrapper.getSessionId()); + + /* + * Kill a Bookie, so that ledger becomes underreplicated. Since totally + * 3 bookies are available and the ensemblesize of the current ledger is + * 2, we should be able to replicate to the other bookie. + */ + BookieSocketAddress replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + LOG.info("Killing Bookie", replicaToKill); + killBookie(replicaToKill); + + /* + * Start RW. + */ + ReplicationWorker rw = new ReplicationWorker(baseConf, bkWithMockZK, false, NullStatsLogger.INSTANCE); + rw.start(); + try { + for (int i = 0; i < 40; i++) { + if (rw.isRunning()) { + break; + } + LOG.info("Waiting for the RW to start..."); + Thread.sleep(500); + } + assertTrue("RW should be running", rw.isRunning()); + + /* + * Since Auditor is not running, ledger needs to be marked + * underreplicated explicitly. But before marking ledger + * underreplicated, set paths for which MockZooKeeper's setData and + * Delete operation to fail. + * + * ZK.setData will be called by 'updateEnsembleInfo' operation after + * completion of copying to a new bookie. ZK.delete will be called by + * RW.logBKExceptionAndReleaseLedger and finally block in + * 'rereplicate(long ledgerIdToReplicate)' + */ + AbstractZkLedgerManager absZKLedgerManager = (AbstractZkLedgerManager) ledgerManager; + String ledgerPath = absZKLedgerManager.getLedgerPath(ledgerId); + String urLockPath = ZkLedgerUnderreplicationManager + .getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId); + zkFaultInjectionWrapper.setPathOfSetDataToFail(ledgerPath); + zkFaultInjectionWrapper.setPathOfDeleteToFail(urLockPath); + underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString()); + + /* + * Since there is only one RW, it will try to replicate underreplicated + * ledger. After completion of copying it to a new bookie, it will try + * to update ensembleinfo. Which would fail with our MockZK. After that + * it would try to delete lock znode as part of + * RW.logBKExceptionAndReleaseLedger, which will also fail because of + * our MockZK. In the finally block in 'rereplicate(long + * ledgerIdToReplicate)' it would try one more time to delete the ledger + * and once again it will fail because of our MockZK. So RW gives up and + * shutdowns itself. + */ + for (int i = 0; i < 40; i++) { + if (!rw.isRunning()) { + break; + } + LOG.info("Waiting for the RW to shutdown..."); + Thread.sleep(500); + } + + /* + * as described earlier, numOfTimes setDataFailed should be 1 and + * numOfTimes deleteFailed should be 2 + */ + assertEquals("NumOfTimesSetDataFailed", 1, + zkFaultInjectionWrapper.getNumOfTimesSetDataFailed()); + assertEquals("NumOfTimesDeleteFailed", 2, + zkFaultInjectionWrapper.getNumOfTimesDeleteFailed()); + assertFalse("RW should be shutdown", rw.isRunning()); + } finally { + rw.shutdown(); + zkFaultInjectionWrapper.close(); + bkWithMockZK.close(); + } + } }