Skip to content

Commit

Permalink
Using same retransmission code in UNICAST2 as in NAKACK2 (https://iss…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 21, 2012
1 parent a9862ff commit 97ab658
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions src/org/jgroups/protocols/UNICAST2.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,29 @@ public class UNICAST2 extends Protocol implements AgeOutCache.Handler<Address> {
protected final ConcurrentMap<Address,ReceiverEntry> recv_table=Util.createConcurrentMap();

/** RetransmitTask running every xmit_interval ms */
protected Future<?> xmit_task;
protected Future<?> xmit_task;
/** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539) */
protected final Map<Address,Long> xmit_task_map=new HashMap<Address,Long>();

protected final ReentrantLock recv_table_lock=new ReentrantLock();
protected final ReentrantLock recv_table_lock=new ReentrantLock();

protected volatile List<Address> members=new ArrayList<Address>(11);
protected volatile List<Address> members=new ArrayList<Address>(11);

protected Address local_addr=null;
protected Address local_addr=null;

protected TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow)
protected TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow)

protected volatile boolean running=false;
protected volatile boolean running=false;

protected short last_conn_id=0;
protected short last_conn_id=0;

protected long max_retransmit_time=60 * 1000L;
protected long max_retransmit_time=60 * 1000L;

protected AgeOutCache<Address> cache=null;
protected AgeOutCache<Address> cache=null;

protected Future<?> stable_task_future=null; // bcasts periodic STABLE message (added to timer below)
protected Future<?> stable_task_future=null; // bcasts periodic STABLE message (added to timer below)

protected Future<?> connection_reaper; // closes idle connections
protected Future<?> connection_reaper; // closes idle connections


@Deprecated
Expand Down Expand Up @@ -1231,25 +1233,44 @@ public void run() {
protected class RetransmitTask implements Runnable {

public void run() {
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
Address target=entry.getKey(); // target to send retransmit requests to
ReceiverEntry val=entry.getValue();
Table<Message> buf=val != null? val.received_msgs : null;
if(buf != null && buf.getNumMissing() > 0) {
SeqnoList missing=buf.getMissing();
if(missing != null) {
// Just a double-check to avoid unneeded retransmissions: messages might have been added to or
// removed from the table after calling getMissing(), and so we remove all
// seqnos <= the highest delivered seqno from the retransmit list
missing.remove(buf.getHighestDelivered());
if(missing.size() > 0)
triggerXmit();
}
}

@ManagedOperation(description="Triggers the retransmission task, asking all senders for missing messages")
public void triggerXmit() {
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
Address target=entry.getKey(); // target to send retransmit requests to
ReceiverEntry val=entry.getValue();
Table<Message> buf=val != null? val.received_msgs : null;
if(buf != null && buf.getNumMissing() > 0) {
SeqnoList missing=buf.getMissing();
if(missing != null) {
// Just a double-check to avoid unneeded retransmissions: messages might have been added to or
// removed from the table after calling getMissing(), and so we remove all
// seqnos <= the highest delivered seqno from the retransmit list
missing.remove(buf.getHighestDelivered());
//if(missing.size() > 0)
// retransmit(missing, target);
if(missing.size() > 0) {
long highest=missing.getLast();
Long prev_seqno=xmit_task_map.get(target);
if(prev_seqno == null) {
xmit_task_map.put(target, highest); // no retransmission
}
else {
missing.removeHigherThan(prev_seqno);
if(highest > prev_seqno)
xmit_task_map.put(target, highest);
retransmit(missing, target);
}
}
else
xmit_task_map.remove(target); // no current gaps for target
}
}
}
}



}

0 comments on commit 97ab658

Please sign in to comment.