Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1199] Non transient remote exceptions transport fix #1743

Merged
merged 16 commits into from May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,9 @@

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException;
import org.axonframework.axonserver.connector.command.AxonServerNonTransientRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.command.AxonServerRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerNonTransientRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerQueryDispatchException;
import org.axonframework.axonserver.connector.query.AxonServerRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
Expand Down Expand Up @@ -98,6 +100,14 @@ public enum ErrorCode {
new AxonServerRemoteCommandHandlingException(code, error)
)
),
COMMAND_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-4005",
(code, error, details) -> new CommandExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteCommandHandlingException(code, error),
details.get()
)
),

//Query errors
NO_HANDLER_FOR_QUERY("AXONIQ-5000", (code, error, details) -> new NoHandlerForQueryException(error.getMessage())),
Expand All @@ -110,6 +120,14 @@ public enum ErrorCode {
)
),
QUERY_DISPATCH_ERROR("AXONIQ-5002", (code, error, details) -> new AxonServerQueryDispatchException(code, error)),
QUERY_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-5003",
(code, error, details) -> new QueryExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteQueryHandlingException(code, error),
details.get()
)
),

// Internal errors
DATAFILE_READ_ERROR(
Expand Down Expand Up @@ -214,6 +232,32 @@ public AxonException convert(String source, Throwable throwable) {
() -> HandlerExecutionException.resolveDetails(throwable).orElse(null));
}

/**
* Returns an Query Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getQueryExecutionErrorCode(Throwable throwable) {
if (ExceptionSerializer.isExplicitlyNonTransient(throwable)) {
return ErrorCode.QUERY_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.QUERY_EXECUTION_ERROR;
}
}

/**
* Returns an Command Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getCommandExecutionErrorCode(Throwable throwable) {
if (ExceptionSerializer.isExplicitlyNonTransient(throwable)) {
return ErrorCode.COMMAND_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.COMMAND_EXECUTION_ERROR;
}
}

/**
* Functional interface towards building an {@link AxonException} based on an {@code errorCode}, {@link
* ErrorMessage} and the {@link Supplier} of an {@link Object}. This provides the option for users to specify more
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* Exception indicating a non-transient problem that was reported by the remote end of a connection.
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteCommandHandlingException extends RemoteNonTransientHandlingException {

private final String errorCode;
private final String server;

/**
* Initialize the exception with given {@code errorCode} and {@code errorMessage}.
*
* @param errorCode the code reported by the server
* @param errorMessage the message describing the exception on the remote end
*/
public AxonServerNonTransientRemoteCommandHandlingException(String errorCode, ErrorMessage errorMessage) {
super(new RemoteExceptionDescription(errorMessage.getDetailsList(), true));
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
this.errorCode = errorCode;
this.server = errorMessage.getLocation();
}

/**
* Returns the name of the server that reported the error.
*
* @return the name of the server that reported the error
*/
public String getServer() {
return server;
}

