Permalink
Browse files

Camel 2.0-M2 Release

git-svn-id: https://svn.apache.org/repos/asf/camel/tags/camel-2.0-M2@783997 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
2 parents 10921f0 + 09044dc commit a57ab48a877981dd1039e79871a0a391b9e16238 @hzbarcea hzbarcea committed Jun 12, 2009
Showing with 764 additions and 133 deletions.
  1. +1 −0 camel-core/src/main/java/org/apache/camel/Exchange.java
  2. +15 −4 camel-core/src/main/java/org/apache/camel/processor/Enricher.java
  3. +9 −0 camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
  4. +8 −1 camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
  5. +13 −2 camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
  6. +81 −0 camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java
  7. +85 −0 camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java
  8. +90 −0 camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java
  9. +89 −0 camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java
  10. +4 −3 components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSpringEndpoint.java
  11. +13 −1 components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
  12. +3 −0 ...amel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBeanDefinitionParser.java
  13. +3 −18 components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/CxfUtils.java
  14. +1 −2 .../camel-cxf/src/test/java/org/apache/camel/component/cxf/spring/AbstractSpringBeanTestSupport.java
  15. +82 −48 components/camel-mail/src/main/java/org/apache/camel/component/mail/MailBinding.java
  16. +17 −4 components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCustomContentTypeTest.java
  17. +8 −0 components/camel-web/pom.xml
  18. +213 −27 components/camel-web/src/main/java/org/apache/camel/web/resources/RouteResource.java
  19. +23 −18 components/camel-web/src/main/webapp/org/apache/camel/web/resources/RouteResource/edit.jsp
  20. +6 −5 components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
@@ -59,6 +59,7 @@
String FILE_NAME_PRODUCED = "CamelFileNameProduced";
String FILE_PATH = "CamelFilePath";
String FILE_PARENT = "CamelFileParent";
+ String FILTERED = "CamelFiltered";
String HTTP_CHARACTER_ENCODING = "CamelHttpCharacterEncoding";
String HTTP_METHOD = "CamelHttpMethod";
@@ -23,6 +23,8 @@
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
@@ -35,8 +37,8 @@
*/
public class Enricher extends ServiceSupport implements Processor {
+ private static final transient Log LOG = LogFactory.getLog(Enricher.class);
private AggregationStrategy aggregationStrategy;
-
private Producer producer;
/**
@@ -99,10 +101,19 @@ public void process(Exchange exchange) throws Exception {
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
+
// aggregate original exchange and resource exchange
- Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
- // copy aggregation result onto original exchange (preserving pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
+ // but do not aggregate if the resource exchange was filtered
+ Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (filtered == null || !filtered) {
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange);
+ }
+ }
}
}
@@ -19,6 +19,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* The processor which implements the
@@ -27,6 +29,7 @@
* @version $Revision$
*/
public class FilterProcessor extends DelegateProcessor {
+ private static final Log LOG = LogFactory.getLog(FilterProcessor.class);
private final Predicate predicate;
public FilterProcessor(Predicate predicate, Processor processor) {
@@ -37,6 +40,12 @@ public FilterProcessor(Predicate predicate, Processor processor) {
public void process(Exchange exchange) throws Exception {
if (predicate.matches(exchange)) {
super.process(exchange);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Marking exchange as filtered: " + exchange);
+ }
+ // mark this exchange as filtered
+ exchange.setProperty(Exchange.FILTERED, Boolean.TRUE);
}
}
@@ -92,6 +92,7 @@ public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
notNull(processors, "processors");
+ // TODO: end() does not work correctly with Splitter
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.isParallelProcessing = parallelProcessing;
@@ -204,8 +205,14 @@ protected void doProcessSequntiel(AtomicExchange result, Iterable<ProcessorExcha
* @param exchange the exchange to be added to the result
*/
protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
- if (aggregationStrategy != null) {
+ // only aggregate if the exchange is not filtered (eg by the FilterProcessor)
+ Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (aggregationStrategy != null && (filtered == null || !filtered)) {
result.set(aggregationStrategy.aggregate(result.get(), exchange));
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
+ }
}
}
@@ -57,11 +57,22 @@ public DefaultAggregationCollection(Expression correlationExpression, Aggregatio
@Override
public boolean add(Exchange exchange) {
+ // do not add exchange if it was filtered
+ Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (filtered != null && filtered) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
+ }
+ return false;
+ }
+
Object correlationKey = correlationExpression.evaluate(exchange, Object.class);
if (LOG.isTraceEnabled()) {
- LOG.trace("Evaluated expression: " + correlationExpression + " as CorrelationKey: " + correlationKey);
+ LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey);
}
+ // TODO: correlationKey evalutated to null should be skipped by default
+
Exchange oldExchange = aggregated.get(correlationKey);
Exchange newExchange = exchange;
@@ -80,7 +91,7 @@ public boolean add(Exchange exchange) {
// the strategy may just update the old exchange and return it
if (!newExchange.equals(oldExchange)) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Put exchange:" + newExchange + " with coorelation key:" + correlationKey);
+ LOG.trace("Put exchange:" + newExchange + " with correlation key:" + correlationKey);
}
aggregated.put(correlationKey, newExchange);
}
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Aggregate aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class AggregateShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testAggregateWithFilter() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Bye World");
+
+ MockEndpoint filtered = getMockEndpoint("mock:filtered");
+ filtered.expectedBodiesReceived("Hello World", "Bye World");
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "id", 1);
+ template.sendBodyAndHeader("direct:start", "Hi there", "id", 1);
+ template.sendBodyAndHeader("direct:start", "Bye World", "id", 1);
+ template.sendBodyAndHeader("direct:start", "How do you do?", "id", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ Predicate goodWord = body().contains("World");
+
+ from("direct:start")
+ .filter(goodWord)
+ .to("mock:filtered")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Enrich aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class EnrichShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testEnrichWithFilterNotFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testEnrichWithFilterFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hi there");
+
+ template.sendBody("direct:start", "Hi there");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .enrich("direct:enrich", new MyAggregationStrategy())
+ .to("mock:result");
+
+ Predicate goodWord = body().contains("World");
+ from("direct:enrich")
+ .filter(goodWord)
+ .to("mock:filtered");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+ assertTrue("Should have been filtered: " + newBody, newBody.contains("World"));
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Multicast aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class MulticastShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testMulticastWithFilterNotFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMulticastWithFilterFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hi there");
+
+ template.sendBody("direct:start", "Hi there");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("direct:multicast")
+ .to("mock:result");
+
+ from("direct:multicast")
+ .multicast(new MyAggregationStrategy())
+ .to("direct:a")
+ .to("direct:b");
+
+ Predicate goodWord = body().contains("World");
+ from("direct:a", "direct:b")
+ .filter(goodWord)
+ .to("mock:filtered");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+ assertTrue("Should have been filtered: " + newBody, newBody.contains("World"));
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
Oops, something went wrong.

0 comments on commit a57ab48

Please sign in to comment.