Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 8, 2009
1 parent 669612e commit f3012a7
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 276 deletions.
50 changes: 40 additions & 10 deletions src/org/jgroups/stack/AckReceiverWindow.java
@@ -1,4 +1,3 @@

package org.jgroups.stack;


Expand All @@ -10,6 +9,7 @@
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;


/**
Expand All @@ -22,13 +22,15 @@
* a sorted set incurs overhead.
*
* @author Bela Ban
* @version $Id: AckReceiverWindow.java,v 1.25.2.4 2008/06/04 15:02:52 belaban Exp $
* @version $Id: AckReceiverWindow.java,v 1.25.2.5 2009/09/08 12:22:45 belaban Exp $
*/
public class AckReceiverWindow {
long next_to_remove=0;
final Map<Long,Message> msgs=new HashMap<Long,Message>(); // keys: seqnos (Long), values: Messages
static final Log log=LogFactory.getLog(AckReceiverWindow.class);
final ReentrantLock lock=new ReentrantLock();
final AtomicBoolean processing=new AtomicBoolean(false);


public AckReceiverWindow(long initial_seqno) {
this.next_to_remove=initial_seqno;
Expand All @@ -38,6 +40,10 @@ public ReentrantLock getLock() {
return lock;
}

public AtomicBoolean getProcessing() {
return processing;
}

/** Adds a new message. Message cannot be null
* @return True if the message was added, false if not (e.g. duplicate, message was already present)
*/
Expand Down Expand Up @@ -69,14 +75,34 @@ public boolean add(long seqno, Message msg) {
* removed in order.
*/
public Message remove() {
Message retval;
Message retval=null;

synchronized(msgs) {
retval=msgs.remove(next_to_remove);
if(retval != null) {
if(log.isTraceEnabled())
log.trace("removed seqno=" + next_to_remove);
next_to_remove++;
long seqno=next_to_remove;
try {
retval=msgs.remove(seqno);
}
finally {
if(retval != null)
next_to_remove=++seqno;
}
}
return retval;
}

public Message remove(AtomicBoolean processing) {
Message retval=null;

synchronized(msgs) {
long seqno=next_to_remove;
try {
retval=msgs.remove(seqno);
}
finally {
if(retval != null)
next_to_remove=++seqno;
else
processing.set(false);
}
}
return retval;
Expand All @@ -92,8 +118,6 @@ public Message removeOOBMessage() {
return null;
}
retval=msgs.remove(next_to_remove);
if(log.isTraceEnabled())
log.trace("removed OOB message with seqno=" + next_to_remove);
next_to_remove++;
}
}
Expand All @@ -107,6 +131,12 @@ public boolean hasMessagesToRemove() {
}
}

public boolean smallerThanNextToRemove(long seqno) {
synchronized(msgs) {
return seqno < next_to_remove;
}
}


public void reset() {
synchronized(msgs) {
Expand Down
168 changes: 35 additions & 133 deletions src/org/jgroups/stack/AckSenderWindow.java
@@ -1,4 +1,4 @@
// $Id: AckSenderWindow.java,v 1.27 2007/10/26 09:58:35 belaban Exp $
// $Id: AckSenderWindow.java,v 1.27.2.1 2009/09/08 12:23:06 belaban Exp $

package org.jgroups.stack;

Expand All @@ -8,8 +8,9 @@
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -26,39 +27,24 @@
*/
public class AckSenderWindow implements Retransmitter.RetransmitCommand {
RetransmitCommand retransmit_command = null; // called to request XMIT of msg
final ConcurrentMap<Long,Message> msgs=new ConcurrentHashMap(); // keys: seqnos (Long), values: Messages
final ConcurrentMap<Long,Message> msgs=new ConcurrentHashMap<Long,Message>();
Interval interval=new StaticInterval(400,800,1200,1600);
final Retransmitter retransmitter;
static final Log log=LogFactory.getLog(AckSenderWindow.class);
long lowest=0; // lowest seqno, used by ack()


public interface RetransmitCommand {
void retransmit(long seqno, Message msg);
}


/**
* Creates a new instance. Thre retransmission thread has to be started separately with
* <code>start()</code>.
* @param com If not null, its method <code>retransmit()</code> will be called when a message
* needs to be retransmitted (called by the Retransmitter).
*/
public AckSenderWindow(RetransmitCommand com) {
retransmit_command = com;
retransmitter = new Retransmitter(null, this);
retransmitter.setRetransmitTimeouts(interval);
}


public AckSenderWindow(RetransmitCommand com, Interval interval) {
retransmit_command = com;
this.interval = interval;
retransmitter = new Retransmitter(null, this);
retransmitter = new Retransmitter(null, this, null);
retransmitter.setRetransmitTimeouts(interval);
}



public AckSenderWindow(RetransmitCommand com, Interval interval, TimeScheduler sched) {
retransmit_command = com;
this.interval = interval;
Expand Down Expand Up @@ -98,24 +84,40 @@ public void add(long seqno, Message msg) {


/**
* Removes the message from <code>msgs</code>, removing them also from retransmission. If
* sliding window protocol is used, and was queueing, check whether we can resume adding elements.
* Add all elements. If this goes above window_size, stop adding and back to queueing. Else
* set queueing to false.
* Removes all messages <em>less than or equal</em> to seqno from <code>msgs</code>, and cancels their retransmission.
*/
public void ack(long seqno) {
msgs.remove(new Long(seqno));
retransmitter.remove(seqno);
public synchronized void ack(long seqno) {
if(lowest == 0) {
Long tmp=getLowestSeqno();
if(tmp != null)
lowest=tmp;
}

for(long i=lowest; i <= seqno; i++) {
msgs.remove(i);
retransmitter.remove(i);
}
lowest=seqno +1;
}

/** Returns the message with the lowest seqno */
public Message getLowestMessage() {
Set<Long> keys=msgs.keySet();
if(keys.isEmpty())
return null;
Long seqno=Collections.min(keys);
return seqno != null? msgs.get(seqno) : null;
}


public int size() {
return msgs.size();
}

public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): ");
TreeSet keys=new TreeSet(msgs.keySet());
TreeSet<Long> keys=new TreeSet<Long>(msgs.keySet());
if(!keys.isEmpty())
sb.append(keys.first()).append(" - ").append(keys.last());
else
Expand All @@ -127,7 +129,7 @@ public String toString() {
public String printDetails() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): ").
append(new TreeSet(msgs.keySet()));
append(new TreeSet<Long>(msgs.keySet()));
return sb.toString();
}

Expand All @@ -150,6 +152,10 @@ public void retransmit(long first_seqno, long last_seqno, Address sender) {



public Long getLowestSeqno() {
Set<Long> keys=msgs.keySet();
return keys != null? Collections.min(keys) : null;
}


/* ---------------------------------- Private methods --------------------------------------- */
Expand Down Expand Up @@ -183,110 +189,6 @@ public void retransmit(long seqno, Message msg) {
}


public static void main(String[] args) {
Interval xmit_timeouts=new StaticInterval(1000, 2000, 3000, 4000);
AckSenderWindow win=new AckSenderWindow(new Dummy(), xmit_timeouts);



final int NUM = 1000;

for (int i = 1; i < NUM; i++)
win.add(i, new Message());


System.out.println(win);
Util.sleep(5000);

for (int i = 1; i < NUM; i++) {
if (i % 2 == 0) // ack the even seqnos
win.ack(i);
}

System.out.println(win);
Util.sleep(4000);

for (int i = 1; i < NUM; i++) {
if (i % 2 != 0) // ack the odd seqnos
win.ack(i);
}
System.out.println(win);

win.add(3, new Message());
win.add(5, new Message());
win.add(4, new Message());
win.add(8, new Message());
win.add(9, new Message());
win.add(6, new Message());
win.add(7, new Message());
win.add(3, new Message());
System.out.println(win);


try {
Thread.sleep(5000);
win.ack(5);
System.out.println("ack(5)");
win.ack(4);
System.out.println("ack(4)");
win.ack(6);
System.out.println("ack(6)");
win.ack(7);
System.out.println("ack(7)");
win.ack(8);
System.out.println("ack(8)");
win.ack(6);
System.out.println("ack(6)");
win.ack(9);
System.out.println("ack(9)");
System.out.println(win);

Thread.sleep(5000);
win.ack(3);
System.out.println("ack(3)");
System.out.println(win);

Thread.sleep(3000);
win.add(10, new Message());
win.add(11, new Message());
System.out.println(win);
Thread.sleep(3000);
win.ack(10);
System.out.println("ack(10)");
win.ack(11);
System.out.println("ack(11)");
System.out.println(win);

win.add(12, new Message());
win.add(13, new Message());
win.add(14, new Message());
win.add(15, new Message());
win.add(16, new Message());
System.out.println(win);

Util.sleep(1000);
win.ack(12);
System.out.println("ack(12)");
win.ack(13);
System.out.println("ack(13)");

win.ack(15);
System.out.println("ack(15)");
System.out.println(win);

Util.sleep(5000);
win.ack(16);
System.out.println("ack(16)");
System.out.println(win);

Util.sleep(1000);

win.ack(14);
System.out.println("ack(14)");
System.out.println(win);
} catch (Exception e) {
log.error(e);
}
}

}

0 comments on commit f3012a7

Please sign in to comment.