From f3012a704fb8a7c3ff776d17da96300744c39b63 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 8 Sep 2009 12:22:45 +0000 Subject: [PATCH] backported from head (https://jira.jboss.org/jira/browse/JGRP-1033) --- src/org/jgroups/stack/AckReceiverWindow.java | 50 +++- src/org/jgroups/stack/AckSenderWindow.java | 168 +++-------- .../jgroups/tests/AckSenderWindowTest.java | 275 +++++++++--------- 3 files changed, 217 insertions(+), 276 deletions(-) diff --git a/src/org/jgroups/stack/AckReceiverWindow.java b/src/org/jgroups/stack/AckReceiverWindow.java index d989c98b480..575deaf4f76 100644 --- a/src/org/jgroups/stack/AckReceiverWindow.java +++ b/src/org/jgroups/stack/AckReceiverWindow.java @@ -1,4 +1,3 @@ - package org.jgroups.stack; @@ -10,6 +9,7 @@ import java.util.Map; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; /** @@ -22,13 +22,15 @@ * a sorted set incurs overhead. * * @author Bela Ban - * @version $Id: AckReceiverWindow.java,v 1.25.2.4 2008/06/04 15:02:52 belaban Exp $ + * @version $Id: AckReceiverWindow.java,v 1.25.2.5 2009/09/08 12:22:45 belaban Exp $ */ public class AckReceiverWindow { long next_to_remove=0; final Map msgs=new HashMap(); // keys: seqnos (Long), values: Messages static final Log log=LogFactory.getLog(AckReceiverWindow.class); final ReentrantLock lock=new ReentrantLock(); + final AtomicBoolean processing=new AtomicBoolean(false); + public AckReceiverWindow(long initial_seqno) { this.next_to_remove=initial_seqno; @@ -38,6 +40,10 @@ public ReentrantLock getLock() { return lock; } + public AtomicBoolean getProcessing() { + return processing; + } + /** Adds a new message. Message cannot be null * @return True if the message was added, false if not (e.g. duplicate, message was already present) */ @@ -69,14 +75,34 @@ public boolean add(long seqno, Message msg) { * removed in order. */ public Message remove() { - Message retval; + Message retval=null; synchronized(msgs) { - retval=msgs.remove(next_to_remove); - if(retval != null) { - if(log.isTraceEnabled()) - log.trace("removed seqno=" + next_to_remove); - next_to_remove++; + long seqno=next_to_remove; + try { + retval=msgs.remove(seqno); + } + finally { + if(retval != null) + next_to_remove=++seqno; + } + } + return retval; + } + + public Message remove(AtomicBoolean processing) { + Message retval=null; + + synchronized(msgs) { + long seqno=next_to_remove; + try { + retval=msgs.remove(seqno); + } + finally { + if(retval != null) + next_to_remove=++seqno; + else + processing.set(false); } } return retval; @@ -92,8 +118,6 @@ public Message removeOOBMessage() { return null; } retval=msgs.remove(next_to_remove); - if(log.isTraceEnabled()) - log.trace("removed OOB message with seqno=" + next_to_remove); next_to_remove++; } } @@ -107,6 +131,12 @@ public boolean hasMessagesToRemove() { } } + public boolean smallerThanNextToRemove(long seqno) { + synchronized(msgs) { + return seqno < next_to_remove; + } + } + public void reset() { synchronized(msgs) { diff --git a/src/org/jgroups/stack/AckSenderWindow.java b/src/org/jgroups/stack/AckSenderWindow.java index 36c0978cdc3..d17b1ee8602 100644 --- a/src/org/jgroups/stack/AckSenderWindow.java +++ b/src/org/jgroups/stack/AckSenderWindow.java @@ -1,4 +1,4 @@ -// $Id: AckSenderWindow.java,v 1.27 2007/10/26 09:58:35 belaban Exp $ +// $Id: AckSenderWindow.java,v 1.27.2.1 2009/09/08 12:23:06 belaban Exp $ package org.jgroups.stack; @@ -8,8 +8,9 @@ import org.jgroups.Address; import org.jgroups.Message; import org.jgroups.util.TimeScheduler; -import org.jgroups.util.Util; +import java.util.Collections; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -26,10 +27,11 @@ */ public class AckSenderWindow implements Retransmitter.RetransmitCommand { RetransmitCommand retransmit_command = null; // called to request XMIT of msg - final ConcurrentMap msgs=new ConcurrentHashMap(); // keys: seqnos (Long), values: Messages + final ConcurrentMap msgs=new ConcurrentHashMap(); Interval interval=new StaticInterval(400,800,1200,1600); final Retransmitter retransmitter; static final Log log=LogFactory.getLog(AckSenderWindow.class); + long lowest=0; // lowest seqno, used by ack() public interface RetransmitCommand { @@ -37,28 +39,12 @@ public interface RetransmitCommand { } - /** - * Creates a new instance. Thre retransmission thread has to be started separately with - * start(). - * @param com If not null, its method retransmit() will be called when a message - * needs to be retransmitted (called by the Retransmitter). - */ public AckSenderWindow(RetransmitCommand com) { retransmit_command = com; - retransmitter = new Retransmitter(null, this); - retransmitter.setRetransmitTimeouts(interval); - } - - - public AckSenderWindow(RetransmitCommand com, Interval interval) { - retransmit_command = com; - this.interval = interval; - retransmitter = new Retransmitter(null, this); + retransmitter = new Retransmitter(null, this, null); retransmitter.setRetransmitTimeouts(interval); } - - public AckSenderWindow(RetransmitCommand com, Interval interval, TimeScheduler sched) { retransmit_command = com; this.interval = interval; @@ -98,16 +84,32 @@ public void add(long seqno, Message msg) { /** - * Removes the message from msgs, removing them also from retransmission. If - * sliding window protocol is used, and was queueing, check whether we can resume adding elements. - * Add all elements. If this goes above window_size, stop adding and back to queueing. Else - * set queueing to false. + * Removes all messages less than or equal to seqno from msgs, and cancels their retransmission. */ - public void ack(long seqno) { - msgs.remove(new Long(seqno)); - retransmitter.remove(seqno); + public synchronized void ack(long seqno) { + if(lowest == 0) { + Long tmp=getLowestSeqno(); + if(tmp != null) + lowest=tmp; + } + + for(long i=lowest; i <= seqno; i++) { + msgs.remove(i); + retransmitter.remove(i); + } + lowest=seqno +1; + } + + /** Returns the message with the lowest seqno */ + public Message getLowestMessage() { + Set keys=msgs.keySet(); + if(keys.isEmpty()) + return null; + Long seqno=Collections.min(keys); + return seqno != null? msgs.get(seqno) : null; } + public int size() { return msgs.size(); } @@ -115,7 +117,7 @@ public int size() { public String toString() { StringBuilder sb=new StringBuilder(); sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): "); - TreeSet keys=new TreeSet(msgs.keySet()); + TreeSet keys=new TreeSet(msgs.keySet()); if(!keys.isEmpty()) sb.append(keys.first()).append(" - ").append(keys.last()); else @@ -127,7 +129,7 @@ public String toString() { public String printDetails() { StringBuilder sb=new StringBuilder(); sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): "). - append(new TreeSet(msgs.keySet())); + append(new TreeSet(msgs.keySet())); return sb.toString(); } @@ -150,6 +152,10 @@ public void retransmit(long first_seqno, long last_seqno, Address sender) { + public Long getLowestSeqno() { + Set keys=msgs.keySet(); + return keys != null? Collections.min(keys) : null; + } /* ---------------------------------- Private methods --------------------------------------- */ @@ -183,110 +189,6 @@ public void retransmit(long seqno, Message msg) { } - public static void main(String[] args) { - Interval xmit_timeouts=new StaticInterval(1000, 2000, 3000, 4000); - AckSenderWindow win=new AckSenderWindow(new Dummy(), xmit_timeouts); - - - - final int NUM = 1000; - - for (int i = 1; i < NUM; i++) - win.add(i, new Message()); - System.out.println(win); - Util.sleep(5000); - - for (int i = 1; i < NUM; i++) { - if (i % 2 == 0) // ack the even seqnos - win.ack(i); - } - - System.out.println(win); - Util.sleep(4000); - - for (int i = 1; i < NUM; i++) { - if (i % 2 != 0) // ack the odd seqnos - win.ack(i); } - System.out.println(win); - - win.add(3, new Message()); - win.add(5, new Message()); - win.add(4, new Message()); - win.add(8, new Message()); - win.add(9, new Message()); - win.add(6, new Message()); - win.add(7, new Message()); - win.add(3, new Message()); - System.out.println(win); - - - try { - Thread.sleep(5000); - win.ack(5); - System.out.println("ack(5)"); - win.ack(4); - System.out.println("ack(4)"); - win.ack(6); - System.out.println("ack(6)"); - win.ack(7); - System.out.println("ack(7)"); - win.ack(8); - System.out.println("ack(8)"); - win.ack(6); - System.out.println("ack(6)"); - win.ack(9); - System.out.println("ack(9)"); - System.out.println(win); - - Thread.sleep(5000); - win.ack(3); - System.out.println("ack(3)"); - System.out.println(win); - - Thread.sleep(3000); - win.add(10, new Message()); - win.add(11, new Message()); - System.out.println(win); - Thread.sleep(3000); - win.ack(10); - System.out.println("ack(10)"); - win.ack(11); - System.out.println("ack(11)"); - System.out.println(win); - - win.add(12, new Message()); - win.add(13, new Message()); - win.add(14, new Message()); - win.add(15, new Message()); - win.add(16, new Message()); - System.out.println(win); - - Util.sleep(1000); - win.ack(12); - System.out.println("ack(12)"); - win.ack(13); - System.out.println("ack(13)"); - - win.ack(15); - System.out.println("ack(15)"); - System.out.println(win); - - Util.sleep(5000); - win.ack(16); - System.out.println("ack(16)"); - System.out.println(win); - - Util.sleep(1000); - - win.ack(14); - System.out.println("ack(14)"); - System.out.println(win); - } catch (Exception e) { - log.error(e); - } - } - -} diff --git a/tests/junit-functional/org/jgroups/tests/AckSenderWindowTest.java b/tests/junit-functional/org/jgroups/tests/AckSenderWindowTest.java index 7a2211ba474..7726f591997 100644 --- a/tests/junit-functional/org/jgroups/tests/AckSenderWindowTest.java +++ b/tests/junit-functional/org/jgroups/tests/AckSenderWindowTest.java @@ -1,8 +1,7 @@ -// $Id: AckSenderWindowTest.java,v 1.2.2.1 2008/01/22 10:01:33 belaban Exp $ +// $Id: AckSenderWindowTest.java,v 1.2.2.2 2009/09/08 12:23:22 belaban Exp $ package org.jgroups.tests; - import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; @@ -10,168 +9,178 @@ import org.jgroups.stack.AckSenderWindow; import org.jgroups.stack.StaticInterval; import org.jgroups.util.Util; +import org.jgroups.util.TimeScheduler; import java.util.HashMap; - - - /** * Test cases for AckSenderWindow * @author Bela Ban */ public class AckSenderWindowTest extends TestCase { AckSenderWindow win=null; - final int NUM_MSGS=100; - long[] xmit_timeouts={1000, 2000, 4000, 8000}; - double PERCENTAGE_OFF=1.3; // how much can expected xmit_timeout and real timeout differ to still be okay ? - HashMap msgs=new HashMap(); // keys=seqnos (Long), values=Entries - + final int NUM_MSGS=100; + long[] xmit_timeouts={1000, 2000, 4000, 8000}; + double PERCENTAGE_OFF=1.3; // how much can expected xmit_timeout and real timeout differ to still be okay ? + HashMap msgs=new HashMap(); // keys=seqnos (Long), values=Entries + protected TimeScheduler timer=null; + + + protected void setUp() throws Exception { + super.setUp(); + timer=new TimeScheduler(10); + } + + protected void tearDown() throws Exception { + timer.stop(); + super.tearDown(); + } class Entry { - long start_time=0; // time message was added - long first_xmit=0; // time between start_time and first_xmit should be ca. 1000ms - long second_xmit=0; // time between first_xmit and second_xmit should be ca. 2000ms - long third_xmit=0; // time between third_xmit and second_xmit should be ca. 4000ms - long fourth_xmit=0; // time between third_xmit and second_xmit should be ca. 8000ms - - Entry() { - start_time=System.currentTimeMillis(); - } - - /** Entry is correct if xmit timeouts are not more than 30% off the mark */ - boolean isCorrect(long seqno) { - long t; - long expected; - long diff, delta; - boolean off=false; - - t=first_xmit - start_time; - expected=xmit_timeouts[0]; - diff=Math.abs(expected - t); - delta=(long)(expected * PERCENTAGE_OFF); - if(diff >= delta) off=true; - - t=second_xmit - first_xmit; - expected=xmit_timeouts[1]; - diff=Math.abs(expected - t); - delta=(long)(expected * PERCENTAGE_OFF); - if(diff >= delta) off=true; - - t=third_xmit - second_xmit; - expected=xmit_timeouts[2]; - diff=Math.abs(expected - t); - delta=(long)(expected * PERCENTAGE_OFF); - if(diff >= delta) off=true; - - t=fourth_xmit - third_xmit; - expected=xmit_timeouts[3]; - diff=Math.abs(expected - t); - delta=(long)(expected * PERCENTAGE_OFF); - if(diff >= delta) off=true; - - if(off) { - System.err.println("#" + seqno + ": " + this + ": (" + "entry is more than " + - PERCENTAGE_OFF + " percentage off "); - return false; + long start_time=0; // time message was added + long first_xmit=0; // time between start_time and first_xmit should be ca. 1000ms + long second_xmit=0; // time between first_xmit and second_xmit should be ca. 2000ms + long third_xmit=0; // time between third_xmit and second_xmit should be ca. 4000ms + long fourth_xmit=0; // time between third_xmit and second_xmit should be ca. 8000ms + + Entry() { + start_time=System.currentTimeMillis(); } - return true; - } + /** + * Entry is correct if xmit timeouts are not more than 30% off the mark + */ + boolean isCorrect(long seqno) { + long t; + long expected; + long diff, delta; + boolean off=false; + + t=first_xmit - start_time; + expected=xmit_timeouts[0]; + diff=Math.abs(expected - t); + delta=(long)(expected * PERCENTAGE_OFF); + if(diff >= delta) off=true; + + t=second_xmit - first_xmit; + expected=xmit_timeouts[1]; + diff=Math.abs(expected - t); + delta=(long)(expected * PERCENTAGE_OFF); + if(diff >= delta) off=true; + + t=third_xmit - second_xmit; + expected=xmit_timeouts[2]; + diff=Math.abs(expected - t); + delta=(long)(expected * PERCENTAGE_OFF); + if(diff >= delta) off=true; + + t=fourth_xmit - third_xmit; + expected=xmit_timeouts[3]; + diff=Math.abs(expected - t); + delta=(long)(expected * PERCENTAGE_OFF); + if(diff >= delta) off=true; + + if(off) { + System.err.println("#" + seqno + ": " + this + ": (" + "entry is more than " + + PERCENTAGE_OFF + " percentage off "); + return false; + } + return true; + } - public String toString() { - StringBuilder sb=new StringBuilder(); - sb.append(first_xmit - start_time).append(", ").append(second_xmit-first_xmit).append(", "); - sb.append(third_xmit-second_xmit).append(", ").append(fourth_xmit-third_xmit); - return sb.toString(); - } + public String toString() { + StringBuilder sb=new StringBuilder(); + sb.append(first_xmit - start_time).append(", ").append(second_xmit - first_xmit).append(", "); + sb.append(third_xmit - second_xmit).append(", ").append(fourth_xmit - third_xmit); + return sb.toString(); + } } class MyRetransmitCommand implements AckSenderWindow.RetransmitCommand { - - public void retransmit(long seqno, Message msg) { - Entry entry=(Entry)msgs.get(new Long(seqno)); - - // System.out.println(" -- retransmit(" + seqno + ")"); - - if(entry != null) { - if(entry.first_xmit == 0) { - entry.first_xmit=System.currentTimeMillis(); - return; - } - - if(entry.second_xmit == 0) { - entry.second_xmit=System.currentTimeMillis(); - return; - } - - if(entry.third_xmit == 0) { - entry.third_xmit=System.currentTimeMillis(); - return; - } - - if(entry.fourth_xmit == 0) { - entry.fourth_xmit=System.currentTimeMillis(); - return; - } - } - } + + public void retransmit(long seqno, Message msg) { + Entry entry=(Entry)msgs.get(new Long(seqno)); + + // System.out.println(" -- retransmit(" + seqno + ")"); + + if(entry != null) { + if(entry.first_xmit == 0) { + entry.first_xmit=System.currentTimeMillis(); + return; + } + + if(entry.second_xmit == 0) { + entry.second_xmit=System.currentTimeMillis(); + return; + } + + if(entry.third_xmit == 0) { + entry.third_xmit=System.currentTimeMillis(); + return; + } + + if(entry.fourth_xmit == 0) { + entry.fourth_xmit=System.currentTimeMillis(); + return; + } + } + } } + public AckSenderWindowTest(String name) { + super(name); + } - public AckSenderWindowTest(String name) { super(name); } - - - - /** Tests whether retransmits are called at correct times for 1000 messages */ + /** + * Tests whether retransmits are called at correct times for 1000 messages + */ public void testRetransmits() { - Entry entry; - int num_non_correct_entries=0; - - win=new AckSenderWindow(new MyRetransmitCommand(), new StaticInterval(xmit_timeouts)); - - // 1. Send NUM_MSGS messages: - System.out.println("-- sending " + NUM_MSGS + " messages:"); - for(long i=0; i < NUM_MSGS; i++) { - msgs.put(new Long(i), new Entry()); - win.add(i, new Message()); - } - System.out.println("-- done"); - - // 2. Wait for at least 4 xmits/msg: total of 1000 + 2000 + 4000 + 8000ms = 15000ms; wait for 20000ms - System.out.println("-- waiting for 20 secs for all retransmits"); - Util.sleep(20000); - - // 3. Check whether all Entries have correct retransmission times - for(long i=0; i < NUM_MSGS; i++) { - entry=(Entry)msgs.get(new Long(i)); - if(!entry.isCorrect(i)) { - num_non_correct_entries++; - } - } - - if(num_non_correct_entries > 0) - System.err.println("Number of incorrect retransmission timeouts: " + num_non_correct_entries); - assertTrue(num_non_correct_entries == 0); - win.reset(); + Entry entry; + int num_non_correct_entries=0; + + win=new AckSenderWindow(new MyRetransmitCommand(), new StaticInterval(xmit_timeouts), timer); + + // 1. Send NUM_MSGS messages: + System.out.println("-- sending " + NUM_MSGS + " messages:"); + for(long i=0; i < NUM_MSGS; i++) { + msgs.put(new Long(i), new Entry()); + win.add(i, new Message()); + } + System.out.println("-- done"); + + // 2. Wait for at least 4 xmits/msg: total of 1000 + 2000 + 4000 + 8000ms = 15000ms; wait for 20000ms + System.out.println("-- waiting for 20 secs for all retransmits"); + Util.sleep(20000); + + // 3. Check whether all Entries have correct retransmission times + for(long i=0; i < NUM_MSGS; i++) { + entry=(Entry)msgs.get(new Long(i)); + if(!entry.isCorrect(i)) { + num_non_correct_entries++; + } + } + + if(num_non_correct_entries > 0) + System.err.println("Number of incorrect retransmission timeouts: " + num_non_correct_entries); + assertTrue(num_non_correct_entries == 0); + win.reset(); } - - - + + public static Test suite() { - TestSuite suite; - suite = new TestSuite(AckSenderWindowTest.class); - return(suite); + TestSuite suite; + suite=new TestSuite(AckSenderWindowTest.class); + return (suite); } public static void main(String[] args) { - String[] name = {AckSenderWindowTest.class.getName()}; - junit.textui.TestRunner.main(name); + String[] name={AckSenderWindowTest.class.getName()}; + junit.textui.TestRunner.main(name); } }