Permalink
Browse files

Fixed tests

  • Loading branch information...
Bela Ban
Bela Ban committed Feb 6, 2012
1 parent dc32ed1 commit 3d1448f64d1d782c2ad13f5a0c3f00523ef3bc8a
@@ -23,16 +23,16 @@
@Test(dataProvider="timerCreator")
public void testExpiration(TimeScheduler timer) {
- AgeOutCache<Integer> cache=new AgeOutCache<Integer>(timer, 500L,
+ AgeOutCache<Integer> cache=new AgeOutCache<Integer>(timer, 2000L,
new AgeOutCache.Handler<Integer>() {
public void expired(Integer key) {
System.out.println(key + " expired");
}
});
for(int i = 1; i <= 5; i++)
cache.add(i);
- System.out.println("cache:\n" + cache);
int size=cache.size();
+ System.out.println("cache:\n" + cache);
assert size == 5 : "size is " + size;
for(int i=0; i < 30; i++) {
@@ -16,10 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -35,12 +32,13 @@
final Address self=Util.createRandomAddress();
final Address sender=Util.createRandomAddress();
- final CyclicBarrier barrier=new CyclicBarrier(NUM_THREADS +1);
+ protected CountDownLatch latch;
NakReceiverWindow win;
@BeforeMethod
void init() {
+ latch=new CountDownLatch(1);
win=new NakReceiverWindow(self, new Retransmitter.RetransmitCommand() {
public void retransmit(long first_seqno, long last_seqno, Address sender) {
}
@@ -66,16 +64,15 @@ public void testConcurrentInsertions() throws BrokenBarrierException, Interrupte
successful_adds.put((long)i, new AtomicInteger(0));
for(int i=0; i < senders.length; i++) {
- senders[i]=new Sender(NUM_MSGS, win, sender, barrier, successful_adds);
+ senders[i]=new Sender(NUM_MSGS, win, sender, latch, successful_adds);
senders[i].start();
}
- Util.sleep(2000);
System.out.println("Concurrently inserting " + NUM_MSGS + " messages with " + NUM_THREADS + " threads");
- barrier.await();
+ latch.countDown();
for(int i=0; i < senders.length; i++)
- senders[i].join(20000);
+ senders[i].join();
System.out.println("OK: " + NUM_MSGS + " were added to the NakReceiverWindow concurrently by " + NUM_THREADS + " threads");
Set<Long> keys=successful_adds.keySet();
@@ -104,16 +101,15 @@ public void testConcurrentRandomInsertions() throws BrokenBarrierException, Inte
successful_adds.put((long)i, new AtomicInteger(0));
for(int i=0; i < senders.length; i++) {
- senders[i]=new RandomSender(NUM_MSGS, win, sender, barrier, successful_adds);
+ senders[i]=new RandomSender(NUM_MSGS, win, sender, latch, successful_adds);
senders[i].start();
}
- Util.sleep(2000);
System.out.println("Concurrently inserting " + NUM_MSGS + " messages with " + NUM_THREADS + " threads");
- barrier.await();
+ latch.countDown();
for(int i=0; i < senders.length; i++)
- senders[i].join(20000);
+ senders[i].join();
System.out.println("OK: " + NUM_MSGS + " were added to the NakReceiverWindow concurrently by " + NUM_THREADS + " threads");
Set<Long> keys=successful_adds.keySet();
@@ -142,16 +138,15 @@ public void testConcurrentInsertionOfSameSeqno() throws BrokenBarrierException,
successful_adds.put((long)i, new AtomicInteger(0));
for(int i=0; i < senders.length; i++) {
- senders[i]=new SameSeqnoSender(NUM_MSGS, win, sender, barrier, successful_adds);
+ senders[i]=new SameSeqnoSender(NUM_MSGS, win, sender, latch, successful_adds);
senders[i].start();
}
- Util.sleep(2000);
System.out.println("Concurrently inserting 1 message with " + NUM_THREADS + " threads");
- barrier.await();
+ latch.countDown();
for(int i=0; i < senders.length; i++)
- senders[i].join(20000);
+ senders[i].join();
System.out.println("OK: 1 message was added to the NakReceiverWindow concurrently by " + NUM_THREADS + " threads");
Set<Long> keys=successful_adds.keySet();
@@ -171,14 +166,14 @@ public void testConcurrentInsertionOfSameSeqno() throws BrokenBarrierException,
final int num;
final NakReceiverWindow win;
final Address sender;
- final CyclicBarrier barrier;
+ final CountDownLatch latch;
final ConcurrentMap<Long,AtomicInteger> map;
- public Sender(int num, NakReceiverWindow win, Address sender, CyclicBarrier barrier, ConcurrentMap<Long, AtomicInteger> map) {
+ public Sender(int num, NakReceiverWindow win, Address sender, CountDownLatch latch, ConcurrentMap<Long, AtomicInteger> map) {
this.num=num;
this.win=win;
this.sender=sender;
- this.barrier=barrier;
+ this.latch=latch;
this.map=map;
}
@@ -196,14 +191,14 @@ protected void add(long seqno) {
boolean added=win.add(seqno, msg);
if(added) {
- AtomicInteger val=map.get((long)seqno);
+ AtomicInteger val=map.get(seqno);
val.incrementAndGet();
}
}
protected void waitForBarrier() {
try {
- barrier.await();
+ latch.await();
}
catch(Exception e) {
e.printStackTrace();
@@ -213,8 +208,8 @@ protected void waitForBarrier() {
static class RandomSender extends Sender {
- public RandomSender(int num, NakReceiverWindow win, Address sender, CyclicBarrier barrier, ConcurrentMap<Long, AtomicInteger> map) {
- super(num, win, sender, barrier, map);
+ public RandomSender(int num, NakReceiverWindow win, Address sender, CountDownLatch latch, ConcurrentMap<Long, AtomicInteger> map) {
+ super(num, win, sender, latch, map);
}
public void run() {
@@ -237,8 +232,8 @@ public void run() {
*/
static class SameSeqnoSender extends Sender {
- public SameSeqnoSender(int num, NakReceiverWindow win, Address sender, CyclicBarrier barrier, ConcurrentMap<Long, AtomicInteger> map) {
- super(num, win, sender, barrier, map);
+ public SameSeqnoSender(int num, NakReceiverWindow win, Address sender, CountDownLatch latch, ConcurrentMap<Long, AtomicInteger> map) {
+ super(num, win, sender, latch, map);
}
public void run() {
@@ -31,8 +31,7 @@
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
public class RSVPTest {
- static final int NUM=5; // number of members
-
+ protected static final int NUM=5; // number of members
protected final JChannel[] channels=new JChannel[NUM];
protected final MyReceiver[] receivers=new MyReceiver[NUM];
protected MyDiagnosticsHandler handler;
@@ -79,6 +78,7 @@ void setUp() throws Exception {
new DISCARD(),
new PING().setValue("timeout",1000).setValue("num_initial_members",NUM)
.setValue("force_sending_discovery_rsps", true),
+ new MERGE2().setValue("min_interval", 1000).setValue("max_interval", 3000),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("discard_delivered_msgs",true)
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false)
@@ -103,7 +103,10 @@ void setUp() throws Exception {
JmxConfigurator.registerChannel(channels[i], server, "channel-" + (i+1), "RSVPTest", true);
channels[i].connect("RSVPTest");
System.out.print(i + 1 + " ");
+ if(i == 0)
+ Util.sleep(2000);
}
+ Util.waitUntilAllChannelsHaveSameSize(30000, 1000, channels);
System.out.println("");
}

0 comments on commit 3d1448f

Please sign in to comment.