Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/handle amqp fatal errors #1111

Merged
Expand Up @@ -27,13 +27,12 @@ public class DmfApiAutoConfiguration {

/**
* Create default error handler bean.
*
*
* @return the default error handler bean
*/
@Bean
@ConditionalOnMissingBean
public ErrorHandler errorHandler() {
ramannas marked this conversation as resolved.
Show resolved Hide resolved
return new ConditionalRejectingErrorHandler();
}

}
@@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;

/**
* An abstract error handler for errors resulting from AMQP.
*/
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler{

@Override
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {
// retrieving the cause of throwable as it contains the actual class of
// exception
final Throwable cause = throwable.getCause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment why you check the cause here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (getExceptionClass().isAssignableFrom(cause.getClass())) {
throw new AmqpRejectAndDontRequeueException(getErrorMessage(throwable));
} else {
chain.handle(throwable);
}
}

/**
* Returns the class of the exception.
*
* @return
* the exception class
*/
public abstract Class<T> getExceptionClass();

/**
* Returns the customized error message.
*
* @return
* the customized error message
*/
public String getErrorMessage(Throwable throwable){
return AmqpErrorMessageComposer.constructErrorMessage(throwable);
}

}
Expand Up @@ -8,9 +8,7 @@
*/
package org.eclipse.hawkbit.amqp;

import java.time.Duration;
import java.util.Map;

import com.google.common.collect.Maps;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadIdCache;
Expand Down Expand Up @@ -53,7 +51,9 @@
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.ErrorHandler;

import com.google.common.collect.Maps;
import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
* Spring configuration for AMQP based DMF communication for indirect device
Expand All @@ -80,15 +80,48 @@ public class AmqpConfiguration {
private ServiceMatcher serviceMatcher;

/**
* Register the bean for the custom error handler.
* Creates a custom error handler bean.
*
* @param handlers
* list of {@link AmqpErrorHandler} handlers

* @return the delegating error handler bean
*/
@Bean
@ConditionalOnMissingBean
public ErrorHandler errorHandler(final List<AmqpErrorHandler> handlers) {
return new DelegatingConditionalErrorHandler(handlers, new ConditionalRejectingErrorHandler(
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay())));
}

/**
* Error handler bean for all target attributes related fatal errors
*
* @return the invalid target attribute exception handler bean
*/
@Bean
public AmqpErrorHandler invalidTargetAttributeConditionalExceptionHandler() {
return new InvalidTargetAttributeExceptionHandler();
}

/**
* Error handler bean for entity not found errors
*
* @return the entity not found exception handler bean
*/
@Bean
public AmqpErrorHandler entityNotFoundExceptionHandler() {
return new EntityNotFoundExceptionHandler();
}

/**
* Error handler bean for amqp message conversion errors
*
* @return custom error handler
* @return the amqp message conversion exception handler bean
*/
@Bean
@ConditionalOnMissingBean(ErrorHandler.class)
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay()));
public AmqpErrorHandler messageConversionExceptionHandler() {
return new MessageConversionExceptionHandler();
}

/**
Expand Down
@@ -0,0 +1,28 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

/**
* Interface declaration of {@link AmqpErrorHandler} that handles errors based on the
ramannas marked this conversation as resolved.
Show resolved Hide resolved
* types of exception.
*/
@FunctionalInterface
public interface AmqpErrorHandler {

/**
* Handles the error based on the type of exception
*
* @param throwable
* the throwable
* @param chain
* an {@link AmqpErrorHandlerChain}
*/
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);

}
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.springframework.util.ErrorHandler;

import java.util.Iterator;
import java.util.List;

