Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 5, 2007
1 parent ea22333 commit 6427cb1
Showing 1 changed file with 47 additions and 109 deletions.
156 changes: 47 additions & 109 deletions tests/junit/org/jgroups/tests/FlushTest.java
@@ -1,38 +1,24 @@
package org.jgroups.tests;

import java.io.*;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import junit.framework.Test;
import junit.framework.TestSuite;

import org.jgroups.Address;
import org.jgroups.BlockEvent;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.Event;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.GetStateEvent;
import org.jgroups.JChannel;
import org.jgroups.JChannelFactory;
import org.jgroups.Message;
import org.jgroups.SetStateEvent;
import org.jgroups.UnblockEvent;
import org.jgroups.View;
import org.jgroups.*;
import org.jgroups.mux.MuxChannel;
import org.jgroups.protocols.DISCARD;
import org.jgroups.util.Digest;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;

import java.io.*;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;


/**
* Tests the FLUSH protocol, requires flush-udp.xml in ./conf to be present and configured to use FLUSH
* @author Bela Ban
* @version $Id: FlushTest.java,v 1.48 2007/07/05 11:27:34 belaban Exp $
* @version $Id: FlushTest.java,v 1.49 2007/07/05 11:46:24 belaban Exp $
*/
public class FlushTest extends ChannelTestBase
{
Expand All @@ -46,7 +32,8 @@ public FlushTest(String name)
super(name);
}

JChannel c1, c2, c3, c4;
JChannel c1, c2, c3, c4;
private MyReceiver a, b, c;

static final String CONFIG = "flush-udp.xml";

