Skip to content

Commit

Permalink
AutoRecovery supports batch read (#4211)
Browse files Browse the repository at this point in the history
* AutoRecovery support batch read

* Fix check style

* address comments
  • Loading branch information
hangc0276 committed Feb 20, 2024
1 parent 745997e commit ef4ce13
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,29 +171,41 @@ private void replicateFragmentInternal(final LedgerHandle lh,
return;
}

/*
* Add all the entries to entriesToReplicate list from
* firstStoredEntryId to lastStoredEntryID.
*/
List<Long> entriesToReplicate = new LinkedList<Long>();
long lastStoredEntryId = lf.getLastStoredEntryId();
for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) {
entriesToReplicate.add(i);
}
/*
* Now asynchronously replicate all of the entries for the ledger
* fragment that were on the dead bookie.
*/
int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
entriesToReplicate.size(), ledgerFragmentMcb, null, BKException.Code.OK,
entriesToReplicateCnt, ledgerFragmentMcb, null, BKException.Code.OK,
BKException.Code.LedgerRecoveryException);
if (this.replicationThrottle != null) {
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
}
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,

if (conf.isRecoveryBatchReadEnabled()
&& conf.getUseV2WireProtocol()
&& conf.isBatchReadEnabled()
&& lh.getLedgerMetadata().getEnsembleSize() == lh.getLedgerMetadata().getWriteQuorumSize()) {
batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh, ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);

} else {
/*
* Add all the entries to entriesToReplicate list from
* firstStoredEntryId to lastStoredEntryID.
*/
List<Long> entriesToReplicate = new LinkedList<Long>();
long lastStoredEntryId = lf.getLastStoredEntryId();
for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) {
entriesToReplicate.add(i);
}
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);
}
}

}

/**
Expand Down Expand Up @@ -433,6 +445,112 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}

void batchRecoverLedgerFragmentEntry(final long startEntryId,
final long endEntryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
final Set<BookieId> newBookies,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
throws InterruptedException {
int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
int maxBytesToReplicate = conf.getReplicationRateByBytes();
if (replicationThrottle != null) {
if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) {
maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt;
}
replicationThrottle.acquire(maxBytesToReplicate);
}

lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate,
new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error reading ledger entries: {} - {}",
startEntryId, endEntryId, BKException.create(rc));
onReadEntryFailureCallback.accept(lh.getId(), startEntryId);
for (int i = 0; i < entriesToReplicateCnt; i++) {
ledgerFragmentMcb.processResult(rc, null, null);
}
return;
}
long lastEntryId = startEntryId;
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
lastEntryId = entry.getEntryId();
byte[] data = entry.getEntry();
final long dataLength = data.length;
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entry.getEntryId(),
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
BookieProtocol.FLAG_RECOVERY_ADD);
if (replicationThrottle != null) {
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
AtomicInteger numCompleted = new AtomicInteger(0);
AtomicBoolean completed = new AtomicBoolean(false);

WriteCallback multiWriteCallback = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
ledgerId, entryId, addr, BKException.create(rc));
if (completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
} else {
numEntriesWritten.inc();
if (ctx instanceof Long) {
numBytesWritten.registerSuccessfulValue((Long) ctx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
ledgerId, entryId, addr);
}
if (numCompleted.incrementAndGet() == newBookies.size()
&& completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
}
}
};

for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entry.getEntryId(), toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
}
toSend.release();
}
if (lastEntryId != endEntryId) {
try {
batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh,
ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);
} catch (InterruptedException e) {
int remainingEntries = (int) (endEntryId - lastEntryId);
for (int i = 0; i < remainingEntries; i++) {
ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null);
}
}
}
}
}, null);
}

private void updateAverageEntrySize(int toSendSize) {
averageEntrySize.updateAndGet(value -> (int) (value * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
protected static final String RECOVERY_BATCH_READ_ENABLED = "recoveryBatchReadEnabled";
// Add Parameters
protected static final String OPPORTUNISTIC_STRIPING = "opportunisticStriping";
protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
Expand Down Expand Up @@ -1203,6 +1204,23 @@ public ClientConfiguration setStickyReadsEnabled(boolean enabled) {
return this;
}

/**
* If recovery batch read enabled or not.
* @return
*/
public boolean isRecoveryBatchReadEnabled() {
return getBoolean(RECOVERY_BATCH_READ_ENABLED, false);
}

/**
* Enable/disable recovery batch read.
* @param enabled
* @return
*/
public ClientConfiguration setRecoveryBatchReadEnabled(boolean enabled) {
setProperty(RECOVERY_BATCH_READ_ENABLED, enabled);
return this;
}
/**
* Get Ensemble Placement Policy Class.
*
Expand Down Expand Up @@ -2084,6 +2102,11 @@ public boolean isBatchReadEnabled() {
return getBoolean(BATCH_READ_ENABLED, true);
}

public ClientConfiguration setBatchReadEnabled(boolean enabled) {
setProperty(BATCH_READ_ENABLED, enabled);
return this;
}

@Override
protected ClientConfiguration getThis() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,69 @@ public void testMultipleLedgerReplicationWithReplicationWorker()

}

@Test
public void testMultipleLedgerReplicationWithReplicationWorkerBatchRead() throws Exception {
LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
for (int i = 0; i < 200; ++i) {
lh1.addEntry(data);
}
BookieId replicaToKillFromFirstLedger = lh1.getLedgerMetadata().getAllEnsembles().get(0L).get(0);

LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
for (int i = 0; i < 200; ++i) {
lh2.addEntry(data);
}

BookieId replicaToKillFromSecondLedger = lh2.getLedgerMetadata().getAllEnsembles().get(0L).get(0);

LOG.info("Killing Bookie : {}", replicaToKillFromFirstLedger);
killBookie(replicaToKillFromFirstLedger);
lh1.close();

LOG.info("Killing Bookie : {}", replicaToKillFromSecondLedger);
killBookie(replicaToKillFromSecondLedger);
lh2.close();

BookieId newBkAddr = startNewBookieAndReturnBookieId();
LOG.info("New Bookie addr : {}", newBkAddr);

if (replicaToKillFromFirstLedger != replicaToKillFromSecondLedger) {
BookieId newBkAddr2 = startNewBookieAndReturnBookieId();
LOG.info("New Bookie addr : {}", newBkAddr2);
}

ClientConfiguration clientConfiguration = new ClientConfiguration(baseClientConf);
clientConfiguration.setUseV2WireProtocol(true);
clientConfiguration.setRecoveryBatchReadEnabled(true);
clientConfiguration.setBatchReadEnabled(true);
clientConfiguration.setRereplicationEntryBatchSize(100);
clientConfiguration.setReplicationRateByBytes(3 * 1024);
ReplicationWorker rw = new ReplicationWorker(new ServerConfiguration(clientConfiguration));

rw.start();
try {
// Mark ledger1 and ledger2 as underreplicated
underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString());
underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString());

while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), basePath)) {
Thread.sleep(100);
}

while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh2.getId(), basePath)) {
Thread.sleep(100);
}

killAllBookies(lh1, newBkAddr);

// Should be able to read the entries from 0-99
verifyRecoveredLedgers(lh1, 0, 199);
verifyRecoveredLedgers(lh2, 0, 199);
} finally {
rw.shutdown();
}
}

/**
* Tests that ReplicationWorker should fence the ledger and release ledger
* lock after timeout. Then replication should happen normally.
Expand Down

0 comments on commit ef4ce13

Please sign in to comment.