Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -64,6 +65,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// The bit count to trim to reduce memory occupation.
private final int timestampPrecisionBitCnt;

// Count of delayed messages in the tracker.
private final AtomicLong delayedMessagesCount = new AtomicLong(0);

InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -125,6 +129,8 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
delayedMessagesCount.incrementAndGet();

updateTimer();

checkAndUpdateHighest(deliverAt);
Expand Down Expand Up @@ -183,13 +189,15 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
positions.add(PositionFactory.create(ledgerId, entryId));
});
n -= cardinality;
delayedMessagesCount.addAndGet(-cardinality);
ledgerIdToDelete.add(ledgerId);
} else {
long[] entryIdsArray = entryIds.toArray();
for (int i = 0; i < n; i++) {
positions.add(PositionFactory.create(ledgerId, entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
delayedMessagesCount.addAndGet(-n);
n = 0;
}
if (n <= 0) {
Expand Down Expand Up @@ -221,14 +229,13 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
@Override
public CompletableFuture<Void> clear() {
this.delayedMessageMap.clear();
this.delayedMessagesCount.set(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 220 there's if (delayedMessageMap.isEmpty()) {. Would it make sense to add this.delayedMessagesCount.set(0); also to that code block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to do that, these logic is used for feature fixedDelayDetectionLookahead.

if (delayedMessageMap.isEmpty()) {
            // Reset to initial state
            highestDeliveryTimeTracked = 0;
            messagesHaveFixedDelay = true;
        }

When the delayedMessageMap is empty, the value of delayedMessagesCount must be 0; otherwise, there are other issues that need to be resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the delayedMessageMap is empty, the value of delayedMessagesCount must be 0; otherwise, there are other issues that need to be resolved.

That's true. I think it would make sense to add an error log if delayedMessageCount isn't 0 at this point. That would help detect possible issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. Should i create a new PR to add the debug log?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thetumbled I think it makes sense. It shouldn't be a debug log, but an error log if that ever happens. In addition, I think it should reset the count to 0, so that the state would be fixed.

Copy link
Member Author

@thetumbled thetumbled Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Lari. I have created a pr to check this. But i think it should not be set to 0 because of the posibility of concurrent execution.
Suppose there is another thread that adds a message to the map after the check if (delayedMessageMap.isEmpty()) {; in that case, the value should not be zero, correct?
I suggest we had better keep it unchanged and only leave a warning log.

return CompletableFuture.completedFuture(null);
}

@Override
public long getNumberOfDelayedMessages() {
return delayedMessageMap.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
return delayedMessagesCount.get();
}

/**
Expand Down
Loading