Expand Down Expand Up @@ -404,27 +391,11 @@ public void testBlockingWithStateTransferAndMultipleServiceMuxChannel()
* </ul>
*/
public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {
c1=createChannel();
c2=createChannel();
c3=createChannel();
MyReceiver a=new MyReceiver(c1, "A"), b=new MyReceiver(c2, "B"), c=new MyReceiver(c3, "C");
c1.setReceiver(a);
c2.setReceiver(b);
c3.setReceiver(c);

c1.connect("x");
c2.connect("x");
c3.connect("x");

View v=c3.getView();
assertEquals("view: " + v, 3, v.size());
createMembers();

insertDISCARD(c2, c3.getLocalAddress());

System.out.println("\nDigests before C sends any messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests before C sends any messages:");

// now C sends 5 messages:
System.out.println("\nC sending 5 messages; B will ignore them, but A and C will receive them");
Expand All @@ -433,10 +404,7 @@ public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {
}
Util.sleep(1000); // until al messages have been received, this is asynchronous so we need to wait a bit

System.out.println("\nDigests after C sent 5 messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests after C sent 5 messages:");

// check C (must have received its own messages)
Map<Address, List<Integer>> map=c.getMsgs();
Expand Down Expand Up @@ -467,6 +435,7 @@ public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {

// wait until view {A,B} has been installed
int cnt=1000;
View v;
while((v=c1.getView()) != null && cnt > 0) {
cnt--;
if(v.size() == 4)
Expand All @@ -477,10 +446,7 @@ public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {
assert v != null;
assertEquals(4, v.size());

System.out.println("\nDigests after D joined (FLUSH protocol should have updated B with C's messages)");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests after D joined (FLUSH protocol should have updated B with C's messages)");

// check B (should have received all 5 of C's messages, through B as part of the flush phase)
map=b.getMsgs();
Expand All @@ -502,27 +468,11 @@ public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {
* </ul>
*/
public void testReconciliationFlushTriggeredByManualFlush() throws Exception {
c1=createChannel();
c2=createChannel();
c3=createChannel();
MyReceiver a=new MyReceiver(c1, "A"), b=new MyReceiver(c2, "B"), c=new MyReceiver(c3, "C");
c1.setReceiver(a);
c2.setReceiver(b);
c3.setReceiver(c);

c1.connect("x");
c2.connect("x");
c3.connect("x");

View v=c3.getView();
assertEquals("view: " + v, 3, v.size());
createMembers();

insertDISCARD(c2, c3.getLocalAddress());

System.out.println("\nDigests before C sends any messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests before C sends any messages:");

// now C sends 5 messages:
System.out.println("\nC sending 5 messages; B will ignore them, but A and C will receive them");
Expand All @@ -531,10 +481,7 @@ public void testReconciliationFlushTriggeredByManualFlush() throws Exception {
}
Util.sleep(1000); // until al messages have been received, this is asynchronous so we need to wait a bit

System.out.println("\nDigests after C sent 5 messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests after C sent 5 messages:");

// check C (must have received its own messages)
Map<Address, List<Integer>> map=c.getMsgs();
Expand Down Expand Up @@ -589,27 +536,11 @@ public void testReconciliationFlushTriggeredByManualFlush() throws Exception {
* </ul>
*/
public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception {
c1=createChannel();
c2=createChannel();
c3=createChannel();
MyReceiver a=new MyReceiver(c1, "A"), b=new MyReceiver(c2, "B"), c=new MyReceiver(c3, "C");
c1.setReceiver(a);
c2.setReceiver(b);
c3.setReceiver(c);

c1.connect("x");
c2.connect("x");
c3.connect("x");

View v=c3.getView();
assertEquals("view: " + v, 3, v.size());
createMembers();

insertDISCARD(c2, c3.getLocalAddress());

System.out.println("\nDigests before C sends any messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests before C sends any messages:");

// now C sends 5 messages:
System.out.println("\nC sending 5 messages; B will ignore them, but A and C will receive them");
Expand All @@ -618,11 +549,7 @@ public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception
}
Util.sleep(1000); // until al messages have been received, this is asynchronous so we need to wait a bit


System.out.println("\nDigests after C sent 5 messages:");
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
printDigests("\nDigests after C sent 5 messages:");

// check C (must have received its own messages)
Map<Address, List<Integer>> map=c.getMsgs();
Expand Down Expand Up @@ -655,6 +582,7 @@ public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception

// wait until view {A,B} has been installed
int cnt=1000;
View v;
while((v=c1.getView()) != null && cnt > 0) {
cnt--;
if(v.size() == 2)
Expand All @@ -678,6 +606,32 @@ public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception
}


private void createMembers() throws ChannelException {
c1=createChannel();
c2=createChannel();
c3=createChannel();
a=new MyReceiver(c1, "A");
b=new MyReceiver(c2, "B");
c=new MyReceiver(c3, "C");
c1.setReceiver(a);
c2.setReceiver(b);
c3.setReceiver(c);
c1.connect("x");
c2.connect("x");
c3.connect("x");

View v=c3.getView();
assertEquals("view: " + v, 3, v.size());
}


private void printDigests(String message) {
System.out.println(message);
System.out.println("A: " + c1.downcall(Event.GET_DIGEST_EVT));
System.out.println("B: " + c2.downcall(Event.GET_DIGEST_EVT));
System.out.println("C: " + c3.downcall(Event.GET_DIGEST_EVT));
}

private static void insertDISCARD(JChannel ch, Address exclude) throws Exception {
Properties prop=new Properties();
prop.setProperty("excludeitself", "true"); // don't discard messages to self
Expand Down Expand Up @@ -726,23 +680,7 @@ public void viewAccepted(View new_view) {
}


private static String printDigests(List<Digest> digests) {
StringBuilder sb=new StringBuilder();
for(Digest digest: digests) {
sb.append(digest).append("\n");
}
return sb.toString();
}

private List<Digest> getDigests(List<FlushTestReceiver> list) {
List<Digest> retval=new ArrayList<Digest>();
for (FlushTestReceiver receiver : list)
{
Digest d = (Digest) receiver.getChannel().downcall(new Event(Event.GET_DIGEST));
retval.add(d);
}
return retval;
}



Expand Down

0 comments on commit 6427cb1

Please sign in to comment.