Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
* Ledger ID
* @return ledger node path
*/
protected abstract String getLedgerPath(long ledgerId);
public abstract String getLedgerPath(long ledgerId);

/**
* Get ledger id from its znode ledger path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void processResult(int rc, String path, Object ctx) {
}

@Override
protected String getLedgerPath(long ledgerId) {
public String getLedgerPath(long ledgerId) {
return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept
LOG.debug("releaseLedger(ledgerId={})", ledgerId);
}
try {
Lock l = heldLocks.remove(ledgerId);
Lock l = heldLocks.get(ledgerId);
if (l != null) {
zkc.delete(l.getLockZNode(), -1);
}
Expand All @@ -642,6 +642,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
}
heldLocks.remove(ledgerId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -34,6 +35,7 @@
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Cleanup;

Expand All @@ -44,11 +46,14 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
Expand All @@ -59,8 +64,17 @@
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -79,8 +93,10 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
private MetadataBookieDriver driver;
private LedgerManagerFactory mFactory;
private LedgerUnderreplicationManager underReplicationManager;
private LedgerManager ledgerManager;
private static byte[] data = "TestReplicationWorker".getBytes();
private OrderedScheduler scheduler;
private String zkLedgersRootPath;

public TestReplicationWorker() {
this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
Expand All @@ -100,7 +116,7 @@ public TestReplicationWorker() {
public void setUp() throws Exception {
super.setUp();

String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf);
zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf);
basePath = zkLedgersRootPath + '/'
+ BookKeeperConstants.UNDER_REPLICATION_NODE
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
Expand All @@ -121,6 +137,7 @@ public void setUp() throws Exception {
NullStatsLogger.INSTANCE);
// initialize urReplicationManager
mFactory = driver.getLedgerManagerFactory();
ledgerManager = mFactory.newLedgerManager();
underReplicationManager = mFactory.newLedgerUnderreplicationManager();
}

Expand Down Expand Up @@ -857,4 +874,210 @@ private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
}
}

class MockZooKeeperClient extends ZooKeeperClient {
private final String connectString;
private final int sessionTimeoutMs;
private final ZooKeeperWatcherBase watcherManager;
private volatile String pathOfSetDataToFail;
private volatile String pathOfDeleteToFail;
private AtomicInteger numOfTimesSetDataFailed = new AtomicInteger();
private AtomicInteger numOfTimesDeleteFailed = new AtomicInteger();

MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher)
throws IOException {
/*
* in OperationalRetryPolicy maxRetries is set to 0. So it wont
* retry incase of any error/exception.
*/
super(connectString, sessionTimeoutMs, watcher,
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0),
NullStatsLogger.INSTANCE, 1, 0, false);
this.connectString = connectString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.watcherManager = watcher;
}

@Override
protected ZooKeeper createZooKeeper() throws IOException {
return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, this.watcherManager, false);
}

private void setPathOfSetDataToFail(String pathOfSetDataToFail) {
this.pathOfSetDataToFail = pathOfSetDataToFail;
}

private void setPathOfDeleteToFail(String pathOfDeleteToFail) {
this.pathOfDeleteToFail = pathOfDeleteToFail;
}

private int getNumOfTimesSetDataFailed() {
return numOfTimesSetDataFailed.get();
}

private int getNumOfTimesDeleteFailed() {
return numOfTimesDeleteFailed.get();
}

class MockZooKeeper extends ZooKeeper {
public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
throws IOException {
super(connectString, sessionTimeout, watcher, canBeReadOnly);
}

@Override
public void setData(final String path, final byte[] data, final int version, final StatCallback cb,
final Object context) {
if ((pathOfSetDataToFail != null) && (pathOfSetDataToFail.equals(path))) {
/*
* if pathOfSetDataToFail matches with the path of the node,
* then callback with CONNECTIONLOSS error.
*/
LOG.error("setData of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path);
numOfTimesSetDataFailed.incrementAndGet();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, context, null);
} else {
super.setData(path, data, version, cb, context);
}
}

@Override
public void delete(final String path, final int version) throws KeeperException, InterruptedException {
if ((pathOfDeleteToFail != null) && (pathOfDeleteToFail.equals(path))) {
/*
* if pathOfDeleteToFail matches with the path of the node,
* then throw CONNECTIONLOSS exception.
*/
LOG.error("delete of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", path);
numOfTimesDeleteFailed.incrementAndGet();
throw new KeeperException.ConnectionLossException();
} else {
super.delete(path, version);
}
}
}
}

