Permalink
Browse files

ns

  • Loading branch information...
1 parent 434760c commit 0f7af0c66fbdffe40501bf205c61c92cfd5fe6f4 @belaban committed Mar 7, 2013
View
4 src/org/jgroups/JChannel.java
@@ -232,8 +232,8 @@ public void setProtocolStack(ProtocolStack stack) {
public void enableStats(boolean stats) {this.stats=stats;}
- @Deprecated @ManagedAttribute public boolean isOpen() {return !(state == State.CLOSED);}
- @Deprecated @ManagedAttribute public boolean isConnected() {return state == State.CONNECTED;}
+ @ManagedAttribute public boolean isOpen() {return !(state == State.CLOSED);}
+ @ManagedAttribute public boolean isConnected() {return state == State.CONNECTED;}
@ManagedOperation
public void resetStats() {sent_msgs=received_msgs=sent_bytes=received_bytes=0;}
View
12 src/org/jgroups/protocols/PRIO.java
@@ -8,7 +8,6 @@
import org.jgroups.util.MessageBatch;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
/**
@@ -118,15 +117,14 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
- for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
- Message message=it.next();
- if(message.isFlagSet(Message.Flag.OOB))
+ for(Message msg: batch) {
+ if(msg.isFlagSet(Message.Flag.OOB))
continue;
- PrioHeader hdr=(PrioHeader)message.getHeader(id);
+ PrioHeader hdr=(PrioHeader)msg.getHeader(id);
if(hdr != null) {
log.trace( "Adding priority message " + hdr.getPriority() + " to UP queue" );
- upMessageQueue.add( new PriorityMessage( new Event(Event.MSG, message), hdr.getPriority() ) );
- it.remove(); // sent up by UpMessageThread; we don't need to send it up, too
+ upMessageQueue.add( new PriorityMessage( new Event(Event.MSG, msg), hdr.getPriority() ) );
+ batch.remove(msg); // sent up by UpMessageThread; we don't need to send it up, too
}
}
View
321 tests/junit/org/jgroups/tests/CloseTest.java
@@ -15,280 +15,207 @@
* Demos the creation of a channel and subsequent connection and closing. Demo application should exit (no
* more threads running)
*/
-@Test(groups=Global.STACK_DEPENDENT, sequential=false)
+@Test(groups=Global.STACK_DEPENDENT,sequential=true)
public class CloseTest extends ChannelTestBase {
- private final ThreadLocal<JChannel> ch=new ThreadLocal<JChannel>(),
- channel1=new ThreadLocal<JChannel>(),
- c1=new ThreadLocal<JChannel>(),
- c2=new ThreadLocal<JChannel>(),
- c3=new ThreadLocal<JChannel>();
-
-
- @AfterMethod
- void tearDown() throws Exception {
- closeChannel(ch);
- closeChannel(channel1);
- closeChannel(c1);
- closeChannel(c2);
- closeChannel(c3);
- }
+ protected JChannel a, b, c;
- protected boolean useBlocking() {
- return false;
- }
+ @AfterMethod void tearDown() throws Exception {Util.close(c,b,a);}
+ protected boolean useBlocking() {return false;}
- private static void closeChannel(ThreadLocal<JChannel> local) {
- Channel c=local.get();
- if(c != null && (c.isOpen() || c.isConnected())) {
- c.close();
- }
- local.set(null);
- }
public void testDoubleClose() throws Exception {
- System.out.println("-- creating channel1 --");
- channel1.set(createChannel(true));
- System.out.println("-- connecting channel1 --");
- channel1.get().connect(getUniqueClusterName("CloseTest.testDoubleClose"));
-
- assertTrue("channel open", channel1.get().isOpen());
- assertTrue("channel connected", channel1.get().isConnected());
-
- System.out.println("-- closing channel1 --");
- channel1.get().close();
- System.out.println("-- closing channel1 (again) --");
- channel1.get().close();
- assertFalse("channel not connected", channel1.get().isConnected());
+ a=createChannel(true, 1, "A");
+ a.connect("CloseTest.testDoubleClose");
+ assert a.isOpen();
+ assert a.isConnected();
+ Util.close(a);
+ Util.close(a);
+ assert !a.isConnected();
}
public void testCreationAndClose() throws Exception {
- System.out.println("-- creating channel1 --");
- ch.set(createChannel(true));
- ch.get().connect(getUniqueClusterName("CloseTest.testCreationAndClose"));
- assertTrue("channel open", ch.get().isOpen());
- assertTrue("channel connected", ch.get().isConnected());
- ch.get().close();
- assertFalse("channel not connected", ch.get().isConnected());
- ch.get().close();
+ a=createChannel(true, 1, "A");
+ a.connect("CloseTest.testCreationAndClose");
+ assert a.isOpen();
+ Util.close(a);
+ assert !a.isConnected();
}
public void testViewChangeReceptionOnChannelCloseByParticipant() throws Exception {
- Address a1, a2;
List<Address> members;
- MyReceiver r1=new MyReceiver(), r2=new MyReceiver();
+ MyReceiver r1=new MyReceiver(), r2=new MyReceiver();
+ Address a_addr, b_addr;
- c1.set(createChannel(true));
- c1.get().setReceiver(r1);
- System.out.println("-- connecting c1");
+ a=createChannel(true, 2, "A");
+ a.setReceiver(r1);
final String GROUP="CloseTest.testViewChangeReceptionOnChannelCloseByParticipant";
- c1.get().connect(GROUP);
- Util.sleep(500); // time to receive its own view
- System.out.println("c1: " + r1.getViews());
- a1=c1.get().getAddress();
- c2.set(createChannel(c1.get()));
- c2.get().setReceiver(r2);
- System.out.println("-- connecting c2");
+ a.connect(GROUP);
+ System.out.println("A: " + r1.getViews());
+ b=createChannel(a, "B");
+ b.setReceiver(r2);
r1.clearViews();
- c2.get().connect(GROUP);
- Util.sleep(500); // time to receive its own view
- a2=c2.get().getAddress();
- System.out.println("c2: " + r2.getViews());
-
- System.out.println("-- closing c2");
- c2.get().close();
- Util.sleep(500);
+ b.connect(GROUP);
+ Util.waitUntilAllChannelsHaveSameSize(10000,1000,a,b);
+ a_addr=a.getAddress();
+ b_addr=b.getAddress();
+
+ Util.close(b);
+ Util.waitUntilAllChannelsHaveSameSize(5000, 500, a);
View v=r1.getViews().get(0);
members=v.getMembers();
System.out.println("-- first view of c1: " + v);
Assert.assertEquals(2, members.size());
- assertTrue(members.contains(a1));
- assertTrue(members.contains(a2));
+ assertTrue(members.contains(a_addr));
+ assertTrue(members.contains(b_addr));
v=r1.getViews().get(1);
members=v.getMembers();
System.out.println("-- second view of c1: " + v);
assert 1 == members.size();
- assert members.contains(a1);
- assert !members.contains(a2);
+ assert members.contains(a_addr);
+ assert !members.contains(b_addr);
}
public void testViewChangeReceptionOnChannelCloseByCoordinator() throws Exception {
- Address a1, a2;
List<Address> members;
- MyReceiver r1=new MyReceiver(), r2=new MyReceiver();
-
- final String GROUP=getUniqueClusterName("CloseTest.testViewChangeReceptionOnChannelCloseByCoordinator");
- c1.set(createChannel(true));
- c1.get().setReceiver(r1);
- c1.get().connect(GROUP);
- Util.sleep(500); // time to receive its own view
- a1=c1.get().getAddress();
- c2.set(createChannel(c1.get()));
- c2.get().setReceiver(r2);
- c2.get().connect(GROUP);
- Util.sleep(500); // time to receive its own view
- a2=c2.get().getAddress();
+ MyReceiver r1=new MyReceiver(), r2=new MyReceiver();
+ Address a_addr, b_addr;
+
+ final String GROUP="CloseTest.testViewChangeReceptionOnChannelCloseByCoordinator";
+ a=createChannel(true, 2, "A");
+ a.setReceiver(r1);
+ a.connect(GROUP);
+ b=createChannel(a, "B");
+ b.setReceiver(r2);
+ b.connect(GROUP);
+ Util.waitUntilAllChannelsHaveSameSize(10000, 1000, a, b);
+ a_addr=a.getAddress();
+ b_addr=b.getAddress();
View v=r2.getViews().get(0);
members=v.getMembers();
assert 2 == members.size();
- assert members.contains(a2);
+ assert members.contains(a.getAddress());
r2.clearViews();
- c1.get().close();
- Util.sleep(1000);
+ Util.close(b);
+ Util.waitUntilAllChannelsHaveSameSize(5000, 500, a);
- v=r2.getViews().get(0);
+ v=r1.getViews().get(r1.getViews().size() -1);
members=v.getMembers();
assert 1 == members.size();
- assert !members.contains(a1);
- assert members.contains(a2);
+ assert members.contains(a_addr);
+ assert !members.contains(b_addr);
}
public void testConnectDisconnectConnectCloseSequence() throws Exception {
- System.out.println("-- creating channel --");
- ch.set(createChannel(true));
+ a=createChannel(true, 1, "A");
- System.out.println("-- connecting channel to CloseTest--");
- ch.get().connect("CloseTest.testConnectDisconnectConnectCloseSequence-CloseTest");
- System.out.println("view is " + ch.get().getView());
+ a.connect("CloseTest.testConnectDisconnectConnectCloseSequence-CloseTest");
+ System.out.println("view is " + a.getView());
System.out.println("-- disconnecting channel --");
- ch.get().disconnect();
+ a.disconnect();
- Util.sleep(500);
System.out.println("-- connecting channel to OtherGroup --");
- ch.get().connect("CloseTest.testConnectDisconnectConnectCloseSequence-OtherGroup");
- System.out.println("view is " + ch.get().getView());
-
- System.out.println("-- closing channel --");
- ch.get().close();
+ a.connect("CloseTest.testConnectDisconnectConnectCloseSequence-OtherGroup");
+ System.out.println("view is " + a.getView());
}
public void testConnectCloseSequenceWith2Members() throws Exception {
- System.out.println("-- creating channel --");
- ch.set(createChannel(true));
- System.out.println("-- connecting channel --");
- final String GROUP=getUniqueClusterName("CloseTest.testConnectCloseSequenceWith2Members");
- ch.get().connect(GROUP);
- System.out.println("view is " + ch.get().getView());
-
- System.out.println("-- creating channel1 --");
- channel1.set(createChannel(ch.get()));
- System.out.println("-- connecting channel1 --");
- channel1.get().connect(GROUP);
- System.out.println("view is " + channel1.get().getView());
-
- System.out.println("-- closing channel1 --");
- channel1.get().close();
-
- Util.sleep(2000);
- System.out.println("-- closing channel --");
- ch.get().close();
+ a=createChannel(true, 2, "A");
+ final String GROUP="CloseTest.testConnectCloseSequenceWith2Members";
+ a.connect(GROUP);
+
+ b=createChannel(a, "B");
+ b.connect(GROUP);
+ Util.waitUntilAllChannelsHaveSameSize(10000,1000,a,b);
+ System.out.println("view is " + b.getView());
}
public void testCreationAndClose2() throws Exception {
- System.out.println("-- creating channel2 --");
- ch.set(createChannel(true));
- System.out.println("-- connecting channel2 --");
- ch.get().connect(getUniqueClusterName("CloseTest.testCreationAndClose2"));
- System.out.println("-- closing channel --");
- ch.get().close();
+ a=createChannel(true, 1, "A");
+ a.connect("CloseTest.testCreationAndClose2");
}
public void testClosedChannel() throws Exception {
- System.out.println("-- creating channel --");
- ch.set(createChannel(true));
- System.out.println("-- connecting channel --");
- ch.get().connect(getUniqueClusterName("CloseTest.testClosedChannel"));
- System.out.println("-- closing channel --");
- ch.get().close();
+ a=createChannel(true, 1, "A");
+ a.connect("CloseTest.testClosedChannel");
+ Util.close(a);
Util.sleep(2000);
-
try {
- ch.get().connect(getUniqueClusterName("CloseTest.testClosedChannel"));
+ a.connect("CloseTest.testClosedChannel");
assert false;
}
catch(IllegalStateException ex) {
- assertTrue(true);
}
}
public void testMultipleConnectsAndDisconnects() throws Exception {
- c1.set(createChannel(true));
- assertTrue(c1.get().isOpen());
- assertFalse(c1.get().isConnected());
- final String GROUP=getUniqueClusterName("CloseTest.testMultipleConnectsAndDisconnects");
- c1.get().connect(GROUP);
- System.out.println("view after c1.connect(): " + c1.get().getView());
- assertTrue(c1.get().isOpen());
- assertTrue(c1.get().isConnected());
- assertServiceAndClusterView(c1.get(), 1);
-
- c2.set(createChannel(c1.get()));
- assertTrue(c2.get().isOpen());
- assertFalse(c2.get().isConnected());
-
- c2.get().connect(GROUP);
- System.out.println("view after c2.connect(): " + c2.get().getView());
- assertTrue(c2.get().isOpen());
- assertTrue(c2.get().isConnected());
- assertServiceAndClusterView(c2.get(), 2);
- Util.sleep(500);
- assertServiceAndClusterView(c1.get(), 2);
-
- c2.get().disconnect();
- System.out.println("view after c2.disconnect(): " + c2.get().getView());
- assertTrue(c2.get().isOpen());
- assertFalse(c2.get().isConnected());
- Util.sleep(500);
- assertServiceAndClusterView(c1.get(), 1);
-
- c2.get().connect(GROUP);
- System.out.println("view after c2.connect(): " + c2.get().getView());
- assertTrue(c2.get().isOpen());
- assertTrue(c2.get().isConnected());
- assertServiceAndClusterView(c2.get(), 2);
- Util.sleep(300);
- assertServiceAndClusterView(c1.get(), 2);
+ a=createChannel(true, 3, "A");
+ assert a.isOpen();
+ assert !a.isConnected();
+ final String GROUP="CloseTest.testMultipleConnectsAndDisconnects";
+ a.connect(GROUP);
+ assert a.isConnected();
+ assertView(a, 1);
+
+ b=createChannel(a, "B");
+ assert b.isOpen();
+ assert !b.isConnected();
+
+ b.connect(GROUP);
+ assert b.isConnected();
+ Util.waitUntilAllChannelsHaveSameSize(10000, 1000, a, b);
+ assertView(b, 2);
+ assertView(a, 2);
+
+ b.disconnect();
+ assert b.isOpen();
+ assert !b.isConnected();
+ Util.waitUntilAllChannelsHaveSameSize(5000, 500, a);
+ assertView(a, 1);
+
+ b.connect(GROUP);
+ assert b.isConnected();
+ Util.waitUntilAllChannelsHaveSameSize(10000, 1000, a, b);
+ assertView(b, 2);
+ assertView(a, 2);
// Now see what happens if we reconnect the first channel
- c3.set(createChannel(c1.get()));
- assertTrue(c3.get().isOpen());
- assertFalse(c3.get().isConnected());
- assertServiceAndClusterView(c1.get(), 2);
- assertServiceAndClusterView(c2.get(), 2);
-
- c1.get().disconnect();
- Util.sleep(1000);
- assertTrue(c1.get().isOpen());
- assertFalse(c1.get().isConnected());
- assertServiceAndClusterView(c2.get(), 1);
- assertTrue(c3.get().isOpen());
- assertFalse(c3.get().isConnected());
-
- c1.get().connect(GROUP);
- System.out.println("view after c1.connect(): " + c1.get().getView());
- assertTrue(c1.get().isOpen());
- assertTrue(c1.get().isConnected());
- assertServiceAndClusterView(c1.get(), 2);
- Util.sleep(500);
- assertServiceAndClusterView(c2.get(), 2);
- assertTrue(c3.get().isOpen());
- assertFalse(c3.get().isConnected());
+ c=createChannel(a, "C");
+ assert c.isOpen();
+ assert !c.isConnected();
+ assertView(a, 2);
+ assertView(b, 2);
+
+ a.disconnect();
+ assert a.isOpen();
+ assert !a.isConnected();
+ Util.waitUntilAllChannelsHaveSameSize(5000, 500, b);
+ assertView(b, 1);
+ assert c.isOpen();
+ assert !c.isConnected();
+
+ a.connect(GROUP);
+ assert a.isOpen();
+ assert a.isConnected();
+ Util.waitUntilAllChannelsHaveSameSize(5000, 500, a, b);
+ assertView(a, 2);
+ assertView(b,2);
}
- private static void assertServiceAndClusterView(Channel ch, int num) {
+ private static void assertView(Channel ch, int num) {
View view=ch.getView();
String msg="view=" + view;
assertNotNull(view);
View
7 tests/junit/org/jgroups/tests/PrioTest.java
@@ -57,7 +57,10 @@ public void testPrioritizedMessages() throws Exception {
barrier.await(); // starts the senders
for(PrioSender sender: senders)
- sender.join();
+ sender.join(10000);
+ for(PrioSender sender: senders)
+ if(sender.isAlive())
+ System.err.println("sender " + sender + " is still alive");
List<Integer> list1=r1.getMsgs(), list2=r2.getMsgs();
for(int i=0; i < 20; i++) {
@@ -132,7 +135,7 @@ public PrioSender(JChannel ch, byte prio, CyclicBarrier barrier) {
}
public void run() {
- Message msg=new Message(null, null, new Integer(prio));
+ Message msg=new Message(null, null,(int)prio);
PrioHeader hdr=new PrioHeader(prio);
msg.putHeader(PRIO_ID, hdr);
try {

0 comments on commit 0f7af0c

Please sign in to comment.