diff --git a/camel-core/src/main/java/org/apache/camel/TimeoutMap.java b/camel-core/src/main/java/org/apache/camel/TimeoutMap.java index e76ed375c4eb4..ef5bda3593e2e 100644 --- a/camel-core/src/main/java/org/apache/camel/TimeoutMap.java +++ b/camel-core/src/main/java/org/apache/camel/TimeoutMap.java @@ -52,8 +52,10 @@ public interface TimeoutMap extends Runnable { * @param key the key * @param value the value * @param timeoutMillis timeout in millis + * @return the previous value associated with key, or + * null if there was no mapping for key. */ - void put(K key, V value, long timeoutMillis); + V put(K key, V value, long timeoutMillis); /** * Callback when the value has been evicted diff --git a/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java b/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java index 94e4026b06306..57462f1650e49 100644 --- a/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java +++ b/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java @@ -94,14 +94,15 @@ public V get(K key) { return entry.getValue(); } - public void put(K key, V value, long timeoutMillis) { + public V put(K key, V value, long timeoutMillis) { TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis); if (useLock) { lock.lock(); } try { - map.put(key, entry); updateExpireTime(entry); + TimeoutMapEntry result = map.put(key, entry); + return result != null ? result.getValue() : null; } finally { if (useLock) { lock.unlock(); diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java index 3d86be1e2758d..d72c8d0848e06 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java @@ -70,7 +70,7 @@ public ReplyHandler get(String key) { } @Override - public void put(String key, ReplyHandler value, long timeoutMillis) { + public ReplyHandler put(String key, ReplyHandler value, long timeoutMillis) { try { if (listener != null) { listener.onPut(key); @@ -79,13 +79,15 @@ public void put(String key, ReplyHandler value, long timeoutMillis) { // ignore } + ReplyHandler result; if (timeoutMillis <= 0) { // no timeout (must use Integer.MAX_VALUE) - super.put(key, value, Integer.MAX_VALUE); + result = super.put(key, value, Integer.MAX_VALUE); } else { - super.put(key, value, timeoutMillis); + result = super.put(key, value, timeoutMillis); } log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis); + return result; } @Override diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index 59513b2461296..e494b83a4056f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -51,10 +51,10 @@ public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncC // add to correlation map QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout); - if (correlation.get(correlationId) != null) { - log.error("The correlationId [{}] is not unique, some reply message would be ignored and the request thread could be blocked.", correlationId); + ReplyHandler result = correlation.put(correlationId, handler, requestTimeout); + if (result != null) { + log.warn("The correlationId [{}] is not unique, some reply message would be ignored and the request thread could be blocked.", correlationId); } - correlation.put(correlationId, handler, requestTimeout); return correlationId; } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 269b8fe9aac24..788994bb04192 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -61,10 +61,10 @@ public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncC String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout); - if (correlation.get(correlationId) != null) { + ReplyHandler result = correlation.put(correlationId, handler, requestTimeout); + if (result != null) { log.error("The correlationId [{}] is not unique, some reply message would be ignored and the request thread could be blocked.", correlationId); } - correlation.put(correlationId, handler, requestTimeout); return correlationId; }