/
RetransmitTable.java
246 lines (207 loc) · 7.77 KB
/
RetransmitTable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
package org.jgroups.util;
import org.jgroups.Message;
/**
* A store for messages to be retransmitted or delivered. Used on sender and receiver side, as a replacement for
* HashMap. RetransmitTable should use less memory than HashMap, as HashMap.Entry has 4 fields, plus arrays for storage.
* <p/>
* RetransmitTable maintains a matrix (an array of arrays) of messages. Messages are stored in the matrix by mapping
* their seqno to an index. E.g. when we have 10 rows of 1000 messages each, and first_seqno is 3000, then a message with
* seqno=5600, will be stored in the 3rd row, at index 600.
* <p/>
* Rows are removed when all messages in that row have been received.<p/>
* This class in not synchronized; the caller has to make sure access to it is synchronized
* @author Bela Ban
*/
public class RetransmitTable {
protected final int num_rows;
protected final int msgs_per_row;
protected final long original_offset;
protected Message[][] matrix;
/** The first seqno, at matrix[0][0] */
protected long offset;
protected volatile int size=0;
public RetransmitTable() {
this(10, 50000, 0);
}
public RetransmitTable(int num_rows, int msgs_per_row, long offset) {
this.num_rows=num_rows;
this.msgs_per_row=msgs_per_row;
this.offset=this.original_offset=offset;
matrix=new Message[num_rows][];
}
/**
* Adds a new message to the index computed as a function of seqno
* @param seqno
* @param msg
* @return True if the element at the computed index was null, else false
*/
public boolean put(long seqno, Message msg) {
return putIfAbsent(seqno, msg) == null;
}
/**
* Adds a message if the element at the given index is null. Returns null if no message existed at the given index,
* else returns the existing message and doesn't set the element.
* @param seqno
* @param msg
* @return The existing message, or null if there wasn't any
*/
public Message putIfAbsent(long seqno, Message msg) {
int[] row_and_index=computeRowAndIndex(seqno);
Message[] row=getRow(row_and_index[0]);
Message existing_msg=row[row_and_index[1]];
if(existing_msg == null) {
row[row_and_index[1]]=msg;
size++;
return null;
}
else
return existing_msg;
}
public Message get(long seqno) {
int[] row_and_index=computeRowAndIndex(seqno);
Message[] row=getRow(row_and_index[0]);
return row[row_and_index[1]];
}
/** Removes the message with seqno from the table, nulls the index */
public Message remove(long seqno) { // todo: purge if we can !
int[] row_and_index=computeRowAndIndex(seqno);
Message[] row=getRow(row_and_index[0]);
Message existing_msg=row[row_and_index[1]];
if(existing_msg != null) {
row[row_and_index[1]]=null;
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
}
return existing_msg;
}
public void clear() {
matrix=new Message[num_rows][];
size=0;
offset=original_offset;
}
/**
* Removes all messages less than or equal to seqno from the table. Adjusts offset and moves rows down by the
* number of removed rows. This method should be used when a number of messages can be removed at once, instead
* of individually removing them with remove().
* @param seqno
*/
public void purge(long seqno) {
long diff=seqno - offset;
if(diff < msgs_per_row)
return;
int num_rows_to_remove=(int)(diff / msgs_per_row);
System.arraycopy(matrix, num_rows_to_remove, matrix, 0, matrix.length - num_rows_to_remove);
for(int i=matrix.length - num_rows_to_remove; i < matrix.length; i++)
matrix[i]=null;
offset+=(num_rows_to_remove * msgs_per_row); // todo: is this correct ?
size=computeSize();
}
@Deprecated
public boolean containsKey(long seqno) {return get(seqno) != null;}
/** Returns the total capacity in the matrix */
public int capacity() {return matrix.length * msgs_per_row;}
/** Returns the numbers of messages in the table */
public int size() {return size;}
public boolean isEmpty() {return size <= 0;}
/** A more expensive way to compute the size, done by iterating through the entire table and adding up non-null values */
public int computeSize() {
int retval=0;
for(int i=0; i < matrix.length; i++) {
Message[] row=matrix[i];
if(row == null)
continue;
for(int j=0; j < row.length; j++) {
if(row[j] != null)
retval++;
}
}
return retval;
}
/** Returns the number of null elements in range [start .. end] */
public int getNullMessages(long start, long end) {
int retval=0;
for(long i=start; i <= end; i++) {
int[] row_and_index=computeRowAndIndex(i);
Message[] row=matrix[row_and_index[0]];
if(row != null && row[row_and_index[1]] == null)
retval++;
}
return retval;
}
public String toString() {
StringBuilder sb=new StringBuilder();
sb.append("size=" + size + ", capacity=" + capacity());
return sb.toString();
}
/** Dumps the seqnos in the table as a list */
public String dump() {
StringBuilder sb=new StringBuilder();
boolean first=true;
for(int i=0; i < matrix.length; i++) {
Message[] row=matrix[i];
if(row == null)
continue;
for(int j=0; j < row.length; j++) {
if(row[j] != null) {
long seqno=offset + (i * msgs_per_row) + j;
if(first)
first=false;
else
sb.append(", ");
sb.append(seqno);
}
}
}
return sb.toString();
}
/** Dumps the non-null in the table in a pseudo graphic way */
public String dumpMatrix() {
StringBuilder sb=new StringBuilder();
for(int i=0; i < matrix.length; i++) {
Message[] row=matrix[i];
sb.append(i + ": ");
if(row == null) {
sb.append("\n");
continue;
}
for(int j=0; j < row.length; j++) {
if(row[j] != null)
sb.append("* ");
else
sb.append(" ");
}
sb.append("\n");
}
return sb.toString();
}
/**
* Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
* @param index
* @return A row
*/
protected Message[] getRow(int index) {
if(index >= matrix.length)
resize(index +1);
Message[] row=matrix[index];
if(row == null) {
row=new Message[msgs_per_row];
matrix[index]=row;
}
return row;
}
/** Resizes the matrix to the new size */
protected void resize(int new_capacity) {
Message[][] new_matrix=new Message[new_capacity][];
System.arraycopy(matrix, 0, new_matrix, 0, matrix.length);
matrix=new_matrix;
}
/** Computes and returns the row index and the index within that row for seqno */
protected int[] computeRowAndIndex(long seqno) {
assert seqno >= offset;
int[] retval=new int[2];
int row_index=(int)(((seqno- offset) / msgs_per_row));
int index=(int)(seqno - offset) % msgs_per_row;
retval[0]=row_index;
retval[1]=index;
return retval;
}
}