Skip to content

Commit

Permalink
modify
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 24, 2024
1 parent a60197d commit fe26036
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3390,7 +3390,6 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
private final ConsumerRecords<K, V> records;

private final List<ConsumerRecord<K, V>> recordList;
private boolean isAnyManualAck = false;

private volatile boolean acked;

Expand All @@ -3402,18 +3401,6 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
this.recordList = recordList;
}

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList,
boolean isAnyManualAck) {
this(records, recordList);
this.isAnyManualAck = isAnyManualAck;
}

@Override
public boolean isAnyManualAck() {
return this.isAnyManualAck;
}

@Override
public void acknowledge() {
if (this.partial >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

@Override
Expand All @@ -84,14 +86,9 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
Assert.state(consumerRecords != null, "filter returned null from filterBatch");

if (consumerRecords.isEmpty()) {
if (acknowledgment != null && acknowledgment.isAnyManualAck()) {
if (acknowledgment != null) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
else {
if (this.consumerAware
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,4 @@ default boolean isOutOfOrderCommit() {
return false;
}

default boolean isAnyManualAck() {
throw new UnsupportedOperationException("You should implement your isAnyManualAck() method.");
}
}

0 comments on commit fe26036

Please sign in to comment.