Skip to content

Commit

Permalink
Removed artificial 'success' parameter from ReplyMessage.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed Oct 15, 2018
1 parent 21e6962 commit 499cb41
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
public abstract class ReplyMessage {

protected String commandIdentifier;
protected boolean success;
protected byte[] serializedMetaData;
protected String payloadType;
protected String payloadRevision;
Expand All @@ -50,17 +49,13 @@ protected ReplyMessage() {

/**
* Initializes a ReplyMessage containing a reply to the command with given {commandIdentifier} and given
* {@code commandResultMessage}. The parameter {@code success} determines whether the was executed successfully or
* not.
* {@code commandResultMessage}.
*
* @param commandIdentifier The identifier of the command to which the message is a reply
* @param success Whether or not the command executed successfully or not
* @param commandResultMessage The result message of command process
* @param serializer The serializer to serialize the message contents with
*/
public ReplyMessage(String commandIdentifier, boolean success, CommandResultMessage<?> commandResultMessage,
Serializer serializer) {
this.success = success;
public ReplyMessage(String commandIdentifier, CommandResultMessage<?> commandResultMessage, Serializer serializer) {
this.commandIdentifier = commandIdentifier;

SerializedObject<byte[]> metaData = commandResultMessage.serializeMetaData(serializer, byte[].class);
Expand All @@ -79,18 +74,12 @@ public ReplyMessage(String commandIdentifier, boolean success, CommandResultMess
}

/**
* Returns the returnValue of the command processing. If {@link #isSuccess()} return {@code false}, this
* method returns {@code null}. This method also returns {@code null} if response processing returned
* a {@code null} value.
* Returns the returnValue of the command processing.
*
* @param serializer The serializer to deserialize the result with
* @return The return value of command processing
*/
public CommandResultMessage<?> getCommandResultMessage(Serializer serializer) {
if (!success || payloadType == null) {
return null;
}

Object payload = deserializePayload(serializer);
Throwable exception = deserializeException(serializer);
SerializedMetaData<byte[]> serializedMetaData =
Expand All @@ -103,17 +92,6 @@ public CommandResultMessage<?> getCommandResultMessage(Serializer serializer) {
return new GenericCommandResultMessage<>(payload, metaData);
}

/**
* Returns the error of the command processing. If {@link #isSuccess()} return {@code true}, this
* method returns {@code null}.
*
* @param serializer The serializer to deserialize the result with
* @return The exception thrown during command processing
*/
public Throwable getError(Serializer serializer) {
return deserializeException(serializer);
}

/**
* Returns the identifier of the command for which this message is a reply.
*
Expand All @@ -123,16 +101,6 @@ public String getCommandIdentifier() {
return commandIdentifier;
}

/**
* Whether the reply message represents a successfully executed command. In this case, successful means that the
* command's execution did not result in an exception.
*
* @return {@code true} if this reply contains a return value, {@code false} if it contains an error.
*/
public boolean isSuccess() {
return success;
}

/**
* Returns the payload type of the serialized reply message.
*
Expand Down Expand Up @@ -172,7 +140,6 @@ public byte[] getSerializedMetaData() {
@Override
public int hashCode() {
return Objects.hash(commandIdentifier,
success,
payloadType,
payloadRevision,
serializedPayload,
Expand All @@ -192,7 +159,6 @@ public boolean equals(Object obj) {
}
final ReplyMessage other = (ReplyMessage) obj;
return Objects.equals(this.commandIdentifier, other.commandIdentifier)
&& Objects.equals(this.success, other.success)
&& Objects.equals(this.payloadType, other.payloadType)
&& Objects.equals(this.payloadRevision, other.payloadRevision)
&& Objects.deepEquals(this.serializedPayload, other.serializedPayload)
Expand All @@ -206,7 +172,6 @@ public boolean equals(Object obj) {
public String toString() {
return "ReplyMessage{" +
"commandIdentifier='" + commandIdentifier + '\'' +
", success=" + success +
", payloadType='" + payloadType + '\'' +
", payloadRevision='" + payloadRevision + '\'' +
", serializedPayload=" + Arrays.toString(serializedPayload) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.junit.*;
import org.mockito.*;
import org.mockito.invocation.*;
import org.mockito.stubbing.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,8 @@ private void processReplyMessage(JGroupsReplyMessage message) {
logger.warn("Received a callback for a message that has either already received a callback, "
+ "or which was not sent through this node. Ignoring.");
} else {
if (message.isSuccess()) {
//noinspection unchecked
callbackWrapper.reportResult(message.getCommandResultMessage(serializer));
} else {
Throwable exception = getOrDefault(message.getError(serializer), new IllegalStateException(
format("Unknown execution failure for command [%s]", message.getCommandIdentifier())));
callbackWrapper.reportResult(asCommandResultMessage(exception));
}
//noinspection unchecked
callbackWrapper.reportResult(message.getCommandResultMessage(serializer));
}
}

Expand All @@ -322,14 +316,13 @@ private <C, R> void processDispatchMessage(Message msg, JGroupsDispatchMessage m
}

private <R> void sendReply(Address address, String commandIdentifier, CommandResultMessage<R> commandResultMessage) {
boolean success = !commandResultMessage.isExceptional();
Object reply;
try {
reply = new JGroupsReplyMessage(commandIdentifier, success, commandResultMessage, serializer);
reply = new JGroupsReplyMessage(commandIdentifier, commandResultMessage, serializer);
} catch (Exception e) {
logger.warn(String.format("Could not serialize command reply [%s]. Sending back NULL.",
commandResultMessage), e);
reply = new JGroupsReplyMessage(commandIdentifier, success, asCommandResultMessage((R) null), serializer);
reply = new JGroupsReplyMessage(commandIdentifier, asCommandResultMessage(e), serializer);
}
try {
channel.send(address, reply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,21 @@ public JGroupsReplyMessage() {

/**
* Initializes a JGroupsReplyMessage containing a reply to the command with given {commandIdentifier} and given
* {@code commandResultMessage}. The parameter {@code success} determines whether the was executed successfully or
* not.
* {@code commandResultMessage}.
*
* @param commandIdentifier The identifier of the command to which the message is a reply
* @param success Whether or not the command executed successfully or not
* @param commandResultMessage The return value of command process
* the given {@code commandResultMessage} is ignored.
* @param serializer The serializer to serialize the message contents with
*/
public JGroupsReplyMessage(String commandIdentifier, boolean success,
CommandResultMessage<?> commandResultMessage, Serializer serializer) {
super(commandIdentifier, success, commandResultMessage, serializer);
public JGroupsReplyMessage(String commandIdentifier, CommandResultMessage<?> commandResultMessage,
Serializer serializer) {
super(commandIdentifier, commandResultMessage, serializer);
}

@Override
public void writeTo(DataOutput out) throws IOException {
out.writeUTF(commandIdentifier);
out.writeBoolean(success);
out.writeInt(serializedMetaData.length);
out.write(serializedMetaData);
write(out, payloadType, payloadRevision, serializedPayload);
Expand All @@ -71,7 +68,6 @@ public void writeTo(DataOutput out) throws IOException {
@Override
public void readFrom(DataInput in) throws IOException {
commandIdentifier = in.readUTF();
success = in.readBoolean();
serializedMetaData = new byte[in.readInt()];
in.readFully(serializedMetaData);
payloadType = in.readUTF();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void testConnectAndDispatchMessages_SingleCandidate() throws Exception {
}

@Test
public void testUnserializableResponseConvertedToNull() throws Exception {
public void testUnserializableResponseReportedAsExceptional() throws Exception {
serializer = spy(XStreamSerializer.builder().build());
Object successResponse = new Object();
Exception failureResponse = new MockException("This cannot be serialized");
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testUnserializableResponseConvertedToNull() throws Exception {

callback = new FutureCallback<>();
distributedCommandBus1.dispatch(new GenericCommandMessage<>("string"), callback);
assertNull(callback.getResult().getPayload());
assertTrue(callback.getResult().isExceptional());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ public <C, R> void send(Member destination, CommandMessage<C> commandMessage,
executor.execute(() -> {
SpringHttpReplyMessage<R> replyMessage =
this.<C, R>sendRemotely(destination, commandMessage, EXPECT_REPLY).getBody();
if (replyMessage.isSuccess()) {
if (replyMessage != null) {
callback.onResult(commandMessage, replyMessage.getCommandResultMessage(serializer));
} else {
callback.onResult(commandMessage, asCommandResultMessage(replyMessage.getError(serializer)));
}
});
}
Expand Down Expand Up @@ -193,29 +191,28 @@ public <C, R> CompletableFuture<?> receiveCommand(@RequestBody SpringHttpDispatc
return replyFutureCallback;
} catch (Exception e) {
logger.error("Could not dispatch command", e);
return CompletableFuture.completedFuture(createReply(commandMessage, false, e));
return CompletableFuture.completedFuture(createReply(commandMessage, asCommandResultMessage(e)));
}
} else {
try {
localCommandBus.dispatch(commandMessage);
return CompletableFuture.completedFuture("");
} catch (Exception e) {
logger.error("Could not dispatch command", e);
return CompletableFuture.completedFuture(createReply(commandMessage, false, e));
return CompletableFuture.completedFuture(createReply(commandMessage, asCommandResultMessage(e)));
}
}
}

private SpringHttpReplyMessage createReply(CommandMessage<?> commandMessage, boolean success, Object result) {
private SpringHttpReplyMessage createReply(CommandMessage<?> commandMessage,
CommandResultMessage<?> commandResultMessage) {
try {
return new SpringHttpReplyMessage<>(commandMessage.getIdentifier(),
success,
asCommandResultMessage(result),
commandResultMessage,
serializer);
} catch (Exception e) {
logger.warn("Could not serialize command reply [{}]. Sending back NULL.", result, e);
logger.warn("Could not serialize command reply [{}]. Sending back NULL.", commandResultMessage, e);
return new SpringHttpReplyMessage<>(commandMessage.getIdentifier(),
success,
asCommandResultMessage(e),
serializer);
}
Expand All @@ -233,7 +230,7 @@ public class SpringHttpReplyFutureCallback<C, R> extends CompletableFuture<Sprin
@Override
public void onResult(CommandMessage<? extends C> commandMessage,
CommandResultMessage<? extends R> commandResultMessage) {
super.complete(createReply(commandMessage, !commandResultMessage.isExceptional(), commandResultMessage));
super.complete(createReply(commandMessage, commandResultMessage));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,16 @@ public class SpringHttpReplyMessage<R> extends ReplyMessage implements Serializa

/**
* Initializes a SpringHttpReplyMessage containing a reply to the command with given {commandIdentifier} and given
* {@code commandResultMessage}. The parameter {@code success} determines whether the was executed successfully or
* not.
* {@code commandResultMessage}.
*
* @param commandIdentifier The identifier of the command to which the message is a reply
* @param success Whether or not the command executed successfully or not
* @param commandResultMessage The return value of command process
* the given {@code commandResultMessage} is ignored.
* @param serializer The serializer to serialize the message contents with
*/
public SpringHttpReplyMessage(String commandIdentifier, boolean success,
CommandResultMessage<R> commandResultMessage, Serializer serializer) {
super(commandIdentifier, success, commandResultMessage, serializer);
public SpringHttpReplyMessage(String commandIdentifier, CommandResultMessage<R> commandResultMessage,
Serializer serializer) {
super(commandIdentifier, commandResultMessage, serializer);
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSendWithoutCallbackThrowsExceptionForMissingDestinationURI() {
public void testSendWithCallbackSucceedsAndReturnsSucceeded() {
HttpEntity<SpringHttpDispatchMessage> expectedHttpEntity = new HttpEntity<>(buildDispatchMessage(true));
SpringHttpReplyMessage<String> testReplyMessage =
new SpringHttpReplyMessage<>(COMMAND_MESSAGE.getIdentifier(), true, COMMAND_RESULT, serializer);
new SpringHttpReplyMessage<>(COMMAND_MESSAGE.getIdentifier(), COMMAND_RESULT, serializer);
ResponseEntity<SpringHttpReplyMessage<String>> testResponseEntity =
new ResponseEntity<>(testReplyMessage, HttpStatus.OK);
when(restTemplate.exchange(eq(expectedUri),
Expand Down Expand Up @@ -165,7 +165,6 @@ public void testSendWithCallbackSucceedsAndReturnsFailed() {
HttpEntity<SpringHttpDispatchMessage> expectedHttpEntity = new HttpEntity<>(buildDispatchMessage(true));
SpringHttpReplyMessage<String> testReplyMessage =
new SpringHttpReplyMessage<>(COMMAND_MESSAGE.getIdentifier(),
false,
asCommandResultMessage(COMMAND_ERROR),
serializer);
ResponseEntity<SpringHttpReplyMessage<String>> testResponseEntity =
Expand All @@ -187,10 +186,16 @@ public void testSendWithCallbackSucceedsAndReturnsFailed() {
//noinspection unchecked
ArgumentCaptor<SerializedObject<byte[]>> serializedObjectCaptor =
ArgumentCaptor.forClass(SerializedObject.class);
verify(serializer).deserialize(serializedObjectCaptor.capture());
assertEquals(serializedObject.getType(), serializedObjectCaptor.getValue().getType());
assertEquals(serializedObject.getContentType(), serializedObjectCaptor.getValue().getContentType());
assertTrue(Arrays.equals(serializedObject.getData(), serializedObjectCaptor.getValue().getData()));
verify(serializer, times(3)).deserialize(serializedObjectCaptor.capture());

assertTrue(Arrays.equals("null".getBytes(), serializedObjectCaptor.getAllValues().get(0).getData()));

assertEquals(serializedObject.getType(), serializedObjectCaptor.getAllValues().get(1).getType());
assertEquals(serializedObject.getContentType(), serializedObjectCaptor.getAllValues().get(1).getContentType());
assertTrue(Arrays.equals(serializedObject.getData(), serializedObjectCaptor.getAllValues().get(1).getData()));

assertTrue(Arrays.equals("{}".getBytes(), serializedObjectCaptor.getAllValues().get(2).getData()));

ArgumentCaptor<CommandResultMessage<String>> commandResultMessageCaptor = ArgumentCaptor.forClass(
CommandResultMessage.class);
verify(commandCallback).onResult(eq(COMMAND_MESSAGE), commandResultMessageCaptor.capture());
Expand Down Expand Up @@ -227,8 +232,8 @@ public void testReceiveCommandHandlesCommandWithCallbackSucceedsAndCallsSuccess(
(SpringHttpReplyMessage) testSubject.receiveCommand(buildDispatchMessage(true)).get();

assertEquals(COMMAND_MESSAGE.getIdentifier(), result.getCommandIdentifier());
assertTrue(result.isSuccess());
CommandResultMessage commandResultMessage = result.getCommandResultMessage(serializer);
assertFalse(commandResultMessage.isExceptional());
assertEquals(COMMAND_RESULT.getPayload(), commandResultMessage.getPayload());
assertEquals(COMMAND_RESULT.getMetaData(), commandResultMessage.getMetaData());

Expand All @@ -247,9 +252,10 @@ public void testReceiveCommandHandlesCommandWithCallbackSucceedsAndCallsFailure(
SpringHttpReplyMessage result =
(SpringHttpReplyMessage) testSubject.receiveCommand(buildDispatchMessage(true)).get();

CommandResultMessage commandResultMessage = result.getCommandResultMessage(serializer);
assertEquals(COMMAND_MESSAGE.getIdentifier(), result.getCommandIdentifier());
assertFalse(result.isSuccess());
assertEquals(COMMAND_ERROR.getMessage(), result.getError(serializer).getMessage());
assertTrue(commandResultMessage.isExceptional());
assertEquals(COMMAND_ERROR.getMessage(), commandResultMessage.getExceptionResult().getMessage());

verify(localCommandBus).dispatch(any(), any());
}
Expand All @@ -261,8 +267,9 @@ public void testReceiveCommandHandlesCommandWithCallbackFails() throws Exception
SpringHttpReplyMessage result =
(SpringHttpReplyMessage) testSubject.receiveCommand(buildDispatchMessage(true)).get();

CommandResultMessage commandResultMessage = result.getCommandResultMessage(serializer);
assertEquals(COMMAND_MESSAGE.getIdentifier(), result.getCommandIdentifier());
assertFalse(result.isSuccess());
assertTrue(commandResultMessage.isExceptional());

verify(localCommandBus).dispatch(any(), any());
}
Expand All @@ -283,8 +290,9 @@ public void testReceiveCommandHandlesCommandWithoutCallbackThrowsException() thr
SpringHttpReplyMessage result =
(SpringHttpReplyMessage) testSubject.receiveCommand(buildDispatchMessage(false)).get();

CommandResultMessage commandResultMessage = result.getCommandResultMessage(serializer);
assertEquals(COMMAND_MESSAGE.getIdentifier(), result.getCommandIdentifier());
assertFalse(result.isSuccess());
assertTrue(commandResultMessage.isExceptional());

verify(localCommandBus).dispatch(any());
}
Expand Down

0 comments on commit 499cb41

Please sign in to comment.