Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp #1509

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,10 @@ public static DataFormats.LedgerMetadataFormat.DigestType toProtoDigestType(Dige
}
}

boolean shouldReorderReadSequence() {
return reorderReadSequence;
}

ZooKeeper getZkHandle() {
return ((ZKMetadataClientDriver) metadataDriver).getZk();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ abstract class ReadLACAndEntryRequest implements AutoCloseable {
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.distributionSchedule.getWriteSetForLongPoll(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
this.writeSet = lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
if (lh.getBk().shouldReorderReadSequence()) {
this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
} else {
this.orderedEnsemble = writeSet.copy();
Expand Down Expand Up @@ -118,7 +118,7 @@ synchronized int getFirstError() {
boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer, long entryId) {
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
content = lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
} catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
Expand Down Expand Up @@ -201,7 +201,7 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress

if (LOG.isDebugEnabled()) {
LOG.debug("{} while reading entry: {} ledgerId: {} from bookie: {}", errMsg, entryImpl.getEntryId(),
lh.ledgerId, host);
lh.getId(), host);
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
for (int i = 0; i < numReplicasTried; i++) {
int slowBookieIndex = orderedEnsemble.get(i);
BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
}
}
return completed;
Expand Down Expand Up @@ -449,7 +449,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
}

protected LedgerMetadata getLedgerMetadata() {
return lh.metadata;
return lh.getLedgerMetadata();
}

ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
Expand All @@ -462,7 +462,7 @@ ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
*/
@Override
public ListenableFuture<Boolean> issueSpeculativeRequest() {
return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
if (!requestComplete.get() && !request.isComplete()
Expand All @@ -480,14 +480,14 @@ public Boolean call() throws Exception {

public void initiate() {
if (parallelRead) {
request = new ParallelReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
request = new ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
} else {
request = new SequenceReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1);
request = new SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
}
request.read();

if (!parallelRead && lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
if (!parallelRead && lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
}
}

Expand All @@ -496,8 +496,8 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest
LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
prevEntryId, timeOutInMillis, to, parallelRead);
}
lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
lh.ledgerId,
lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
lh.getId(),
BookieProtocol.LAST_ADD_CONFIRMED,
prevEntryId,
timeOutInMillis,
Expand All @@ -517,12 +517,12 @@ private void submitCallback(int rc) {
long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
LedgerEntry entry;
if (BKException.Code.OK != rc) {
lh.bk.getReadLacAndEntryOpLogger()
lh.getBk().getReadLacAndEntryOpLogger()
.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
entry = null;
} else {
// could received advanced lac, with no entry
lh.bk.getReadLacAndEntryOpLogger()
lh.getBk().getReadLacAndEntryOpLogger()
.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
if (request.entryImpl.getEntryBuffer() != null) {
entry = new LedgerEntry(request.entryImpl);
Expand Down Expand Up @@ -558,18 +558,20 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe

if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
buffer.retain();
if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
if (!requestComplete.get() && request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
// callback immediately
if (rCtx.getLacUpdateTimestamp().isPresent()) {
long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
- rCtx.getLacUpdateTimestamp().get());
elapsedMicros = Math.max(elapsedMicros, 0);
lh.bk.getReadLacAndEntryRespLogger()
.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
lh.getBk().getReadLacAndEntryRespLogger()
.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
}

submitCallback(BKException.Code.OK);
requestComplete.set(true);
// if the request has already completed, the buffer is not going to be used anymore, release it.
if (!completeRequest()) {
buffer.release();
}
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
} else {
buffer.release();
Expand Down Expand Up @@ -611,8 +613,9 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
}
}

private void completeRequest() {
if (requestComplete.compareAndSet(false, true)) {
private boolean completeRequest() {
boolean requestCompleted = requestComplete.compareAndSet(false, true);
if (requestCompleted) {
if (!hasValidResponse) {
// no success called
submitCallback(request.getFirstError());
Expand All @@ -621,11 +624,12 @@ private void completeRequest() {
submitCallback(BKException.Code.OK);
}
}
return requestCompleted;
}

@Override
public String toString() {
return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.ledgerId, prevEntryId);
return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.getId(), prevEntryId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ protected LastConfirmedAndEntryImpl newObject(Handle<LastConfirmedAndEntryImpl>
public static LastConfirmedAndEntryImpl create(long lac, org.apache.bookkeeper.client.LedgerEntry entry) {
LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
entryImpl.lac = lac;
entryImpl.entry = LedgerEntryImpl.create(
entry.getLedgerId(),
entry.getEntryId(),
entry.getLength(),
entry.getEntryBuffer());
if (null == entry) {
entryImpl.entry = null;
} else {
entryImpl.entry = LedgerEntryImpl.create(
entry.getLedgerId(),
entry.getEntryId(),
entry.getLength(),
entry.getEntryBuffer());
}
return entryImpl;
}

Expand Down
Loading