Skip to content

Commit

Permalink
Merge pull request #1743 from AxonFramework/bugfix/non-transient-remo…
Browse files Browse the repository at this point in the history
…te-exceptions

Non transient remote exceptions transport fix
  • Loading branch information
smcvb committed May 12, 2021
2 parents 020f59c + 118eb8b commit 1ac9b66
Show file tree
Hide file tree
Showing 20 changed files with 543 additions and 44 deletions.
Expand Up @@ -18,13 +18,16 @@

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException;
import org.axonframework.axonserver.connector.command.AxonServerNonTransientRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.command.AxonServerRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerNonTransientRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerQueryDispatchException;
import org.axonframework.axonserver.connector.query.AxonServerRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.commandhandling.CommandExecutionException;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.common.AxonException;
import org.axonframework.common.ExceptionUtils;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.HandlerExecutionException;
Expand Down Expand Up @@ -98,6 +101,14 @@ public enum ErrorCode {
new AxonServerRemoteCommandHandlingException(code, error)
)
),
COMMAND_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-4005",
(code, error, details) -> new CommandExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteCommandHandlingException(code, error),
details.get()
)
),

//Query errors
NO_HANDLER_FOR_QUERY("AXONIQ-5000", (code, error, details) -> new NoHandlerForQueryException(error.getMessage())),
Expand All @@ -110,6 +121,14 @@ public enum ErrorCode {
)
),
QUERY_DISPATCH_ERROR("AXONIQ-5002", (code, error, details) -> new AxonServerQueryDispatchException(code, error)),
QUERY_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-5003",
(code, error, details) -> new QueryExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteQueryHandlingException(code, error),
details.get()
)
),

// Internal errors
DATAFILE_READ_ERROR(
Expand Down Expand Up @@ -214,6 +233,32 @@ public AxonException convert(String source, Throwable throwable) {
() -> HandlerExecutionException.resolveDetails(throwable).orElse(null));
}

/**
* Returns an Query Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getQueryExecutionErrorCode(Throwable throwable) {
if (ExceptionUtils.isExplicitlyNonTransient(throwable)) {
return ErrorCode.QUERY_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.QUERY_EXECUTION_ERROR;
}
}

/**
* Returns an Command Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getCommandExecutionErrorCode(Throwable throwable) {
if (ExceptionUtils.isExplicitlyNonTransient(throwable)) {
return ErrorCode.COMMAND_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.COMMAND_EXECUTION_ERROR;
}
}

/**
* Functional interface towards building an {@link AxonException} based on an {@code errorCode}, {@link
* ErrorMessage} and the {@link Supplier} of an {@link Object}. This provides the option for users to specify more
Expand Down
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* Exception indicating a non-transient problem that was reported by the remote end of a connection.
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteCommandHandlingException extends RemoteNonTransientHandlingException {

private static final boolean PERSISTENT = true;
private final String errorCode;
private final String server;

/**
* Initialize the exception with given {@code errorCode} and {@code errorMessage}.
*
* @param errorCode the code reported by the server
* @param errorMessage the message describing the exception on the remote end
*/
public AxonServerNonTransientRemoteCommandHandlingException(String errorCode, ErrorMessage errorMessage) {
super(new RemoteExceptionDescription(errorMessage.getDetailsList(), PERSISTENT));
this.errorCode = errorCode;
this.server = errorMessage.getLocation();
}

/**
* Returns the name of the server that reported the error.
*
* @return the name of the server that reported the error
*/
public String getServer() {
return server;
}

