Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1383872 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit e353a8f93ad48d8785bbaa7162a8a9ebca40a15f 1 parent 26671da
@ivankelly ivankelly authored
Showing with 606 additions and 117 deletions.
  1. +2 −0  CHANGES.txt
  2. +14 −7 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
  3. +73 −7 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
  4. +25 −11 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
  5. +2 −3 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
  6. +5 −2 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
  7. +10 −3 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
  8. +25 −11 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
  9. +17 −32 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
  10. +3 −3 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
  11. +30 −15 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
  12. +2 −0  bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
  13. +70 −13 bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
  14. +2 −0  bookkeeper-server/src/main/proto/DataFormats.proto
  15. +1 −1  bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
  16. +64 −0 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
  17. +208 −0 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
  18. +2 −2 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
  19. +1 −2  bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
  20. +1 −1  bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
  21. +49 −4 bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
View
2  CHANGES.txt
@@ -132,6 +132,8 @@ Trunk (unreleased changes)
BOOKKEEPER-300: Create Bookie format command (Vinay via sijie)
+ BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
View
21 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
@@ -78,7 +78,7 @@
}
}
- public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
+ public BenchThroughputLatency(int ensemble, int writeQuorumSize, int ackQuorumSize, byte[] passwd,
int numberOfLedgers, int sendLimit, ClientConfiguration conf)
throws KeeperException, IOException, InterruptedException {
this.sem = new Semaphore(conf.getThrottleValue());
@@ -91,9 +91,11 @@ public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
lh = new LedgerHandle[this.numberOfLedgers];
for(int i = 0; i < this.numberOfLedgers; i++) {
- lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
+ lh[i] = bk.createLedger(ensemble, writeQuorumSize,
+ ackQuorumSize,
+ BookKeeper.DigestType.CRC32,
passwd);
- LOG.info("Ledger Handle: " + lh[i].getId());
+ LOG.debug("Ledger Handle: " + lh[i].getId());
}
} catch (BKException e) {
e.printStackTrace();
@@ -233,6 +235,7 @@ public static void main(String[] args)
options.addOption("entrysize", true, "Entry size (bytes), default 1024");
options.addOption("ensemble", true, "Ensemble size, default 3");
options.addOption("quorum", true, "Quorum size, default 2");
+ options.addOption("ackQuorum", true, "Ack quorum size, default is same as quorum");
options.addOption("throttle", true, "Max outstanding requests, default 10000");
options.addOption("ledgers", true, "Number of ledgers, default 1");
options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
@@ -261,6 +264,10 @@ public static void main(String[] args)
int ledgers = Integer.valueOf(cmd.getOptionValue("ledgers", "1"));
int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
+ int ackQuorum = quorum;
+ if (cmd.hasOption("ackQuorum")) {
+ ackQuorum = Integer.valueOf(cmd.getOptionValue("ackQuorum"));
+ }
int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
int sendLimit = Integer.valueOf(cmd.getOptionValue("sendlimit", "20000000"));
@@ -313,8 +320,8 @@ public void run() {
// Now do the benchmark
- BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd,
- ledgers, sendLimit, conf);
+ BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, ackQuorum,
+ passwd, ledgers, sendLimit, conf);
bench.setEntryData(data);
thread = new Thread(bench);
ZooKeeper zk = null;
@@ -439,8 +446,8 @@ public void process(WatchedEvent event) {
}
}
- BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
- ledgers, 50000, conf);
+ BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, bookies, passwd,
+ ledgers, 10000, conf);
warmup.setEntryData(data);
Thread thread = new Thread(warmup);
thread.start();
View
80 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -256,9 +256,47 @@ BookieClient getBookieClient() {
* authenticate access to a ledger, but also to verify entries in ledgers.
*
* @param ensSize
- * ensemble size
- * @param qSize
- * quorum size
+ * number of bookies over which to stripe entries
+ * @param writeQuorumSize
+ * number of bookies each entry will be written to. each of these bookies
+ * must acknowledge the entry before the call is completed.
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param cb
+ * createCallback implementation
+ * @param ctx
+ * optional control object
+ */
+ public void asyncCreateLedger(final int ensSize,
+ final int writeQuorumSize,
+ final DigestType digestType,
+ final byte[] passwd, final CreateCallback cb, final Object ctx)
+ {
+ asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize, digestType, passwd, cb, ctx);
+ }
+
+ /**
+ * Creates a new ledger asynchronously. Ledgers created with this call have
+ * a separate write quorum and ack quorum size. The write quorum must be larger than
+ * the ack quorum.
+ *
+ * Separating the write and the ack quorum allows the BookKeeper client to continue
+ * writing when a bookie has failed but the failure has not yet been detected. Detecting
+ * a bookie has failed can take a number of seconds, as configured by the read timeout
+ * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
+ * that bookie will be removed from the ensemble.
+ *
+ * The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ *
+ * @param ensSize
+ * number of bookies over which to stripe entries
+ * @param writeQuorumSize
+ * number of bookies each entry will be written to
+ * @param ackQuorumSize
+ * number of bookies which must acknowledge an entry before the call is completed
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
@@ -268,9 +306,17 @@ BookieClient getBookieClient() {
* @param ctx
* optional control object
*/
- public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
+
+ public void asyncCreateLedger(final int ensSize,
+ final int writeQuorumSize,
+ final int ackQuorumSize,
+ final DigestType digestType,
final byte[] passwd, final CreateCallback cb, final Object ctx) {
- new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+ if (writeQuorumSize < ackQuorumSize) {
+ throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
+ }
+ new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+ ackQuorumSize, digestType, passwd, cb, ctx)
.initiate();
}
@@ -305,14 +351,34 @@ public LedgerHandle createLedger(DigestType digestType, byte passwd[])
* @throws BKException
*/
public LedgerHandle createLedger(int ensSize, int qSize,
- DigestType digestType, byte passwd[])
+ DigestType digestType, byte passwd[])
+ throws InterruptedException, BKException {
+ return createLedger(ensSize, qSize, qSize, digestType, passwd);
+ }
+
+ /**
+ * Synchronous call to create ledger. Parameters match those of
+ * {@link #asyncCreateLedger(int, int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ *
+ * @param ensSize
+ * @param writeQuorumSize
+ * @param ackQuorumSize
+ * @param digestType
+ * @param passwd
+ * @return a handle to the newly created ledger
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
+ DigestType digestType, byte passwd[])
throws InterruptedException, BKException {
SyncCounter counter = new SyncCounter();
counter.inc();
/*
* Calls asynchronous version
*/
- asyncCreateLedger(ensSize, qSize, digestType, passwd,
+ asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
new SyncCreateCallback(), counter);
/*
View
36 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import java.util.List;
/**
* This interface determins how entries are distributed among bookies.
*
@@ -30,21 +31,34 @@
interface DistributionSchedule {
/**
- *
- * @param entryId
- * @param replicaIndex
- * @return index of bookie that should get this replica
+ * return the set of bookie indices to send the message to
*/
- public int getBookieIndex(long entryId, int replicaIndex);
+ public List<Integer> getWriteSet(long entryId);
/**
- *
- * @param entryId
- * @param bookieIndex
- * @return -1 if the given bookie index is not a replica for the given
- * entryId
+ * An ack set represents the set of bookies from which
+ * a response must be received so that an entry can be
+ * considered to be replicated on a quorum.
*/
- public int getReplicaIndex(long entryId, int bookieIndex);
+ public interface AckSet {
+ /**
+ * Add a bookie response and check if quorum has been met
+ * @return true if quorum has been met, false otherwise
+ */
+ public boolean addBookieAndCheck(int bookieIndexHeardFrom);
+
+ /**
+ * Invalidate a previous bookie response.
+ * Used for reissuing write requests.
+ */
+ public void removeBookie(int bookie);
+ }
+
+ /**
+ * Returns an ackset object, responses should be checked against this
+ */
+ public AckSet getAckSet();
+
/**
* Interface to keep track of which bookies in an ensemble, an action
View
5 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -222,7 +222,7 @@ public void checkLedger(LedgerHandle lh,
final long entryToRead = curEntryId;
EntryExistsCallback eecb
- = new EntryExistsCallback(lh.getLedgerMetadata().getQuorumSize(),
+ = new EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
new GenericCallback<Boolean>() {
public void operationComplete(int rc, Boolean result) {
if (result) {
@@ -232,8 +232,7 @@ public void operationComplete(int rc, Boolean result) {
}
});
- for (int i = 0; i < lh.getLedgerMetadata().getQuorumSize(); i++) {
- int bi = lh.getDistributionSchedule().getBookieIndex(entryToRead, i);
+ for (int bi : lh.getDistributionSchedule().getWriteSet(entryToRead)) {
InetSocketAddress addr = curEnsemble.get(bi);
bookieClient.readEntry(addr, lh.getId(),
entryToRead, eecb, null);
View
7 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -68,9 +68,12 @@
* optional control object
*/
- LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
+ LedgerCreateOp(BookKeeper bk, int ensembleSize,
+ int writeQuorumSize, int ackQuorumSize,
+ DigestType digestType,
+ byte[] passwd, CreateCallback cb, Object ctx) {
this.bk = bk;
- this.metadata = new LedgerMetadata(ensembleSize, quorumSize, digestType, passwd);
+ this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd);
this.digestType = digestType;
this.passwd = passwd;
this.cb = cb;
View
13 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -29,6 +29,7 @@
import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException;
@@ -74,6 +75,7 @@
*/
final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID;
+ final AtomicInteger blockAddCompletions = new AtomicInteger(0);
final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
@@ -98,7 +100,7 @@
macManager = DigestManager.instantiate(ledgerId, password, digestType);
this.ledgerKey = MacDigestManager.genDigest("ledger", password);
distributionSchedule = new RoundRobinDistributionSchedule(
- metadata.getQuorumSize(), metadata.getEnsembleSize());
+ metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
}
/**
@@ -641,8 +643,9 @@ void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;
- while ((pendingAddOp = pendingAddOps.peek()) != null) {
- if (pendingAddOp.numResponsesPending != 0) {
+ while ((pendingAddOp = pendingAddOps.peek()) != null
+ && blockAddCompletions.get() == 0) {
+ if (!pendingAddOp.completed) {
return;
}
pendingAddOps.remove();
@@ -660,6 +663,7 @@ void handleBookieFailure(final InetSocketAddress addr, final int bookieIndex) {
+ bookieIndex);
}
final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>();
+ blockAddCompletions.incrementAndGet();
final long newEnsembleStartEntry = lastAddConfirmed + 1;
// avoid parallel ensemble changes to same ensemble.
@@ -735,6 +739,8 @@ public void safeRun() {
handleUnrecoverableErrorDuringAdd(rc);
return;
}
+ blockAddCompletions.decrementAndGet();
+
// the failed bookie has been replaced
unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
}
@@ -815,6 +821,7 @@ private boolean resolveConflict(LedgerMetadata newMeta) {
}
} else {
// the failed bookie has been replaced
+ blockAddCompletions.decrementAndGet();
unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
}
return true;
View
36 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -61,7 +61,8 @@
private int metadataFormatVersion = 0;
private int ensembleSize;
- private int quorumSize;
+ private int writeQuorumSize;
+ private int ackQuorumSize;
private long length;
private long lastEntryId;
@@ -74,10 +75,11 @@
private LedgerMetadataFormat.DigestType digestType;
private byte[] password;
- public LedgerMetadata(int ensembleSize, int quorumSize,
+ public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
BookKeeper.DigestType digestType, byte[] password) {
this.ensembleSize = ensembleSize;
- this.quorumSize = quorumSize;
+ this.writeQuorumSize = writeQuorumSize;
+ this.ackQuorumSize = ackQuorumSize;
/*
* It is set in PendingReadOp.readEntryComplete, and
@@ -95,7 +97,7 @@ public LedgerMetadata(int ensembleSize, int quorumSize,
}
private LedgerMetadata() {
- this(0, 0, BookKeeper.DigestType.MAC, new byte[] {});
+ this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {});
this.hasPassword = false;
}
@@ -114,8 +116,12 @@ public int getEnsembleSize() {
return ensembleSize;
}
- public int getQuorumSize() {
- return quorumSize;
+ public int getWriteQuorumSize() {
+ return writeQuorumSize;
+ }
+
+ public int getAckQuorumSize() {
+ return ackQuorumSize;
}
/**
@@ -217,7 +223,8 @@ long getNextEnsembleChange(long entryId) {
return serializeVersion1();
}
LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
- builder.setQuorumSize(quorumSize).setEnsembleSize(ensembleSize).setLength(length)
+ builder.setQuorumSize(writeQuorumSize).setAckQuorumSize(ackQuorumSize)
+ .setEnsembleSize(ensembleSize).setLength(length)
.setState(state).setLastEntryId(lastEntryId);
if (hasPassword) {
@@ -245,7 +252,7 @@ long getNextEnsembleChange(long entryId) {
private byte[] serializeVersion1() {
StringBuilder s = new StringBuilder();
s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
- s.append(quorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
+ s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : ensembles.entrySet()) {
s.append(lSplitter).append(entry.getKey());
@@ -321,7 +328,13 @@ public static LedgerMetadata parseConfig(byte[] bytes, Version version) throws I
LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
TextFormat.merge(reader, builder);
LedgerMetadataFormat data = builder.build();
- lc.quorumSize = data.getQuorumSize();
+ lc.writeQuorumSize = data.getQuorumSize();
+ if (data.hasAckQuorumSize()) {
+ lc.ackQuorumSize = data.getAckQuorumSize();
+ } else {
+ lc.ackQuorumSize = lc.writeQuorumSize;
+ }
+
lc.ensembleSize = data.getEnsembleSize();
lc.length = data.getLength();
lc.state = data.getState();
@@ -346,7 +359,7 @@ public static LedgerMetadata parseConfig(byte[] bytes, Version version) throws I
static LedgerMetadata parseVersion1Config(LedgerMetadata lc,
BufferedReader reader) throws IOException {
try {
- lc.quorumSize = new Integer(reader.readLine());
+ lc.writeQuorumSize = lc.ackQuorumSize = new Integer(reader.readLine());
lc.ensembleSize = new Integer(reader.readLine());
lc.length = new Long(reader.readLine());
@@ -413,7 +426,8 @@ boolean resolveConflict(LedgerMetadata newMeta) {
if (metadataFormatVersion != newMeta.metadataFormatVersion ||
ensembleSize != newMeta.ensembleSize ||
- quorumSize != newMeta.quorumSize ||
+ writeQuorumSize != newMeta.writeQuorumSize ||
+ ackQuorumSize != newMeta.ackQuorumSize ||
length != newMeta.length ||
state != newMeta.state ||
!digestType.equals(newMeta.digestType) ||
View
49 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -42,8 +42,10 @@
AddCallback cb;
Object ctx;
long entryId;
- boolean[] successesSoFar;
- int numResponsesPending;
+
+ DistributionSchedule.AckSet ackSet;
+ boolean completed = false;
+
LedgerHandle lh;
boolean isRecoveryAdd = false;
@@ -53,11 +55,10 @@
this.ctx = ctx;
this.entryId = LedgerHandle.INVALID_ENTRY_ID;
- successesSoFar = new boolean[lh.metadata.getQuorumSize()];
- numResponsesPending = successesSoFar.length;
+ ackSet = lh.distributionSchedule.getAckSet();
}
- /**
+ /**
* Enable the recovery add flag for this operation.
* @see LedgerHandle#asyncRecoveryAddEntry
*/
@@ -70,11 +71,11 @@ void setEntryId(long entryId) {
this.entryId = entryId;
}
- void sendWriteRequest(int bookieIndex, int arrayIndex) {
+ void sendWriteRequest(int bookieIndex) {
int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE;
lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
- this, arrayIndex, flags);
+ this, bookieIndex, flags);
}
void unsetSuccessAndSendWriteRequest(int bookieIndex) {
@@ -85,15 +86,6 @@ void unsetSuccessAndSendWriteRequest(int bookieIndex) {
return;
}
- int replicaIndex = lh.distributionSchedule.getReplicaIndex(entryId, bookieIndex);
- if (replicaIndex < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Leaving unchanged, ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
- + bookieIndex);
- }
- return;
- }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
+ bookieIndex);
@@ -102,27 +94,22 @@ void unsetSuccessAndSendWriteRequest(int bookieIndex) {
// if we had already heard a success from this array index, need to
// increment our number of responses that are pending, since we are
// going to unset this success
- if (successesSoFar[replicaIndex]) {
- successesSoFar[replicaIndex] = false;
- numResponsesPending++;
- }
+ ackSet.removeBookie(bookieIndex);
+ completed = false;
- sendWriteRequest(bookieIndex, replicaIndex);
+ sendWriteRequest(bookieIndex);
}
void initiate(ChannelBuffer toSend) {
this.toSend = toSend;
- for (int i = 0; i < successesSoFar.length; i++) {
- int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, i);
- sendWriteRequest(bookieIndex, i);
+ for (int bookieIndex : lh.distributionSchedule.getWriteSet(entryId)) {
+ sendWriteRequest(bookieIndex);
}
}
@Override
public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
-
- Integer replicaIndex = (Integer) ctx;
- int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, replicaIndex);
+ int bookieIndex = (Integer) ctx;
if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
@@ -148,14 +135,12 @@ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress
return;
}
-
- if (!successesSoFar[replicaIndex]) {
- successesSoFar[replicaIndex] = true;
- numResponsesPending--;
+ if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+ completed = true;
// do some quick checks to see if some adds may have finished. All
// this will be checked under locks again
- if (numResponsesPending == 0 && lh.pendingAddOps.peek() == this) {
+ if (lh.pendingAddOps.peek() == this) {
lh.sendAddSuccessCallbacks();
}
}
View
6 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -94,7 +94,7 @@ public void initiate() throws InterruptedException {
}
void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
- if (entry.nextReplicaIndexToReadFrom >= lh.metadata.getQuorumSize()) {
+ if (entry.nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
lh.opCounterSem.release();
@@ -102,7 +102,7 @@ void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int last
return;
}
- int bookieIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom);
+ int bookieIndex = lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom);
entry.nextReplicaIndexToReadFrom++;
lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), lh.ledgerId, entry.entryId,
this, entry);
@@ -110,7 +110,7 @@ void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int last
void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
ArrayList<InetSocketAddress> ensemble = lh.metadata.getEnsemble(entry.entryId);
- int bookeIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom - 1);
+ int bookeIndex = lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom - 1);
LOG.error(errMsg + " while reading entry: " + entry.entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
+ ensemble.get(bookeIndex));
sendRead(ensemble, entry, rc);
View
45 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -19,6 +19,10 @@
import org.apache.bookkeeper.util.MathUtils;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+
/**
* A specific {@link DistributionSchedule} that places entries in round-robin
* fashion. For ensemble size 3, and quorum size 2, Entry 0 goes to bookie 0 and
@@ -27,29 +31,39 @@
*
*/
class RoundRobinDistributionSchedule implements DistributionSchedule {
- int quorumSize;
- int ensembleSize;
+ private int writeQuorumSize;
+ private int ackQuorumSize;
+ private int ensembleSize;
- public RoundRobinDistributionSchedule(int quorumSize, int ensembleSize) {
- this.quorumSize = quorumSize;
+ public RoundRobinDistributionSchedule(int writeQuorumSize, int ackQuorumSize, int ensembleSize) {
+ this.writeQuorumSize = writeQuorumSize;
+ this.ackQuorumSize = ackQuorumSize;
this.ensembleSize = ensembleSize;
}
@Override
- public int getBookieIndex(long entryId, int replicaIndex) {
- return (int) ((entryId + replicaIndex) % ensembleSize);
+ public List<Integer> getWriteSet(long entryId) {
+ List<Integer> set = new ArrayList<Integer>();
+ for (int i = 0; i < this.writeQuorumSize; i++) {
+ set.add((int)((entryId + i) % ensembleSize));
+ }
+ return set;
}
@Override
- public int getReplicaIndex(long entryId, int bookieIndex) {
- // NOTE: Java's % operator returns the sign of the dividend and is hence
- // not always positive
-
- int replicaIndex = MathUtils.signSafeMod(bookieIndex - entryId, ensembleSize);
-
- return replicaIndex < quorumSize ? replicaIndex : -1;
+ public AckSet getAckSet() {
+ final HashSet<Integer> ackSet = new HashSet<Integer>();
+ return new AckSet() {
+ public boolean addBookieAndCheck(int bookieIndexHeardFrom) {
+ ackSet.add(bookieIndexHeardFrom);
+ return ackSet.size() >= ackQuorumSize;
+ }
+ public void removeBookie(int bookie) {
+ ackSet.remove(bookie);
+ }
+ };
}
private class RRQuorumCoverageSet implements QuorumCoverageSet {
@@ -68,7 +82,7 @@ public synchronized boolean addBookieAndCheckCovered(int bookieIndexHeardFrom) {
return true;
}
- for (int i = 0; i < quorumSize; i++) {
+ for (int i = 0; i < ackQuorumSize; i++) {
int quorumStartIndex = MathUtils.signSafeMod(bookieIndexHeardFrom - i, ensembleSize);
if (!covered[quorumStartIndex]) {
covered[quorumStartIndex] = true;
@@ -83,12 +97,13 @@ public synchronized boolean addBookieAndCheckCovered(int bookieIndexHeardFrom) {
}
}
+ @Override
public QuorumCoverageSet getCoverageSet() {
return new RRQuorumCoverageSet();
}
@Override
public boolean hasEntry(long entryId, int bookieIndex) {
- return getReplicaIndex(entryId, bookieIndex) != -1;
+ return getWriteSet(entryId).contains(bookieIndex);
}
}
View
2  bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -230,6 +230,8 @@ public ClientConfiguration setZkTimeout(int zkTimeout) {
* seconds we wait without hearing a response from a bookie
* before we consider it failed.
*
+ * The default is 5 seconds.
+ *
* @return the current read timeout in seconds
*/
public int getReadTimeout() {
View
83 bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
@@ -48,6 +48,10 @@ public static void registerAllExtensions(
// optional bytes password = 8;
boolean hasPassword();
com.google.protobuf.ByteString getPassword();
+
+ // optional int32 ackQuorumSize = 9;
+ boolean hasAckQuorumSize();
+ int getAckQuorumSize();
}
public static final class LedgerMetadataFormat extends
com.google.protobuf.GeneratedMessage
@@ -767,6 +771,16 @@ public boolean hasPassword() {
return password_;
}
+ // optional int32 ackQuorumSize = 9;
+ public static final int ACKQUORUMSIZE_FIELD_NUMBER = 9;
+ private int ackQuorumSize_;
+ public boolean hasAckQuorumSize() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ public int getAckQuorumSize() {
+ return ackQuorumSize_;
+ }
+
private void initFields() {
quorumSize_ = 0;
ensembleSize_ = 0;
@@ -776,6 +790,7 @@ private void initFields() {
segment_ = java.util.Collections.emptyList();
digestType_ = org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType.CRC32;
password_ = com.google.protobuf.ByteString.EMPTY;
+ ackQuorumSize_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -835,6 +850,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeBytes(8, password_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeInt32(9, ackQuorumSize_);
+ }
getUnknownFields().writeTo(output);
}
@@ -876,6 +894,10 @@ public int getSerializedSize() {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(8, password_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(9, ackQuorumSize_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1021,6 +1043,8 @@ public Builder clear() {
bitField0_ = (bitField0_ & ~0x00000040);
password_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000080);
+ ackQuorumSize_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
@@ -1096,6 +1120,10 @@ public Builder clone() {
to_bitField0_ |= 0x00000040;
}
result.password_ = password_;
+ if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.ackQuorumSize_ = ackQuorumSize_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1159,6 +1187,9 @@ public Builder mergeFrom(org.apache.bookkeeper.proto.DataFormats.LedgerMetadataF
if (other.hasPassword()) {
setPassword(other.getPassword());
}
+ if (other.hasAckQuorumSize()) {
+ setAckQuorumSize(other.getAckQuorumSize());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1265,6 +1296,11 @@ public Builder mergeFrom(
password_ = input.readBytes();
break;
}
+ case 72: {
+ bitField0_ |= 0x00000100;
+ ackQuorumSize_ = input.readInt32();
+ break;
+ }
}
}
}
@@ -1613,6 +1649,27 @@ public Builder clearPassword() {
return this;
}
+ // optional int32 ackQuorumSize = 9;
+ private int ackQuorumSize_ ;
+ public boolean hasAckQuorumSize() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ public int getAckQuorumSize() {
+ return ackQuorumSize_;
+ }
+ public Builder setAckQuorumSize(int value) {
+ bitField0_ |= 0x00000100;
+ ackQuorumSize_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearAckQuorumSize() {
+ bitField0_ = (bitField0_ & ~0x00000100);
+ ackQuorumSize_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:LedgerMetadataFormat)
}
@@ -3176,24 +3233,24 @@ void setInstanceId(com.google.protobuf.ByteString value) {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n src/main/proto/DataFormats.proto\"\233\003\n\024L" +
+ "\n src/main/proto/DataFormats.proto\"\262\003\n\024L" +
"edgerMetadataFormat\022\022\n\nquorumSize\030\001 \002(\005\022" +
"\024\n\014ensembleSize\030\002 \002(\005\022\016\n\006length\030\003 \002(\003\022\023\n" +
"\013lastEntryId\030\004 \001(\003\0220\n\005state\030\005 \002(\0162\033.Ledg" +
"erMetadataFormat.State:\004OPEN\022.\n\007segment\030" +
"\006 \003(\0132\035.LedgerMetadataFormat.Segment\0224\n\n" +
"digestType\030\007 \001(\0162 .LedgerMetadataFormat." +
- "DigestType\022\020\n\010password\030\010 \001(\014\0327\n\007Segment\022" +
- "\026\n\016ensembleMember\030\001 \003(\t\022\024\n\014firstEntryId\030" +
- "\002 \002(\003\".\n\005State\022\010\n\004OPEN\020\001\022\017\n\013IN_RECOVERY\020",
- "\002\022\n\n\006CLOSED\020\003\"!\n\nDigestType\022\t\n\005CRC32\020\001\022\010" +
- "\n\004HMAC\020\002\"@\n\037LedgerRereplicationLayoutFor" +
- "mat\022\014\n\004type\030\001 \002(\t\022\017\n\007version\030\002 \002(\005\".\n\033Un" +
- "derreplicatedLedgerFormat\022\017\n\007replica\030\001 \003" +
- "(\t\"^\n\014CookieFormat\022\022\n\nbookieHost\030\001 \002(\t\022\022" +
- "\n\njournalDir\030\002 \002(\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022" +
- "\n\ninstanceId\030\004 \001(\tB\037\n\033org.apache.bookkee" +
- "per.protoH\001"
+ "DigestType\022\020\n\010password\030\010 \001(\014\022\025\n\rackQuoru" +
+ "mSize\030\t \001(\005\0327\n\007Segment\022\026\n\016ensembleMember" +
+ "\030\001 \003(\t\022\024\n\014firstEntryId\030\002 \002(\003\".\n\005State\022\010\n",
+ "\004OPEN\020\001\022\017\n\013IN_RECOVERY\020\002\022\n\n\006CLOSED\020\003\"!\n\n" +
+ "DigestType\022\t\n\005CRC32\020\001\022\010\n\004HMAC\020\002\"@\n\037Ledge" +
+ "rRereplicationLayoutFormat\022\014\n\004type\030\001 \002(\t" +
+ "\022\017\n\007version\030\002 \002(\005\".\n\033UnderreplicatedLedg" +
+ "erFormat\022\017\n\007replica\030\001 \003(\t\"^\n\014CookieForma" +
+ "t\022\022\n\nbookieHost\030\001 \002(\t\022\022\n\njournalDir\030\002 \002(" +
+ "\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022\n\ninstanceId\030\004 \001(" +
+ "\tB\037\n\033org.apache.bookkeeper.protoH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3205,7 +3262,7 @@ void setInstanceId(com.google.protobuf.ByteString value) {
internal_static_LedgerMetadataFormat_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_LedgerMetadataFormat_descriptor,
- new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", },
+ new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", "AckQuorumSize", },
org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.class,
org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Builder.class);
internal_static_LedgerMetadataFormat_Segment_descriptor =
View
2  bookkeeper-server/src/main/proto/DataFormats.proto
@@ -46,6 +46,8 @@ message LedgerMetadataFormat {
}
optional DigestType digestType = 7;
optional bytes password = 8;
+
+ optional int32 ackQuorumSize = 9;
}
message LedgerRereplicationLayoutFormat {
View
2  bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -464,7 +464,7 @@ private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws E
ranges.put(keyList.get(keyList.size()-1), untilEntry);
for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
- int quorum = md.getQuorumSize();
+ int quorum = md.getAckQuorumSize();
long startEntryId = e.getKey();
long endEntryId = ranges.get(startEntryId);
long expectedSuccess = quorum*(endEntryId-startEntryId);
View
64 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.List;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import junit.framework.TestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RoundRobinDistributionScheduleTest {
+ static Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionScheduleTest.class);
+
+ @Test
+ public void testDistributionSchedule() throws Exception {
+ RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(3, 2, 5);
+
+ List<Integer> wSet = schedule.getWriteSet(1);
+ assertEquals("Write set is wrong size", wSet.size(), 3);
+
+ DistributionSchedule.AckSet ackSet = schedule.getAckSet();
+ assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
+ assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
+ assertTrue("Should ack after 2 unique", ackSet.addBookieAndCheck(wSet.get(2)));
+ assertTrue("Should still be acking", ackSet.addBookieAndCheck(wSet.get(1)));
+
+ DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
+ assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
+
+ covSet = schedule.getCoverageSet();
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(1));
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
+ assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
+
+ covSet = schedule.getCoverageSet();
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(4));
+ assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+ assertTrue("Should cover now", covSet.addBookieAndCheckCovered(2));
+ }
+}
View
208 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.Set;
+import java.util.List;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.net.InetSocketAddress;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import junit.framework.TestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+
+public class SlowBookieTest extends BookKeeperClusterTestCase {
+ static Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
+
+ public SlowBookieTest() {
+ super(4);
+ }
+
+ @Test
+ public void testSlowBookie() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(360);
+
+ BookKeeper bkc = new BookKeeper(conf);
+
+ LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+
+ byte[] entry = "Test Entry".getBytes();
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(entry);
+ }
+ final CountDownLatch b0latch = new CountDownLatch(1);
+ final CountDownLatch b1latch = new CountDownLatch(1);
+ List<InetSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+ try {
+ sleepBookie(curEns.get(0), b0latch);
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(entry);
+ }
+ sleepBookie(curEns.get(2), b1latch); // should cover all quorums
+
+ final AtomicInteger i = new AtomicInteger(0xdeadbeef);
+ AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ i.set(rc);
+ }
+ };
+ lh.asyncAddEntry(entry, cb, null);
+
+ Thread.sleep(1000); // sleep a second to allow time to complete
+ assertEquals(i.get(), 0xdeadbeef);
+ b0latch.countDown();
+ b1latch.countDown();
+ Thread.sleep(2000);
+ assertEquals(i.get(), BKException.Code.OK);
+ } finally {
+ b0latch.countDown();
+ b1latch.countDown();
+ }
+ }
+
+ @Test
+ public void testBookieFailureWithSlowBookie() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(5);
+
+ BookKeeper bkc = new BookKeeper(conf);
+
+ byte[] pwd = new byte[] {};
+ final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
+ long lid = lh.getId();
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicBoolean failTest = new AtomicBoolean(false);
+ final byte[] entry = "Test Entry".getBytes();
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ while (!finished.get()) {
+ lh.addEntry(entry);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in add entry thread", e);
+ failTest.set(true);
+ }
+ }
+ };
+ t.start();
+ final CountDownLatch b0latch = new CountDownLatch(1);
+ startNewBookie();
+ sleepBookie(getBookie(0), b0latch);
+ Thread.sleep(10000);
+ b0latch.countDown();
+ finished.set(true);
+ t.join();
+
+ assertFalse(failTest.get());
+
+ lh.close();
+
+ LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+ LedgerChecker lc = new LedgerChecker(bkc);
+ final CountDownLatch checklatch = new CountDownLatch(1);
+ final AtomicInteger numFragments = new AtomicInteger(-1);
+ lc.checkLedger(lh2, new GenericCallback<Set<LedgerFragment>>() {
+ public void operationComplete(int rc, Set<LedgerFragment> fragments) {
+ LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+ if (rc == BKException.Code.OK) {
+ numFragments.set(fragments.size());
+ }
+ checklatch.countDown();
+ }
+ });
+ checklatch.await();
+ assertEquals("There should be no missing fragments", 0, numFragments.get());
+ }
+
+ @Test
+ public void testManyBookieFailureWithSlowBookies() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(5);
+
+ BookKeeper bkc = new BookKeeper(conf);
+
+ byte[] pwd = new byte[] {};
+ final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
+ long lid = lh.getId();
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicBoolean failTest = new AtomicBoolean(false);
+ final byte[] entry = "Test Entry".getBytes();
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ while (!finished.get()) {
+ lh.addEntry(entry);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in add entry thread", e);
+ failTest.set(true);
+ }
+ }
+ };
+ t.start();
+ final CountDownLatch b0latch = new CountDownLatch(1);
+ final CountDownLatch b1latch = new CountDownLatch(1);
+
+ startNewBookie();
+ startNewBookie();
+
+ sleepBookie(getBookie(0), b0latch);
+ sleepBookie(getBookie(1), b1latch);
+
+ Thread.sleep(10000);
+ b0latch.countDown();
+ b1latch.countDown();
+ finished.set(true);
+ t.join();
+
+ assertFalse(failTest.get());
+
+ lh.close();
+
+ LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+ LedgerChecker lc = new LedgerChecker(bkc);
+ final CountDownLatch checklatch = new CountDownLatch(1);
+ final AtomicInteger numFragments = new AtomicInteger(-1);
+ lc.checkLedger(lh2, new GenericCallback<Set<LedgerFragment>>() {
+ public void operationComplete(int rc, Set<LedgerFragment> fragments) {
+ LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+ if (rc == BKException.Code.OK) {
+ numFragments.set(fragments.size());
+ }
+ checklatch.countDown();
+ }
+ });
+ checklatch.await();
+ assertEquals("There should be no missing fragments", 0, numFragments.get());
+ }
+}
View
4 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -329,7 +329,7 @@ public void testSingleEntryAfterEnsembleChange() throws Exception {
ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
.getEnsembles().get(0L);
InetSocketAddress lastBookieFromEnsemble = firstEnsemble.get(
- lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 0));
+ lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(0));
LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+ firstEnsemble);
killBookie(lastBookieFromEnsemble);
@@ -338,7 +338,7 @@ public void testSingleEntryAfterEnsembleChange() throws Exception {
lh.addEntry(TEST_LEDGER_ENTRY_DATA);
lastBookieFromEnsemble = firstEnsemble.get(
- lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 1));
+ lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(1));
LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+ firstEnsemble);
killBookie(lastBookieFromEnsemble);
View
3  bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
@@ -77,8 +77,7 @@ public void testReadTimeout() throws Exception {
final InetSocketAddress bookieToSleep
= writelh.getLedgerMetadata().getEnsemble(numEntries).get(0);
int sleeptime = baseClientConf.getReadTimeout()*3;
- CountDownLatch latch = new CountDownLatch(1);
- sleepBookie(bookieToSleep, sleeptime, latch);
+ CountDownLatch latch = sleepBookie(bookieToSleep, sleeptime);
latch.await();
writelh.asyncAddEntry(tmp.getBytes(),
View
2  bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -56,7 +56,7 @@ public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
final AtomicInteger expected = new AtomicInteger(numLedgers);
for (int i=0; i<numLedgers; i++) {
- getLedgerManager().createLedger(new LedgerMetadata(1, 1, DigestType.MAC, "".getBytes()),
+ getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()),
new GenericCallback<Long>() {
@Override
public void operationComplete(int rc, Long ledgerId) {
View
53 bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -165,6 +165,17 @@ protected ServerConfiguration newServerConfiguration(int port, String zkServers,
}
/**
+ * Get bookie address for bookie at index
+ */
+ public InetSocketAddress getBookie(int index) throws IllegalArgumentException {
+ if (bs.size() <= index || index < 0) {
+ throw new IllegalArgumentException("Invalid index, there are only " + bs.size()
+ + " bookies. Asked for " + index);
+ }
+ return bs.get(index).getLocalAddress();
+ }
+
+ /**
* Kill a bookie by its socket address
*
* @param addr
@@ -216,14 +227,13 @@ public ServerConfiguration killBookie(int index) throws InterruptedException, IO
* Socket Address
* @param seconds
* Sleep seconds
- * @param l
- * Count Down Latch
+ * @return Count Down latch which will be counted down when sleep finishes
* @throws InterruptedException
* @throws IOException
*/
- public void sleepBookie(InetSocketAddress addr, final int seconds,
- final CountDownLatch l)
+ public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
throws InterruptedException, IOException {
+ final CountDownLatch l = new CountDownLatch(1);
final String name = "BookieJournal-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
@@ -243,6 +253,41 @@ public void run() {
}
};
sleeper.start();
+ return l;
+ }
+ }
+ throw new IOException("Bookie thread not found");
+ }
+
+ /**
+ * Sleep a bookie until I count down the latch
+ *
+ * @param addr
+ * Socket Address
+ * @param latch
+ * Latch to wait on
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
+ throws InterruptedException, IOException {
+ final String name = "BookieJournal-" + addr.getPort();
+ Thread[] allthreads = new Thread[Thread.activeCount()];
+ Thread.enumerate(allthreads);
+ for (final Thread t : allthreads) {
+ if (t.getName().equals(name)) {
+ Thread sleeper = new Thread() {
+ public void run() {
+ try {
+ t.suspend();
+ l.await();
+ t.resume();
+ } catch (Exception e) {
+ LOG.error("Error suspending thread", e);
+ }
+ }
+ };
+ sleeper.start();
return;
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.