Skip to content

Commit

Permalink
[ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#…
Browse files Browse the repository at this point in the history
…15067)

- follow up on #15031
* [ML] Fix race in persisting mark delete position
* [ML] Resetting should reset lastMarkDeleteEntry
* [ML] Reset fields in initializeCursorPosition method

(cherry picked from commit a19a30a)
  • Loading branch information
lhotari committed Apr 14, 2022
1 parent 0d67f05 commit 779605b
Showing 1 changed file with 75 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class ManagedCursorImpl implements ManagedCursor {

// this position is have persistent mark delete position
protected volatile PositionImpl persistentMarkDeletePosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl>
INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class,
"inProgressMarkDeletePersistPosition");
protected volatile PositionImpl inProgressMarkDeletePersistPosition;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
Expand Down Expand Up @@ -214,6 +219,31 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
this.callback = callback;
this.ctx = ctx;
}

public void triggerComplete() {
// Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
// will ensure that no race condition will happen between the next mark-delete and the switching
// operation.
if (callbackGroup != null) {
// Trigger the callback for every request in the group
for (MarkDeleteEntry e : callbackGroup) {
e.callback.markDeleteComplete(e.ctx);
}
} else if (callback != null) {
// Only trigger the callback for the current request
callback.markDeleteComplete(ctx);
}
}

public void triggerFailed(ManagedLedgerException exception) {
if (callbackGroup != null) {
for (MarkDeleteEntry e : callbackGroup) {
e.callback.markDeleteFailed(exception, e.ctx);
}
} else if (callback != null) {
callback.markDeleteFailed(exception, ctx);
}
}
}

protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
Expand Down Expand Up @@ -540,6 +570,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
Expand Down Expand Up @@ -1121,7 +1152,11 @@ public void operationFailed(ManagedLedgerException exception) {

};

internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
Expand Down Expand Up @@ -1567,6 +1602,9 @@ boolean hasMoreEntries(PositionImpl position) {
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
Expand Down Expand Up @@ -1782,6 +1820,34 @@ protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<Strin
}

void internalMarkDelete(final MarkDeleteEntry mdEntry) {
if (persistentMarkDeletePosition != null
&& mdEntry.newPosition.compareTo(persistentMarkDeletePosition) < 0) {
if (log.isInfoEnabled()) {
log.info("Skipping updating mark delete position to {}. The persisted mark delete position {} "
+ "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
}
mdEntry.triggerComplete();
return;
}

PositionImpl inProgressLatest = INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.updateAndGet(this, current -> {
if (current != null && current.compareTo(mdEntry.newPosition) > 0) {
return current;
} else {
return mdEntry.newPosition;
}
});

// if there's a newer or equal mark delete update in progress, skip it.
if (inProgressLatest != mdEntry.newPosition) {
if (log.isInfoEnabled()) {
log.info("Skipping updating mark delete position to {}. The mark delete position update "
+ "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
}
mdEntry.triggerComplete();
return;
}

// The counter is used to mark all the pending mark-delete request that were submitted to BK and that are not
// yet finished. While we have outstanding requests we cannot close the current ledger, so the switch to new
// ledger is postponed to when the counter goes to 0.
Expand All @@ -1804,6 +1870,9 @@ public void operationComplete() {
mdEntry.newPosition);
}

INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
mdEntry.newPosition, null);

// Remove from the individual deleted messages all the entries before the new mark delete
// point.
lock.writeLock().lock();
Expand All @@ -1815,11 +1884,7 @@ public void operationComplete() {
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
}
if (persistentMarkDeletePosition == null
|| mdEntry.newPosition.compareTo(persistentMarkDeletePosition) > 0) {
persistentMarkDeletePosition = mdEntry.newPosition;
}

persistentMarkDeletePosition = mdEntry.newPosition;
} finally {
lock.writeLock().unlock();
}
Expand All @@ -1828,22 +1893,13 @@ public void operationComplete() {

decrementPendingMarkDeleteCount();

// Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
// will ensure that no race condition will happen between the next mark-delete and the switching
// operation.
if (mdEntry.callbackGroup != null) {
// Trigger the callback for every request in the group
for (MarkDeleteEntry e : mdEntry.callbackGroup) {
e.callback.markDeleteComplete(e.ctx);
}
} else {
// Only trigger the callback for the current request
mdEntry.callback.markDeleteComplete(mdEntry.ctx);
}
mdEntry.triggerComplete();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
mdEntry.newPosition, null);
isDirty = true;
log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
ManagedCursorImpl.this, mdEntry.newPosition);
Expand All @@ -1854,13 +1910,7 @@ public void operationFailed(ManagedLedgerException exception) {

decrementPendingMarkDeleteCount();

if (mdEntry.callbackGroup != null) {
for (MarkDeleteEntry e : mdEntry.callbackGroup) {
e.callback.markDeleteFailed(exception, e.ctx);
}
} else {
mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
}
mdEntry.triggerFailed(exception);
}
});
}
Expand Down

0 comments on commit 779605b

Please sign in to comment.