/**
* Returns the error code as reported by the server.
*
* @return the error code as reported by the server
*/
public String getErrorCode() {
return errorCode;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteCommandHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", server='" + server + '\'' +
'}';
}
}
Expand Up @@ -125,7 +125,7 @@ public CommandResponse serialize(CommandResultMessage<?> commandResultMessage, S

if (commandResultMessage.isExceptional()) {
Throwable throwable = commandResultMessage.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.COMMAND_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getCommandExecutionErrorCode(throwable).errorCode());
responseBuilder.setErrorMessage(ExceptionSerializer.serialize(configuration.getClientId(), throwable));
commandResultMessage.exceptionDetails()
.ifPresent(details -> responseBuilder.setPayload(objectSerializer.apply(details)));
Expand Down
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* Exception indicating a non-transient problem that was reported during query handling by the remote end of a connection.
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteQueryHandlingException extends RemoteNonTransientHandlingException {

private static final boolean PERSISTENT = true;
private final String errorCode;
private final String server;

/**
* Initialize the exception with given {@code errorCode} and {@code errorMessage}.
*
* @param errorCode the code reported by the server
* @param errorMessage the message describing the exception on the remote end
*/
public AxonServerNonTransientRemoteQueryHandlingException(String errorCode, ErrorMessage errorMessage) {
super(new RemoteExceptionDescription(errorMessage.getDetailsList(), PERSISTENT));
this.errorCode = errorCode;
this.server = errorMessage.getLocation();
}

/**
* Returns the error code as reported by the server.
*
* @return the error code as reported by the server
*/
public String getErrorCode() {
return errorCode;
}

/**
* Returns the name of the server that reported the error.
*
* @return the name of the server that reported the error
*/
public String getServer() {
return server;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteQueryHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", server='" + server + '\'' +
'}';
}
}
Expand Up @@ -367,7 +367,7 @@ public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(Sub
updateHandler.getUpdates()
.doOnError(e -> {
ErrorMessage error = ExceptionSerializer.serialize(configuration.getClientId(), e);
String errorCode = ErrorCode.QUERY_EXECUTION_ERROR.errorCode();
String errorCode = ErrorCode.getQueryExecutionErrorCode(e).errorCode();
QueryUpdate queryUpdate = QueryUpdate.newBuilder()
.setErrorMessage(error)
.setErrorCode(errorCode)
Expand Down Expand Up @@ -427,7 +427,7 @@ public void run() {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
QueryResponse response =
QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build();
Expand All @@ -452,7 +452,7 @@ public void run() {
} catch (RuntimeException | OutOfDirectMemoryError e) {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
responseHandler.sendLast(QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build());
Expand Down
Expand Up @@ -136,7 +136,7 @@ public QueryResponse serializeResponse(QueryResponseMessage<?> queryResponse, St

if (queryResponse.isExceptional()) {
Throwable exceptionResult = queryResponse.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down
Expand Up @@ -132,7 +132,7 @@ public QueryUpdate serialize(SubscriptionQueryUpdateMessage<?> subscriptionQuery
QueryUpdate.Builder updateMessageBuilder = QueryUpdate.newBuilder();
if (subscriptionQueryUpdateMessage.isExceptional()) {
Throwable exceptionResult = subscriptionQueryUpdateMessage.exceptionResult();
updateMessageBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
updateMessageBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
updateMessageBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -232,7 +232,7 @@ QueryProviderOutbound serialize(QueryResponseMessage<?> initialResult, String su
.setRequestIdentifier(subscriptionId);
if (initialResult.isExceptional()) {
Throwable exceptionResult = initialResult.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -279,7 +279,7 @@ QueryProviderOutbound serializeCompleteExceptionally(String subscriptionId, Thro
.setErrorMessage(ExceptionSerializer.serialize(
configuration.getClientId(), cause
))
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(cause).errorCode())
.setClientId(configuration.getClientId())
.setComponentName(configuration.getComponentName())
.build();
Expand Down
Expand Up @@ -37,8 +37,8 @@ private ExceptionSerializer() {
*
* @param clientLocation the name of the client were the {@link ErrorMessage} originates from
* @param t the {@link Throwable} to base this {@link ErrorMessage} on
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the {@link
* Throwable}
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the
* {@link Throwable}
*/
public static ErrorMessage serialize(String clientLocation, Throwable t) {
ErrorMessage.Builder builder =
Expand All @@ -52,4 +52,5 @@ public static ErrorMessage serialize(String clientLocation, Throwable t) {
}
return builder.build();
}

}
Expand Up @@ -20,26 +20,25 @@
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.commandhandling.CommandExecutionException;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.SerializationException;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.*;
import org.junit.jupiter.params.provider.*;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

/**
* Author: marc
Expand Down Expand Up @@ -125,6 +124,41 @@ void testSerializeExceptionalResponseWithDetails(CommandSerializer testSubject)
assertEquals("Details", ((CommandExecutionException) actual).getDetails().orElse("None"));
}

@MethodSource("data")
@ParameterizedTest
void testSerializeNonTransientExceptionalResponse(CommandSerializer testSubject) {
SerializationException nonTransientExceptionCause = new SerializationException("Serialization non recoverable problem");
Exception exception = new CommandExecutionException("oops", nonTransientExceptionCause, null);
CommandResultMessage<?> response = new GenericCommandResultMessage<>(exception,
MetaData.with("test", "testValue"));
CommandResponse outbound = testSubject.serialize(response, "requestIdentifier");
assertEquals(response.getIdentifier(), outbound.getMessageIdentifier());

assertEquals(ErrorCode.COMMAND_EXECUTION_NON_TRANSIENT_ERROR.errorCode(), outbound.getErrorCode());
}

@MethodSource("data")
@ParameterizedTest
void testSerializeDeserializeNonTransientExceptionalResponseWithDetails(CommandSerializer testSubject) {
SerializationException nonTransientExceptionCause = new SerializationException("Serialization non recoverable problem");
Exception exception = new CommandExecutionException("oops", nonTransientExceptionCause, "Details");
CommandResultMessage<?> response = new GenericCommandResultMessage<>(exception,
MetaData.with("test", "testValue"));
CommandResponse outbound = testSubject.serialize(response, "requestIdentifier");
assertEquals(response.getIdentifier(), outbound.getMessageIdentifier());
CommandResultMessage<?> deserialize = testSubject.deserialize(outbound);

assertEquals(response.getIdentifier(), deserialize.getIdentifier());
assertEquals(response.getMetaData(), deserialize.getMetaData());
assertTrue(deserialize.isExceptional());
assertTrue(deserialize.optionalExceptionResult().isPresent());
assertEquals(exception.getMessage(), deserialize.exceptionResult().getMessage());
Throwable actual = deserialize.optionalExceptionResult().get();
assertTrue(actual instanceof CommandExecutionException);
assertTrue(actual.getCause() instanceof AxonServerNonTransientRemoteCommandHandlingException);
assertTrue(actual.getCause().getMessage().contains("Serialization non recoverable problem"));
}

@MethodSource("data")
@ParameterizedTest
void testDeserializeResponseWithoutPayload(CommandSerializer testSubject) {
Expand Down

0 comments on commit 1ac9b66

Please sign in to comment.