Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1199] Non transient remote exceptions transport fix #1743

Merged
merged 16 commits into from May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,9 @@

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException;
import org.axonframework.axonserver.connector.command.AxonServerNonTransientRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.command.AxonServerRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerNonTransientRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.query.AxonServerQueryDispatchException;
import org.axonframework.axonserver.connector.query.AxonServerRemoteQueryHandlingException;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
Expand Down Expand Up @@ -98,6 +100,14 @@ public enum ErrorCode {
new AxonServerRemoteCommandHandlingException(code, error)
)
),
COMMAND_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-4005",
(code, error, details) -> new CommandExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteCommandHandlingException(code, error),
details.get()
)
),

//Query errors
NO_HANDLER_FOR_QUERY("AXONIQ-5000", (code, error, details) -> new NoHandlerForQueryException(error.getMessage())),
Expand All @@ -110,6 +120,14 @@ public enum ErrorCode {
)
),
QUERY_DISPATCH_ERROR("AXONIQ-5002", (code, error, details) -> new AxonServerQueryDispatchException(code, error)),
QUERY_EXECUTION_NON_TRANSIENT_ERROR(
"AXONIQ-5003",
(code, error, details) -> new QueryExecutionException(
error.getMessage(),
new AxonServerNonTransientRemoteQueryHandlingException(code, error),
details.get()
)
),

