diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 9ccfd4b5d364d..670664b5a847f 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -41,7 +41,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); - private ScheduledExecutorService executor; + private ScheduledExecutorService scheduledExecutorService; private boolean shutdownExecutor; private ScheduledFuture future; @@ -60,12 +60,12 @@ public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } - public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) { + public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) { super(endpoint, processor); // we have been given an existing thread pool, so we should not manage its lifecycle // so we should keep shutdownExecutor as false - this.executor = executor; - ObjectHelper.notNull(executor, "executor"); + this.scheduledExecutorService = scheduledExecutorService; + ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService"); } /** @@ -283,6 +283,23 @@ public boolean isSendEmptyMessageWhenIdle() { return sendEmptyMessageWhenIdle; } + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + /** + * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool + *

+ * Notice: When using a custom thread pool, then the lifecycle of this thread + * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool + * when the consumer is started/stopped etc.) + * + * @param scheduledExecutorService the custom thread pool to use + */ + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + } + // Implementation methods // ------------------------------------------------------------------------- @@ -299,15 +316,15 @@ protected void doStart() throws Exception { super.doStart(); // if no existing executor provided, then create a new thread pool ourselves - if (executor == null) { + if (scheduledExecutorService == null) { // we only need one thread in the pool to schedule this task - this.executor = getEndpoint().getCamelContext().getExecutorServiceManager() + this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1); // and we should shutdown the thread pool when no longer needed this.shutdownExecutor = true; } - ObjectHelper.notNull(executor, "executor", this); + ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); if (isStartScheduler()) { @@ -321,13 +338,13 @@ protected void startScheduler() { LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); } - future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit()); + future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit()); } else { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); } - future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit()); + future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit()); } } @@ -342,9 +359,9 @@ protected void doStop() throws Exception { @Override protected void doShutdown() throws Exception { - if (shutdownExecutor && executor != null) { - getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); - executor = null; + if (shutdownExecutor && scheduledExecutorService != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); + scheduledExecutorService = null; future = null; } super.doShutdown(); diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 4aa4337ff48ab..1d4aa850e02e0 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -62,13 +62,14 @@ private void configureScheduledPollConsumerProperties(Map option Object pollStrategy = options.remove("pollStrategy"); Object runLoggingLevel = options.remove("runLoggingLevel"); Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle"); + Object scheduledExecutorService = options.remove("scheduledExecutorService"); boolean setConsumerProperties = false; // the following is split into two if statements to satisfy the checkstyle max complexity constraint if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) { setConsumerProperties = true; } - if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null) { + if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) { setConsumerProperties = true; } @@ -101,6 +102,9 @@ private void configureScheduledPollConsumerProperties(Map option if (sendEmptyMessageWhenIdle != null) { consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle); } + if (scheduledExecutorService != null) { + consumerProperties.put("scheduledExecutorService", scheduledExecutorService); + } } } diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java new file mode 100644 index 0000000000000..46c502e99134b --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.Exchange; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * + */ +public class FileConsumerSharedThreadPollStopRouteTest extends FileConsumerSharedThreadPollTest { + + public void testSharedThreadPool() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + // thread thread name should be the same + mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName")); + + template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt"); + template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt"); + + assertMockEndpointsSatisfied(); + + // now stop a + context.stopRoute("a"); + + resetMocks(); + mock.expectedBodiesReceived("Bye World 2"); + // a should not be polled + mock.expectedFileExists("target/a/hello2.txt"); + + template.sendBodyAndHeader("file:target/a", "Hello World 2", Exchange.FILE_NAME, "hello2.txt"); + template.sendBodyAndHeader("file:target/b", "Bye World 2", Exchange.FILE_NAME, "bye2.txt"); + + assertMockEndpointsSatisfied(); + + // now start a, which should pickup the file + resetMocks(); + mock.expectedBodiesReceived("Hello World 2"); + context.startRoute("a"); + + assertMockEndpointsSatisfied(); + } + +} diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java new file mode 100644 index 0000000000000..fb0e5824db7f1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; + +/** + * + */ +public class FileConsumerSharedThreadPollTest extends ContextTestSupport { + + private ScheduledExecutorService pool; + private SimpleRegistry registry = new SimpleRegistry(); + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/a"); + deleteDirectory("target/b"); + super.setUp(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + return new DefaultCamelContext(registry); + } + + public void testSharedThreadPool() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + // thread thread name should be the same + mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName")); + + template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt"); + template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // create shared pool and enlist in registry + pool = new ThreadPoolBuilder(context).poolSize(1).buildScheduled(this, "MySharedPool"); + registry.put("myPool", pool); + + from("file:target/a?scheduledExecutorService=#myPool").routeId("a") + .to("direct:shared"); + + from("file:target/b?scheduledExecutorService=#myPool").routeId("b") + .to("direct:shared"); + + from("direct:shared").routeId("shared") + .convertBodyTo(String.class) + .log("Get ${file:name} using ${threadName}") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader("threadName", Thread.currentThread().getName()); + } + }) + .to("mock:result"); + } + }; + } +}