@Test
public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
/*
* create MockZooKeeperClient instance and wait for it to be connected.
*/
int zkSessionTimeOut = 10000;
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase);
zkFaultInjectionWrapper.waitForConnection();
assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
zkFaultInjectionWrapper.getState());
long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();

/*
* create ledger and add entries.
*/
BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper);
long ledgerId = 567L;
LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD,
null);
for (int i = 0; i < 10; i++) {
lh.addEntry(i, data);
}
lh.close();

/*
* trigger Expired event so that MockZooKeeperClient would run
* 'clientCreator' and create new zk handle. In this case it would
* create MockZooKeeper instance.
*/
zooKeeperWatcherBase.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
zkFaultInjectionWrapper.waitForConnection();
for (int i = 0; i < 10; i++) {
if (zkFaultInjectionWrapper.getState() == States.CONNECTED) {
break;
}
Thread.sleep(200);
}
assertEquals("zkFaultInjectionWrapper should be in connected state", States.CONNECTED,
zkFaultInjectionWrapper.getState());
assertNotEquals("Session Id of old and new ZK instance should be different", oldZkInstanceSessionId,
zkFaultInjectionWrapper.getSessionId());

/*
* Kill a Bookie, so that ledger becomes underreplicated. Since totally
* 3 bookies are available and the ensemblesize of the current ledger is
* 2, we should be able to replicate to the other bookie.
*/
BookieSocketAddress replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

/*
* Start RW.
*/
ReplicationWorker rw = new ReplicationWorker(baseConf, bkWithMockZK, false, NullStatsLogger.INSTANCE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to shutdown this object and the other ones in a finallyblock in order to teardown properly the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli fixed it

rw.start();
try {
for (int i = 0; i < 40; i++) {
if (rw.isRunning()) {
break;
}
LOG.info("Waiting for the RW to start...");
Thread.sleep(500);
}
assertTrue("RW should be running", rw.isRunning());

/*
* Since Auditor is not running, ledger needs to be marked
* underreplicated explicitly. But before marking ledger
* underreplicated, set paths for which MockZooKeeper's setData and
* Delete operation to fail.
*
* ZK.setData will be called by 'updateEnsembleInfo' operation after
* completion of copying to a new bookie. ZK.delete will be called by
* RW.logBKExceptionAndReleaseLedger and finally block in
* 'rereplicate(long ledgerIdToReplicate)'
*/
AbstractZkLedgerManager absZKLedgerManager = (AbstractZkLedgerManager) ledgerManager;
String ledgerPath = absZKLedgerManager.getLedgerPath(ledgerId);
String urLockPath = ZkLedgerUnderreplicationManager
.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId);
zkFaultInjectionWrapper.setPathOfSetDataToFail(ledgerPath);
zkFaultInjectionWrapper.setPathOfDeleteToFail(urLockPath);
underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());

/*
* Since there is only one RW, it will try to replicate underreplicated
* ledger. After completion of copying it to a new bookie, it will try
* to update ensembleinfo. Which would fail with our MockZK. After that
* it would try to delete lock znode as part of
* RW.logBKExceptionAndReleaseLedger, which will also fail because of
* our MockZK. In the finally block in 'rereplicate(long
* ledgerIdToReplicate)' it would try one more time to delete the ledger
* and once again it will fail because of our MockZK. So RW gives up and
* shutdowns itself.
*/
for (int i = 0; i < 40; i++) {
if (!rw.isRunning()) {
break;
}
LOG.info("Waiting for the RW to shutdown...");
Thread.sleep(500);
}

/*
* as described earlier, numOfTimes setDataFailed should be 1 and
* numOfTimes deleteFailed should be 2
*/
assertEquals("NumOfTimesSetDataFailed", 1,
zkFaultInjectionWrapper.getNumOfTimesSetDataFailed());
assertEquals("NumOfTimesDeleteFailed", 2,
zkFaultInjectionWrapper.getNumOfTimesDeleteFailed());
assertFalse("RW should be shutdown", rw.isRunning());
} finally {
rw.shutdown();
zkFaultInjectionWrapper.close();
bkWithMockZK.close();
}
}
}