From 6d209fffdc00f84ca84eb833b5edecd71380abaf Mon Sep 17 00:00:00 2001 From: Manuel Shenavai Date: Tue, 21 Jan 2020 16:00:34 +0100 Subject: [PATCH] CAMEL-14425: Polling Optimization: Remove for-loops and consider number of maximum messages to process earlier during the poll --- .../camel/component/mail/MailConsumer.java | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index bf178da346cc9..0949714ff71c2 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -20,7 +20,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; - import javax.mail.Flags; import javax.mail.Folder; import javax.mail.FolderNotFoundException; @@ -86,6 +85,30 @@ protected void doStop() throws Exception { super.doStop(); } + /** + * Returns the max number of messages to be processed. Will return -1 if no maximum is set + * + * @return + */ + private int getMaxNumberOfMessages() { + int maxMessagesPerPoll = (getEndpoint().getMaxMessagesPerPoll() == 0) ? -1 : getEndpoint().getMaxMessagesPerPoll(); + int fetchSize = getEndpoint().getConfiguration().getFetchSize(); + + if (hasMessageLimit(fetchSize)) { + return fetchSize; + } + + if (hasMessageLimit(maxMessagesPerPoll)) { + return maxMessagesPerPoll; + } + + return -1; + } + + private boolean hasMessageLimit(int limitValue) { + return limitValue >= 0; + } + protected int poll() throws Exception { // must reset for each poll shutdownRunningTask = null; @@ -174,13 +197,6 @@ protected int poll() throws Exception { public int processBatch(Queue exchanges) throws Exception { int total = exchanges.size(); - - // limit if needed - if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { - LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total); - total = maxMessagesPerPoll; - } - for (int index = 0; index < total && isBatchAllowed(); index++) { // only loop if we are started (allowed to run) Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); @@ -269,13 +285,22 @@ private List> retrieveMessages() throws Messagin } } + int maxMessage = getMaxNumberOfMessages(); + boolean messageNeedsValidation = getEndpoint().getIdempotentRepository() != null; for (Message message : messages) { + if (hasMessageLimit(maxMessage) && answer.size() >= maxMessage) { + break; + } String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message); - if (isValidMessage(key, message)) { + if (!messageNeedsValidation || isValidMessage(key, message)) { answer.add(new KeyValueHolder<>(key, message)); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Fetching {} messages. Total {} messages.", answer.size(), messages.length); + } + return answer; } @@ -328,16 +353,9 @@ private SearchTerm computeSearchTerm() { } protected Queue createExchanges(List> messages) throws MessagingException { - Queue answer = new LinkedList<>(); - - int fetchSize = getEndpoint().getConfiguration().getFetchSize(); - int count = fetchSize == -1 ? messages.size() : Math.min(fetchSize, messages.size()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size()); - } + Queue answer = new LinkedList(); - for (int i = 0; i < count; i++) { + for (int i = 0; i < messages.size(); i++) { try { KeyValueHolder holder = messages.get(i); String key = holder.getKey();