Skip to content

Commit

Permalink
Feature/handle amqp fatal errors (#1111)
Browse files Browse the repository at this point in the history
* Adding support to handle lengthy error msgs more precisely

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Added check at conditionalHandler level and changes assertions in test class

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Fixed sonar lint issues

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Reverted the change on making class final

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* To trigger the circle-ci build and check

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Addressed last set of PR comments

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Fixe sonar issue for nullpointer dereference

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>

* Handling null case explicitly

Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>
  • Loading branch information
ramannas committed Jul 20, 2021
1 parent a8f7f50 commit c37c615
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 12 deletions.
Expand Up @@ -27,13 +27,12 @@ public class DmfApiAutoConfiguration {

/**
* Create default error handler bean.
*
*
* @return the default error handler bean
*/
@Bean
@ConditionalOnMissingBean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler();
}

}
@@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;

/**
* An abstract error handler for errors resulting from AMQP.
*/
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler{

@Override
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {
// retrieving the cause of throwable as it contains the actual class of
// exception
final Throwable cause = throwable.getCause();
if (getExceptionClass().isAssignableFrom(cause.getClass())) {
throw new AmqpRejectAndDontRequeueException(getErrorMessage(throwable));
} else {
chain.handle(throwable);
}
}

/**
* Returns the class of the exception.
*
* @return
* the exception class
*/
public abstract Class<T> getExceptionClass();

/**
* Returns the customized error message.
*
* @return
* the customized error message
*/
public String getErrorMessage(Throwable throwable){
return AmqpErrorMessageComposer.constructErrorMessage(throwable);
}

}
Expand Up @@ -8,9 +8,7 @@
*/
package org.eclipse.hawkbit.amqp;

import java.time.Duration;
import java.util.Map;

import com.google.common.collect.Maps;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadIdCache;
Expand Down Expand Up @@ -53,7 +51,9 @@
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.ErrorHandler;

import com.google.common.collect.Maps;
import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
* Spring configuration for AMQP based DMF communication for indirect device
Expand All @@ -80,15 +80,48 @@ public class AmqpConfiguration {
private ServiceMatcher serviceMatcher;

/**
* Register the bean for the custom error handler.
* Creates a custom error handler bean.
*
* @param handlers
* list of {@link AmqpErrorHandler} handlers
* @return the delegating error handler bean
*/
@Bean
@ConditionalOnMissingBean
public ErrorHandler errorHandler(final List<AmqpErrorHandler> handlers) {
return new DelegatingConditionalErrorHandler(handlers, new ConditionalRejectingErrorHandler(
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay())));
}

/**
* Error handler bean for all target attributes related fatal errors
*
* @return the invalid target attribute exception handler bean
*/
@Bean
public AmqpErrorHandler invalidTargetAttributeConditionalExceptionHandler() {
return new InvalidTargetAttributeExceptionHandler();
}

/**
* Error handler bean for entity not found errors
*
* @return the entity not found exception handler bean
*/
@Bean
public AmqpErrorHandler entityNotFoundExceptionHandler() {
return new EntityNotFoundExceptionHandler();
}

/**
* Error handler bean for amqp message conversion errors
*
* @return custom error handler
* @return the amqp message conversion exception handler bean
*/
@Bean
@ConditionalOnMissingBean(ErrorHandler.class)
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay()));
public AmqpErrorHandler messageConversionExceptionHandler() {
return new MessageConversionExceptionHandler();
}

/**
Expand Down
@@ -0,0 +1,28 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

/**
* Interface declaration of {@link AmqpErrorHandler} that handles errors based on the
* types of exception.
*/
@FunctionalInterface
public interface AmqpErrorHandler {

/**
* Handles the error based on the type of exception
*
* @param throwable
* the throwable
* @param chain
* an {@link AmqpErrorHandlerChain}
*/
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);

}
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.springframework.util.ErrorHandler;

import java.util.Iterator;
import java.util.List;

