105105import org .apache .bookkeeper .mledger .proto .MLDataFormats .StringProperty ;
106106import org .apache .bookkeeper .mledger .util .ManagedLedgerUtils ;
107107import org .apache .commons .lang3 .mutable .MutableInt ;
108+ import org .apache .commons .lang3 .mutable .MutableLong ;
108109import org .apache .commons .lang3 .tuple .Pair ;
109110import org .apache .pulsar .common .policies .data .ManagedLedgerInternalStats ;
110111import org .apache .pulsar .common .util .DateFormatter ;
@@ -208,7 +209,7 @@ public class ManagedCursorImpl implements ManagedCursor {
208209 @ Getter
209210 @ VisibleForTesting
210211 @ Nullable protected final ConcurrentSkipListMap <Position , BitSet > batchDeletedIndexes ;
211- private final ReadWriteLock lock = new ReentrantReadWriteLock ();
212+ protected final ReadWriteLock lock = new ReentrantReadWriteLock ();
212213
213214 private RateLimiter markDeleteLimiter ;
214215 // The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
@@ -238,6 +239,7 @@ class MarkDeleteEntry {
238239 final MarkDeleteCallback callback ;
239240 final Object ctx ;
240241 final Map <String , Long > properties ;
242+ final Runnable alignAcknowledgeStatusAfterPersisted ;
241243
242244 // If the callbackGroup is set, it means this mark-delete request was done on behalf of a group of request (just
243245 // persist the last one in the chain). In this case we need to trigger the callbacks for every request in the
@@ -246,10 +248,26 @@ class MarkDeleteEntry {
246248
247249 public MarkDeleteEntry (Position newPosition , Map <String , Long > properties ,
248250 MarkDeleteCallback callback , Object ctx ) {
251+ this (newPosition , properties , callback , ctx , null );
252+ }
253+
254+ public MarkDeleteEntry (Position newPosition , Map <String , Long > properties ,
255+ MarkDeleteCallback callback , Object ctx , Runnable alignAcknowledgeStatusAfterPersisted ) {
256+ if (alignAcknowledgeStatusAfterPersisted == null ) {
257+ alignAcknowledgeStatusAfterPersisted = () -> {
258+ if (batchDeletedIndexes != null ) {
259+ batchDeletedIndexes .subMap (PositionFactory .EARLIEST ,
260+ false , PositionFactory .create (newPosition .getLedgerId (),
261+ newPosition .getEntryId ()), true ).clear ();
262+ }
263+ persistentMarkDeletePosition = newPosition ;
264+ };
265+ }
249266 this .newPosition = newPosition ;
250267 this .properties = properties ;
251268 this .callback = callback ;
252269 this .ctx = ctx ;
270+ this .alignAcknowledgeStatusAfterPersisted = alignAcknowledgeStatusAfterPersisted ;
253271 }
254272
255273 public void triggerComplete () {
@@ -267,6 +285,10 @@ public void triggerComplete() {
267285 }
268286 }
269287
288+ public void alignAcknowledgeStatus () {
289+ this .alignAcknowledgeStatusAfterPersisted .run ();
290+ }
291+
270292 public void triggerFailed (ManagedLedgerException exception ) {
271293 if (callbackGroup != null ) {
272294 for (MarkDeleteEntry e : callbackGroup ) {
@@ -1482,47 +1504,63 @@ protected void internalResetCursor(Position proposedReadPosition,
14821504
14831505 final Position newMarkDeletePosition = ledger .getPreviousPosition (newReadPosition );
14841506
1507+ Runnable alignAcknowledgeStatusAfterPersisted = () -> {
1508+ // Correct the variable "messagesConsumedCounter".
1509+ // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1510+ // old one.
1511+ int compareRes = ledger .comparePositions (markDeletePosition , newMarkDeletePosition );
1512+ if (compareRes > 0 ) {
1513+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), -getNumberOfEntries (
1514+ Range .openClosed (newMarkDeletePosition , markDeletePosition )));
1515+ } else if (compareRes < 0 ) {
1516+ long entries = getNumberOfEntries (Range .openClosed (markDeletePosition , newMarkDeletePosition ));
1517+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (ManagedCursorImpl .this , entries );
1518+ }
1519+ individualDeletedMessages .removeAtMost (newMarkDeletePosition .getLedgerId (),
1520+ newMarkDeletePosition .getEntryId ());
1521+
1522+ // Entries already acknowledged, which is larger than the new mark deleted position.
1523+ MutableLong ackedEntriesAfterMdPosition = new MutableLong ();
1524+ individualDeletedMessages .forEach ((r ) -> {
1525+ for (long i = r .lowerEndpoint ().getEntryId () + 1 ; i <= r .upperEndpoint ().getEntryId (); i ++) {
1526+ ackedEntriesAfterMdPosition .incrementAndGet ();
1527+ }
1528+ return true ;
1529+ });
1530+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (ManagedCursorImpl .this ,
1531+ -ackedEntriesAfterMdPosition .get ().longValue ());
1532+ markDeletePosition = newMarkDeletePosition ;
1533+ lastMarkDeleteEntry = new MarkDeleteEntry (newMarkDeletePosition , isCompactionCursor ()
1534+ ? getProperties () : Collections .emptyMap (), null , null );
1535+ individualDeletedMessages .clear ();
1536+ if (batchDeletedIndexes != null ) {
1537+ batchDeletedIndexes .clear ();
1538+ AckSetStateUtil .maybeGetAckSetState (newReadPosition ).ifPresent (ackSetState -> {
1539+ long [] resetWords = ackSetState .getAckSet ();
1540+ if (resetWords != null ) {
1541+ batchDeletedIndexes .put (newReadPosition , BitSet .valueOf (resetWords ));
1542+ }
1543+ });
1544+ }
1545+
1546+ Position oldReadPosition = readPosition ;
1547+ if (oldReadPosition .compareTo (newReadPosition ) >= 0 ) {
1548+ log .info ("[{}] reset readPosition to {} before current read readPosition {} on cursor {}" ,
1549+ ledger .getName (), newReadPosition , oldReadPosition , name );
1550+ } else {
1551+ log .info ("[{}] reset readPosition to {} skipping from current read readPosition {} on "
1552+ + "cursor {}" , ledger .getName (), newReadPosition , oldReadPosition , name );
1553+ }
1554+ readPosition = newReadPosition ;
1555+ };
1556+
14851557 VoidCallback finalCallback = new VoidCallback () {
14861558 @ Override
14871559 public void operationComplete () {
14881560
14891561 // modify mark delete and read position since we are able to persist new position for cursor
14901562 lock .writeLock ().lock ();
14911563 try {
1492- // Correct the variable "messagesConsumedCounter".
1493- // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1494- // old one.
1495- int compareRes = ledger .comparePositions (markDeletePosition , newMarkDeletePosition );
1496- if (compareRes > 0 ) {
1497- MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), -getNumberOfEntries (
1498- Range .openClosed (newMarkDeletePosition , markDeletePosition )));
1499- } else if (compareRes < 0 ) {
1500- MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), getNumberOfEntries (
1501- Range .openClosed (markDeletePosition , newMarkDeletePosition )));
1502- }
1503- markDeletePosition = newMarkDeletePosition ;
1504- lastMarkDeleteEntry = new MarkDeleteEntry (newMarkDeletePosition , isCompactionCursor ()
1505- ? getProperties () : Collections .emptyMap (), null , null );
1506- individualDeletedMessages .clear ();
1507- if (batchDeletedIndexes != null ) {
1508- batchDeletedIndexes .clear ();
1509- AckSetStateUtil .maybeGetAckSetState (newReadPosition ).ifPresent (ackSetState -> {
1510- long [] resetWords = ackSetState .getAckSet ();
1511- if (resetWords != null ) {
1512- batchDeletedIndexes .put (newReadPosition , BitSet .valueOf (resetWords ));
1513- }
1514- });
1515- }
1516-
1517- Position oldReadPosition = readPosition ;
1518- if (oldReadPosition .compareTo (newReadPosition ) >= 0 ) {
1519- log .info ("[{}] reset readPosition to {} before current read readPosition {} on cursor {}" ,
1520- ledger .getName (), newReadPosition , oldReadPosition , name );
1521- } else {
1522- log .info ("[{}] reset readPosition to {} skipping from current read readPosition {} on "
1523- + "cursor {}" , ledger .getName (), newReadPosition , oldReadPosition , name );
1524- }
1525- readPosition = newReadPosition ;
15261564 ledger .onCursorReadPositionUpdated (ManagedCursorImpl .this , newReadPosition );
15271565 } finally {
15281566 lock .writeLock ().unlock ();
@@ -1566,7 +1604,7 @@ public void markDeleteComplete(Object ctx) {
15661604 public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
15671605 finalCallback .operationFailed (exception );
15681606 }
1569- }, null );
1607+ }, null , alignAcknowledgeStatusAfterPersisted );
15701608 }
15711609
15721610 @ Override
@@ -2181,7 +2219,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
21812219 callback .markDeleteComplete (ctx );
21822220 return ;
21832221 }
2184- internalAsyncMarkDelete (newPosition , properties , callback , ctx );
2222+ internalAsyncMarkDelete (newPosition , properties , callback , ctx , null );
21852223 }
21862224
21872225 private Position ackBatchPosition (Position position ) {
@@ -2210,10 +2248,11 @@ private Position ackBatchPosition(Position position) {
22102248 }
22112249
22122250 protected void internalAsyncMarkDelete (final Position newPosition , Map <String , Long > properties ,
2213- final MarkDeleteCallback callback , final Object ctx ) {
2251+ final MarkDeleteCallback callback , final Object ctx , Runnable alignAcknowledgeStatusAfterPersisted ) {
22142252 ledger .mbean .addMarkDeleteOp ();
22152253
2216- MarkDeleteEntry mdEntry = new MarkDeleteEntry (newPosition , properties , callback , ctx );
2254+ MarkDeleteEntry mdEntry = new MarkDeleteEntry (newPosition , properties , callback , ctx ,
2255+ alignAcknowledgeStatusAfterPersisted );
22172256
22182257 // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
22192258 synchronized (pendingMarkDeleteOps ) {
@@ -2312,14 +2351,7 @@ public void operationComplete() {
23122351 // point.
23132352 lock .writeLock ().lock ();
23142353 try {
2315- individualDeletedMessages .removeAtMost (mdEntry .newPosition .getLedgerId (),
2316- mdEntry .newPosition .getEntryId ());
2317- if (batchDeletedIndexes != null ) {
2318- batchDeletedIndexes .subMap (PositionFactory .EARLIEST ,
2319- false , PositionFactory .create (mdEntry .newPosition .getLedgerId (),
2320- mdEntry .newPosition .getEntryId ()), true ).clear ();
2321- }
2322- persistentMarkDeletePosition = mdEntry .newPosition ;
2354+ mdEntry .alignAcknowledgeStatus ();
23232355 } finally {
23242356 lock .writeLock ().unlock ();
23252357 }
@@ -2576,7 +2608,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
25762608 callback .deleteFailed (exception , ctx );
25772609 }
25782610
2579- }, ctx );
2611+ }, ctx , null );
25802612
25812613 } catch (Exception e ) {
25822614 log .warn ("[{}] [{}] Error doing asyncDelete [{}]" , ledger .getName (), name , e .getMessage (), e );
0 commit comments