Skip to content

Commit

Permalink
spring-projectsGH-2806 : Draft again
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 24, 2024
1 parent 44b3dbe commit b9c1583
Showing 1 changed file with 18 additions and 12 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,8 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
Expand All @@ -54,6 +56,7 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

/**
Expand All @@ -71,6 +74,7 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

@Override
Expand All @@ -79,19 +83,21 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);

if (consumerRecords.isEmpty()) {
if (acknowledgment != null) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
if (this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
}
Expand Down

0 comments on commit b9c1583

Please sign in to comment.