Skip to content

Commit

Permalink
ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEnt…
Browse files Browse the repository at this point in the history
…ryOp

Descriptions of the changes in this PR:

The issue #1476 is caused by peculative reads with object recycling, same request object will be added to the CompletionObjects multiple times with different txnid.  In fact the logic of process the request already take this into account, only on place inside `ReadLastConfirmedAndEntryOp.requestComplete` forget to check requestComplete before calling `submitCallback` which in turn call request.close.

### Motivation

to fix #1476

### Changes

check `requestComplete` before `submitCallback` in `ReadLastConfirmedAndEntryOp.requestComplete`

Master Issue: #1476

Author: Sijie Guo <sijie@apache.org>
Author: infodog <infodog@hotmail.com>
Author: zhengxiangyang <zxy@xinshi.net>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <None>

This closes #1509 from infodog/issue1476, closes #1476

(cherry picked from commit 6476fc3)
Signed-off-by: Sijie Guo <sijie@apache.org>
  • Loading branch information
sijie committed Jun 14, 2018
1 parent 0cea52e commit c7b1610
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ public static DataFormats.LedgerMetadataFormat.DigestType toProtoDigestType(Dige
}
}

boolean shouldReorderReadSequence() {
return reorderReadSequence;
}

ZooKeeper getZkHandle() {
return ((ZKMetadataClientDriver) metadataDriver).getZk();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ abstract class ReadLACAndEntryRequest implements AutoCloseable {
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.distributionSchedule.getWriteSetForLongPoll(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
this.writeSet = lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
if (lh.getBk().shouldReorderReadSequence()) {
this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
} else {
this.orderedEnsemble = writeSet.copy();
Expand Down Expand Up @@ -118,7 +118,7 @@ synchronized int getFirstError() {
boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer, long entryId) {
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
content = lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
} catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
Expand Down Expand Up @@ -201,7 +201,7 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress

if (LOG.isDebugEnabled()) {
LOG.debug("{} while reading entry: {} ledgerId: {} from bookie: {}", errMsg, entryImpl.getEntryId(),
lh.ledgerId, host);
lh.getId(), host);
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
for (int i = 0; i < numReplicasTried; i++) {
int slowBookieIndex = orderedEnsemble.get(i);
BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
}
}
return completed;
Expand Down Expand Up @@ -449,7 +449,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
}

protected LedgerMetadata getLedgerMetadata() {
return lh.metadata;
return lh.getLedgerMetadata();
}

ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
Expand All @@ -462,7 +462,7 @@ ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
*/
@Override
public ListenableFuture<Boolean> issueSpeculativeRequest() {
return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
if (!requestComplete.get() && !request.isComplete()
Expand All @@ -480,14 +480,14 @@ public Boolean call() throws Exception {

public void initiate() {
if (parallelRead) {
request = new ParallelReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
request = new ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
} else {
request = new SequenceReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
request = new SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
}
request.read();

if (!parallelRead && lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
if (!parallelRead && lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
}
}

Expand All @@ -496,8 +496,8 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest
LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
prevEntryId, timeOutInMillis, to, parallelRead);
}
lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
lh.ledgerId,
lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
lh.getId(),
BookieProtocol.LAST_ADD_CONFIRMED,
prevEntryId,
timeOutInMillis,
Expand All @@ -517,12 +517,12 @@ private void submitCallback(int rc) {
long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
LedgerEntry entry;
if (BKException.Code.OK != rc) {
lh.bk.getReadLacAndEntryOpLogger()
lh.getBk().getReadLacAndEntryOpLogger()
.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
entry = null;
} else {
// could received advanced lac, with no entry
lh.bk.getReadLacAndEntryOpLogger()
lh.getBk().getReadLacAndEntryOpLogger()
.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
if (request.entryImpl.getEntryBuffer() != null) {
entry = new LedgerEntry(request.entryImpl);
Expand Down Expand Up @@ -558,18 +558,20 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe

if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
buffer.retain();
if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
if (!requestComplete.get() && request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
// callback immediately
if (rCtx.getLacUpdateTimestamp().isPresent()) {
long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
- rCtx.getLacUpdateTimestamp().get());
elapsedMicros = Math.max(elapsedMicros, 0);
lh.bk.getReadLacAndEntryRespLogger()
.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
lh.getBk().getReadLacAndEntryRespLogger()
.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
}

submitCallback(BKException.Code.OK);
requestComplete.set(true);
// if the request has already completed, the buffer is not going to be used anymore, release it.
if (!completeRequest()) {
buffer.release();
}
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
} else {
buffer.release();
Expand Down Expand Up @@ -611,8 +613,9 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
}
}

private void completeRequest() {
if (requestComplete.compareAndSet(false, true)) {
private boolean completeRequest() {
boolean requestCompleted = requestComplete.compareAndSet(false, true);
if (requestCompleted) {
if (!hasValidResponse) {
// no success called
submitCallback(request.getFirstError());
Expand All @@ -621,11 +624,12 @@ private void completeRequest() {
submitCallback(BKException.Code.OK);
}
}
return requestCompleted;
}

@Override
public String toString() {
return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.ledgerId, prevEntryId);
return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.getId(), prevEntryId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ protected LastConfirmedAndEntryImpl newObject(Handle<LastConfirmedAndEntryImpl>
public static LastConfirmedAndEntryImpl create(long lac, org.apache.bookkeeper.client.LedgerEntry entry) {
LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
entryImpl.lac = lac;
entryImpl.entry = LedgerEntryImpl.create(
entry.getLedgerId(),
entry.getEntryId(),
entry.getLength(),
entry.getEntryBuffer());
if (null == entry) {
entryImpl.entry = null;
} else {
entryImpl.entry = LedgerEntryImpl.create(
entry.getLedgerId(),
entry.getEntryId(),
entry.getLength(),
entry.getEntryBuffer());
}
return entryImpl;
}

Expand Down
Loading

0 comments on commit c7b1610

Please sign in to comment.