/**
* An error handler chain that delegates the error to the matching error handler based on the type of exception
*/
public final class AmqpErrorHandlerChain {
private final Iterator<AmqpErrorHandler> iterator;
private final ErrorHandler defaultHandler;

/**
* Constructor.
*
* @param iterator
* the {@link AmqpErrorHandler} iterator
* @param defaultHandler
* the default handler
*/
private AmqpErrorHandlerChain(Iterator<AmqpErrorHandler> iterator, ErrorHandler defaultHandler) {
this.iterator = iterator;
this.defaultHandler = defaultHandler;
}

/**
* Returns an {@link AmqpErrorHandlerChain}
*
* @param errorHandlers
* {@link List} of error handlers
* @param defaultHandler
* the default error handler
* @return an {@link AmqpErrorHandlerChain}
*/
public static AmqpErrorHandlerChain getHandlerChain(final List<AmqpErrorHandler> errorHandlers, final ErrorHandler defaultHandler) {
return new AmqpErrorHandlerChain(errorHandlers.iterator(), defaultHandler);
}

/**
* Handles the error based on the type of exception
*
* @param error
* the throwable containing the cause of exception
*/
public void handle(final Throwable error) {
ramannas marked this conversation as resolved.
Show resolved Hide resolved
if (iterator.hasNext()) {
final AmqpErrorHandler handler = iterator.next();
handler.doHandle(error, this);
} else {
defaultHandler.handleError(error);
}
}
}

@@ -0,0 +1,53 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;

/**
* Class that composes a meaningful error message and enhances it with properties from failed message
*/
public final class AmqpErrorMessageComposer {

private AmqpErrorMessageComposer() {
}

/**
* Constructs an error message based on failed message content
*
* @param throwable
* the throwable containing failed message content
* @return
* meaningful error message
*/
public static String constructErrorMessage(final Throwable throwable) {
StringBuilder completeErrorMessage = new StringBuilder();
final String mainErrorMsg = throwable.getCause().getMessage();

if (throwable instanceof ListenerExecutionFailedException) {
Collection<Message> failedMessages = ((ListenerExecutionFailedException) throwable).getFailedMessages();
// since the intended message content is always on top of the collection, we only extract the first one
final Message failedMessage = failedMessages.iterator().next();
final byte[] amqpFailedMsgBody = failedMessage.getBody();
final Map<String, Object> amqpFailedMsgHeaders = failedMessage.getMessageProperties().getHeaders();

String amqpFailedMsgConcatenatedHeaders = amqpFailedMsgHeaders.keySet().stream()
.map(key -> key + "=" + amqpFailedMsgHeaders.get(key)).collect(Collectors.joining(", ", "{", "}"));
completeErrorMessage.append(mainErrorMsg).append(new String(amqpFailedMsgBody))
.append(amqpFailedMsgConcatenatedHeaders);
return completeErrorMessage.toString();
}
return mainErrorMsg;
}
}
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import java.util.List;
import javax.validation.constraints.NotNull;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.util.ErrorHandler;

/**
* An error handler delegates error handling to the matching {@link AmqpErrorHandler} based on the type of exception
*/
public class DelegatingConditionalErrorHandler implements ErrorHandler {
private final List<AmqpErrorHandler> handlers;
private final ErrorHandler defaultHandler;

/**
* Constructor
*
* @param handlers
* {@link List} of error handlers
* @param defaultHandler
* the default error handler
*/
public DelegatingConditionalErrorHandler(final List<AmqpErrorHandler> handlers, @NotNull final ErrorHandler defaultHandler) {
this.handlers = handlers;
this.defaultHandler = defaultHandler;
}

@Override
public void handleError(final Throwable t) {
if (t.getCause() == null || includesAmqpRejectException(t)){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please pass the t.getCause() in includesAmqpRejectException directly.

return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please log the exception, something like: "Received ... caused by {}", t.getCause.getMessage())

}
AmqpErrorHandlerChain.getHandlerChain(handlers, defaultHandler).handle(t);
bogdan-bondar marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean includesAmqpRejectException(final Throwable t) {
if (t instanceof AmqpRejectAndDontRequeueException){
return true;
}
if (t.getCause() != null) {
return includesAmqpRejectException(t.getCause());
}
return false;
}
}
@@ -0,0 +1,22 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;

/**
* An error handler for entity not found exception resulting from AMQP.
*/
public class EntityNotFoundExceptionHandler extends AbstractAmqpErrorHandler<EntityNotFoundException> {

@Override
public Class<EntityNotFoundException> getExceptionClass() {
return EntityNotFoundException.class;
}
}
@@ -0,0 +1,22 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;

import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;

/**
* An error handler for all invalid target attributes resulting from AMQP.
*/
public class InvalidTargetAttributeExceptionHandler extends AbstractAmqpErrorHandler<InvalidTargetAttributeException> {

@Override
public Class<InvalidTargetAttributeException> getExceptionClass() {
return InvalidTargetAttributeException.class;
}
}