Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- Implementation of call termination in MessageDispatcher when underl…

…ying channel is closed (https://issues.jboss.org/browse/JGRP-1508)

 - Unit test (by Dan Berindei)
  • Loading branch information...
commit 960f40808d6fbca7c949b00f38659a511d558c26 1 parent c0d51f9
Bela Ban authored
24 src/org/jgroups/blocks/GroupRequest.java
View
@@ -261,6 +261,30 @@ public void viewChange(View new_view) {
checkCompletion(this);
}
+ /** Marks all responses with an exception (unless a response was already marked as done) */
+ public void transportClosed() {
+ boolean changed=false;
+
+ lock.lock();
+ try {
+ for(Map.Entry<Address, Rsp<T>> entry: requests.entrySet()) {
+ Rsp<T> rsp=entry.getValue();
+ if(rsp != null && !(rsp.wasReceived() || rsp.wasSuspected() || rsp.wasUnreachable())) {
+ rsp.setException(new IllegalStateException("transport was closed"));
+ num_received++;
+ changed=true;
+ }
+ }
+ if(changed) {
+ completed.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ if(changed)
+ checkCompletion(this);
+ }
/* -------------------- End of Interface RspCollector ----------------------------------- */
6 src/org/jgroups/blocks/MessageDispatcher.java
View
@@ -66,6 +66,7 @@ public MessageDispatcher(Channel channel, MessageListener l, MembershipListener
prot_adapter=new ProtocolAdapter();
if(channel != null) {
local_addr=channel.getAddress();
+ channel.addChannelListener(this);
}
setMessageListener(l);
setMembershipListener(l2);
@@ -621,8 +622,11 @@ public Object up(Event evt) {
public Object down(Event evt) {
- if(channel != null)
+ if(channel != null) {
+ if(evt.getType() == Event.MSG && !channel.isConnected())
+ throw new IllegalStateException("channel is not connected");
return channel.down(evt);
+ }
return null;
}
2  src/org/jgroups/blocks/Request.java
View
@@ -152,7 +152,7 @@ public boolean isDone() {
public String toString() {
StringBuilder ret=new StringBuilder(128);
ret.append(super.toString());
- ret.append("req_id=").append(req_id).append(", mode=" + options.getMode());
+ ret.append(", req_id=").append(req_id).append(", mode=" + options.getMode());
return ret.toString();
}
3  src/org/jgroups/blocks/RequestCorrelator.java
View
@@ -264,6 +264,9 @@ public final void start() {
public void stop() {
started=false;
+ for(RspCollector coll: requests.values())
+ coll.transportClosed();
+ requests.clear();
}
3  src/org/jgroups/blocks/RpcDispatcher.java
View
@@ -20,7 +20,7 @@
* Is the equivalent of RpcProtocol on the application rather than protocol level.
* @author Bela Ban
*/
-public class RpcDispatcher extends MessageDispatcher implements ChannelListener {
+public class RpcDispatcher extends MessageDispatcher {
protected Object server_obj=null;
/** Marshaller to marshall requests at the caller and unmarshal requests at the receiver(s) */
protected Marshaller req_marshaller=null;
@@ -37,7 +37,6 @@ public RpcDispatcher() {
public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj) {
super(channel, l, l2);
- channel.addChannelListener(this);
this.server_obj=server_obj;
}
1  src/org/jgroups/blocks/RspCollector.java
View
@@ -10,4 +10,5 @@
void suspect(Address mbr);
void viewChange(View new_view);
void siteUnreachable(short site);
+ void transportClosed();
}
17 src/org/jgroups/blocks/UnicastRequest.java
View
@@ -163,6 +163,23 @@ public void viewChange(View new_view) {
checkCompletion(this);
}
+ public void transportClosed() {
+ lock.lock();
+ try {
+ if(done)
+ return;
+ if(result != null && !result.wasReceived())
+ result.setException(new IllegalStateException("transport was closed"));
+ done=true;
+ if(corr != null)
+ corr.done(req_id);
+ completed.signalAll();
+ }
+ finally {
+ lock.unlock();
+ }
+ checkCompletion(this);
+ }
/* -------------------- End of Interface RspCollector ----------------------------------- */
198 tests/junit-functional/org/jgroups/tests/MessageDispatcherRSVPTest.java
View
@@ -1,59 +1,43 @@
package org.jgroups.tests;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-
+import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
-import org.jgroups.ReceiverAdapter;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestOptions;
-import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
-import org.jgroups.protocols.DISCARD;
-import org.jgroups.protocols.MERGE2;
-import org.jgroups.protocols.PING;
-import org.jgroups.protocols.RSVP;
-import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.ProtocolStack;
-import org.jgroups.util.DefaultSocketFactory;
-import org.jgroups.util.DefaultThreadFactory;
-import org.jgroups.util.RspList;
-import org.jgroups.util.SocketFactory;
-import org.jgroups.util.ThreadFactory;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.TimeScheduler2;
-import org.jgroups.util.Util;
-import org.testng.Assert;
+import org.jgroups.util.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
- * Tests tthe {@link org.jgroups.protocols.RSVP} protocol
- * @author Bela Ban
+ * Tests the {@link org.jgroups.protocols.RSVP} protocol
+ * @author Dan Berindei
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
public class MessageDispatcherRSVPTest {
- private static final Log log = LogFactory.getLog(MessageDispatcherRSVPTest.class);
- protected static final int NUM=2; // number of members
- protected final JChannel[] channels=new JChannel[NUM];
+ protected static final int NUM=2; // number of members
+ protected final JChannel[] channels=new JChannel[NUM];
protected final MessageDispatcher[] dispatchers=new MessageDispatcher[NUM];
- protected MyDiagnosticsHandler handler;
- protected ThreadPoolExecutor oob_thread_pool;
- protected ThreadPoolExecutor thread_pool;
+ protected MyDiagnosticsHandler handler;
+ protected ThreadPoolExecutor oob_thread_pool;
+ protected ThreadPoolExecutor thread_pool;
@@ -79,12 +63,9 @@ void setUp() throws Exception {
thread_pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- MBeanServer server=Util.getMBeanServer();
-
System.out.print("Connecting channels: ");
for(int i=0; i < NUM; i++) {
SHARED_LOOPBACK shared_loopback=(SHARED_LOOPBACK)new SHARED_LOOPBACK().setValue("enable_bundling", false);
- // UDP shared_loopback=(UDP)new UDP().setValue("enable_bundling", false);
shared_loopback.setLoopback(false);
shared_loopback.setTimer(timer);
shared_loopback.setOOBThreadPool(oob_thread_pool);
@@ -98,17 +79,9 @@ void setUp() throws Exception {
new MERGE2().setValue("min_interval", 1000).setValue("max_interval", 3000),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("discard_delivered_msgs",true)
- .setValue("log_discard_msgs", false).setValue("log_not_found_msgs", false)
- .setValue("xmit_table_num_rows",5)
- .setValue("xmit_table_msgs_per_row", 10),
- // new UNICAST(),
- new UNICAST2().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300)
- .setValue("xmit_table_msgs_per_row",10)
- .setValue("conn_expiry_timeout", 10000)
- .setValue("stable_interval", 30000)
- .setValue("max_bytes", 50000),
- new RSVP().setValue("timeout", 10000),
- // new STABLE().setValue("max_bytes",500000).setValue("desired_avg_gossip", 60000),
+ .setValue("log_discard_msgs", false).setValue("log_not_found_msgs", false),
+ new UNICAST2().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300),
+ new RSVP().setValue("timeout", 10000).setValue("throw_exception_on_timeout", true),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",100)
.setValue("log_view_warnings",false)
@@ -116,11 +89,10 @@ void setUp() throws Exception {
.setValue("log_collect_msgs",false));
channels[i].setName(String.valueOf((i + 1)));
dispatchers[i]=new MessageDispatcher(channels[i], null, null);
- JmxConfigurator.registerChannel(channels[i], server, "channel-" + (i+1), "MessageDispatcherRSVPTest", true);
channels[i].connect("MessageDispatcherRSVPTest");
System.out.print(i + 1 + " ");
if(i == 0)
- Util.sleep(2000);
+ Util.sleep(1000);
}
Util.waitUntilAllChannelsHaveSameSize(30000, 1000, channels);
System.out.println("");
@@ -131,8 +103,6 @@ void tearDown() throws Exception {
for(int i=NUM-1; i >= 0; i--) {
ProtocolStack stack=channels[i].getProtocolStack();
String cluster_name=channels[i].getClusterName();
- if(channels[i].isOpen())
- JmxConfigurator.unregisterChannel(channels[i], Util.getMBeanServer(), "channel-" + (i+1),cluster_name);
stack.stopStack(cluster_name);
stack.destroy();
}
@@ -140,61 +110,104 @@ void tearDown() throws Exception {
}
+ /**
+ * First send a message, drop it (using DISCARD) and then close the channel. The caller invoking castMessage() should
+ * get an exception, as the channel was closed
+ */
public void testCancellationByClosingChannel() throws Exception {
- // test with a unicast message:
- short value=(short)Math.abs((short)Util.random(10000));
- Message msg=new Message(channels[1].getAddress(), null, value);
-
- DISCARD discard=(DISCARD)channels[0].getProtocolStack().findProtocol(DISCARD.class);
- discard.setUpDiscardRate(1);
-
- Thread closer=new Thread() {
- public void run() {
- Util.sleep(2000);
- System.out.println("closer closing channel");
- channels[0].close();
- }
- };
- closer.start();
-
- long nanosStart = System.nanoTime();
- RspList<Object> rsps = dispatchers[0].castMessage(Collections.singleton(channels[1].getAddress()), msg, RequestOptions.SYNC());
- log.debug("Received responses: " + rsps);
+ testCancellationByClosing(false, // multicast
+ new Closer(channels[0]));
+ }
- long nanosEnd = System.nanoTime();
- long seconds = TimeUnit.NANOSECONDS.toSeconds(nanosEnd - nanosStart);
- Assert.assertTrue(seconds < 5);
+ public void testCancellationByClosingChannelUnicast() throws Exception {
+ testCancellationByClosing(true, // unicast
+ new Closer(channels[0]));
}
+
+ /**
+ * Sends a message via the MessageDispatcher on a closed channel. This should immediately throw an exception.
+ */
public void testSendingMessageOnClosedChannel() throws Exception {
- channels[0].close();
+ // unicast
+ sendMessageOnClosedChannel(new Message(channels[1].getAddress(), "bla"));
- short value=(short)Math.abs((short)Util.random(10000));
- Message msg=new Message(channels[1].getAddress(), null, value);
+ // multicast
+ sendMessageOnClosedChannel(new Message(null,"bla"));
+ }
- long nanosStart = System.nanoTime();
- RspList<Object> rsps = dispatchers[0].castMessage(Collections.singleton(channels[1].getAddress()), msg, RequestOptions.SYNC());
- log.debug("Received responses: " + rsps);
+ public void testSendingMessageOnClosedChannelRSVP() throws Exception {
+ // unicast
+ Message msg=new Message(channels[1].getAddress(), null, "bla");
+ msg.setFlag(Message.Flag.RSVP);
+ sendMessageOnClosedChannel(msg);
- long nanosEnd = System.nanoTime();
- long seconds = TimeUnit.NANOSECONDS.toSeconds(nanosEnd - nanosStart);
- Assert.assertTrue(seconds < 2);
+ // multicast
+ msg=new Message(null, "bla");
+ msg.setFlag(Message.Flag.RSVP);
+ sendMessageOnClosedChannel(msg);
}
- public void testSendingMessageOnClosedChannelRSVP() throws Exception {
+ protected void testCancellationByClosing(boolean unicast, Thread closer) throws Exception {
+ DISCARD discard=(DISCARD)channels[0].getProtocolStack().findProtocol(DISCARD.class);
+ discard.setDiscardAll(true);
+
+ try {
+ Address target=unicast? channels[1].getAddress() : null;
+ Message msg=new Message(target, "bla");
+ msg.setFlag(Message.Flag.RSVP);
+ closer.start();
+ if(unicast) {
+ System.out.println("sending unicast message to " + target);
+ dispatchers[0].sendMessage(msg, RequestOptions.SYNC());
+ assert false: "sending the message on a closed channel should have thrown an exception";
+ }
+ else {
+ System.out.println("sending multicast message");
+ RspList<Object> rsps=dispatchers[0].castMessage(Collections.singleton(channels[1].getAddress()),msg,RequestOptions.SYNC());
+ System.out.println("rsps = " + rsps);
+ assert rsps.size() == 1;
+ Rsp<Object> rsp=rsps.iterator().next();
+ System.out.println("rsp = " + rsp);
+ assert rsp.hasException();
+ Throwable ex=rsp.getException();
+ assert ex instanceof IllegalStateException;
+ }
+ }
+ catch(IllegalStateException t) {
+ System.out.println("received \"" + t + "\" as expected");
+ }
+ }
+
+
+ protected void sendMessageOnClosedChannel(Message msg) throws Exception {
channels[0].close();
+ Address target=msg.getDest();
+ try {
+ if(target == null) { // multicast
+ dispatchers[0].castMessage(Collections.singleton(channels[1].getAddress()), msg, RequestOptions.SYNC());
+ }
+ else {
+ dispatchers[0].sendMessage(msg, RequestOptions.SYNC());
+ }
+ assert false: "sending the message on a closed channel should have thrown an exception";
+ }
+ catch(IllegalStateException t) {
+ System.out.println("received \"" + t + "\" as expected");
+ }
+ }
- short value=(short)Math.abs((short)Util.random(10000));
- Message msg=new Message(channels[1].getAddress(), null, value);
- msg.setFlag(Message.Flag.RSVP);
- long nanosStart = System.nanoTime();
- RspList<Object> rsps = dispatchers[0].castMessage(Collections.singleton(channels[1].getAddress()), msg, RequestOptions.SYNC());
- log.debug("Received responses: " + rsps);
+ protected static class Closer extends Thread {
+ protected final JChannel ch;
- long nanosEnd = System.nanoTime();
- long seconds = TimeUnit.NANOSECONDS.toSeconds(nanosEnd - nanosStart);
- Assert.assertTrue(seconds < 2);
+ public Closer(JChannel ch) {this.ch=ch;}
+
+ public void run() {
+ Util.sleep(2000);
+ System.out.println("closing channel");
+ Util.close(ch);
+ };
}
@@ -209,3 +222,4 @@ public void stop() {}
public void destroy() {super.stop();}
}
}
+;
Please sign in to comment.
Something went wrong with that request. Please try again.