diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index a67cee8778185..f946becd46348 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; +import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -37,6 +38,8 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Route; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; +import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProducerCache; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -188,6 +191,23 @@ public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; } + @Override + public boolean process(Exchange exchange, final AsyncCallback callback) { + AggregationStrategy strategy = getAggregationStrategy(); + + // set original exchange if not already pre-configured + if (strategy instanceof UseOriginalAggregationStrategy original) { + // need to create a new private instance, as we can also have concurrency issue so we cannot store state + AggregationStrategy clone = original.newInstance(exchange); + if (isShareUnitOfWork()) { + clone = new ShareUnitOfWorkAggregationStrategy(clone); + } + setAggregationStrategyOnExchange(exchange, clone); + } + + return super.process(exchange, callback); + } + @Override protected Iterable createProcessorExchangePairs(Exchange exchange) throws Exception { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java index 17f4a95dd6c9f..bee87834a9f84 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java @@ -61,25 +61,28 @@ public UseOriginalAggregationStrategy newInstance(Exchange original) { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + Exchange target = oldExchange; if (propagateException) { Exception exception = checkException(oldExchange, newExchange); if (exception != null) { + target = oldExchange != null ? oldExchange : newExchange; if (original != null) { original.setException(exception); } else { - oldExchange.setException(exception); + target.setException(exception); } } exception = checkCaughtException(oldExchange, newExchange); if (exception != null) { + target = oldExchange != null ? oldExchange : newExchange; if (original != null) { original.setProperty(Exchange.EXCEPTION_CAUGHT, exception); - } else if (oldExchange != null) { - oldExchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + } else { + target.setProperty(Exchange.EXCEPTION_CAUGHT, exception); } } } - return original != null ? original : oldExchange; + return original != null ? original : target; } protected Exception checkException(Exchange oldExchange, Exchange newExchange) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java new file mode 100644 index 0000000000000..18076fd0c6b80 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RecipientListUseOriginalNotPropagateExceptionTest extends ContextTestSupport { + + @ParameterizedTest + @ValueSource(strings = { + "body", + "throw1", + "throw2", + "throw1;throw2" + }) + public void testWithoutPropagation(String body) throws Exception { + + getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1") ? 0 : 1); + getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2") ? 0 : 1); + + getMockEndpoint("mock:result").expectedBodiesReceived(body); + + template.sendBody("direct:start", body); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:start") + .errorHandler(noErrorHandler()) + .recipientList(constant("direct:recipient1,direct:recipient2")) + .aggregationStrategy(new UseOriginalAggregationStrategy(false)) + .to("mock:result"); + + from("direct:recipient1") + .log("recipient1") + .choice() + .when(bodyAs(String.class).contains("throw1")) + .process( + exchange -> { + throw new RuntimeException("recipient1"); + }) + .end() + .setBody(constant("recipient1")) + .to("mock:recipient1"); + + from("direct:recipient2") + .log("recipient2") + .choice() + .when(bodyAs(String.class).contains("throw2")) + .process( + exchange -> { + throw new RuntimeException("recipient2"); + }) + .end() + .setBody(constant("recipient2")) + .to("mock:recipient2"); + } + }; + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java new file mode 100644 index 0000000000000..abf4f0116c256 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class RecipientListUseOriginalPropagateExceptionCaughtTest extends ContextTestSupport { + + @ParameterizedTest + @ValueSource(strings = { + "caught1", + "caught2", + }) + public void testWithPropagation(String body) throws Exception { + + getMockEndpoint("mock:recipient1").expectedMessageCount(1); + getMockEndpoint("mock:recipient2").expectedMessageCount(1); + + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", body); + + assertEquals(1, getMockEndpoint("mock:result").getReceivedExchanges().size()); + Exchange exchange = getMockEndpoint("mock:result").getReceivedExchanges().get(0); + Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + assertNotNull(exception); + Throwable rootCause = exception; + while (rootCause.getCause() != null) { + rootCause = rootCause.getCause(); + } + assertInstanceOf(RuntimeException.class, rootCause); + + if (body.contains("caught1")) { + assertEquals("recipient1", rootCause.getMessage()); + } + if (body.contains("caught2")) { + assertEquals("recipient2", rootCause.getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:start") + .errorHandler(noErrorHandler()) + .recipientList(constant("direct:recipient1,direct:recipient2")) + .aggregationStrategy(new UseOriginalAggregationStrategy(true)) + .to("mock:result"); + + from("direct:recipient1") + .log("recipient1") + .choice() + .when(bodyAs(String.class).contains("caught1")) + .process( + exchange -> { + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new RuntimeException("recipient1")); + }) + .end() + .setBody(constant("recipient1")) + .to("mock:recipient1"); + + from("direct:recipient2") + .log("recipient2") + .choice() + .when(bodyAs(String.class).contains("caught2")) + .process( + exchange -> { + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new RuntimeException("recipient2")); + }) + .end() + .setBody(constant("recipient2")) + .to("mock:recipient2"); + } + }; + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java new file mode 100644 index 0000000000000..9b243e6285454 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RecipientListUseOriginalPropagateExceptionTest extends ContextTestSupport { + + @ParameterizedTest + @ValueSource(strings = { + "throw1", + "throw2", + }) + public void testWithPropagation(String body) throws Exception { + + getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1") ? 0 : 1); + getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2") ? 0 : 1); + + getMockEndpoint("mock:result").expectedMessageCount(0); + + CamelExecutionException exception + = assertThrows(CamelExecutionException.class, () -> template.sendBody("direct:start", body)); + + Throwable rootCause = exception; + while (rootCause.getCause() != null) { + rootCause = rootCause.getCause(); + } + assertInstanceOf(RuntimeException.class, rootCause); + + if (body.contains("throw1")) { + assertEquals("recipient1", rootCause.getMessage()); + } + if (body.contains("throw2")) { + assertEquals("recipient2", rootCause.getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:start") + .errorHandler(noErrorHandler()) + .recipientList(constant("direct:recipient1,direct:recipient2")) + .aggregationStrategy(new UseOriginalAggregationStrategy(true)) + .to("mock:result"); + + from("direct:recipient1") + .log("recipient1") + .choice() + .when(bodyAs(String.class).contains("throw1")) + .process( + exchange -> { + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new RuntimeException("recipient1Caught")); + throw new RuntimeException("recipient1"); + }) + .end() + .setBody(constant("recipient1")) + .to("mock:recipient1"); + + from("direct:recipient2") + .log("recipient2") + .choice() + .when(bodyAs(String.class).contains("throw2")) + .process( + exchange -> { + throw new RuntimeException("recipient2"); + }) + .end() + .setBody(constant("recipient2")) + .to("mock:recipient2"); + } + }; + } + +}