/**
* An error handler chain that delegates the error to the matching error handler based on the type of exception
*/
public final class AmqpErrorHandlerChain {
private final Iterator<AmqpErrorHandler> iterator;
private final ErrorHandler defaultHandler;

/**
* Constructor.
*
* @param iterator
* the {@link AmqpErrorHandler} iterator
* @param defaultHandler
* the default handler
*/
private AmqpErrorHandlerChain(Iterator<AmqpErrorHandler> iterator, ErrorHandler defaultHandler) {
this.iterator = iterator;
this.defaultHandler = defaultHandler;
}

/**
* Returns an {@link AmqpErrorHandlerChain}
*
* @param errorHandlers
* {@link List} of error handlers
* @param defaultHandler
* the default error handler
* @return an {@link AmqpErrorHandlerChain}
*/
public static AmqpErrorHandlerChain getHandlerChain(final List<AmqpErrorHandler> errorHandlers, final ErrorHandler defaultHandler) {
return new AmqpErrorHandlerChain(errorHandlers.iterator(), defaultHandler);
}

/**
* Handles the error based on the type of exception
*
* @param error
* the throwable containing the cause of exception
*/
public void handle(final Throwable error) {
if (iterator.hasNext()) {
final AmqpErrorHandler handler = iterator.next();
handler.doHandle(error, this);
} else {
defaultHandler.handleError(error);
}
}
}

@@ -0,0 +1,53 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;

/**
* Class that composes a meaningful error message and enhances it with properties from failed message
*/
public final class AmqpErrorMessageComposer {

private AmqpErrorMessageComposer() {
}

/**
* Constructs an error message based on failed message content
*
* @param throwable
* the throwable containing failed message content
* @return
* meaningful error message
*/
public static String constructErrorMessage(final Throwable throwable) {
StringBuilder completeErrorMessage = new StringBuilder();
final String mainErrorMsg = throwable.getCause().getMessage();

if (throwable instanceof ListenerExecutionFailedException) {
Collection<Message> failedMessages = ((ListenerExecutionFailedException) throwable).getFailedMessages();
// since the intended message content is always on top of the collection, we only extract the first one
final Message failedMessage = failedMessages.iterator().next();
final byte[] amqpFailedMsgBody = failedMessage.getBody();
final Map<String, Object> amqpFailedMsgHeaders = failedMessage.getMessageProperties().getHeaders();

String amqpFailedMsgConcatenatedHeaders = amqpFailedMsgHeaders.keySet().stream()
.map(key -> key + "=" + amqpFailedMsgHeaders.get(key)).collect(Collectors.joining(", ", "{", "}"));
completeErrorMessage.append(mainErrorMsg).append(new String(amqpFailedMsgBody))
.append(amqpFailedMsgConcatenatedHeaders);
return completeErrorMessage.toString();
}
return mainErrorMsg;
}
}
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import java.util.List;
import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.util.ErrorHandler;

/**
* An error handler delegates error handling to the matching {@link AmqpErrorHandler} based on the type of exception
*/
public class DelegatingConditionalErrorHandler implements ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(DelegatingConditionalErrorHandler.class);
private final List<AmqpErrorHandler> handlers;
private final ErrorHandler defaultHandler;

/**
* Constructor
*
* @param handlers
* {@link List} of error handlers
* @param defaultHandler
* the default error handler
*/
public DelegatingConditionalErrorHandler(final List<AmqpErrorHandler> handlers, @NotNull final ErrorHandler defaultHandler) {
this.handlers = handlers;
this.defaultHandler = defaultHandler;
}

@Override
public void handleError(final Throwable t) {
if (t.getCause() == null) {
LOG.error("Cannot handle the error as the cause of the error is null!");
return;
}

if (includesAmqpRejectException(t.getCause())) {
LOG.error("Received an AmqpRejectAndDontRequeueException due to {}", t.getCause().getMessage());
return;
}

AmqpErrorHandlerChain.getHandlerChain(handlers, defaultHandler).handle(t);
}

private boolean includesAmqpRejectException(final Throwable t) {
if (t instanceof AmqpRejectAndDontRequeueException){
return true;
}
if (t.getCause() != null) {
return includesAmqpRejectException(t.getCause());
}
return false;
}
}
@@ -0,0 +1,22 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;

/**
* An error handler for entity not found exception resulting from AMQP.
*/
public class EntityNotFoundExceptionHandler extends AbstractAmqpErrorHandler<EntityNotFoundException> {

@Override
public Class<EntityNotFoundException> getExceptionClass() {
return EntityNotFoundException.class;
}
}
@@ -0,0 +1,22 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;

/**
* An error handler for all invalid target attributes resulting from AMQP.
*/
public class InvalidTargetAttributeExceptionHandler extends AbstractAmqpErrorHandler<InvalidTargetAttributeException> {

@Override
public Class<InvalidTargetAttributeException> getExceptionClass() {
return InvalidTargetAttributeException.class;
}
}

0 comments on commit c37c615

Please sign in to comment.