/**
* Returns the error code as reported by the server.
*
* @return the error code as reported by the server
*/
public String getErrorCode() {
return errorCode;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteCommandHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", server='" + server + '\'' +
'}';
}
}
Expand Up @@ -125,7 +125,7 @@ public CommandResponse serialize(CommandResultMessage<?> commandResultMessage, S

if (commandResultMessage.isExceptional()) {
Throwable throwable = commandResultMessage.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.COMMAND_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getCommandExecutionErrorCode(throwable).errorCode());
responseBuilder.setErrorMessage(ExceptionSerializer.serialize(configuration.getClientId(), throwable));
commandResultMessage.exceptionDetails()
.ifPresent(details -> responseBuilder.setPayload(objectSerializer.apply(details)));
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* Exception indicating a non-transient problem that was reported during query handling by the remote end of a connection.
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteQueryHandlingException extends RemoteNonTransientHandlingException {

private final String errorCode;
private final String server;

/**
* Initialize the exception with given {@code errorCode} and {@code errorMessage}.
*
* @param errorCode the code reported by the server
* @param errorMessage the message describing the exception on the remote end
*/
public AxonServerNonTransientRemoteQueryHandlingException(String errorCode, ErrorMessage message) {
super(new RemoteExceptionDescription(message.getDetailsList()));
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
this.errorCode = errorCode;
this.server = message.getLocation();
}

/**
* Returns the error code as reported by the server.
*
* @return the error code as reported by the server
*/
public String getErrorCode() {
return errorCode;
}

/**
* Returns the name of the server that reported the error.
*
* @return the name of the server that reported the error
*/
public String getServer() {
return server;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteQueryHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", location='" + server + '\'' +
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
'}';
}
}
Expand Up @@ -39,6 +39,7 @@
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.util.ErrorCodeDecider;
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
Expand Down Expand Up @@ -367,7 +368,7 @@ public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(Sub
updateHandler.getUpdates()
.doOnError(e -> {
ErrorMessage error = ExceptionSerializer.serialize(configuration.getClientId(), e);
String errorCode = ErrorCode.QUERY_EXECUTION_ERROR.errorCode();
String errorCode = ErrorCode.getQueryExecutionErrorCode(e).errorCode();
QueryUpdate queryUpdate = QueryUpdate.newBuilder()
.setErrorMessage(error)
.setErrorCode(errorCode)
Expand Down Expand Up @@ -427,7 +428,7 @@ public void run() {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
QueryResponse response =
QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build();
Expand All @@ -452,7 +453,7 @@ public void run() {
} catch (RuntimeException | OutOfDirectMemoryError e) {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
responseHandler.sendLast(QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build());
Expand Down
Expand Up @@ -136,7 +136,7 @@ public QueryResponse serializeResponse(QueryResponseMessage<?> queryResponse, St

if (queryResponse.isExceptional()) {
Throwable exceptionResult = queryResponse.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down
Expand Up @@ -132,7 +132,7 @@ public QueryUpdate serialize(SubscriptionQueryUpdateMessage<?> subscriptionQuery
QueryUpdate.Builder updateMessageBuilder = QueryUpdate.newBuilder();
if (subscriptionQueryUpdateMessage.isExceptional()) {
Throwable exceptionResult = subscriptionQueryUpdateMessage.exceptionResult();
updateMessageBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
updateMessageBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
updateMessageBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -232,7 +232,7 @@ QueryProviderOutbound serialize(QueryResponseMessage<?> initialResult, String su
.setRequestIdentifier(subscriptionId);
if (initialResult.isExceptional()) {
Throwable exceptionResult = initialResult.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCode.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -279,7 +279,7 @@ QueryProviderOutbound serializeCompleteExceptionally(String subscriptionId, Thro
.setErrorMessage(ExceptionSerializer.serialize(
configuration.getClientId(), cause
))
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCode.getQueryExecutionErrorCode(cause).errorCode())
.setClientId(configuration.getClientId())
.setComponentName(configuration.getComponentName())
.build();
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.axonframework.axonserver.connector.util;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.common.AxonNonTransientException;

import static org.axonframework.common.ObjectUtils.getOrDefault;

Expand All @@ -37,8 +38,8 @@ private ExceptionSerializer() {
*
* @param clientLocation the name of the client were the {@link ErrorMessage} originates from
* @param t the {@link Throwable} to base this {@link ErrorMessage} on
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the {@link
* Throwable}
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the
* {@link Throwable}
*/
public static ErrorMessage serialize(String clientLocation, Throwable t) {
ErrorMessage.Builder builder =
Expand All @@ -52,4 +53,16 @@ public static ErrorMessage serialize(String clientLocation, Throwable t) {
}
return builder.build();
}

/**
* Indicates whether the given {@code failure} is clearly non-transient. Non-transient exceptions indicate
* that the handling of the message will fail in the same way if retried
*
* @param failure the exception that occurred while processing a message
* @return {@code true} if the exception is clearly non-transient
*/
public static boolean isExplicitlyNonTransient(Throwable failure) {
return failure instanceof AxonNonTransientException
|| (failure.getCause() != null && isExplicitlyNonTransient(failure.getCause()));
}
}