Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoRecovery supports batch read #4211

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,29 +171,41 @@ private void replicateFragmentInternal(final LedgerHandle lh,
return;
}

/*
* Add all the entries to entriesToReplicate list from
* firstStoredEntryId to lastStoredEntryID.
*/
List<Long> entriesToReplicate = new LinkedList<Long>();
long lastStoredEntryId = lf.getLastStoredEntryId();
for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) {
entriesToReplicate.add(i);
}
/*
* Now asynchronously replicate all of the entries for the ledger
* fragment that were on the dead bookie.
*/
int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
entriesToReplicate.size(), ledgerFragmentMcb, null, BKException.Code.OK,
entriesToReplicateCnt, ledgerFragmentMcb, null, BKException.Code.OK,
BKException.Code.LedgerRecoveryException);
if (this.replicationThrottle != null) {
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
}
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,

if (conf.isRecoveryBatchReadEnabled()
&& conf.getUseV2WireProtocol()
&& conf.isBatchReadEnabled()
&& lh.getLedgerMetadata().getEnsembleSize() == lh.getLedgerMetadata().getWriteQuorumSize()) {
batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh, ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);

} else {
/*
* Add all the entries to entriesToReplicate list from
* firstStoredEntryId to lastStoredEntryID.
*/
List<Long> entriesToReplicate = new LinkedList<Long>();
long lastStoredEntryId = lf.getLastStoredEntryId();
for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) {
entriesToReplicate.add(i);
}
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);
}
}

}

/**
Expand Down Expand Up @@ -433,6 +445,109 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}

void batchRecoverLedgerFragmentEntry(final long startEntryId,
final long endEntryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
final Set<BookieId> newBookies,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
throws InterruptedException {
int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
int maxBytesToReplicate = conf.getReplicationRateByBytes();
if (replicationThrottle != null) {
if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) {
maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt;
}
replicationThrottle.acquire(maxBytesToReplicate);
}

lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate,
new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error reading ledger entries: {} - {}",
startEntryId, endEntryId, BKException.create(rc));
onReadEntryFailureCallback.accept(lh.getId(), startEntryId);
ledgerFragmentMcb.processResult(rc, null, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ledgerFragmentMcb is MultiCallback, when batch read fails, the processResult invoke count may be not enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated.

return;
}
long lastEntryId = startEntryId;
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
lastEntryId = entry.getEntryId();
byte[] data = entry.getEntry();
final long dataLength = data.length;
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entry.getEntryId(),
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
BookieProtocol.FLAG_RECOVERY_ADD);
if (replicationThrottle != null) {
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
AtomicInteger numCompleted = new AtomicInteger(0);
AtomicBoolean completed = new AtomicBoolean(false);

WriteCallback multiWriteCallback = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
ledgerId, entryId, addr, BKException.create(rc));
if (completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
} else {
numEntriesWritten.inc();
if (ctx instanceof Long) {
numBytesWritten.registerSuccessfulValue((Long) ctx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
ledgerId, entryId, addr);
}
if (numCompleted.incrementAndGet() == newBookies.size()
&& completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
}
}
};

for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entry.getEntryId(), toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
}
toSend.release();
}
if (lastEntryId == endEntryId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When lastEntryId == endEntryId, we needn't callback ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Updated

ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
} else {
try {
batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh,
ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);
} catch (InterruptedException e) {
ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null);
}
}
}
}, null);
}

private void updateAverageEntrySize(int toSendSize) {
averageEntrySize.updateAndGet(value -> (int) (value * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
protected static final String RECOVERY_BATCH_READ_ENABLED = "recoveryBatchReadEnabled";
// Add Parameters
protected static final String OPPORTUNISTIC_STRIPING = "opportunisticStriping";
protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
Expand Down Expand Up @@ -1203,6 +1204,23 @@ public ClientConfiguration setStickyReadsEnabled(boolean enabled) {
return this;
}

/**
* If recovery batch read enabled or not.
* @return
*/
public boolean isRecoveryBatchReadEnabled() {
return getBoolean(RECOVERY_BATCH_READ_ENABLED, false);
}

/**
* Enable/disable recovery batch read.
* @param enabled
* @return
*/
public ClientConfiguration setRecoveryBatchReadEnabled(boolean enabled) {
setProperty(RECOVERY_BATCH_READ_ENABLED, enabled);
return this;
}
/**
* Get Ensemble Placement Policy Class.
*
Expand Down Expand Up @@ -2084,6 +2102,11 @@ public boolean isBatchReadEnabled() {
return getBoolean(BATCH_READ_ENABLED, true);
}

public ClientConfiguration setBatchReadEnabled(boolean enabled) {
setProperty(BATCH_READ_ENABLED, enabled);
return this;
}

@Override
protected ClientConfiguration getThis() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,69 @@ public void testMultipleLedgerReplicationWithReplicationWorker()

}

@Test
public void testMultipleLedgerReplicationWithReplicationWorkerBatchRead() throws Exception {
LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
for (int i = 0; i < 200; ++i) {
lh1.addEntry(data);
}
BookieId replicaToKillFromFirstLedger = lh1.getLedgerMetadata().getAllEnsembles().get(0L).get(0);

LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
for (int i = 0; i < 200; ++i) {
lh2.addEntry(data);
}

BookieId replicaToKillFromSecondLedger = lh2.getLedgerMetadata().getAllEnsembles().get(0L).get(0);

LOG.info("Killing Bookie : {}", replicaToKillFromFirstLedger);
killBookie(replicaToKillFromFirstLedger);
lh1.close();

LOG.info("Killing Bookie : {}", replicaToKillFromSecondLedger);
killBookie(replicaToKillFromSecondLedger);
lh2.close();

BookieId newBkAddr = startNewBookieAndReturnBookieId();
LOG.info("New Bookie addr : {}", newBkAddr);

if (replicaToKillFromFirstLedger != replicaToKillFromSecondLedger) {
BookieId newBkAddr2 = startNewBookieAndReturnBookieId();
LOG.info("New Bookie addr : {}", newBkAddr2);
}

ClientConfiguration clientConfiguration = new ClientConfiguration(baseClientConf);
clientConfiguration.setUseV2WireProtocol(true);
clientConfiguration.setRecoveryBatchReadEnabled(true);
clientConfiguration.setBatchReadEnabled(true);
clientConfiguration.setRereplicationEntryBatchSize(100);
clientConfiguration.setReplicationRateByBytes(3 * 1024);
ReplicationWorker rw = new ReplicationWorker(new ServerConfiguration(clientConfiguration));

rw.start();
try {
// Mark ledger1 and ledger2 as underreplicated
underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString());
underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString());

while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), basePath)) {
Thread.sleep(100);
}

while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh2.getId(), basePath)) {
Thread.sleep(100);
}

killAllBookies(lh1, newBkAddr);

// Should be able to read the entries from 0-99
verifyRecoveredLedgers(lh1, 0, 199);
verifyRecoveredLedgers(lh2, 0, 199);
} finally {
rw.shutdown();
}
}

/**
* Tests that ReplicationWorker should fence the ledger and release ledger
* lock after timeout. Then replication should happen normally.
Expand Down
Loading