Skip to content

Commit

Permalink
changed FCTest to use SHARED_LOOPBACK instead of Simulator
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Apr 12, 2011
1 parent b8b6c07 commit 6dac307
Showing 1 changed file with 26 additions and 39 deletions.
65 changes: 26 additions & 39 deletions tests/junit-functional/org/jgroups/tests/FCTest.java
Expand Up @@ -2,26 +2,23 @@
package org.jgroups.tests; package org.jgroups.tests;


import org.jgroups.*; import org.jgroups.*;
import org.jgroups.debug.Simulator; import org.jgroups.protocols.*;
import org.jgroups.protocols.FC; import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.FRAG2; import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.stack.IpAddress; import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util; import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.util.Vector;



/** /**
* Tests the flow control (FC) protocol * Tests the flow control (FC) protocol
* @author Bela Ban * @author Bela Ban
*/ */
@Test(groups=Global.FUNCTIONAL,sequential=true) @Test(groups=Global.FUNCTIONAL,sequential=true)
public class FCTest { public class FCTest {
Simulator s=null; JChannel ch;
static final int SIZE=1000; // bytes static final int SIZE=1000; // bytes
static final int NUM_MSGS=100000; static final int NUM_MSGS=100000;
static final int PRINT=NUM_MSGS / 10; static final int PRINT=NUM_MSGS / 10;
Expand All @@ -30,48 +27,38 @@ public class FCTest {


@BeforeMethod @BeforeMethod
void setUp() throws Exception { void setUp() throws Exception {
IpAddress a1=new IpAddress(1111); ch=Util.createChannel(new SHARED_LOOPBACK().setValue("thread_pool_rejection_policy", "run").setValue("loopback", true),
Vector<Address> members=new Vector<Address>(); new PING(),
members.add(a1); new NAKACK().setValue("use_mcast_xmit", false),
View v=new View(a1, 1, members); new UNICAST2(),
s=new Simulator(); new STABLE().setValue("max_bytes", 50000),
s.setLocalAddress(a1); new GMS().setValue("print_local_addr", false),
s.setView(v); new FC().setValue("min_credits", 1000).setValue("max_credits", 10000).setValue("max_block_time", 1000),
s.addMember(a1); new FRAG2());
FC fc=new FC(); ch.connect("FCTest");
fc.setMinCredits(1000);
fc.setMaxCredits(10000);
fc.setMaxBlockTime(1000);
FRAG2 frag=new FRAG2();
frag.setFragSize(8000);
Protocol[] stack=new Protocol[]{frag, fc};
s.setProtocolStack(stack);
s.start();
} }


@AfterMethod @AfterMethod
void tearDown() throws Exception { void tearDown() throws Exception {
s.stop(); Util.close(ch);
} }




@Test(groups=Global.FUNCTIONAL) public void testReceptionOfAllMessages() throws ChannelNotConnectedException, ChannelClosedException {
public void testReceptionOfAllMessages() {
int num_received=0; int num_received=0;
Receiver r=new Receiver(); Receiver r=new Receiver();
s.setReceiver(r); ch.setReceiver(r);
for(int i=1; i <= NUM_MSGS; i++) { for(int i=1; i <= NUM_MSGS; i++) {
Message msg=new Message(null, null, createPayload(SIZE)); Message msg=new Message(null, null, createPayload(SIZE));
Event evt=new Event(Event.MSG, msg); ch.send(msg);
s.send(evt);
if(i % PRINT == 0) if(i % PRINT == 0)
System.out.println("==> " + i); System.out.println("==> " + i);
} }
int num_tries=10; int num_tries=10;
while(num_tries > 0) { while(num_tries > 0) {
Util.sleep(1000); Util.sleep(1000);
num_received=r.getNumberOfReceivedMessages(); num_received=r.getNumberOfReceivedMessages();
System.out.println("-- num received=" + num_received + ", stats:\n" + s.dumpStats()); System.out.println("-- num received=" + num_received);
if(num_received >= NUM_MSGS) if(num_received >= NUM_MSGS)
break; break;
num_tries--; num_tries--;
Expand All @@ -81,6 +68,8 @@ public void testReceptionOfAllMessages() {








private static byte[] createPayload(int size) { private static byte[] createPayload(int size) {
byte[] retval=new byte[size]; byte[] retval=new byte[size];
for(int i=0; i < size; i++) for(int i=0; i < size; i++)
Expand All @@ -89,15 +78,13 @@ private static byte[] createPayload(int size) {
} }




static class Receiver implements Simulator.Receiver { static class Receiver extends ReceiverAdapter {
int num_mgs_received=0; int num_mgs_received=0;


public void receive(Event evt) { public void receive(Message msg) {
if(evt.getType() == Event.MSG) { num_mgs_received++;
num_mgs_received++; if(num_mgs_received % PRINT == 0)
if(num_mgs_received % PRINT == 0) System.out.println("<== " + num_mgs_received);
System.out.println("<== " + num_mgs_received);
}
} }


public int getNumberOfReceivedMessages() { public int getNumberOfReceivedMessages() {
Expand Down

0 comments on commit 6dac307

Please sign in to comment.