Skip to content

Commit

Permalink
CAMEL-5385: Allow to use custom/shared thread pool for scheduled poll…
Browse files Browse the repository at this point in the history
…ing consumers.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1353260 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
davsclaus committed Jun 24, 2012
1 parent 7a8a09e commit a26c25b
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 13 deletions.
Expand Up @@ -41,7 +41,7 @@
public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);

private ScheduledExecutorService executor;
private ScheduledExecutorService scheduledExecutorService;
private boolean shutdownExecutor;
private ScheduledFuture<?> future;

Expand All @@ -60,12 +60,12 @@ public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}

public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
super(endpoint, processor);
// we have been given an existing thread pool, so we should not manage its lifecycle
// so we should keep shutdownExecutor as false
this.executor = executor;
ObjectHelper.notNull(executor, "executor");
this.scheduledExecutorService = scheduledExecutorService;
ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
}

/**
Expand Down Expand Up @@ -283,6 +283,23 @@ public boolean isSendEmptyMessageWhenIdle() {
return sendEmptyMessageWhenIdle;
}

public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}

/**
* Sets a custom shared {@link ScheduledExecutorService} to use as thread pool
* <p/>
* <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread
* pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool
* when the consumer is started/stopped etc.)
*
* @param scheduledExecutorService the custom thread pool to use
*/
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
}

// Implementation methods
// -------------------------------------------------------------------------

Expand All @@ -299,15 +316,15 @@ protected void doStart() throws Exception {
super.doStart();

// if no existing executor provided, then create a new thread pool ourselves
if (executor == null) {
if (scheduledExecutorService == null) {
// we only need one thread in the pool to schedule this task
this.executor = getEndpoint().getCamelContext().getExecutorServiceManager()
this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager()
.newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1);
// and we should shutdown the thread pool when no longer needed
this.shutdownExecutor = true;
}

ObjectHelper.notNull(executor, "executor", this);
ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this);
ObjectHelper.notNull(pollStrategy, "pollStrategy", this);

if (isStartScheduler()) {
Expand All @@ -321,13 +338,13 @@ protected void startScheduler() {
LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
}
future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}",
new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
}
future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
}
}

Expand All @@ -342,9 +359,9 @@ protected void doStop() throws Exception {

@Override
protected void doShutdown() throws Exception {
if (shutdownExecutor && executor != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
executor = null;
if (shutdownExecutor && scheduledExecutorService != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
scheduledExecutorService = null;
future = null;
}
super.doShutdown();
Expand Down
Expand Up @@ -62,13 +62,14 @@ private void configureScheduledPollConsumerProperties(Map<String, Object> option
Object pollStrategy = options.remove("pollStrategy");
Object runLoggingLevel = options.remove("runLoggingLevel");
Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle");
Object scheduledExecutorService = options.remove("scheduledExecutorService");
boolean setConsumerProperties = false;

// the following is split into two if statements to satisfy the checkstyle max complexity constraint
if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
setConsumerProperties = true;
}
if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null) {
if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) {
setConsumerProperties = true;
}

Expand Down Expand Up @@ -101,6 +102,9 @@ private void configureScheduledPollConsumerProperties(Map<String, Object> option
if (sendEmptyMessageWhenIdle != null) {
consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle);
}
if (scheduledExecutorService != null) {
consumerProperties.put("scheduledExecutorService", scheduledExecutorService);
}
}
}

Expand Down
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;

import org.apache.camel.Exchange;
import org.apache.camel.component.mock.MockEndpoint;

/**
*
*/
public class FileConsumerSharedThreadPollStopRouteTest extends FileConsumerSharedThreadPollTest {

public void testSharedThreadPool() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(2);
// thread thread name should be the same
mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));

template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");

assertMockEndpointsSatisfied();

// now stop a
context.stopRoute("a");

resetMocks();
mock.expectedBodiesReceived("Bye World 2");
// a should not be polled
mock.expectedFileExists("target/a/hello2.txt");

template.sendBodyAndHeader("file:target/a", "Hello World 2", Exchange.FILE_NAME, "hello2.txt");
template.sendBodyAndHeader("file:target/b", "Bye World 2", Exchange.FILE_NAME, "bye2.txt");

assertMockEndpointsSatisfied();

// now start a, which should pickup the file
resetMocks();
mock.expectedBodiesReceived("Hello World 2");
context.startRoute("a");

assertMockEndpointsSatisfied();
}

}
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;

/**
*
*/
public class FileConsumerSharedThreadPollTest extends ContextTestSupport {

private ScheduledExecutorService pool;
private SimpleRegistry registry = new SimpleRegistry();

@Override
protected void setUp() throws Exception {
deleteDirectory("target/a");
deleteDirectory("target/b");
super.setUp();
}

@Override
protected CamelContext createCamelContext() throws Exception {
return new DefaultCamelContext(registry);
}

public void testSharedThreadPool() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(2);
// thread thread name should be the same
mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));

template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// create shared pool and enlist in registry
pool = new ThreadPoolBuilder(context).poolSize(1).buildScheduled(this, "MySharedPool");
registry.put("myPool", pool);

from("file:target/a?scheduledExecutorService=#myPool").routeId("a")
.to("direct:shared");

from("file:target/b?scheduledExecutorService=#myPool").routeId("b")
.to("direct:shared");

from("direct:shared").routeId("shared")
.convertBodyTo(String.class)
.log("Get ${file:name} using ${threadName}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("threadName", Thread.currentThread().getName());
}
})
.to("mock:result");
}
};
}
}

0 comments on commit a26c25b

Please sign in to comment.