Skip to content

Commit

Permalink
CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should …
Browse files Browse the repository at this point in the history
…cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
  • Loading branch information
davsclaus committed Dec 11, 2023
1 parent 9f28788 commit 2d3b17b
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -50,8 +51,6 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ErrorHandlerAware;
Expand Down Expand Up @@ -268,21 +267,6 @@ protected void doBuild() throws Exception {
processorExchangeFactory.setId(id);
processorExchangeFactory.setRouteId(routeId);
}

// eager load classes
Object dummy = new MulticastReactiveTask();
LOG.trace("Loaded {}", dummy.getClass().getName());
Object dummy2 = new MulticastTransactedTask();
LOG.trace("Loaded {}", dummy2.getClass().getName());
Object dummy3 = new UseOriginalAggregationStrategy();
LOG.trace("Loaded {}", dummy3.getClass().getName());
if (isShareUnitOfWork()) {
Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null);
LOG.trace("Loaded {}", dummy4.getClass().getName());
}
Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
LOG.trace("Loaded {}", dummy5.getClass().getName());

ServiceHelper.buildService(processorExchangeFactory);
}

Expand Down Expand Up @@ -417,24 +401,17 @@ protected abstract class MulticastTask implements Runnable {
final AtomicBoolean allSent = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean();
final Map<String, String> mdc;

private MulticastTask() {
// used for eager classloading
this.original = null;
this.pairs = null;
this.callback = null;
this.iterator = null;
this.mdc = null;
this.completion = null;
}
final ScheduledFuture<?> timeoutTask;

MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int capacity) {
this.original = original;
this.pairs = pairs;
this.callback = callback;
this.iterator = pairs.iterator();
if (timeout > 0) {
schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
} else {
timeoutTask = null;
}
// if MDC is enabled we must make a copy in this constructor when the task
// is created by the caller thread, and then propagate back when run is called
Expand Down Expand Up @@ -505,15 +482,30 @@ protected void timeout() {
} catch (Throwable e) {
original.setException(e);
// and do the done work
doDone(null, false);
doTimeoutDone(null, false);
} finally {
lock.unlock();
}
}
}

protected void doTimeoutDone(Exchange exchange, boolean forceExhaust) {
if (done.compareAndSet(false, true)) {
MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
}
}

protected void doDone(Exchange exchange, boolean forceExhaust) {
if (done.compareAndSet(false, true)) {
// cancel timeout if we are done normally (we cannot cancel if called via onTimeout)
if (timeoutTask != null) {
try {
timeoutTask.cancel(true);
} catch (Exception e) {
// ignore
LOG.debug("Cancel timeout task caused an exception. This exception is ignored.", e);
}
}
MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
}
}
Expand All @@ -524,9 +516,6 @@ protected void doDone(Exchange exchange, boolean forceExhaust) {
*/
protected class MulticastReactiveTask extends MulticastTask {

private MulticastReactiveTask() {
}

public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
super(original, pairs, callback, size);
Expand Down Expand Up @@ -625,9 +614,6 @@ public void run() {
*/
protected class MulticastTransactedTask extends MulticastTask {

private MulticastTransactedTask() {
}

public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
super(original, pairs, callback, size);
Expand Down Expand Up @@ -737,9 +723,9 @@ boolean doRun() throws Exception {
}
}

protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
protected ScheduledFuture<?> schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
return ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
} else {
executor.execute(() -> {
try {
Expand All @@ -750,6 +736,7 @@ protected void schedule(Executor executor, Runnable runnable, long delay, TimeUn
runnable.run();
});
}
return null;
}

protected StopWatch beforeSend(ProcessorExchangePair pair) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.ThreadPoolProfile;
import org.junit.jupiter.api.Test;

public class SplitTimeoutCancelTaskTest extends ContextTestSupport {

String payload1 = "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
String payload2 = "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";

@Test
public void testSplitterTimeoutShouldNotExhaustThreadPool() throws Exception {
MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
mockEndpoint.expectedMessageCount(4);

template.sendBody("direct:start", payload1);
template.sendBody("direct:start", payload2);

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("testProfile");
myThreadPoolProfile.setMaxPoolSize(20);
myThreadPoolProfile.setPoolSize(10);
myThreadPoolProfile.setMaxQueueSize(1);

getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);

from("direct:start")
.split()
.xpath("//items/item")
.parallelProcessing(true)
.streaming(true)
.stopOnException(true)
.timeout("30000")
.executorService("testProfile")
.to("mock:split");
}
};
}
}

0 comments on commit 2d3b17b

Please sign in to comment.