Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiving an empty list when using RecordFilterStrategy on batch messages #2806

Open
artembilan opened this issue Sep 11, 2023 Discussed in #2805 · 2 comments · May be fixed by #3216
Open

Receiving an empty list when using RecordFilterStrategy on batch messages #2806

artembilan opened this issue Sep 11, 2023 Discussed in #2805 · 2 comments · May be fixed by #3216

Comments

@artembilan
Copy link
Member

Discussed in #2805

Originally posted by m-hetz September 11, 2023
Hi!

I'm using RecordFilterStrategy to filter records based on headers. This works great on single messages, the filter works and records don't reach the listener (annotated with @KafkaListener).

When using batch mode, filtering also works but when the entire batch is filtered, the listener is invoked with an empty list.
Is this expected behavior? According to this when returning an empty list all records are filtered

Is there a way to configure it not to reach the listener at all? (on an empty bulk)

Thanks!

val filter = object : RecordFilterStrategy<String, String> {
    override fun filter(consumerRecord: ConsumerRecord<String, String>): Boolean {
        val header = consumerRecord.headers().lastHeader("SKIP")
        if (header != null) {
            logger.info("Filtering header=" + consumerRecord.key())
            return true
        }
        return false
    }

    override fun filterBatch(records: MutableList<ConsumerRecord<String, String>>): MutableList<ConsumerRecord<String, String>> {
        val iterator: MutableIterator<ConsumerRecord<String, String>> = records.iterator()

        while (iterator.hasNext()) {
            if (this.filter(iterator.next())) {
                iterator.remove()
            }
        }
        if (records.size > 0) return records
        return mutableListOf()
    }
}
factory.setBatchListener(isBatchListener)
factory.setRecordFilterStrategy(filter)


@KafkaListener(topics = ["topic"], containerFactory = "messageContainerFactory")
fun listener(messages: List<ConsumerRecord<String, String?>>,
) {

    logger.info("Consuming messages, batch size=${messages.size}")
}

Sorry for the unformatted code, I tried editing the message but the code is still messed up

@chickenchickenlove
Copy link
Contributor

chickenchickenlove commented Apr 20, 2024

Hi, @artembilan.
I'm interested in solving this issue.

		if (consumerRecords.size() > 0 || consumerAware
				|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
			invokeDelegate(consumerRecords, acknowledgment, consumer);
		}

Why we should consider consumerAware type even if consumerRecords.size() == 0?

		if (!filter(consumerRecord)) {
			switch (this.delegateType) {
				case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
				case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
				case CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, consumer);
				case SIMPLE -> this.delegate.onMessage(consumerRecord);
			}
		}
		else {
			ackFilteredIfNecessary(acknowledgment);
		}

In a not batch mode, if record are filtered all, MessageListenerAdapter does not care about at all.
Thus what do you think to fix code like this? (please note if condition)

                 // Fix Here. consumerRecords.size() should be grater than 0. 
		if ((consumerRecords.size() > 0) && (consumerAware
				|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING)))) {
			invokeDelegate(consumerRecords, acknowledgment, consumer);
		}
                // And then, consumerRecords.size() == 0, ackFilteredIfNecessary! 
		else {
			if (this.ackDiscarded && acknowledgment != null) {
				acknowledgment.acknowledge();
			}
		}

IMHO, it is not necessary to be aware of consumer if filtered record size is 0. This is because it is already being ignored in single mode.
What do you think?

@sobychacko
Copy link
Contributor

@chickenchickenlove You can go ahead and work on a PR. Then, we can review the details. Keep in mind that we are already in an RC state with 3.2.0. Therefore, any new enhancements like these must wait for the 3.3.0 version.

chickenchickenlove added a commit to chickenchickenlove/spring-kafka that referenced this issue Apr 24, 2024
chickenchickenlove added a commit to chickenchickenlove/spring-kafka that referenced this issue Apr 24, 2024
chickenchickenlove added a commit to chickenchickenlove/spring-kafka that referenced this issue Apr 24, 2024
@artembilan artembilan modified the milestones: Backlog, 3.3.0-M1 May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants