Skip to content

Commit

Permalink
AXON-335 : Prevent CommandGateway.sendAndWait() from hanging indefini…
Browse files Browse the repository at this point in the history
…tely in certain specific cases
  • Loading branch information
srideepprasad committed May 31, 2015
1 parent 6d45d27 commit cd59915
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2010-2015. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.commandhandling.distributed.jgroups;

import org.axonframework.common.AxonException;

/**
* Exception indicating that a failure occured during processing of a command response. Typically this would imply an unserializable command response / exception message
*
* @author Srideep Prasad
*/

public class CommandResponseProcessingFailedException extends AxonException{

/**
* Initializes the exception using the given <code>message</code>.
*
* @param message The message describing the exception
*/
public CommandResponseProcessingFailedException(String message) {
super(message);
}

/**
* Initializes the exception using the given <code>message</code> and <code>cause</code>.
*
* @param message The message describing the exception
* @param cause The underlying cause of the exception
*/
public CommandResponseProcessingFailedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.axonframework.commandhandling.distributed.CommandDispatchException;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.RemoteCommandHandlingException;
import org.axonframework.commandhandling.distributed.jgroups.support.callbacks.MemberAwareCommandCallback;
import org.axonframework.commandhandling.distributed.jgroups.support.callbacks.ReplyingCallback;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.serializer.MessageSerializer;
import org.axonframework.serializer.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,17 +39,8 @@
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -413,7 +402,7 @@ private void processDispatchMessage(final Message msg, final DispatchMessage mes
try {
final CommandMessage commandMessage = message.getCommandMessage(serializer);
if (message.isExpectReply()) {
localSegment.dispatch(commandMessage, new ReplyingCallback(msg, commandMessage));
localSegment.dispatch(commandMessage, new ReplyingCallback(channel,msg, commandMessage,serializer));
} else {
localSegment.dispatch(commandMessage);
}
Expand Down Expand Up @@ -465,41 +454,6 @@ private void processReplyMessage(ReplyMessage replyMessage) {
}
}

private class ReplyingCallback implements CommandCallback<Object> {

private final Message msg;
private final CommandMessage commandMessage;

public ReplyingCallback(Message msg, CommandMessage commandMessage) {
this.msg = msg;
this.commandMessage = commandMessage;
}

@Override
public void onSuccess(Object result) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
result,
null, serializer));
} catch (Exception e) {
logger.error("Unable to send reply to command [name: {}, id: {}]. ",
new Object[]{commandMessage.getCommandName(),
commandMessage.getIdentifier(),
e});
}
}

@Override
public void onFailure(Throwable cause) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
null,
cause, serializer));
} catch (Exception e) {
logger.error("Unable to send reply:", e);
}
}
}
}

private List<String> getMemberNames(View view) {
Expand Down Expand Up @@ -533,31 +487,6 @@ public boolean isJoined() {
}
}

private static class MemberAwareCommandCallback<R> implements CommandCallback<R> {

private final Address dest;
private final CommandCallback<R> callback;

public MemberAwareCommandCallback(Address dest, CommandCallback<R> callback) {
this.dest = dest;
this.callback = callback;
}

public boolean isMemberLive(View currentView) {
return currentView.containsMember(dest);
}

@Override
public void onSuccess(R result) {
callback.onSuccess(result);
}

@Override
public void onFailure(Throwable cause) {
callback.onFailure(cause);
}
}

private static interface UpdateFunction {

ConsistentHash update(ConsistentHash hash);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2010-2015. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.distributed.jgroups.support.callbacks;


import org.axonframework.commandhandling.CommandCallback;
import org.jgroups.Address;
import org.jgroups.View;

/**
* Internal class used used by JGroupsConnector. For internal use only. Pulled outside to allow for seamless unit testing
*
* @author Allard Buijze
* @since 2.0
*/
public class MemberAwareCommandCallback<R> implements CommandCallback<R> {

private final Address dest;
private final CommandCallback<R> callback;

public MemberAwareCommandCallback(Address dest, CommandCallback<R> callback) {
this.dest = dest;
this.callback = callback;
}

public boolean isMemberLive(View currentView) {
return currentView.containsMember(dest);
}

@Override
public void onSuccess(R result) {
callback.onSuccess(result);
}

@Override
public void onFailure(Throwable cause) {
callback.onFailure(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2010-2015. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.distributed.jgroups.support.callbacks;

import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.distributed.jgroups.CommandResponseProcessingFailedException;
import org.axonframework.commandhandling.distributed.jgroups.ReplyMessage;
import org.axonframework.serializer.Serializer;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Internal class used used by JGroupsConnector. For internal use only. Pulled outside to allow for seamless unit testing
*
* @author Allard Buijze
* @since 2.0
*/
public class ReplyingCallback implements CommandCallback<Object> {

private final Message msg;
private final CommandMessage commandMessage;
private final JChannel channel;
private final Serializer serializer;

private static final Logger logger = LoggerFactory.getLogger(ReplyingCallback.class);

public ReplyingCallback(JChannel channel, Message msg, CommandMessage commandMessage, Serializer serializer) {
this.msg = msg;
this.commandMessage = commandMessage;
this.channel = channel;
this.serializer = serializer;
}

@Override
public void onSuccess(Object result) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
result,
null, serializer));
} catch (Exception e) {
logger.error("Unable to send reply to command [name: {}, id: {}]. ",
new Object[]{commandMessage.getCommandName(),
commandMessage.getIdentifier(),
e});
throw new CommandResponseProcessingFailedException(String.format("An error occurred while attempting to process command response of type : %s, Exception Message: %s", result.getClass().getName(), e.getMessage()),e);
}
}

@Override
public void onFailure(Throwable cause) {
try {
channel.send(msg.getSrc(), new ReplyMessage(commandMessage.getIdentifier(),
null,
cause, serializer));
} catch (Exception e) {
logger.error("Unable to send reply:", e);
//Not capturing the causative exception while throwing - the causative exception may not be serializable and this may cause the commandbus to hangup.
throw new CommandResponseProcessingFailedException(String.format("An error occurred while attempting to process command exception response of type : %s, Exception Message:: %s", e.getClass().getName(), e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@

package org.axonframework.commandhandling.distributed.jgroups;

import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.*;
import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.jgroups.support.callbacks.ReplyingCallback;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.axonframework.unitofwork.UnitOfWork;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.stack.IpAddress;
import org.junit.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -56,6 +54,7 @@ public class JGroupsConnectorTest {
private CommandBus mockCommandBus2;
private String clusterName;
private RecordingHashChangeListener hashChangeListener;
private XStreamSerializer serializer;

@Before
public void setUp() throws Exception {
Expand All @@ -65,9 +64,10 @@ public void setUp() throws Exception {
mockCommandBus2 = spy(new SimpleCommandBus());
clusterName = "test-" + new Random().nextInt(Integer.MAX_VALUE);
hashChangeListener = new RecordingHashChangeListener();
connector1 = new JGroupsConnector(channel1, clusterName, mockCommandBus1, new XStreamSerializer(),
serializer = new XStreamSerializer();
connector1 = new JGroupsConnector(channel1, clusterName, mockCommandBus1, serializer,
hashChangeListener);
connector2 = new JGroupsConnector(channel2, clusterName, mockCommandBus2, new XStreamSerializer());
connector2 = new JGroupsConnector(channel2, clusterName, mockCommandBus2, serializer);
}

@After
Expand All @@ -76,6 +76,24 @@ public void tearDown() {
closeSilently(channel2);
}

@Test
public void testSetupOfReplyingCallback() throws InterruptedException {
final String mockPayload = "DummyString";
final CommandMessage commandMessage = new GenericCommandMessage(mockPayload);

final DispatchMessage dispatchMessage = new DispatchMessage(commandMessage,serializer,true);
final Message message = new Message(channel1.getAddress(),dispatchMessage);

connector1.connect(20);
assertTrue("Expected connector 1 to connect within 10 seconds", connector1.awaitJoined(10, TimeUnit.SECONDS));

channel1.getReceiver().receive(message);

//Verify that the newly introduced ReplyingCallBack class is being wired in. Actual behaviour of ReplyingCallback is tested in its unit tests
verify(mockCommandBus1).dispatch(refEq(commandMessage),any(ReplyingCallback.class));

}

@SuppressWarnings("unchecked")
@Test(timeout = 30000)
public void testConnectAndDispatchMessages_Balanced() throws Exception {
Expand Down
Loading

0 comments on commit cd59915

Please sign in to comment.