Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 24, 2024
1 parent 48e018d commit a60197d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 27 deletions.
Expand Up @@ -3390,18 +3390,30 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
private final ConsumerRecords<K, V> records;

private final List<ConsumerRecord<K, V>> recordList;
private boolean isAnyManualAck = false;

private volatile boolean acked;

private volatile int partial = -1;

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList) {

@Nullable List<ConsumerRecord<K, V>> recordList) {
this.records = records;
this.recordList = recordList;
}

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList,
boolean isAnyManualAck) {
this(records, recordList);
this.isAnyManualAck = isAnyManualAck;
}

@Override
public boolean isAnyManualAck() {
return this.isAnyManualAck;
}

@Override
public void acknowledge() {
if (this.partial >= 0) {
Expand All @@ -3410,7 +3422,8 @@ public void acknowledge() {
}
if (!this.acked) {
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred =
ListenerConsumer.this.deferredOffsets;
for (TopicPartition topicPartition : this.records.partitions()) {
if (offs != null) {
offs.remove(topicPartition);
Expand All @@ -3425,22 +3438,23 @@ public void acknowledge() {
@Override
public void acknowledge(int index) {
Assert.isTrue(index > this.partial,
() -> String.format("index (%d) must be greater than the previous partial commit (%d)", index,
this.partial));
() -> String.format(
"index (%d) must be greater than the previous partial commit (%d)", index,
this.partial));
Assert.state(ListenerConsumer.this.isManualImmediateAck,
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
Assert.state(this.recordList != null,
"Listener must receive a List of records to use partial batch acknowledgment");
"Listener must receive a List of records to use partial batch acknowledgment");
Assert.isTrue(index >= 0 && index < this.recordList.size(),
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
this.recordList.size() - 1));
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
this.recordList.size() - 1));
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"Partial batch acknowledgment is only supported on the consumer thread");
"Partial batch acknowledgment is only supported on the consumer thread");
Map<TopicPartition, List<ConsumerRecord<K, V>>> offsetsToCommit = new LinkedHashMap<>();
for (int i = this.partial + 1; i <= index; i++) {
ConsumerRecord<K, V> record = this.recordList.get(i);
offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
tp -> new ArrayList<>()).add(record);
tp -> new ArrayList<>()).add(record);
}
if (!offsetsToCommit.isEmpty()) {
processAcks(new ConsumerRecords<>(offsetsToCommit));
Expand All @@ -3451,9 +3465,9 @@ public void acknowledge(int index) {
@Override
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
Expand All @@ -3463,15 +3477,14 @@ public void nack(int index, Duration sleep) {
for (ConsumerRecord<K, V> cRecord : this.records) {
if (i++ < index) {
toAck.add(cRecord);
}
else {
} else {
break;
}
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> newRecords = new HashMap<>();
for (ConsumerRecord<K, V> cRecord : toAck) {
newRecords.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
tp -> new LinkedList<>()).add(cRecord);
tp -> new LinkedList<>()).add(cRecord);
}
processAcks(new ConsumerRecords<K, V>(newRecords));
}
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class FilteringBatchMessageListenerAdapter<K, V>
implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {

private final boolean ackDiscarded;
private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
Expand All @@ -54,6 +55,8 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

/**
Expand All @@ -79,19 +82,26 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);

if (consumerRecords.isEmpty()) {
if (acknowledgment != null && acknowledgment.isAnyManualAck()) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
if (this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
}
Expand Down
Expand Up @@ -85,4 +85,7 @@ default boolean isOutOfOrderCommit() {
return false;
}

default boolean isAnyManualAck() {
throw new UnsupportedOperationException("You should implement your isAnyManualAck() method.");
}
}

0 comments on commit a60197d

Please sign in to comment.