From 53bd40e91e2689078120c7ba302e69db1b5e6f6f Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Fri, 2 Sep 2011 09:13:51 +0000 Subject: [PATCH] Merged revisions 1159682 via svnmerge from https://svn.apache.org/repos/asf/camel/trunk git-svn-id: https://svn.apache.org/repos/asf/camel/branches/camel-2.8.x@1164422 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/camel/impl/ProducerCache.java | 58 +++++----- ...EventNotifierFailureHandledEventsTest.java | 19 ++-- .../EventNotifierRedeliveryEventsTest.java | 7 +- .../async/AsyncEndpointEventNotifierTest.java | 102 ++++++++++++++++++ 4 files changed, 149 insertions(+), 37 deletions(-) create mode 100644 camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index e9093a4ccf69..c8304d636502 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -259,11 +259,12 @@ public T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern * @param producerCallback the producer template callback to be executed * @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously */ - public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, AsyncCallback callback, AsyncProducerCallback producerCallback) { + public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, + final AsyncCallback callback, final AsyncProducerCallback producerCallback) { boolean sync = true; // get the producer and we do not mind if its pooled as we can handle returning it back to the pool - Producer producer = doGetProducer(endpoint, true); + final Producer producer = doGetProducer(endpoint, true); if (producer == null) { if (isStopped()) { @@ -274,39 +275,44 @@ public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, ExchangeP } } - StopWatch watch = null; - if (exchange != null) { - // record timing for sending the exchange using the producer - watch = new StopWatch(); - } + // record timing for sending the exchange using the producer + final StopWatch watch = exchange != null ? new StopWatch() : null; try { // invoke the callback AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(producer); - sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, callback); + sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + try { + if (watch != null) { + long timeTaken = watch.stop(); + // emit event that the exchange was sent to the endpoint + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + } + + if (producer instanceof ServicePoolAware) { + // release back to the pool + pool.release(endpoint, producer); + } else if (!producer.isSingleton()) { + // stop non singleton producers as we should not leak resources + try { + ServiceHelper.stopService(producer); + } catch (Exception e) { + // ignore and continue + LOG.warn("Error stopping producer: " + producer, e); + } + } + } finally { + callback.done(doneSync); + } + } + }); } catch (Throwable e) { // ensure exceptions is caught and set on the exchange if (exchange != null) { exchange.setException(e); } - } finally { - if (exchange != null && exchange.getException() == null) { - long timeTaken = watch.stop(); - // emit event that the exchange was sent to the endpoint - EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); - } - if (producer instanceof ServicePoolAware) { - // release back to the pool - pool.release(endpoint, producer); - } else if (!producer.isSingleton()) { - // stop non singleton producers as we should not leak resources - try { - ServiceHelper.stopService(producer); - } catch (Exception e) { - // ignore and continue - LOG.warn("Error stopping producer: " + producer, e); - } - } } return sync; diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java index 9280728e466a..d0df8595c86f 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java @@ -94,17 +94,19 @@ public void configure() throws Exception { assertIsInstanceOf(RouteStartedEvent.class, events.get(1)); assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(4)); - ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4)); + ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5)); assertEquals("should be DLC", true, e.isDeadLetterChannel()); SendProcessor send = assertIsInstanceOf(SendProcessor.class, e.getFailureHandler()); assertEquals("mock://dead", send.getDestination().getEndpointUri()); // dead letter channel will mark the exchange as completed - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5)); - // and the sent will be logged after they are complete sending as it record the time taken as well - assertIsInstanceOf(ExchangeSentEvent.class, events.get(6)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6)); + // and the last event should be the direct:start assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); + ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7); + assertEquals("direct://start", sent.getEndpoint().getEndpointUri()); } public void testExchangeOnException() throws Exception { @@ -127,14 +129,17 @@ public void configure() throws Exception { assertIsInstanceOf(RouteStartedEvent.class, events.get(1)); assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(4)); - ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4)); + ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5)); assertEquals("should NOT be DLC", false, e.isDeadLetterChannel()); // onException will handle the exception - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(6)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6)); + // and the last event should be the direct:start assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); + ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7); + assertEquals("direct://start", sent.getEndpoint().getEndpointUri()); } } diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java index 912a137e3096..8a6117c841e9 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java @@ -100,9 +100,9 @@ public void configure() throws Exception { assertEquals(3, e.getAttempt()); e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4)); assertEquals(4, e.getAttempt()); - assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5)); - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(5)); + assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(6)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(7)); assertIsInstanceOf(ExchangeSentEvent.class, events.get(8)); } @@ -134,7 +134,6 @@ public void configure() throws Exception { assertEquals(3, e.getAttempt()); e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4)); assertEquals(4, e.getAttempt()); - assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5)); // since its async the ordering of the rest can be different depending per OS and timing } diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java new file mode 100644 index 000000000000..6447422290b0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import java.util.EventObject; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.management.EventNotifierSupport; +import org.apache.camel.management.event.ExchangeSentEvent; + +/** + * @version + */ +public class AsyncEndpointEventNotifierTest extends ContextTestSupport { + + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicLong time = new AtomicLong(); + + public void testAsyncEndpointEventNotifer() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel"); + + String reply = template.requestBody("direct:start", "Hello Camel", String.class); + assertEquals("Bye Camel", reply); + + assertMockEndpointsSatisfied(); + + assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS)); + + long delta = time.get(); + assertTrue("Should take about 1000 millis sec, was: " + delta, delta > 800); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(createRegistry()); + context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() { + public void notify(EventObject event) throws Exception { + try { + ExchangeSentEvent sent = (ExchangeSentEvent) event; + time.set(sent.getTimeTaken()); + } finally { + latch.countDown(); + } + } + + public boolean isEnabled(EventObject event) { + // we only want the async endpoint + if (event instanceof ExchangeSentEvent) { + ExchangeSentEvent sent = (ExchangeSentEvent) event; + return sent.getEndpoint().getEndpointUri().startsWith("async"); + } + return false; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + }); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("async", new MyAsyncComponent()); + + from("direct:start") + .to("mock:before") + .to("async:Bye Camel?delay=1000") + .to("mock:result"); + } + }; + } + +} \ No newline at end of file