Skip to content
This repository has been archived by the owner on Sep 16, 2021. It is now read-only.

Add route for retrying failed messages before sending to DLQ #32

Merged
merged 6 commits into from
Apr 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncMessageProcessorRoute extends RouteBuilder {
public class AsyncBatchMessageProcessorRoute extends RouteBuilder {
private final String fromUri;
private final String failureUri;
private final Duration processTimeout;
Expand All @@ -48,22 +48,18 @@ public class AsyncMessageProcessorRoute extends RouteBuilder {
* {@code messageFactory} to parse them into {@link Message}s.
* @param failureUri Endpoint where failures will be sent to as a {@code Collection} of
* {@link FailedMessage}s.
* @param processTimeout How to long to wait for a message process before timing out? Time outs
* are considered {@link FailedMessage#isRecoverable() recoverable
* failures}.
* @param processTimeout How to long to wait for a message process before timing out?
* @param messageFactory Accepts each element in the exchange body {@code Collection} and
* parses them to create message implementations which will be processed.
*/
public AsyncMessageProcessorRoute(String fromUri, String failureUri, Duration processTimeout,
MessageFactory messageFactory) {
public AsyncBatchMessageProcessorRoute(String fromUri, String failureUri,
Duration processTimeout, MessageFactory messageFactory) {
this.fromUri = Objects.requireNonNull(fromUri, "fromUri");
this.failureUri = Objects.requireNonNull(failureUri, "failureUri");
this.processTimeout = Objects.requireNonNull(processTimeout, "processTimeout");
this.messageFactory = Objects.requireNonNull(messageFactory, "messageFactory");
}

// TODO(ahenning): Consider parameterizing multiple from URIs instead of one

@Override
public void configure() throws Exception {
from(fromUri)
Expand All @@ -78,39 +74,39 @@ public void configure() throws Exception {
exchangeBody.getClass().getName() + ": " + exchangeBody);
}

Collection messages = (Collection) exchangeBody;
Collection originalMessages = (Collection) exchangeBody;

List<ProcessingMessage> processingMessages = new ArrayList<>(messages.size());
List<ProcessingMessage> processingMessages = new ArrayList<>(originalMessages.size());
List<FailedMessage> failures = new ArrayList<>();

// Start processing all of the messages in the batch in parallel.
for (Object message : messages) {
final Message parsedMessage;
for (Object originalMessage : originalMessages) {
final Message message;

try {
parsedMessage = messageFactory.getMessageForBody(message);
message = messageFactory.getMessageForBody(originalMessage);
} catch (Exception e) {
log.error("Failure parsing message. Body was: " + message, e);
failures.add(new FailedMessage(message, e));
log.error("Failure parsing message. Body was: " + originalMessage, e);
failures.add(new FailedMessage(originalMessage, e));
continue;
}

final Future<Void> processingFuture;

try {
processingFuture = parsedMessage.process();
processingFuture = message.process();
} catch (Exception e) {
log.error("Failed to processing message: " + parsedMessage, e);
FailedMessage failure = new FailedMessage(message, parsedMessage, e);
log.error("Failed to process message: " + message, e);
FailedMessage failure = new FailedMessage(originalMessage, message, e);
failures.add(failure);
continue;
}

ProcessingMessage processing = new ProcessingMessage(
message, parsedMessage, processingFuture);
originalMessage, message, processingFuture);
processingMessages.add(processing);

log.debug("Processing on route {}: {}", routeId, parsedMessage);
log.debug("Processing on route {}: {}", routeId, message);
}

// Wait for processing to complete.
Expand All @@ -124,9 +120,8 @@ public void configure() throws Exception {
failures.add(failure);
} catch (InterruptedException | TimeoutException e) {
log.warn("Timed out processing message: " + processingMsg.parsedMessage, e);
RecoverableException recoverableException = new RecoverableException(e);
FailedMessage failure = new FailedMessage(processingMsg.originalMessage,
processingMsg.parsedMessage, recoverableException);
FailedMessage failure = new FailedMessage(
processingMsg.originalMessage, processingMsg.parsedMessage, e);
failures.add(failure);
}
}
Expand All @@ -137,7 +132,9 @@ public void configure() throws Exception {
.to(failureUri);
}

/** Simple struct for storing a message and its future processing result. */
/**
* Simple struct for storing a message and its future processing result.
*/
private static class ProcessingMessage {
final Object originalMessage;
final Message parsedMessage;
Expand Down
10 changes: 0 additions & 10 deletions lib/src/main/java/org/esbtools/eventhandler/FailedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ public Throwable exception() {
return exception;
}

/**
* A recoverable failure is one that is entirely transient: the message should be retried as it
* will likely work.
*
* <p>Examples would be failures due to transient networking problems or unavailable locks.
*/
public boolean isRecoverable() {
return exception instanceof RecoverableException;
}

@Override
public String toString() {
return "FailedMessage{" +
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright 2016 esbtools Contributors and/or its affiliates.
*
* This file is part of esbtools.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.esbtools.eventhandler;

import static org.apache.camel.builder.PredicateBuilder.and;
import static org.apache.camel.builder.PredicateBuilder.not;

import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RetryingBatchFailedMessageRoute extends RouteBuilder {
private final String fromUri;
private final Expression retryDelayMillis;
private final int maxRetryCount;
private final Duration processTimeout;
private final String deadLetterUri;

private static final String NEXT_ATTEMPT_NUMBER_PROPERTY = "nextAttemptNumber";
private static final Integer FIRST_ATTEMPT_NUMBER = 1;

public RetryingBatchFailedMessageRoute(String fromUri, Expression retryDelayMillis,
int maxRetryCount, Duration processTimeout, String deadLetterUri) {
this.fromUri = fromUri;
this.retryDelayMillis = retryDelayMillis;
this.maxRetryCount = maxRetryCount;
this.processTimeout = processTimeout;
this.deadLetterUri = deadLetterUri;
}

@Override
public void configure() throws Exception {
from(fromUri)
// We use loop instead of error handler because error handlers start with original message
// sent to point of failure; we need the message to stay intact to prevent reprocessing
// already succeeded messages and to keep context of previous tries' failures.
.loopDoWhile(and(
exchangeHasFailures(),
not(maxRetryCountMet())))
.delay(retryDelayMillis)
// Indenting because delay actually starts a child processor, and needs its own end()
// See: https://issues.apache.org/jira/browse/CAMEL-2654
.process(exchange -> {
Integer retryAttempt = Optional.ofNullable(
exchange.getProperty(NEXT_ATTEMPT_NUMBER_PROPERTY, Integer.class))
.orElse(FIRST_ATTEMPT_NUMBER);
// Preemptively increment retry attempt for next loop.
exchange.setProperty(NEXT_ATTEMPT_NUMBER_PROPERTY, retryAttempt + 1);

Collection oldFailures = exchange.getIn().getMandatoryBody(Collection.class);

List<FailedMessage> newFailures = new ArrayList<>();
List<ReprocessingFailure> reprocessingFailures =
new ArrayList<>(oldFailures.size());

// Begin processing all failed messages again in parallel.
for (Object failureAsObject : oldFailures) {
if (!(failureAsObject instanceof FailedMessage)) {
throw new IllegalArgumentException("Messages sent to " +
RetryingBatchFailedMessageRoute.class + " route should be " +
"collections of FailedMessage elements, but got collection " +
"of " + failureAsObject.getClass());
}

FailedMessage failure = (FailedMessage) failureAsObject;
Optional<Message> maybeMessage = failure.parsedMessage();

if (!maybeMessage.isPresent()) {
// Nothing to retry; dead letter it
// This happens when message factory failed to get message from original
// body. We won't bother trying get the message from the message factory
// again; if that fails it is usually a bug that retrying won't circumvent.
continue;
}

Message message = maybeMessage.get();
final Future<Void> reprocessingFuture;

try {
reprocessingFuture = message.process();
} catch (Exception e) {
log.error("Failed to reprocess message (retry attempt " +
retryAttempt + "): " + message, e);
suppressPreviousFailureInNewException(failure, e);
newFailures.add(new FailedMessage(failure.originalMessage(), message, e));
continue;
}

reprocessingFailures.add(new ReprocessingFailure(failure, reprocessingFuture));
}

for (ReprocessingFailure reprocessingFailure : reprocessingFailures) {
FailedMessage originalFailure = reprocessingFailure.originalFailure;

try {
reprocessingFailure.reprocessingFuture
.get(processTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Message parsedMessage = originalFailure.parsedMessage().get();

log.error("Failed to reprocess message (retry attempt " + retryAttempt +
"): " + parsedMessage, e);

Throwable realException = e.getCause();
suppressPreviousFailureInNewException(originalFailure, realException);

FailedMessage failure = new FailedMessage(
originalFailure.originalMessage(), parsedMessage, realException);
newFailures.add(failure);
} catch (InterruptedException | TimeoutException e) {
Message parsedMessage = originalFailure.parsedMessage().get();

log.warn("Timed out reprocessing message (retry attempt " + retryAttempt +
"): " + parsedMessage, e);

suppressPreviousFailureInNewException(originalFailure, e);
FailedMessage failure = new FailedMessage(
originalFailure.originalMessage(), parsedMessage, e);
newFailures.add(failure);
}
}

// Give new failures another shot or dead letter them.
exchange.getIn().setBody(newFailures);
})
.end() // end delay -- see comment below delay(...).
.end() // end loop
// If we still have failures, dead letter them.
.filter(exchangeHasFailures())
.to(deadLetterUri);
}

/**
* In the event a messages fails on subsequent retries, this tracks that previous failure as a
* suppressed exception in the latest failure, keeping the history of failures for debugging.
*
* <p>Makes sure the exceptions are not referring to the same object to avoid a infinite
* recursion.
*/
private void suppressPreviousFailureInNewException(FailedMessage failure, Throwable e) {
Throwable previousException = failure.exception();

if (e != previousException) {
e.addSuppressed(previousException);

for (Throwable previousSuppressed : previousException.getSuppressed()) {
e.addSuppressed(previousSuppressed);
}
}
}

private Predicate maxRetryCountMet() {
return new Predicate() {
@Override
public boolean matches(Exchange exchange) {
Integer nextAttemptNumber = Optional.ofNullable(
exchange.getProperty(NEXT_ATTEMPT_NUMBER_PROPERTY, Integer.class))
.orElse(FIRST_ATTEMPT_NUMBER);

return nextAttemptNumber - FIRST_ATTEMPT_NUMBER >= maxRetryCount;
}
};
}

private Predicate exchangeHasFailures() {
return new Predicate() {
@Override
public boolean matches(Exchange exchange) {
Collection failures = exchange.getIn().getBody(Collection.class);

if (failures == null || failures.isEmpty()) {
return false;
}

return true;
}
};
}

private static final class ReprocessingFailure {
private final FailedMessage originalFailure;
private final Future<Void> reprocessingFuture;

private ReprocessingFailure(FailedMessage originalFailure, Future<Void> reprocessingFuture) {
this.originalFailure = originalFailure;
this.reprocessingFuture = reprocessingFuture;
}
}
}
Loading