// Internal errors
DATAFILE_READ_ERROR(
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2010-2018. Axon Framework
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* Exception indicating a non transient problem that was reported by the remote end of a connection.
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteCommandHandlingException extends RemoteNonTransientHandlingException {

private final String errorCode;
private final String server;

/**
* Initialize the exception with given {@code errorCode} and {@code errorMessage}.
*
* @param errorCode the code reported by the server
* @param errorMessage the message describing the exception on the remote end
*/
public AxonServerNonTransientRemoteCommandHandlingException(String errorCode, ErrorMessage errorMessage) {
super(new RemoteExceptionDescription(errorMessage.getDetailsList()));
this.errorCode = errorCode;
this.server = errorMessage.getLocation();
}

/**
* Returns the name of the server that reported the error.
*
* @return the name of the server that reported the error
*/
public String getOriginServer() {
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
return server;
}

/**
* Returns the error code as reported by the server.
*
* @return the error code as reported by the server
*/
public String getErrorCode() {
return errorCode;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteCommandHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", server='" + server + '\'' +
'}';
}
}
Expand Up @@ -24,6 +24,7 @@
import io.axoniq.axonserver.grpc.command.CommandResponse;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.util.ErrorCodeDecider;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.axonserver.connector.util.GrpcMetadataSerializer;
Expand Down Expand Up @@ -125,7 +126,7 @@ public CommandResponse serialize(CommandResultMessage<?> commandResultMessage, S

if (commandResultMessage.isExceptional()) {
Throwable throwable = commandResultMessage.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.COMMAND_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCodeDecider.getCommandExecutionErrorCode(throwable).errorCode());
responseBuilder.setErrorMessage(ExceptionSerializer.serialize(configuration.getClientId(), throwable));
commandResultMessage.exceptionDetails()
.ifPresent(details -> responseBuilder.setPayload(objectSerializer.apply(details)));
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2010-2020. Axon Framework
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteNonTransientHandlingException;

/**
* An AxonServer Non Transient Exception which is thrown if a Query Handling exception is non transient.
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class AxonServerNonTransientRemoteQueryHandlingException extends RemoteNonTransientHandlingException {

private final String errorCode;
private final String server;

/**
* Initialize a Query Handling exception from a remote source.
*
* @param errorCode a {@link String} defining the error code of this exception
* @param message an {@link ErrorMessage} describing the exception
*/
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
public AxonServerNonTransientRemoteQueryHandlingException(String errorCode, ErrorMessage message) {
super(new RemoteExceptionDescription(message.getDetailsList()));
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
this.errorCode = errorCode;
this.server = message.getLocation();
}

/**
* Return a {@link String} defining the error code.
*
* @return a {@link String} defining the error code
*/
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
public String getErrorCode() {
return errorCode;
}

/**
* Return a {@link String} defining the location where the error originated.
*
* @return a {@link String} defining the location where the error originated
*/
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
public String getServer() {
return server;
}

@Override
public String toString() {
return "AxonServerNonTransientRemoteQueryHandlingException{" +
"message=" + getMessage() +
", errorCode='" + errorCode + '\'' +
", location='" + server + '\'' +
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
'}';
}
}
Expand Up @@ -39,6 +39,7 @@
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.util.ErrorCodeDecider;
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
Expand Down Expand Up @@ -367,7 +368,7 @@ public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(Sub
updateHandler.getUpdates()
.doOnError(e -> {
ErrorMessage error = ExceptionSerializer.serialize(configuration.getClientId(), e);
String errorCode = ErrorCode.QUERY_EXECUTION_ERROR.errorCode();
String errorCode = ErrorCodeDecider.getQueryExecutionErrorCode(e).errorCode();
QueryUpdate queryUpdate = QueryUpdate.newBuilder()
.setErrorMessage(error)
.setErrorCode(errorCode)
Expand Down Expand Up @@ -427,7 +428,7 @@ public void run() {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
QueryResponse response =
QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build();
Expand All @@ -452,7 +453,7 @@ public void run() {
} catch (RuntimeException | OutOfDirectMemoryError e) {
ErrorMessage ex = ExceptionSerializer.serialize(clientId, e);
responseHandler.sendLast(QueryResponse.newBuilder()
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(e).errorCode())
.setErrorMessage(ex)
.setRequestIdentifier(queryRequest.getMessageIdentifier())
.build());
Expand Down
Expand Up @@ -22,7 +22,7 @@
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.util.ErrorCodeDecider;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.axonserver.connector.util.GrpcMetadataSerializer;
Expand Down Expand Up @@ -136,7 +136,7 @@ public QueryResponse serializeResponse(QueryResponseMessage<?> queryResponse, St

if (queryResponse.isExceptional()) {
Throwable exceptionResult = queryResponse.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down
Expand Up @@ -27,8 +27,8 @@
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.query.GrpcBackedResponseMessage;
import org.axonframework.axonserver.connector.util.ErrorCodeDecider;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.axonserver.connector.util.GrpcMetadataSerializer;
Expand Down Expand Up @@ -132,7 +132,7 @@ public QueryUpdate serialize(SubscriptionQueryUpdateMessage<?> subscriptionQuery
QueryUpdate.Builder updateMessageBuilder = QueryUpdate.newBuilder();
if (subscriptionQueryUpdateMessage.isExceptional()) {
Throwable exceptionResult = subscriptionQueryUpdateMessage.exceptionResult();
updateMessageBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
updateMessageBuilder.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(exceptionResult).errorCode());
updateMessageBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -232,7 +232,7 @@ QueryProviderOutbound serialize(QueryResponseMessage<?> initialResult, String su
.setRequestIdentifier(subscriptionId);
if (initialResult.isExceptional()) {
Throwable exceptionResult = initialResult.exceptionResult();
responseBuilder.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode());
responseBuilder.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(exceptionResult).errorCode());
responseBuilder.setErrorMessage(
ExceptionSerializer.serialize(configuration.getClientId(), exceptionResult)
);
Expand Down Expand Up @@ -279,7 +279,7 @@ QueryProviderOutbound serializeCompleteExceptionally(String subscriptionId, Thro
.setErrorMessage(ExceptionSerializer.serialize(
configuration.getClientId(), cause
))
.setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode())
.setErrorCode(ErrorCodeDecider.getQueryExecutionErrorCode(cause).errorCode())
.setClientId(configuration.getClientId())
.setComponentName(configuration.getComponentName())
.build();
Expand Down
@@ -0,0 +1,42 @@
package org.axonframework.axonserver.connector.util;

import org.axonframework.axonserver.connector.ErrorCode;

/**
* Utility class used to pick correct {@link ErrorCode} from a {@link Throwable}
*
* @author Stefan Andjelkovic
* @since 4.5
*/
public class ErrorCodeDecider {
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved

private ErrorCodeDecider() {
// Utility class
}

/**
* Returns an Query Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getQueryExecutionErrorCode(Throwable throwable) {
if (ExceptionSerializer.isExplicitlyNonTransient(throwable)) {
return ErrorCode.QUERY_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.QUERY_EXECUTION_ERROR;
}
}

/**
* Returns an Command Execution ErrorCode variation based on the transiency of the given {@link Throwable}
* @param throwable The {@link Throwable} to inspect for transiency
* @return {@link ErrorCode} variation
*/
public static ErrorCode getCommandExecutionErrorCode(Throwable throwable) {
if (ExceptionSerializer.isExplicitlyNonTransient(throwable)) {
return ErrorCode.COMMAND_EXECUTION_NON_TRANSIENT_ERROR;
} else {
return ErrorCode.COMMAND_EXECUTION_ERROR;
}
}
}
Expand Up @@ -17,6 +17,7 @@
package org.axonframework.axonserver.connector.util;

import io.axoniq.axonserver.grpc.ErrorMessage;
import org.axonframework.common.AxonNonTransientException;

import static org.axonframework.common.ObjectUtils.getOrDefault;

Expand All @@ -37,8 +38,7 @@ private ExceptionSerializer() {
*
* @param clientLocation the name of the client were the {@link ErrorMessage} originates from
* @param t the {@link Throwable} to base this {@link ErrorMessage} on
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the {@link
* Throwable}
* @return the {@link ErrorMessage} originating from the given {@code clientLocation} and based on the \
sandjelkovic marked this conversation as resolved.
Show resolved Hide resolved
*/
public static ErrorMessage serialize(String clientLocation, Throwable t) {
ErrorMessage.Builder builder =
Expand All @@ -52,4 +52,18 @@ public static ErrorMessage serialize(String clientLocation, Throwable t) {
}
return builder.build();
}

/**
* Indicates whether the given {@code failure} is clearly non-transient. That means, whether the
* {@code failure} explicitly states that a retry of the same Command would result in the same failure to
* occur again.
*
* @param failure the exception that occurred while processing a command
* @return {@code true} if the exception is clearly non-transient and the command should <em>not</em> be
* retried, or {@code false} when the command has a chance of succeeding if it retried.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of JavaDoc is clearly biased towards commands, whereas it is currently placed and intended to have a broader scope than that.

I feel this piece should be adjusted to accompany the fact this ExceptionSerializer's method simply validates whether the given failure is non-transient yes/no.

*/
public static boolean isExplicitlyNonTransient(Throwable failure) {
return failure instanceof AxonNonTransientException
|| (failure.getCause() != null && isExplicitlyNonTransient(failure.getCause()));
}
}