Skip to content

AbstractBaseDispatcher#filterEntriesForConsumer return value problem. #15297

@horizonzy

Description

@horizonzy

Describe the bug

public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
? entryWrapper.get()[entryWrapperIndex].getMetadata()
: null;
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
: msgMetadata;
if (CollectionUtils.isNotEmpty(entryFilters)) {
fillContext(filterContext, msgMetadata, subscription);
if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
entry.release();
continue;
}
}
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
continue;
}
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
batchSizes.setBatchSize(i, batchSize);
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
ackSet = cursor
.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
} else {
indexesAcks.setIndexesAcks(i, null);
}
}
BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
Collections.emptyMap());
int filtered = entriesToFiltered.size();
Topic topic = subscription.getTopic();
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).addFilteredEntriesCount(filtered);
}
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;

There are some judgement branch to filter entry.
like

if (CollectionUtils.isNotEmpty(entryFilters)) {
fillContext(filterContext, msgMetadata, subscription);
if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
entry.release();
continue;
}
}

it shouldn't make totalEntries++ when an entry be filter

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions