Skip to content

Commit

Permalink
Minor modifications
Browse files Browse the repository at this point in the history
Issue #AXON-335 Fixed
  • Loading branch information
abuijze committed Jun 1, 2015
1 parent d974853 commit f289beb
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 37 deletions.
Expand Up @@ -15,15 +15,20 @@
*/
package org.axonframework.commandhandling.distributed.jgroups;

import org.axonframework.common.AxonException;
import org.axonframework.common.AxonNonTransientException;

/**
* Exception indicating that a failure occured during processing of a command response. Typically this would imply an unserializable command response / exception message
* Exception indicating that a failure occurred during processing of a command response. Typically this would imply an
* command response or exception message that could not be serialized.
* <p/>
* Typically, this exception indicates a non-transient exception.
*
* @author Srideep Prasad
* @since 2.4.2
*/
public class CommandResponseProcessingFailedException extends AxonNonTransientException {

public class CommandResponseProcessingFailedException extends AxonException{
private static final long serialVersionUID = -1318148724064577512L;

/**
* Initializes the exception using the given <code>message</code>.
Expand Down
Expand Up @@ -344,7 +344,9 @@ private void processDispatchMessage(final Message msg, final DispatchMessage mes
try {
final CommandMessage commandMessage = message.getCommandMessage(serializer);
if (message.isExpectReply()) {
localSegment.dispatch(commandMessage, new ReplyingCallback(channel,msg, commandMessage,serializer));
localSegment.dispatch(commandMessage, new ReplyingCallback(channel,
msg.getSrc(), commandMessage,serializer
));
} else {
localSegment.dispatch(commandMessage);
}
Expand Down
Expand Up @@ -22,8 +22,10 @@
import org.jgroups.View;

/**
* Internal class used used by JGroupsConnector. For internal use only. Pulled outside to allow for seamless unit testing
* Callback implementation which wraps another callback, and is aware of the JGroups node responsible for providing the
* value to invoke the wrapped callback with.
*
* @param <R> The expected type of return value
* @author Allard Buijze
* @since 2.0
*/
Expand All @@ -32,13 +34,27 @@ public class MemberAwareCommandCallback<R> implements CommandCallback<R> {
private final Address dest;
private final CommandCallback<R> callback;

/**
* Initialize the callback, where the given <code>dest</code> is responsible for providing the value to invoke the
* given <code>callback</code> with.
*
* @param dest The destination of the command of which the result is to be passed to the callback
* @param callback The callback to invoke with the result of the command
*/
public MemberAwareCommandCallback(Address dest, CommandCallback<R> callback) {
this.dest = dest;
this.callback = callback;
}

public boolean isMemberLive(View currentView) {
return currentView.containsMember(dest);
/**
* Indicates whether the node responsible for providing the value to invoke the callback with is still alive in the
* given <code>currentView</code>.
*
* @param view The view containing the currently available nodes in the JGroups cluster
* @return <code>true</code> if the node is live, otherwise <code>false</code>
*/
public boolean isMemberLive(View view) {
return view.containsMember(dest);
}

@Override
Expand Down
Expand Up @@ -21,28 +21,37 @@
import org.axonframework.commandhandling.distributed.jgroups.CommandResponseProcessingFailedException;
import org.axonframework.commandhandling.distributed.jgroups.ReplyMessage;
import org.axonframework.serializer.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Internal class used used by JGroupsConnector. For internal use only. Pulled outside to allow for seamless unit testing
* Callback implementation that forwards the callback invocation as a reply to an incoming message.
*
* @author Allard Buijze
* @since 2.0
*/
public class ReplyingCallback implements CommandCallback<Object> {

private final Message msg;
private final CommandMessage commandMessage;
private final JChannel channel;
private final Serializer serializer;

private static final Logger logger = LoggerFactory.getLogger(ReplyingCallback.class);
private final Address address;

public ReplyingCallback(JChannel channel, Message msg, CommandMessage commandMessage, Serializer serializer) {
this.msg = msg;
/**
* Initialize the callback to send a reply for an incoming <code>commandMessage</code> to given <code>address</code> using the given <code>channel</code>.
* The given <code>serializer</code> is used to serialize the reply message.
*
* @param channel The channel to send the reply on
* @param address The destination for the reply message
* @param commandMessage The incoming command message
* @param serializer The serializer to serialize the reply with
*/
public ReplyingCallback(JChannel channel, Address address, CommandMessage commandMessage, Serializer serializer) {
this.address = address;
this.commandMessage = commandMessage;
this.channel = channel;
this.serializer = serializer;
Expand All @@ -51,28 +60,31 @@ public ReplyingCallback(JChannel channel, Message msg, CommandMessage commandMes
@Override
public void onSuccess(Object result) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
result,
null, serializer));
channel.send(address, new ReplyMessage(commandMessage.getIdentifier(),
result,
null, serializer));
} catch (Exception e) {
logger.error("Unable to send reply to command [name: {}, id: {}]. ",
new Object[]{commandMessage.getCommandName(),
commandMessage.getIdentifier(),
e});
throw new CommandResponseProcessingFailedException(String.format("An error occurred while attempting to process command response of type : %s, Exception Message: %s", result.getClass().getName(), e.getMessage()),e);
commandMessage.getCommandName(), commandMessage.getIdentifier(), e);
throw new CommandResponseProcessingFailedException(String.format(
"An error occurred while attempting to process command response of type : %s, Exception Message: %s",
result.getClass().getName(), e.getMessage()), e);
}
}

@Override
public void onFailure(Throwable cause) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
null,
cause, serializer));
channel.send(address, new ReplyMessage(commandMessage.getIdentifier(),
null,
cause, serializer));
} catch (Exception e) {
logger.error("Unable to send reply:", e);
//Not capturing the causative exception while throwing - the causative exception may not be serializable and this may cause the commandbus to hangup.
throw new CommandResponseProcessingFailedException(String.format("An error occurred while attempting to process command exception response of type : %s, Exception Message:: %s", e.getClass().getName(), e.getMessage()));
//Not capturing the causative exception while throwing - the causative exception may not be serializable and this may cause the command bus to hang.
throw new CommandResponseProcessingFailedException(String.format(
"An error occurred while attempting to process command exception response of type : %s, Exception Message:: %s",
e.getClass().getName(),
e.getMessage()));
}
}
}
Expand Up @@ -22,14 +22,11 @@
import org.axonframework.serializer.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.junit.*;
import org.junit.rules.*;
import org.junit.runner.*;
import org.mockito.*;
import org.mockito.runners.*;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
Expand All @@ -48,8 +45,6 @@ public class ReplyingCallbackTest {
@Mock
private JChannel mockChannel;
@Mock
private Message mockMsg;
@Mock
private CommandMessage mockCommandMsg;
@Mock
private Serializer mockSerializer;
Expand All @@ -63,10 +58,8 @@ public class ReplyingCallbackTest {

@Before
public void setup(){
replyingCallback = new ReplyingCallback(mockChannel,mockMsg,mockCommandMsg,mockSerializer);
replyingCallback = new ReplyingCallback(mockChannel, mockAddr, mockCommandMsg,mockSerializer);
when(mockCommandMsg.getIdentifier()).thenReturn(IDENTIFIER);
when(mockMsg.getSrc()).thenReturn(mockAddr);

}

@Test
Expand Down

0 comments on commit f289beb

Please sign in to comment.