Skip to content

Commit

Permalink
changed to use Receiver interface
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 23, 2005
1 parent 0abd40b commit 33cdace
Showing 1 changed file with 57 additions and 47 deletions.
104 changes: 57 additions & 47 deletions tests/junit/org/jgroups/tests/DiscardTest.java
@@ -1,22 +1,20 @@
// $Id: DiscardTest.java,v 1.5 2005/12/08 14:41:48 belaban Exp $
// $Id: DiscardTest.java,v 1.6 2005/12/23 18:27:04 belaban Exp $

package org.jgroups.tests;


import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.*;
import org.jgroups.util.Promise;


/**
* Tests the NAKACK (retransmission) and STABLE (garbage collection) protocols
* by discarding 10% of all network-bound messages
* @author Bela Ban
* @version $Id: DiscardTest.java,v 1.5 2005/12/08 14:41:48 belaban Exp $
* @version $Id: DiscardTest.java,v 1.6 2005/12/23 18:27:04 belaban Exp $
*/
public class DiscardTest extends TestCase {
JChannel ch1, ch2;
Expand All @@ -26,12 +24,20 @@ public class DiscardTest extends TestCase {
final long NUM_MSGS=10000;
final int MSG_SIZE=1000;
private static final String GROUP="DiscardTestGroup";
final Promise ch1_all_received=new Promise();
final Promise ch2_all_received=new Promise();


public DiscardTest(String name) {
super(name);
}

protected void setUp() throws Exception {
super.setUp();
ch1_all_received.reset();
ch2_all_received.reset();
}

public void testDiscardProperties() throws Exception {
_testLosslessReception(discard_props);
}
Expand All @@ -47,28 +53,21 @@ public void _testLosslessReception(String props) throws Exception {
System.setProperty("bind.address", "127.0.0.1");

ch1=new JChannel(props);
ch1.setReceiver(new MyReceiver(ch1_all_received, NUM_MSGS, "ch1"));
ch2=new JChannel(props);
ch2.setReceiver(new MyReceiver(ch2_all_received, NUM_MSGS, "ch2"));

ch1.connect(GROUP);
ch1_addr=ch1.getLocalAddress();
ch2.connect(GROUP);
ch2_addr=ch2.getLocalAddress();

View v=(View)ch1.receive(2000);
assertEquals(v.size(), 1);
assertTrue(v.getMembers().contains(ch1_addr));

v=(View)ch1.receive(20000);
assertEquals(v.size(), 2);
assertTrue(v.getMembers().contains(ch1_addr));
assertTrue(v.getMembers().contains(ch2_addr));

v=(View)ch2.receive(5000);
View v=ch2.getView();
System.out.println("**** ch2's view: " + v);
assertEquals(2, v.size());
assertTrue(v.getMembers().contains(ch1_addr));
assertTrue(v.getMembers().contains(ch2_addr));

System.out.println("View ch1=" + ch1.getView());
System.out.println("View ch2=" + ch2.getView());
System.out.println("sending " + NUM_MSGS + " 1K messages to all members (including myself)");
start=System.currentTimeMillis();
for(int i=0; i < NUM_MSGS; i++) {
Expand All @@ -78,37 +77,15 @@ public void _testLosslessReception(String props) throws Exception {
System.out.println("-- sent " + i + " messages");
}

System.out.println("receiving messages on ch1");
long received_msgs=0;
Object obj;
while(true) {
obj=ch1.receive(30000);
if(obj instanceof Message) {
received_msgs++;
if(received_msgs % 1000 == 0)
System.out.println("-- received " + received_msgs + " on ch1");
if(received_msgs >= NUM_MSGS) {
System.out.println("SUCCESS: received all " + NUM_MSGS + " messages on ch1");
break;
}
}
}
System.out.println("-- waiting for ch1 and ch2 to receive " + NUM_MSGS + " messages");
Long num_msgs;
num_msgs=(Long)ch1_all_received.getResult();
System.out.println("-- received " + num_msgs + " messages on ch1");

System.out.println("receiving messages on ch2");
received_msgs=0;
while(true) {
obj=ch2.receive(5000);
if(obj instanceof Message) {
received_msgs++;
if(received_msgs % 1000 == 0)
System.out.println("-- received " + received_msgs + " on ch2");
if(received_msgs >= NUM_MSGS) {
System.out.println("SUCCESS: received all " + NUM_MSGS + " messages on ch2");
break;
}
}
}
num_msgs=(Long)ch2_all_received.getResult();
stop=System.currentTimeMillis();
System.out.println("-- received " + num_msgs + " messages on ch2");

long diff=stop-start;
double msgs_sec=NUM_MSGS / (diff / 1000.0);
System.out.println("== Sent and received " + NUM_MSGS + " in " + diff + "ms, " +
Expand All @@ -119,6 +96,39 @@ public void _testLosslessReception(String props) throws Exception {
}


class MyReceiver extends ReceiverAdapter {
final Promise p;
final long num_msgs_expected;
long num_msgs=0;
String channel_name;
boolean operational=true;

public MyReceiver(final Promise p, final long num_msgs_expected, String channel_name) {
this.p=p;
this.num_msgs_expected=num_msgs_expected;
this.channel_name=channel_name;
}

public void receive(Message msg) {
if(!operational)
return;
num_msgs++;

if(num_msgs > 0 && num_msgs % 1000 == 0)
System.out.println("-- received " + num_msgs + " on " + channel_name);

if(num_msgs >= num_msgs_expected) {
System.out.println("SUCCESS: received all " + num_msgs_expected + " messages on " + channel_name);
operational=false;
p.setResult(new Long(num_msgs));
}
}

public void viewAccepted(View new_view) {
System.out.println("-- view (" + channel_name + "): " + new_view);
}
}


private Message createMessage(int size) {
byte[] buf=new byte[size];
Expand Down

0 comments on commit 33cdace

Please sign in to comment.