Skip to content

Commit

Permalink
feat: add failureDetail to DataException
Browse files Browse the repository at this point in the history
  • Loading branch information
Dapeng Wang authored and ada89 committed Aug 29, 2022
1 parent 88cb66f commit c7fa2bd
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 13 deletions.
5 changes: 4 additions & 1 deletion src/main/java/io/neonbee/NeonBee.java
Expand Up @@ -42,6 +42,7 @@
import io.neonbee.config.HealthConfig;
import io.neonbee.config.NeonBeeConfig;
import io.neonbee.config.ServerConfig;
import io.neonbee.data.DataException;
import io.neonbee.data.DataQuery;
import io.neonbee.entity.EntityModelManager;
import io.neonbee.entity.EntityWrapper;
Expand All @@ -55,6 +56,7 @@
import io.neonbee.hook.internal.DefaultHookRegistry;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.buffer.ImmutableBuffer;
import io.neonbee.internal.codec.DataExceptionMessageCodec;
import io.neonbee.internal.codec.DataQueryMessageCodec;
import io.neonbee.internal.codec.EntityWrapperMessageCodec;
import io.neonbee.internal.codec.ImmutableBufferMessageCodec;
Expand Down Expand Up @@ -410,7 +412,8 @@ Future<Void> decorateEventBus() {
.registerDefaultCodec(EntityWrapper.class, new EntityWrapperMessageCodec(vertx))
.registerDefaultCodec(ImmutableBuffer.class, new ImmutableBufferMessageCodec())
.registerDefaultCodec(ImmutableJsonArray.class, new ImmutableJsonArrayMessageCodec())
.registerDefaultCodec(ImmutableJsonObject.class, new ImmutableJsonObjectMessageCodec());
.registerDefaultCodec(ImmutableJsonObject.class, new ImmutableJsonObjectMessageCodec())
.registerDefaultCodec(DataException.class, new DataExceptionMessageCodec());

// add any additional default codecs configured in NeonBeeConfig
config.getEventBusCodecs().forEach(this::registerCodec);
Expand Down
40 changes: 37 additions & 3 deletions src/main/java/io/neonbee/data/DataException.java
@@ -1,10 +1,16 @@
package io.neonbee.data;

import static io.neonbee.internal.helper.StringHelper.EMPTY;
import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

@SuppressWarnings({ "OverrideThrowableToString", "checkstyle:JavadocVariable" })
public class DataException extends RuntimeException {
public static final int FAILURE_CODE_UNKNOWN_STRATEGY = 1000;
Expand All @@ -21,6 +27,8 @@ public class DataException extends RuntimeException {

private final int failureCode;

private final Map<String, Object> failureDetail;

/**
* Create a DataException.
*/
Expand Down Expand Up @@ -53,8 +61,22 @@ public DataException(int failureCode) {
* @param message the failure message
*/
public DataException(int failureCode, String message) {
this(failureCode, message, Map.of());
}

/**
* Create a DataException.
*
* @param failureCode the failure code
* @param message the failure message
* @param failureDetail the failure detail message to be propagated to the invoker, but should not be exposed to the
* consumer. Must be compatible to {@link JsonObject#JsonObject(Map)}. The type of the passed
* object must be either: {@link String} or {@link JsonObject} or {@link JsonArray}
*/
public DataException(int failureCode, String message, Map<String, Object> failureDetail) {
super(message);
this.failureCode = failureCode;
this.failureDetail = requireNonNull(failureDetail);
}

/**
Expand All @@ -66,9 +88,18 @@ public int failureCode() {
return failureCode;
}

/**
* Get the failure detail message.
*
* @return the failure detail
*/
public Map<String, Object> failureDetail() {
return Collections.unmodifiableMap(failureDetail);
}

@Override
public int hashCode() {
return Objects.hash(failureCode, getMessage());
return Objects.hash(failureCode, getMessage(), failureDetail);
}

@Override
Expand All @@ -83,11 +114,14 @@ public boolean equals(Object obj) {
return false;
}
DataException other = (DataException) obj;
return failureCode == other.failureCode && getMessage().equals(other.getMessage());
return failureCode == other.failureCode
&& Optional.ofNullable(getMessage()).orElse(EMPTY)
.equals(Optional.ofNullable(other.getMessage()).orElse(EMPTY))
&& failureDetail().equals(other.failureDetail());
}

@Override
public String toString() {
return "(" + failureCode + ") " + Optional.ofNullable(getMessage()).orElse(EMPTY);
return "(" + failureCode + ")" + Optional.ofNullable(getMessage()).map(msg -> " " + msg).orElse(EMPTY);
}
}
26 changes: 17 additions & 9 deletions src/main/java/io/neonbee/data/DataVerticle.java
Expand Up @@ -115,14 +115,22 @@ public static <U> Future<U> requestData(Vertx vertx, DataRequest request, DataCo
LOGGER.correlateWith(context).debug("Received event bus reply");

if (asyncReply.succeeded()) {
DataContext responseDataContext =
decodeContextFromString(asyncReply.result().headers().get(CONTEXT_HEADER));
context.setData(
Optional.ofNullable(responseDataContext).map(DataContext::data).orElse(null));
context.mergeResponseData(Optional.ofNullable(responseDataContext)
.map(DataContext::responseData).orElse(null));
return succeededFuture(asyncReply.result().body());

U body = asyncReply.result().body();
if (body instanceof DataException) {
if (LOGGER.isWarnEnabled()) {
LOGGER.correlateWith(context).warn("Received a event bus reply failure from {}",
qualifiedName, (DataException) body);
}
return failedFuture((DataException) body);
} else {
DataContext responseDataContext =
decodeContextFromString(asyncReply.result().headers().get(CONTEXT_HEADER));
context.setData(
Optional.ofNullable(responseDataContext).map(DataContext::data).orElse(null));
context.mergeResponseData(Optional.ofNullable(responseDataContext)
.map(DataContext::responseData).orElse(null));
return succeededFuture(asyncReply.result().body());
}
} else {
Throwable cause = asyncReply.cause();
if (LOGGER.isWarnEnabled()) {
Expand Down Expand Up @@ -412,7 +420,7 @@ public void start(Promise<Void> promise) {
}

if (cause instanceof DataException) {
message.fail(((DataException) cause).failureCode(), cause.getMessage());
message.reply(cause);
} else {
message.fail(FAILURE_CODE_PROCESSING_FAILED,
"Processing of message failed. " + cause.getMessage());
Expand Down
@@ -0,0 +1,70 @@
package io.neonbee.internal.codec;

import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import io.neonbee.data.DataException;
import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.json.JsonObject;

public class DataExceptionMessageCodec implements MessageCodec<DataException, DataException> {

@Override
public void encodeToWire(Buffer buffer, DataException exception) {
buffer.appendInt(exception.failureCode());

String msgEncoded = exception.getMessage();
if (msgEncoded != null) {
byte[] msgBytes = msgEncoded.getBytes(CharsetUtil.UTF_8);
buffer.appendInt(msgBytes.length);
buffer.appendBytes(msgBytes);
} else {
buffer.appendInt(-1);
}

Buffer failureDetailEncoded = new JsonObject(exception.failureDetail()).toBuffer();
buffer.appendInt(failureDetailEncoded.length());
buffer.appendBuffer(failureDetailEncoded);
}

@Override
public DataException decodeFromWire(int pos, Buffer buffer) {
int tmpPos = pos;
int failureCode = buffer.getInt(tmpPos);
tmpPos += Integer.BYTES;

int msgLength = buffer.getInt(tmpPos);
tmpPos += Integer.BYTES;
String message =
msgLength != -1 ? new String(buffer.getBytes(tmpPos, tmpPos + msgLength), CharsetUtil.UTF_8) : null;
tmpPos += msgLength != -1 ? msgLength : 0;

int failureDetailLength = buffer.getInt(tmpPos);
tmpPos += Integer.BYTES;

Buffer buff = buffer.getBuffer(tmpPos, tmpPos + failureDetailLength);
Map<String, Object> failureDetail =
buff.toJsonObject().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue));

return new DataException(failureCode, message, failureDetail);
}

@Override
public DataException transform(DataException exception) {
return exception;
}

@Override
public String name() {
return "dataexception";
}

@Override
public byte systemCodecID() {
return -1;
}

}
134 changes: 134 additions & 0 deletions src/test/java/io/neonbee/cluster/DataExceptionRequestTest.java
@@ -0,0 +1,134 @@
package io.neonbee.cluster;

import static com.google.common.truth.Truth.assertThat;
import static io.neonbee.test.helper.DeploymentHelper.deployVerticle;
import static io.vertx.core.Future.failedFuture;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.neonbee.NeonBee;
import io.neonbee.NeonBeeExtension;
import io.neonbee.NeonBeeInstanceConfiguration;
import io.neonbee.data.DataContext;
import io.neonbee.data.DataException;
import io.neonbee.data.DataMap;
import io.neonbee.data.DataQuery;
import io.neonbee.data.DataRequest;
import io.neonbee.data.DataVerticle;
import io.neonbee.data.internal.DataContextImpl;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxTestContext;

@ExtendWith(NeonBeeExtension.class)
class DataExceptionRequestTest {
private static final JsonObject FAILURE_OBJECT = new JsonObject().put("code", new JsonArray()
.add(new JsonObject().put("message", "This is a bad response")).add(new JsonObject().put("lang", "en")));

private static final Map<String, Object> FAILURE_DETAIL = Map.of("error", FAILURE_OBJECT);

private static final DataVerticle<Buffer> DATA_EXCEPTION_VERTICLE = new DataVerticle<>() {
@Override
public String getName() {
return "DataException";
}

@Override
public Future<Buffer> retrieveData(DataQuery query, DataMap dataMap, DataContext context) {
return failedFuture(new DataException(400, "Bad Response", FAILURE_DETAIL));
}
};

private static final DataVerticle<Buffer> DATA_EXCEPTION_VERTICLE_ONLY_FAILURE_CODE = new DataVerticle<>() {
@Override
public String getName() {
return "DataExceptionFailureCodeOnly";
}

@Override
public Future<Buffer> retrieveData(DataQuery query, DataMap dataMap, DataContext context) {
return failedFuture(new DataException(400));
}
};

private static final DataVerticle<Buffer> DATA_EXCEPTION_VERTICLE_FAILURE_CODE_AND_MESSAGE = new DataVerticle<>() {
@Override
public String getName() {
return "DataExceptionFailureCodeAndMessage";
}

@Override
public Future<Buffer> retrieveData(DataQuery query, DataMap dataMap, DataContext context) {
return failedFuture(new DataException(400, "Bad response"));
}
};

@Test
@Timeout(value = 10, timeUnit = TimeUnit.HOURS)
@DisplayName("Test that DataException can be returned via the event bus")
void testDataExceptionRequest(@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee source,
@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee target,
VertxTestContext testContext) {
deployVerticle(target.getVertx(), DATA_EXCEPTION_VERTICLE).compose(s -> {
DataRequest request = new DataRequest(DATA_EXCEPTION_VERTICLE.getName());
return DataVerticle.<JsonObject>requestData(source.getVertx(), request, new DataContextImpl());
}).onComplete(testContext.failing(response -> {
testContext.verify(() -> {
assertThat(response).isInstanceOf(DataException.class);
assertThat(((DataException) response).failureCode()).isEqualTo(400);
assertThat(response.getMessage()).isEqualTo("Bad Response");
assertThat(((DataException) response).failureDetail().get("error")).isEqualTo(FAILURE_OBJECT);
});
testContext.completeNow();
}));
}

@Test
@Timeout(value = 10, timeUnit = TimeUnit.SECONDS)
@DisplayName("Test that DataException with failure code and message be returned via the event bus")
void testDataExceptionRequestFailureCodeAndMessage(
@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee source,
@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee target,
VertxTestContext testContext) {
deployVerticle(target.getVertx(), DATA_EXCEPTION_VERTICLE_FAILURE_CODE_AND_MESSAGE).compose(s -> {
DataRequest request = new DataRequest(DATA_EXCEPTION_VERTICLE_FAILURE_CODE_AND_MESSAGE.getName());
return DataVerticle.<JsonObject>requestData(source.getVertx(), request, new DataContextImpl());
}).onComplete(testContext.failing(response -> {
testContext.verify(() -> {
assertThat(response).isInstanceOf(DataException.class);
assertThat(((DataException) response).failureCode()).isEqualTo(400);
assertThat(response.getMessage()).isEqualTo("Bad response");
});
testContext.completeNow();
}));
}

@Test
@Timeout(value = 10, timeUnit = TimeUnit.SECONDS)
@DisplayName("Test that DataException with failure code be returned via the event bus")
void testDataExceptionRequestFailureCodeOnly(
@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee source,
@NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}) NeonBee target,
VertxTestContext testContext) {
deployVerticle(target.getVertx(), DATA_EXCEPTION_VERTICLE_ONLY_FAILURE_CODE).compose(s -> {
DataRequest request = new DataRequest(DATA_EXCEPTION_VERTICLE_ONLY_FAILURE_CODE.getName());
return DataVerticle.<JsonObject>requestData(source.getVertx(), request, new DataContextImpl());
}).onComplete(testContext.failing(response -> {
testContext.verify(() -> {
assertThat(response).isInstanceOf(DataException.class);
assertThat(((DataException) response).failureCode()).isEqualTo(400);
assertThat(response.getMessage()).isEqualTo(null);
assertThat(((DataException) response).failureDetail()).isEqualTo(Map.of());
});
testContext.completeNow();
}));
}
}

0 comments on commit c7fa2bd

Please sign in to comment.