Skip to content

Commit

Permalink
Merged revisions 1159682 via svnmerge from
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Sep 2, 2011
1 parent 19ff743 commit 53bd40e
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 37 deletions.
58 changes: 32 additions & 26 deletions camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Expand Up @@ -259,11 +259,12 @@ public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern
* @param producerCallback the producer template callback to be executed
* @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
*/
public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, AsyncCallback callback, AsyncProducerCallback producerCallback) {
public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
boolean sync = true;

// get the producer and we do not mind if its pooled as we can handle returning it back to the pool
Producer producer = doGetProducer(endpoint, true);
final Producer producer = doGetProducer(endpoint, true);

if (producer == null) {
if (isStopped()) {
Expand All @@ -274,39 +275,44 @@ public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, ExchangeP
}
}

StopWatch watch = null;
if (exchange != null) {
// record timing for sending the exchange using the producer
watch = new StopWatch();
}
// record timing for sending the exchange using the producer
final StopWatch watch = exchange != null ? new StopWatch() : null;

try {
// invoke the callback
AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(producer);
sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, callback);
sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
try {
if (watch != null) {
long timeTaken = watch.stop();
// emit event that the exchange was sent to the endpoint
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
}

if (producer instanceof ServicePoolAware) {
// release back to the pool
pool.release(endpoint, producer);
} else if (!producer.isSingleton()) {
// stop non singleton producers as we should not leak resources
try {
ServiceHelper.stopService(producer);
} catch (Exception e) {
// ignore and continue
LOG.warn("Error stopping producer: " + producer, e);
}
}
} finally {
callback.done(doneSync);
}
}
});
} catch (Throwable e) {
// ensure exceptions is caught and set on the exchange
if (exchange != null) {
exchange.setException(e);
}
} finally {
if (exchange != null && exchange.getException() == null) {
long timeTaken = watch.stop();
// emit event that the exchange was sent to the endpoint
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
}
if (producer instanceof ServicePoolAware) {
// release back to the pool
pool.release(endpoint, producer);
} else if (!producer.isSingleton()) {
// stop non singleton producers as we should not leak resources
try {
ServiceHelper.stopService(producer);
} catch (Exception e) {
// ignore and continue
LOG.warn("Error stopping producer: " + producer, e);
}
}
}

return sync;
Expand Down
Expand Up @@ -94,17 +94,19 @@ public void configure() throws Exception {
assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));

ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
assertEquals("should be DLC", true, e.isDeadLetterChannel());
SendProcessor send = assertIsInstanceOf(SendProcessor.class, e.getFailureHandler());
assertEquals("mock://dead", send.getDestination().getEndpointUri());

// dead letter channel will mark the exchange as completed
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
// and the sent will be logged after they are complete sending as it record the time taken as well
assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
// and the last event should be the direct:start
assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
}

public void testExchangeOnException() throws Exception {
Expand All @@ -127,14 +129,17 @@ public void configure() throws Exception {
assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));

ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
assertEquals("should NOT be DLC", false, e.isDeadLetterChannel());

// onException will handle the exception
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
// and the last event should be the direct:start
assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
}

}
Expand Up @@ -100,9 +100,9 @@ public void configure() throws Exception {
assertEquals(3, e.getAttempt());
e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
assertEquals(4, e.getAttempt());
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(5));
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(6));
assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(7));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
}

Expand Down Expand Up @@ -134,7 +134,6 @@ public void configure() throws Exception {
assertEquals(3, e.getAttempt());
e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
assertEquals(4, e.getAttempt());
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));

// since its async the ordering of the rest can be different depending per OS and timing
}
Expand Down
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.async;

import java.util.EventObject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.management.EventNotifierSupport;
import org.apache.camel.management.event.ExchangeSentEvent;

/**
* @version
*/
public class AsyncEndpointEventNotifierTest extends ContextTestSupport {

private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicLong time = new AtomicLong();

public void testAsyncEndpointEventNotifer() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");

String reply = template.requestBody("direct:start", "Hello Camel", String.class);
assertEquals("Bye Camel", reply);

assertMockEndpointsSatisfied();

assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS));

long delta = time.get();
assertTrue("Should take about 1000 millis sec, was: " + delta, delta > 800);
}

@Override
protected CamelContext createCamelContext() throws Exception {
DefaultCamelContext context = new DefaultCamelContext(createRegistry());
context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
public void notify(EventObject event) throws Exception {
try {
ExchangeSentEvent sent = (ExchangeSentEvent) event;
time.set(sent.getTimeTaken());
} finally {
latch.countDown();
}
}

public boolean isEnabled(EventObject event) {
// we only want the async endpoint
if (event instanceof ExchangeSentEvent) {
ExchangeSentEvent sent = (ExchangeSentEvent) event;
return sent.getEndpoint().getEndpointUri().startsWith("async");
}
return false;
}

@Override
protected void doStart() throws Exception {
}

@Override
protected void doStop() throws Exception {
}
});
return context;
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
context.addComponent("async", new MyAsyncComponent());

from("direct:start")
.to("mock:before")
.to("async:Bye Camel?delay=1000")
.to("mock:result");
}
};
}

}

0 comments on commit 53bd40e

Please sign in to comment.