/
AckSenderWindow.java
194 lines (146 loc) · 6.37 KB
/
AckSenderWindow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// $Id: AckSenderWindow.java,v 1.27.2.1 2009/09/08 12:23:06 belaban Exp $
package org.jgroups.stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.util.TimeScheduler;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* ACK-based sliding window for a sender. Messages are added to the window keyed by seqno
* When an ACK is received, the corresponding message is removed. The Retransmitter
* continously iterates over the entries in the hashmap, retransmitting messages based on their
* creation time and an (increasing) timeout. When there are no more messages in the retransmission
* table left, the thread terminates. It will be re-activated when a new entry is added to the
* retransmission table.
* @author Bela Ban
*/
public class AckSenderWindow implements Retransmitter.RetransmitCommand {
RetransmitCommand retransmit_command = null; // called to request XMIT of msg
final ConcurrentMap<Long,Message> msgs=new ConcurrentHashMap<Long,Message>();
Interval interval=new StaticInterval(400,800,1200,1600);
final Retransmitter retransmitter;
static final Log log=LogFactory.getLog(AckSenderWindow.class);
long lowest=0; // lowest seqno, used by ack()
public interface RetransmitCommand {
void retransmit(long seqno, Message msg);
}
public AckSenderWindow(RetransmitCommand com) {
retransmit_command = com;
retransmitter = new Retransmitter(null, this, null);
retransmitter.setRetransmitTimeouts(interval);
}
public AckSenderWindow(RetransmitCommand com, Interval interval, TimeScheduler sched) {
retransmit_command = com;
this.interval = interval;
retransmitter = new Retransmitter(null, this, sched);
retransmitter.setRetransmitTimeouts(interval);
}
public AckSenderWindow(RetransmitCommand com, Interval interval, TimeScheduler sched, Address sender) {
retransmit_command = com;
this.interval = interval;
retransmitter = new Retransmitter(sender, this, sched);
retransmitter.setRetransmitTimeouts(interval);
}
public void reset() {
msgs.clear();
// moved out of sync scope: Retransmitter.reset()/add()/remove() are sync'ed anyway
// Bela Jan 15 2003
retransmitter.reset();
}
/**
* Adds a new message to the retransmission table. If the message won't have received an ack within
* a certain time frame, the retransmission thread will retransmit the message to the receiver. If
* a sliding window protocol is used, we only add up to <code>window_size</code> messages. If the table is
* full, we add all new messages to a queue. Those will only be added once the table drains below a certain
* threshold (<code>min_threshold</code>)
*/
public void add(long seqno, Message msg) {
msgs.putIfAbsent(seqno, msg);
retransmitter.add(seqno, seqno);
}
/**
* Removes all messages <em>less than or equal</em> to seqno from <code>msgs</code>, and cancels their retransmission.
*/
public synchronized void ack(long seqno) {
if(lowest == 0) {
Long tmp=getLowestSeqno();
if(tmp != null)
lowest=tmp;
}
for(long i=lowest; i <= seqno; i++) {
msgs.remove(i);
retransmitter.remove(i);
}
lowest=seqno +1;
}
/** Returns the message with the lowest seqno */
public Message getLowestMessage() {
Set<Long> keys=msgs.keySet();
if(keys.isEmpty())
return null;
Long seqno=Collections.min(keys);
return seqno != null? msgs.get(seqno) : null;
}
public int size() {
return msgs.size();
}
public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): ");
TreeSet<Long> keys=new TreeSet<Long>(msgs.keySet());
if(!keys.isEmpty())
sb.append(keys.first()).append(" - ").append(keys.last());
else
sb.append("[]");
return sb.toString();
}
public String printDetails() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append(retransmitter.size()).append(" to retransmit): ").
append(new TreeSet<Long>(msgs.keySet()));
return sb.toString();
}
/* -------------------------------- Retransmitter.RetransmitCommand interface ------------------- */
public void retransmit(long first_seqno, long last_seqno, Address sender) {
Message msg;
if(retransmit_command != null) {
if(log.isTraceEnabled())
log.trace(new StringBuilder("retransmitting messages ").append(first_seqno).
append(" - ").append(last_seqno).append(" from ").append(sender));
for(long i = first_seqno; i <= last_seqno; i++) {
if((msg=msgs.get(i)) != null) { // find the message to retransmit
retransmit_command.retransmit(i, msg);
}
}
}
}
/* ----------------------------- End of Retransmitter.RetransmitCommand interface ---------------- */
public Long getLowestSeqno() {
Set<Long> keys=msgs.keySet();
return keys != null? Collections.min(keys) : null;
}
/* ---------------------------------- Private methods --------------------------------------- */
/* ------------------------------ End of Private methods ------------------------------------ */
/** Struct used to store message alongside with its seqno in the message queue */
static class Entry {
final long seqno;
final Message msg;
Entry(long seqno, Message msg) {
this.seqno = seqno;
this.msg = msg;
}
}
static class Dummy implements RetransmitCommand {
static final long last_xmit_req = 0;
long curr_time;
public void retransmit(long seqno, Message msg) {
if(log.isDebugEnabled()) log.debug("seqno=" + seqno);
curr_time = System.currentTimeMillis();
}
}
}