-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
Stalelifecycle/staletype/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Describe the bug
Lines 325 to 355 in 46baae6
| private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<String, Long> properties, | |
| BitSetRecyclable bitSet) { | |
| consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); | |
| if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { | |
| // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an | |
| // uncommon condition since it's only used for the compaction subscription. | |
| return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet); | |
| } else { | |
| if (isAckReceiptEnabled(consumer.getClientCnx())) { | |
| // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't | |
| // change currentFuture. but we can lock by the read lock, because the currentFuture is not change | |
| // any ack operation is allowed. | |
| this.lock.readLock().lock(); | |
| try { | |
| doCumulativeAckAsync(messageId, bitSet); | |
| return this.currentCumulativeAckFuture; | |
| } finally { | |
| this.lock.readLock().unlock(); | |
| if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) { | |
| flush(); | |
| } | |
| } | |
| } else { | |
| doCumulativeAckAsync(messageId, bitSet); | |
| if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) { | |
| flush(); | |
| } | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| } | |
| } |
L343 - L345 & L349-L351
It looks like unnecessary
this method doCumulativeAckAsync(messageId, bitSet) only update variable lastCumulativeAck, will not change pendingIndividualBatchIndexAcks
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
Stalelifecycle/staletype/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug