Skip to content

Commit

Permalink
CAMEL-7050: DeadLetterChannel should make more clear that it handles …
Browse files Browse the repository at this point in the history
…any new exception also. Added option to configure this behavior so ppl can turn that off and let new exceptions be unhandled, so transactions can rollback.
  • Loading branch information
davsclaus committed Jan 18, 2015
1 parent 4136f92 commit 77851e1
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 73 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.util.EventObject; import java.util.EventObject;


import org.apache.camel.CamelContext; import org.apache.camel.CamelContext;
import org.apache.camel.DelegateProcessor;
import org.apache.camel.Endpoint; import org.apache.camel.Endpoint;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
Expand Down Expand Up @@ -93,7 +94,12 @@ public EventObject createExchangeFailedEvent(Exchange exchange) {
} }


public EventObject createExchangeFailureHandledEvent(Exchange exchange, Processor failureHandler, boolean deadLetterChannel) { public EventObject createExchangeFailureHandledEvent(Exchange exchange, Processor failureHandler, boolean deadLetterChannel) {
return new ExchangeFailureHandledEvent(exchange, failureHandler, deadLetterChannel); // unwrap delegate processor
Processor handler = failureHandler;
if (handler instanceof DelegateProcessor) {
handler = ((DelegateProcessor) handler).getProcessor();
}
return new ExchangeFailureHandledEvent(exchange, handler, deadLetterChannel);
} }


public EventObject createExchangeRedeliveryEvent(Exchange exchange, int attempt) { public EventObject createExchangeRedeliveryEvent(Exchange exchange, int attempt) {
Expand Down
Expand Up @@ -76,12 +76,6 @@ public String toString() {
return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]"; return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]";
} }


@Override
protected Predicate getDefaultHandledPredicate() {
// DeadLetterChannel handles errors before sending to DLQ
return ExpressionToPredicateAdapter.toPredicate(ExpressionBuilder.constantExpression(true));
}

@Override @Override
protected boolean isRunAllowedOnPreparingShutdown() { protected boolean isRunAllowedOnPreparingShutdown() {
// allow tu run as we want to move the message eto DLC, instead of rejecting the message // allow tu run as we want to move the message eto DLC, instead of rejecting the message
Expand Down
Expand Up @@ -32,15 +32,15 @@ public class FatalFallbackErrorHandler extends DelegateAsyncProcessor implements


private static final Logger LOG = LoggerFactory.getLogger(FatalFallbackErrorHandler.class); private static final Logger LOG = LoggerFactory.getLogger(FatalFallbackErrorHandler.class);


private boolean logWarn; private boolean deadLetterChannel;


public FatalFallbackErrorHandler(Processor processor) { public FatalFallbackErrorHandler(Processor processor) {
this(processor, false); this(processor, false);
} }


public FatalFallbackErrorHandler(Processor processor, boolean logWarn) { public FatalFallbackErrorHandler(Processor processor, boolean isDeadLetterChannel) {
super(processor); super(processor);
this.logWarn = logWarn; this.deadLetterChannel = isDeadLetterChannel;
} }


@Override @Override
Expand Down Expand Up @@ -70,10 +70,16 @@ public void done(boolean doneSync) {
// to be visible in the error handler // to be visible in the error handler
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException()); exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException());


// mark this exchange as already been error handler handled (just by having this property) if (deadLetterChannel) {
// the false value mean the caught exception will be kept on the exchange, causing the // special for dead letter channel as we want to let it determine what to do, depending how
// exception to be propagated back to the caller, and to break out routing // it has been configured
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, false); exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED);
} else {
// mark this exchange as already been error handler handled (just by having this property)
// the false value mean the caught exception will be kept on the exchange, causing the
// exception to be propagated back to the caller, and to break out routing
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, false);
}
} }
callback.done(doneSync); callback.done(doneSync);
} }
Expand All @@ -87,7 +93,8 @@ private void log(String message) {
} }


private void log(String message, Throwable t) { private void log(String message, Throwable t) {
if (logWarn) { // when using dead letter channel we only want to log at WARN level
if (deadLetterChannel) {
if (t != null) { if (t != null) {
LOG.warn(message, t); LOG.warn(message, t);
} else { } else {
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.camel.util.MessageHelper; import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.URISupport;


/** /**
* Base redeliverable error handler that also supports a final dead letter queue in case * Base redeliverable error handler that also supports a final dead letter queue in case
Expand Down Expand Up @@ -382,7 +383,7 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) {
} }
// we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
// bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor);
boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
// we are breaking out // we are breaking out
return sync; return sync;
Expand Down Expand Up @@ -833,13 +834,15 @@ protected boolean deliverToFailureProcessor(final Processor processor, final boo
// clear exception as we let the failure processor handle it // clear exception as we let the failure processor handle it
exchange.setException(null); exchange.setException(null);


// always handle if dead letter channel final Boolean shouldHandle = shouldHandle(exchange, data);
final boolean shouldHandle = isDeadLetterChannel || shouldHandled(exchange, data); final Boolean shouldContinue = shouldContinue(exchange, data);
final boolean shouldContinue = shouldContinue(exchange, data);
// regard both handled or continued as being handled // regard both handled or continued as being handled
boolean handled = false; boolean handled = false;


if (shouldHandle || shouldContinue) { // always handle if dead letter channel
boolean handleOrContinue = isDeadLetterChannel || (shouldHandle != null && shouldHandle) || (shouldContinue != null && shouldContinue);
if (handleOrContinue) {
// its handled then remove traces of redelivery attempted // its handled then remove traces of redelivery attempted
exchange.getIn().removeHeader(Exchange.REDELIVERED); exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
Expand Down Expand Up @@ -890,7 +893,7 @@ protected boolean deliverToFailureProcessor(final Processor processor, final boo
public void done(boolean sync) { public void done(boolean sync) {
log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
try { try {
prepareExchangeAfterFailure(processor, exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
// fire event as we had a failure processor to handle it, which there is a event for // fire event as we had a failure processor to handle it, which there is a event for
boolean deadLetterChannel = processor == data.deadLetterProcessor; boolean deadLetterChannel = processor == data.deadLetterProcessor;
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel); EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
Expand All @@ -904,7 +907,7 @@ public void done(boolean sync) {
} else { } else {
try { try {
// no processor but we need to prepare after failure as well // no processor but we need to prepare after failure as well
prepareExchangeAfterFailure(processor, exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
} finally { } finally {
// callback we are done // callback we are done
callback.done(data.sync); callback.done(data.sync);
Expand All @@ -924,39 +927,14 @@ public void done(boolean sync) {
return sync; return sync;
} }


protected void prepareExchangeAfterFailure(final Processor failureProcessor, final Exchange exchange, final RedeliveryData data, protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
final boolean isDeadLetterChannel, final boolean shouldHandle, final boolean shouldContinue) { final Boolean shouldHandle, final Boolean shouldContinue) {


Exception newException = exchange.getException(); Exception newException = exchange.getException();
if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
boolean handled = data.handleNewException;
String msg = "New exception occurred during processing by the failure processor " + failureProcessor + " due " + newException.getMessage();
if (handled) {
msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
} else {
msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
}
logFailedDelivery(false, true, handled, false, exchange, msg, data, newException);
}


// we could not process the exchange so we let the failure processor handled it // we could not process the exchange so we let the failure processor handled it
ExchangeHelper.setFailureHandled(exchange); ExchangeHelper.setFailureHandled(exchange);


// special for dead letter channel where it by default handle new exceptions, but its possible to turn that off
if (isDeadLetterChannel && shouldHandle) {
boolean handled = data.handleNewException;
if (handled) {
// if there is a new exception then log a warning about that
log.trace("This exchange is handled so its marked as not failed: {}", exchange);
} else {
// exception not handled, put exception back in the exchange
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
// and put failure endpoint back as well
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
}
return;
}

// honor if already set a handling // honor if already set a handling
boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
if (alreadySet) { if (alreadySet) {
Expand All @@ -971,25 +949,59 @@ protected void prepareExchangeAfterFailure(final Processor failureProcessor, fin
return; return;
} }


if (shouldHandle) { // dead letter channel is special
log.trace("This exchange is handled so its marked as not failed: {}", exchange); if (shouldContinue != null && shouldContinue) {
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
} else if (shouldContinue) {
log.trace("This exchange is continued: {}", exchange); log.trace("This exchange is continued: {}", exchange);
// okay we want to continue then prepare the exchange for that as well // okay we want to continue then prepare the exchange for that as well
prepareExchangeForContinue(exchange, data); prepareExchangeForContinue(exchange, data);
} else if (shouldHandle != null && shouldHandle) {
log.trace("This exchange is handled so its marked as not failed: {}", exchange);
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
} else { } else {
log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); // okay the redelivery policy are not explicit set to true, so we should allow to check for some
// exception not handled, put exception back in the exchange // special situations when using dead letter channel
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); if (isDeadLetterChannel) {
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
// and put failure endpoint back as well // use the handled option from the DLC
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); boolean handled = data.handleNewException;
// and store the route id so we know in which route we failed
UnitOfWork uow = exchange.getUnitOfWork(); // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
if (uow != null && uow.getRouteContext() != null) { // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); // to avoid causing endless poison messages that fails forever)
if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
String uri = URISupport.sanitizeUri(deadLetterUri);
String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
if (handled) {
msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
} else {
msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
}
logFailedDelivery(false, true, handled, false, exchange, msg, data, newException);
}

if (handled) {
log.trace("This exchange is handled so its marked as not failed: {}", exchange);
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
return;
}
} }

// not handled by default
prepareExchangeAfterFailureNotHandled(exchange);
}
}

private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
// exception not handled, put exception back in the exchange
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
// and put failure endpoint back as well
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
// and store the route id so we know in which route we failed
UnitOfWork uow = exchange.getUnitOfWork();
if (uow != null && uow.getRouteContext() != null) {
exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
} }
} }


Expand Down Expand Up @@ -1137,33 +1149,33 @@ private boolean isExhausted(Exchange exchange, RedeliveryData data) {
} }


/** /**
* Determines whether or not to continue if we are exhausted. * Determines whether the redelivery configuration has a continued predicate
* *
* @param exchange the current exchange * @param exchange the current exchange
* @param data the redelivery data * @param data the redelivery data
* @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. * @return <tt>true</tt> to continue, or <tt>false</tt> to attempt to handle, or <tt>null</tt> if continued predicate has not been configured.
*/ */
private boolean shouldContinue(Exchange exchange, RedeliveryData data) { private Boolean shouldContinue(Exchange exchange, RedeliveryData data) {
if (data.continuedPredicate != null) { if (data.continuedPredicate != null) {
return data.continuedPredicate.matches(exchange); return data.continuedPredicate.matches(exchange);
} }
// do not continue by default // no predicate
return false; return null;
} }


/** /**
* Determines whether or not to handle if we are exhausted. * Determines whether the redelivery configuration has a handled predicate
* *
* @param exchange the current exchange * @param exchange the current exchange
* @param data the redelivery data * @param data the redelivery data
* @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust, or <tt>null</tt> if handled predicate has not been configured.
*/ */
private boolean shouldHandled(Exchange exchange, RedeliveryData data) { private Boolean shouldHandle(Exchange exchange, RedeliveryData data) {
if (data.handledPredicate != null) { if (data.handledPredicate != null) {
return data.handledPredicate.matches(exchange); return data.handledPredicate.matches(exchange);
} }
// do not handle by default // no predicate
return false; return null;
} }


/** /**
Expand Down
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.issues;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

/**
*
*/
public class OnExceptionContinuedNoFailureProcessorTest extends ContextTestSupport {

public void testOnException() throws Exception {
getMockEndpoint("mock:end").expectedMessageCount(1);
getMockEndpoint("mock:error").expectedMessageCount(1);

template.sendBody("direct:start", "Hello World");

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("mock:error"));

// continue runtime exception
onException(RuntimeException.class)
.continued(true);

from("direct:start").
process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
throw new RuntimeException("FAIL!");
}
}).
to("mock:end");
}
};
}
}

0 comments on commit 77851e1

Please sign in to comment.