/
AckReceiverWindow.java
171 lines (143 loc) · 5.08 KB
/
AckReceiverWindow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package org.jgroups.stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Message;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Counterpart of AckSenderWindow. Simple FIFO buffer.
* Every message received is ACK'ed (even duplicates) and added to a hashmap
* keyed by seqno. The next seqno to be received is stored in <code>next_to_remove</code>. When a message with
* a seqno less than next_to_remove is received, it will be discarded. The <code>remove()</code> method removes
* and returns a message whose seqno is equal to next_to_remove, or null if not found.<br>
* Change May 28 2002 (bela): replaced TreeSet with HashMap. Keys do not need to be sorted, and adding a key to
* a sorted set incurs overhead.
*
* @author Bela Ban
* @version $Id: AckReceiverWindow.java,v 1.25.2.5 2009/09/08 12:22:45 belaban Exp $
*/
public class AckReceiverWindow {
long next_to_remove=0;
final Map<Long,Message> msgs=new HashMap<Long,Message>(); // keys: seqnos (Long), values: Messages
static final Log log=LogFactory.getLog(AckReceiverWindow.class);
final ReentrantLock lock=new ReentrantLock();
final AtomicBoolean processing=new AtomicBoolean(false);
public AckReceiverWindow(long initial_seqno) {
this.next_to_remove=initial_seqno;
}
public ReentrantLock getLock() {
return lock;
}
public AtomicBoolean getProcessing() {
return processing;
}
/** Adds a new message. Message cannot be null
* @return True if the message was added, false if not (e.g. duplicate, message was already present)
*/
public boolean add(long seqno, Message msg) {
if(msg == null)
throw new IllegalArgumentException("msg must be non-null");
synchronized(msgs) {
if(seqno < next_to_remove) {
if(log.isTraceEnabled())
log.trace("discarded msg with seqno=" + seqno + " (next msg to receive is " + next_to_remove + ')');
return false;
}
if(!msgs.containsKey(seqno)) {
msgs.put(seqno, msg);
return true;
}
else {
if(log.isTraceEnabled())
log.trace("seqno " + seqno + " already received - dropping it");
return false;
}
}
}
/**
* Removes a message whose seqno is equal to <code>next_to_remove</code>, increments the latter.
* Returns message that was removed, or null, if no message can be removed. Messages are thus
* removed in order.
*/
public Message remove() {
Message retval=null;
synchronized(msgs) {
long seqno=next_to_remove;
try {
retval=msgs.remove(seqno);
}
finally {
if(retval != null)
next_to_remove=++seqno;
}
}
return retval;
}
public Message remove(AtomicBoolean processing) {
Message retval=null;
synchronized(msgs) {
long seqno=next_to_remove;
try {
retval=msgs.remove(seqno);
}
finally {
if(retval != null)
next_to_remove=++seqno;
else
processing.set(false);
}
}
return retval;
}
public Message removeOOBMessage() {
Message retval;
synchronized(msgs) {
retval=msgs.get(next_to_remove);
if(retval != null) {
if(!retval.isFlagSet(Message.OOB)) {
return null;
}
retval=msgs.remove(next_to_remove);
next_to_remove++;
}
}
return retval;
}
public boolean hasMessagesToRemove() {
synchronized(msgs) {
return msgs.containsKey(next_to_remove);
}
}
public boolean smallerThanNextToRemove(long seqno) {
synchronized(msgs) {
return seqno < next_to_remove;
}
}
public void reset() {
synchronized(msgs) {
msgs.clear();
}
}
public int size() {
return msgs.size();
}
public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")");
TreeSet<Long> s=new TreeSet<Long>(msgs.keySet());
if(!s.isEmpty()) {
sb.append(" [").append(s.first()).append(" - ").append(s.last()).append("]");
sb.append(": ").append(s);
}
return sb.toString();
}
public String printDetails() {
StringBuilder sb=new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")").
append(", msgs=" ).append(new TreeSet<Long>(msgs.keySet()));
return sb.toString();
}
}