/
NakReceiverWindow.java
606 lines (498 loc) · 20.5 KB
/
NakReceiverWindow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
package org.jgroups.stack;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.GuardedBy;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.RetransmitTable;
import org.jgroups.util.TimeScheduler;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Keeps track of messages according to their sequence numbers. Allows
* messages to be added out of order, and with gaps between sequence numbers.
* Method <code>remove()</code> removes the first message with a sequence
* number that is 1 higher than <code>next_to_remove</code> (this variable is
* then incremented), or it returns null if no message is present, or if no
* message's sequence number is 1 higher.
* <p>
* When there is a gap upon adding a message, its seqno will be added to the
* Retransmitter, which (using a timer) requests retransmissions of missing
* messages and keeps on trying until the message has been received, or the
* member who sent the message is suspected.
*
* There are 3 variables which keep track of messages:
* <ul>
* <li>low: lowest seqno, modified on stable(). On stable(), we purge msgs [low digest.highest_delivered]
* <li>highest_delivered: the highest delivered seqno, updated on remove(). The next message to be removed is highest_delivered + 1
* <li>highest_received: the highest received message, updated on add (if a new message is added, not updated e.g.
* if a missing msg was received)
* </ul>
* <p/>
* Note that the first seqno expected is 1. This design is described in doc/design.NAKACK.txt
* <p/>
* Example:
* 1,2,3,5,6,8: low=1, highest_delivered=2 (or 3, depending on whether remove() was called !), highest_received=8
*
* @author Bela Ban
*/
public class NakReceiverWindow {
public interface Listener {
void missingMessageReceived(long seqno, Address original_sender);
void messageGapDetected(long from, long to, Address src);
}
private final ReadWriteLock lock=new ReentrantReadWriteLock();
Address local_addr=null;
private volatile boolean running=true;
/** Lowest seqno, modified on stable(). On stable(), we purge msgs [low digest.highest_delivered] */
@GuardedBy("lock")
private long low=0;
/** The highest delivered seqno, updated on remove(). The next message to be removed is highest_delivered + 1 */
@GuardedBy("lock")
private long highest_delivered=0;
/** The highest received message, updated on add (if a new message is added, not updated e.g. if a missing msg
* was received) */
@GuardedBy("lock")
private long highest_received=0;
/**
* ConcurrentMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers
*/
private final RetransmitTable xmit_table;
/**
* Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
* messages are removed from NakReceiverWindow.xmit_table and moved to NakReceiverWindow.delivered_msgs, where
* they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
* received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
* around, and don't need to wait for garbage collection to remove them.
*/
private boolean discard_delivered_msgs=false;
private final AtomicBoolean processing=new AtomicBoolean(false);
/** if not set, no retransmitter thread will be started. Useful if
* protocols do their own retransmission (e.g PBCAST) */
private Retransmitter retransmitter=null;
private Listener listener=null;
protected static final Log log=LogFactory.getLog(NakReceiverWindow.class);
/** The highest stable() seqno received */
long highest_stability_seqno=0;
/** The loss rate (70% of the new value and 30% of the old value) */
private double smoothed_loss_rate=0.0;
/**
* Creates a new instance with the given retransmit command
*
* @param sender The sender associated with this instance
* @param cmd The command used to retransmit a missing message, will
* be invoked by the table. If null, the retransmit thread will not be started
* @param highest_delivered_seqno The next seqno to remove is highest_delivered_seqno +1
* @param lowest_seqno The low seqno purged
* @param sched the external scheduler to use for retransmission
* requests of missing msgs. If it's not provided or is null, an internal
*/
public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long highest_delivered_seqno,
long lowest_seqno, TimeScheduler sched) {
this(null, sender, cmd, highest_delivered_seqno, lowest_seqno, sched);
}
public NakReceiverWindow(Address local_addr, Address sender, Retransmitter.RetransmitCommand cmd,
long highest_delivered_seqno, long lowest_seqno, TimeScheduler sched) {
this(local_addr, sender, cmd, highest_delivered_seqno, lowest_seqno, sched, true);
}
public NakReceiverWindow(Address local_addr, Address sender, Retransmitter.RetransmitCommand cmd,
long highest_delivered_seqno, long lowest_seqno, TimeScheduler sched,
boolean use_range_based_retransmitter) {
this.local_addr=local_addr;
highest_delivered=highest_delivered_seqno;
highest_received=highest_delivered;
low=Math.min(lowest_seqno, highest_delivered);
if(sched == null)
throw new IllegalStateException("timer has to be provided and cannot be null");
if(cmd != null)
retransmitter=use_range_based_retransmitter?
new RangeBasedRetransmitter(sender, cmd, sched) :
new DefaultRetransmitter(sender, cmd, sched);
xmit_table=new RetransmitTable(5, 10000, low);
}
/**
* Creates a new instance with the given retransmit command
*
* @param sender The sender associated with this instance
* @param cmd The command used to retransmit a missing message, will
* be invoked by the table. If null, the retransmit thread will not be started
* @param highest_delivered_seqno The next seqno to remove is highest_delivered_seqno +1
* @param sched the external scheduler to use for retransmission
* requests of missing msgs. If it's not provided or is null, an internal
*/
public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long highest_delivered_seqno, TimeScheduler sched) {
this(sender, cmd, highest_delivered_seqno, 0, sched);
}
public AtomicBoolean getProcessing() {
return processing;
}
public void setRetransmitTimeouts(Interval timeouts) {
retransmitter.setRetransmitTimeouts(timeouts);
}
public void setDiscardDeliveredMessages(boolean flag) {
this.discard_delivered_msgs=flag;
}
@Deprecated
public int getMaxXmitBufSize() {
return 0;
}
@Deprecated
public void setMaxXmitBufSize(int max_xmit_buf_size) {
}
public void setListener(Listener l) {
this.listener=l;
}
public int getPendingXmits() {
return retransmitter!= null? retransmitter.size() : 0;
}
/**
* Returns the loss rate, which is defined as the number of pending retransmission requests / the total number of
* messages in xmit_table
* @return The loss rate
*/
public double getLossRate() {
int total_msgs=size();
int pending_xmits=getPendingXmits();
if(pending_xmits == 0 || total_msgs == 0)
return 0.0;
return pending_xmits / (double)total_msgs;
}
public double getSmoothedLossRate() {
return smoothed_loss_rate;
}
/** Set the new smoothed_loss_rate value to 70% of the new value and 30% of the old value */
private void setSmoothedLossRate() {
double new_loss_rate=getLossRate();
if(smoothed_loss_rate == 0) {
smoothed_loss_rate=new_loss_rate;
}
else {
smoothed_loss_rate=smoothed_loss_rate * .3 + new_loss_rate * .7;
}
}
public int getRetransmiTableSize() {return xmit_table.size();}
public int getRetransmitTableCapacity() {return xmit_table.capacity();}
/**
* Adds a message according to its seqno (sequence number).
* <p>
* There are 4 cases where messages are added:
* <ol>
* <li>seqno is the next to be expected seqno: added to map
* <li>seqno is <= highest_delivered: discard as we've already delivered it
* <li>seqno is smaller than the next expected seqno: missing message, add it
* <li>seqno is greater than the next expected seqno: add it to map and fill the gaps with null messages
* for retransmission. Add the seqno to the retransmitter too
* </ol>
* @return True if the message was added successfully, false otherwise (e.g. duplicate message)
*/
public boolean add(final long seqno, final Message msg) {
long old_next, next_to_add;
int num_xmits=0;
lock.writeLock().lock();
try {
if(!running)
return false;
next_to_add=highest_received +1;
old_next=next_to_add;
// Case #1: we received the expected seqno: most common path
if(seqno == next_to_add) {
xmit_table.put(seqno, msg);
return true;
}
// Case #2: we received a message that has already been delivered: discard it
if(seqno <= highest_delivered) {
if(log.isTraceEnabled())
log.trace("seqno " + seqno + " is smaller than " + next_to_add + "); discarding message");
return false;
}
// Case #3: we finally received a missing message. Case #2 handled seqno <= highest_delivered, so this
// seqno *must* be between highest_delivered and next_to_add
if(seqno < next_to_add) {
Message existing=xmit_table.putIfAbsent(seqno, msg);
if(existing != null)
return false; // key/value was present
num_xmits=retransmitter.remove(seqno);
if(log.isTraceEnabled())
log.trace(new StringBuilder("added missing msg ").append(msg.getSrc()).append('#').append(seqno));
return true;
}
// Case #4: we received a seqno higher than expected: add to Retransmitter
if(seqno > next_to_add) {
xmit_table.put(seqno, msg);
retransmitter.add(old_next, seqno -1); // BUT: add only null messages to xmitter
if(listener != null) {
try {listener.messageGapDetected(next_to_add, seqno, msg.getSrc());} catch(Throwable t) {}
}
return true;
}
}
finally {
highest_received=Math.max(highest_received, seqno);
lock.writeLock().unlock();
}
if(listener != null && num_xmits > 0) {
try {listener.missingMessageReceived(seqno, msg.getSrc());} catch(Throwable t) {}
}
return true;
}
public Message remove() {
return remove(true);
}
public Message remove(boolean acquire_lock) {
Message retval;
if(acquire_lock)
lock.writeLock().lock();
try {
long next_to_remove=highest_delivered +1;
retval=xmit_table.get(next_to_remove);
if(retval != null) { // message exists and is ready for delivery
if(discard_delivered_msgs) {
Address sender=retval.getSrc();
if(!local_addr.equals(sender)) { // don't remove if we sent the message !
xmit_table.remove(next_to_remove);
}
}
highest_delivered=next_to_remove;
return retval;
}
return null;
}
finally {
if(acquire_lock)
lock.writeLock().unlock();
}
}
/**
* Removes as many messages as possible
* @return List<Message> A list of messages, or null if no available messages were found
*/
public List<Message> removeMany(final AtomicBoolean processing) {
return removeMany(processing, 0);
}
public List<Message> removeMany(final AtomicBoolean processing, int max_results) {
return removeMany(processing, false, max_results);
}
/**
* Removes as many messages as possible
* @param discard_own_msgs Removes messages from xmit_table even if we sent it
* @param max_results Max number of messages to remove in one batch
* @return List<Message> A list of messages, or null if no available messages were found
*/
public List<Message> removeMany(final AtomicBoolean processing, boolean discard_own_msgs, int max_results) {
List<Message> retval=null;
int num_results=0;
lock.writeLock().lock();
try {
while(true) {
long next_to_remove=highest_delivered +1;
Message msg=xmit_table.get(next_to_remove);
if(msg != null) { // message exists and is ready for delivery
if(discard_delivered_msgs) {
Address sender=msg.getSrc();
if(discard_own_msgs || !local_addr.equals(sender)) { // don't remove if we sent the message !
xmit_table.remove(next_to_remove);
}
}
highest_delivered=next_to_remove;
if(retval == null)
retval=new LinkedList<Message>();
retval.add(msg);
if(max_results <= 0 || ++num_results < max_results)
continue;
}
if((retval == null || retval.isEmpty()) && processing != null)
processing.set(false);
return retval;
}
}
finally {
lock.writeLock().unlock();
}
}
/**
* Delete all messages <= seqno (they are stable, that is, have been received at all members).
* Stop when a number > seqno is encountered (all messages are ordered on seqnos).
*/
public void stable(long seqno) {
lock.writeLock().lock();
try {
if(seqno > highest_delivered) {
if(log.isWarnEnabled())
log.warn("seqno " + seqno + " is > highest_delivered (" + highest_delivered + ";) ignoring stability message");
return;
}
// we need to remove all seqnos *including* seqno
if(!xmit_table.isEmpty())
xmit_table.purge(seqno);
// remove all seqnos below seqno from retransmission
for(long i=low; i <= seqno; i++) {
retransmitter.remove(i);
}
highest_stability_seqno=Math.max(highest_stability_seqno, seqno);
low=Math.max(low, seqno);
}
finally {
lock.writeLock().unlock();
}
}
/**
* Destroys the NakReceiverWindow. After this method returns, no new messages can be added and a new
* NakReceiverWindow should be used instead. Note that messages can still be <em>removed</em> though.
*/
public void destroy() {
lock.writeLock().lock();
try {
running=false;
retransmitter.reset();
xmit_table.clear();
low=0;
highest_delivered=0; // next (=first) to deliver will be 1
highest_received=0;
highest_stability_seqno=0;
}
finally {
lock.writeLock().unlock();
}
}
/** Returns the lowest, highest delivered and highest received seqnos */
public long[] getDigest() {
lock.readLock().lock();
try {
long[] retval=new long[3];
retval[0]=low;
retval[1]=highest_delivered;
retval[2]=highest_received;
return retval;
}
finally {
lock.readLock().unlock();
}
}
/**
* @return the lowest sequence number of a message that has been
* delivered or is a candidate for delivery (by the next call to
* <code>remove()</code>)
*/
public long getLowestSeen() {
lock.readLock().lock();
try {
return low;
}
finally {
lock.readLock().unlock();
}
}
/** Returns the highest sequence number of a message <em>consumed</em> by the application (by <code>remove()</code>).
* Note that this is different from the highest <em>deliverable</em> seqno. E.g. in 23,24,26,27,29, the highest
* <em>delivered</em> message may be 22, whereas the highest <em>deliverable</em> message may be 24 !
* @return the highest sequence number of a message consumed by the
* application (by <code>remove()</code>)
*/
public long getHighestDelivered() {
lock.readLock().lock();
try {
return highest_delivered;
}
finally {
lock.readLock().unlock();
}
}
public long setHighestDelivered(long new_val) {
lock.writeLock().lock();
try {
long retval=highest_delivered;
highest_delivered=new_val;
return retval;
}
finally {
lock.writeLock().unlock();
}
}
/**
* Returns the highest sequence number received so far (which may be
* higher than the highest seqno <em>delivered</em> so far; e.g., for
* 1,2,3,5,6 it would be 6.
*
* @see NakReceiverWindow#getHighestDelivered
*/
public long getHighestReceived() {
lock.readLock().lock();
try {
return highest_received;
}
finally {
lock.readLock().unlock();
}
}
/**
* Returns the message from xmit_table
* @param seqno
* @return Message from xmit_table
*/
public Message get(long seqno) {
lock.readLock().lock();
try {
return xmit_table.get(seqno);
}
finally {
lock.readLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try {
return xmit_table.size();
}
finally {
lock.readLock().unlock();
}
}
public String toString() {
lock.readLock().lock();
try {
return printMessages();
}
finally {
lock.readLock().unlock();
}
}
/**
* Prints xmit_table. Requires read lock to be present
* @return String
*/
protected String printMessages() {
StringBuilder sb=new StringBuilder();
lock.readLock().lock();
try {
sb.append('[').append(low).append(" : ").append(highest_delivered).append(" (").append(highest_received).append(")");
if(xmit_table != null && !xmit_table.isEmpty()) {
int non_received=xmit_table.getNullMessages(highest_received);
sb.append(" (size=").append(xmit_table.size()).append(", missing=").append(non_received).
append(", highest stability=").append(highest_stability_seqno).append(')');
}
sb.append(']');
return sb.toString();
}
finally {
lock.readLock().unlock();
}
}
public String printLossRate() {
StringBuilder sb=new StringBuilder();
int num_missing=getPendingXmits();
int num_received=size();
int total=num_missing + num_received;
sb.append("total=").append(total).append(" (received=").append(num_received).append(", missing=")
.append(num_missing).append("), loss rate=").append(getLossRate())
.append(", smoothed loss rate=").append(smoothed_loss_rate);
return sb.toString();
}
public String printRetransmitStats() {
return retransmitter instanceof RangeBasedRetransmitter? ((RangeBasedRetransmitter)retransmitter).printStats() : "n/a";
}
/* ------------------------------- Private Methods -------------------------------------- */
/* --------------------------- End of Private Methods ----------------------------------- */
}