Skip to content

Commit

Permalink
CAMEL-2134: Applied patch with thanks to David Valeri.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/camel/branches/camel-1.x@832801 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
davsclaus committed Nov 4, 2009
1 parent 296f70a commit f442a01
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
Expand Up @@ -20,7 +20,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -48,6 +48,8 @@
* @version $Revision$
*/
public class MulticastProcessor extends ServiceSupport implements Processor {
private static final int DEFAULT_THREADPOOL_SIZE = 10;

static class ProcessorExchangePair {
private final Processor processor;
private final Exchange exchange;
Expand Down Expand Up @@ -95,7 +97,10 @@ public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
this.executor = executor;
} else {
// setup default Executor
this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
this.executor = new ThreadPoolExecutor(
DEFAULT_THREADPOOL_SIZE, DEFAULT_THREADPOOL_SIZE,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
this.streaming = streaming;
Expand Down
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;

/**
* @version $Revision$
*/
public class MulticastExecutorTest extends ContextTestSupport {

protected static final String DEFAULT_EXECUTOR_ENDPOINT = "seda:inputDefaultExecutor";

protected Endpoint<Exchange> startEndpoint;
protected MockEndpoint x;
protected MockEndpoint y;
protected MockEndpoint z;

public void testSendingAMessageUsingMulticastAdequateExecutorPool() throws Exception {
this.x.expectedBodiesReceived("input");
this.x.expectedMessageCount(40);
this.y.expectedBodiesReceived("input");
this.y.expectedMessageCount(40);
this.z.expectedBodiesReceived("input");
this.z.expectedMessageCount(40);

(new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 40; i++) {
template.sendBody(DEFAULT_EXECUTOR_ENDPOINT, "input");
}
}
})).start();

assertMockEndpointsSatisfied();
}

@Override
protected void setUp() throws Exception {
super.setUp();

this.x = getMockEndpoint("mock:x");
this.y = getMockEndpoint("mock:y");
this.z = getMockEndpoint("mock:z");
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: example
from(DEFAULT_EXECUTOR_ENDPOINT + "?concurrentConsumers=5").
multicast().
parallelProcessing().
to("mock:x", "mock:y").
to("mock:z");
// END SNIPPET: example
}
};
}
}

0 comments on commit f442a01

Please sign in to comment.