Permalink
Browse files

BOOKKEEPER-336: bookie readEntries is taking more time if the ensembl…

…e has failed bookie(s) (ivank)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1421242 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent d58a71d commit 496f442a7a68dd549355e845540c6c72c2fbb44c @ivankelly ivankelly committed Dec 13, 2012
View
@@ -138,6 +138,8 @@ Trunk (unreleased changes)
BOOKKEEPER-365: Ledger will never recover if one of the quorum bookie is down forever and others dont have entry (sijie via ivank)
+ BOOKKEEPER-336: bookie readEntries is taking more time if the ensemble has failed bookie(s) (ivank)
+
hedwig-protocol:
BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
@@ -75,6 +76,7 @@
final BookieWatcher bookieWatcher;
final OrderedSafeExecutor mainWorkerPool;
+ final ScheduledExecutorService scheduler;
// Ledger manager responsible for how to store ledger meta data
final LedgerManagerFactory ledgerManagerFactory;
@@ -125,9 +127,11 @@ public BookKeeper(final ClientConfiguration conf)
this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher = new BookieWatcher(conf, scheduler, this);
bookieWatcher.readBookiesBlocking();
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -187,10 +191,11 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFac
this.conf = conf;
this.zk = zk;
this.channelFactory = channelFactory;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher = new BookieWatcher(conf, scheduler, this);
bookieWatcher.readBookiesBlocking();
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -562,7 +567,7 @@ public void close() throws InterruptedException, BKException {
} catch (IOException ie) {
LOG.error("Failed to close ledger manager : ", ie);
}
- bookieWatcher.halt();
+ scheduler.shutdown();
if (ownChannelFactory) {
channelFactory.releaseExternalResources();
}
@@ -25,7 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executors;
+
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -61,10 +61,10 @@
static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
public static int ZK_CONNECT_BACKOFF_SEC = 1;
- BookKeeper bk;
- ScheduledExecutorService scheduler;
+ final BookKeeper bk;
HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+ final ScheduledExecutorService scheduler;
SafeRunnable reReadTask = new SafeRunnable() {
@Override
@@ -74,18 +74,16 @@ public void safeRun() {
};
private ReadOnlyBookieWatcher readOnlyBookieWatcher;
- public BookieWatcher(ClientConfiguration conf, BookKeeper bk) throws KeeperException, InterruptedException {
+ public BookieWatcher(ClientConfiguration conf,
+ ScheduledExecutorService scheduler,
+ BookKeeper bk) throws KeeperException, InterruptedException {
this.bk = bk;
// ZK bookie registration path
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
- this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ this.scheduler = scheduler;
readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
}
- public void halt() {
- scheduler.shutdown();
- }
-
public void readBookies() {
readBookies(this);
}
@@ -382,7 +382,8 @@ public void asyncReadEntries(long firstEntry, long lastEntry,
}
try {
- new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
+ new PendingReadOp(this, bk.scheduler,
+ firstEntry, lastEntry, cb, ctx).initiate();
} catch (InterruptedException e) {
cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
}
@@ -21,11 +21,19 @@
*
*/
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -45,7 +53,11 @@
class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
+ final int speculativeReadTimeout;
+ final private ScheduledExecutorService scheduler;
+ private ScheduledFuture<?> speculativeTask = null;
Queue<LedgerEntryRequest> seq;
+ Set<InetSocketAddress> heardFromHosts;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
@@ -54,22 +66,86 @@
long endEntryId;
final int maxMissedReadsAllowed;
- private class LedgerEntryRequest extends LedgerEntry {
+ class LedgerEntryRequest extends LedgerEntry {
+ final static int NOT_FOUND = -1;
int nextReplicaIndexToReadFrom = 0;
AtomicBoolean complete = new AtomicBoolean(false);
int firstError = BKException.Code.OK;
int numMissedEntryReads = 0;
final ArrayList<InetSocketAddress> ensemble;
+ final List<Integer> writeSet;
+ final BitSet sentReplicas;
+ final BitSet erroredReplicas;
LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
+ this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ }
+
+ private int getReplicaIndex(InetSocketAddress host) {
+ int bookieIndex = ensemble.indexOf(host);
+ if (bookieIndex == -1) {
+ return NOT_FOUND;
+ }
+ return writeSet.indexOf(bookieIndex);
+ }
+
+ private BitSet getSentToBitSet() {
+ BitSet b = new BitSet(ensemble.size());
+
+ for (int i = 0; i < sentReplicas.length(); i++) {
+ if (sentReplicas.get(i)) {
+ b.set(writeSet.get(i));
+ }
+ }
+ return b;
}
- void sendNextRead() {
+ private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
+ BitSet b = new BitSet(ensemble.size());
+ for (InetSocketAddress i : heardFromHosts) {
+ int index = ensemble.indexOf(i);
+ if (index != -1) {
+ b.set(index);
+ }
+ }
+ return b;
+ }
+
+ private boolean readsOutstanding() {
+ return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
+ }
+
+ /**
+ * Send to next replica speculatively, if required and possible.
+ * This returns the host we may have sent to for unit testing.
+ * @return host we sent to if we sent. null otherwise.
+ */
+ synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
+ if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
+ return null;
+ }
+
+ BitSet sentTo = getSentToBitSet();
+ BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
+ sentTo.and(heardFrom);
+
+ // only send another read, if we have had no response at all (even for other entries)
+ // from any of the other bookies we have sent the request to
+ if (sentTo.cardinality() == 0) {
+ return sendNextRead();
+ } else {
+ return null;
+ }
+ }
+
+ synchronized InetSocketAddress sendNextRead() {
if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
@@ -82,22 +158,27 @@ void sendNextRead() {
}
submitCallback(firstError);
- return;
+ return null;
}
+ int replica = nextReplicaIndexToReadFrom;
int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
- sendReadTo(ensemble.get(bookieIndex), this);
+ InetSocketAddress to = ensemble.get(bookieIndex);
+ sendReadTo(to, this);
+ sentReplicas.set(replica);
+ return to;
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry " + this, ie);
Thread.currentThread().interrupt();
submitCallback(BKException.Code.ReadException);
+ return null;
}
}
- void logErrorAndReattemptRead(String errMsg, int rc) {
+ synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
if (BKException.Code.OK == firstError ||
BKException.Code.NoSuchEntryException == firstError) {
firstError = rc;
@@ -113,18 +194,27 @@ void logErrorAndReattemptRead(String errMsg, int rc) {
int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1);
LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
- + ensemble.get(bookieIndex));
+ + host);
- sendNextRead();
+ int replica = getReplicaIndex(host);
+ if (replica == NOT_FOUND) {
+ LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
+ return;
+ }
+ erroredReplicas.set(replica);
+
+ if (!readsOutstanding()) {
+ sendNextRead();
+ }
}
// return true if we managed to complete the entry
- boolean complete(final ChannelBuffer buffer) {
+ boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKDigestMatchException e) {
- logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException);
+ logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
}
@@ -151,22 +241,38 @@ public String toString() {
}
}
- PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
-
- seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - startEntryId));
+ PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+ long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
+ seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
this.cb = cb;
this.ctx = ctx;
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
+ this.scheduler = scheduler;
numPendingEntries = endEntryId - startEntryId + 1;
maxMissedReadsAllowed = lh.metadata.getWriteQuorumSize() - lh.metadata.getAckQuorumSize();
+ speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
+ heardFromHosts = new HashSet<InetSocketAddress>();
}
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
ArrayList<InetSocketAddress> ensemble = null;
+
+ if (speculativeReadTimeout > 0) {
+ speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ for (LedgerEntryRequest r : seq) {
+ if (!r.isComplete()) {
+ r.maybeSendSpeculativeRead(heardFromHosts);
+ }
+ }
+ }
+ }, speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
+ }
+
do {
LOG.debug("Acquiring lock: {}", i);
@@ -182,25 +288,38 @@ public void initiate() throws InterruptedException {
} while (i <= endEntryId);
}
+ private static class ReadContext {
+ final InetSocketAddress to;
+ final LedgerEntryRequest entry;
+
+ ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
+ this.to = to;
+ this.entry = entry;
+ }
+ }
+
void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
lh.opCounterSem.acquire();
lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
- this, entry);
+ this, new ReadContext(to, entry));
}
@Override
public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
- final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
+ final ReadContext rctx = (ReadContext)ctx;
+ final LedgerEntryRequest entry = rctx.entry;
lh.opCounterSem.release();
if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc);
+ entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
return;
}
- if (entry.complete(buffer)) {
+ heardFromHosts.add(rctx.to);
+
+ if (entry.complete(rctx.to, buffer)) {
numPendingEntries--;
}
@@ -213,6 +332,9 @@ public void readEntryComplete(int rc, long ledgerId, final long entryId, final C
}
private void submitCallback(int code) {
+ if (speculativeTask != null) {
+ speculativeTask.cancel(true);
+ }
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
}
public boolean hasMoreElements() {
Oops, something went wrong. Retry.

0 comments on commit 496f442

Please sign in to comment.