Permalink
Browse files

Merge branch JGRP-1133.xmit_table onto JGRP-1133

  • Loading branch information...
2 parents 95feacc + 4ff1a45 commit 3c51ce95e16d3b2420b9fdf4a1e7aaa9c95adef5 @belaban committed Dec 20, 2010
@@ -5,6 +5,7 @@
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
+import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Unsupported;
import org.jgroups.stack.Protocol;
@@ -29,6 +30,7 @@ public String toString() {
* Example of a protocol layer. Contains no real functionality, can be used as a template.
*/
@Unsupported
+@MBean(description="Sample protocol")
public class EXAMPLE extends Protocol {
final Vector<Address> members=new Vector<Address>();
@@ -53,11 +53,29 @@
protected long max_bytes=10000000;
@Property(description="Max number of milliseconds before a stability message is sent to the sender(s)")
- protected long stable_interval=10000L;
+ protected long stable_interval=60000L;
@Property(description="Max number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid")
protected int max_stable_msgs=5;
+ @Property(description="Number of rows of the matrix in the retransmission table (only for experts)",writable=false)
+ int xmit_table_num_rows=5;
+
+ @Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). " +
+ "The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row",writable=false)
+ int xmit_table_msgs_per_row=10000;
+
+ @Property(description="Resize factor of the matrix in the retransmission table (only for experts)",writable=false)
+ double xmit_table_resize_factor=1.2;
+
+ @Property(description="Number of milliseconds after which the matrix in the retransmission table " +
+ "is compacted (only for experts)",writable=false)
+ long xmit_table_max_compaction_time=10 * 60 * 1000;
+
+ @Property(description="If enabled, the removal of a message from the retransmission table causes an " +
+ "automatic purge (only for experts)",writable=false)
+ boolean xmit_table_automatic_purging=true;
+
/* --------------------------------------------- JMX ---------------------------------------------- */
@@ -185,6 +203,38 @@ public int getNumberOfMessagesInReceiveWindows() {
return num;
}
+
+ @ManagedOperation(description="Returns the sizes of all NakReceiverWindow.RetransmitTables")
+ public String printRetransmitTableSizes() {
+ StringBuilder sb=new StringBuilder();
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ sb.append(entry.getKey() + ": ").append(win.getRetransmiTableSize())
+ .append(" (capacity=" + win.getRetransmitTableCapacity())
+ .append(", fill factor=" + win.getRetransmitTableFillFactor() + "%)\n");
+ }
+ return sb.toString();
+ }
+
+
+ @ManagedOperation(description="Compacts the retransmission tables")
+ public void compact() {
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ win.compact();
+ }
+ }
+
+ @ManagedOperation(description="Purges highes delivered messages and compacts the retransmission tables")
+ public void purgeAndCompact() {
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ win.stable(win.getHighestDelivered());
+ win.compact();
+ }
+ }
+
+
public void resetStats() {
num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_xmits=0;
}
@@ -218,6 +268,8 @@ public void init() throws Exception {
super.init();
if(max_stable_msgs < 1)
throw new IllegalArgumentException("max_stable_msgs ( " + max_stable_msgs + ") must be > 0");
+ if(max_bytes <= 0)
+ throw new IllegalArgumentException("max_bytes has to be > 0");
}
public void start() throws Exception {
@@ -448,6 +500,14 @@ protected void sendStableMessage(Address dest, long low, long high) {
log.trace(sb.toString());
}
down_prot.down(new Event(Event.MSG, stable_msg));
+
+ ReceiverEntry entry=recv_table.get(dest);
+ NakReceiverWindow win=entry != null? entry.received_msgs : null;
+ if(win != null) {
+ //System.out.println("[" + local_addr + "] stable(" + dest + ", hd=" + win.getHighestDelivered() + "): " +
+ // "win: " + win);
+ win.stable(win.getHighestDelivered());
+ }
}
@@ -585,7 +645,7 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
num_msgs_received++;
num_bytes_received+=msg.getLength();
- if(added && max_bytes > 0) {
+ if(added) {
int bytes=entry.received_bytes.addAndGet(msg.getLength());
if(bytes >= max_bytes) {
entry.received_bytes_lock.lock();
@@ -650,8 +710,10 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
private ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, long conn_id) {
- NakReceiverWindow win=new NakReceiverWindow(local_addr, sender, this, seqno-1, seqno-1, timer);
- win.setDiscardDeliveredMessages(true);
+ NakReceiverWindow win=new NakReceiverWindow(sender, this, seqno-1, seqno-1, timer, true,
+ xmit_table_num_rows, xmit_table_msgs_per_row,
+ xmit_table_resize_factor, xmit_table_max_compaction_time,
+ xmit_table_automatic_purging);
ReceiverEntry entry=new ReceiverEntry(win, conn_id, max_stable_msgs);
ReceiverEntry entry2=recv_table.putIfAbsent(sender, entry);
if(entry2 != null)
@@ -136,6 +136,20 @@
@Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
private boolean log_not_found_msgs=true;
+ @Property(description="Number of rows of the matrix in the retransmission table (only for experts)",writable=false)
+ int xmit_table_num_rows=5;
+
+ @Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). " +
+ "The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row",writable=false)
+ int xmit_table_msgs_per_row=10000;
+
+ @Property(description="Resize factor of the matrix in the retransmission table (only for experts)",writable=false)
+ double xmit_table_resize_factor=1.2;
+
+ @Property(description="Number of milliseconds after which the matrix in the retransmission table " +
+ "is compacted (only for experts)",writable=false)
+ long xmit_table_max_compaction_time=10 * 60 * 1000;
+
/* -------------------------------------------------- JMX ---------------------------------------------------------- */
@@ -473,6 +487,28 @@ public String printLossRates() {
return sb.toString();
}
+ @ManagedOperation(description="Returns the sizes of all NakReceiverWindow.RetransmitTables")
+ public String printRetransmitTableSizes() {
+ StringBuilder sb=new StringBuilder();
+ for(Map.Entry<Address,NakReceiverWindow> entry: xmit_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue();
+ sb.append(entry.getKey() + ": ").append(win.getRetransmiTableSize())
+ .append(" (capacity=" + win.getRetransmitTableCapacity())
+ .append(", fill factor=" + win.getRetransmitTableFillFactor() + "%)\n");
+ }
+ return sb.toString();
+ }
+
+
+ @ManagedOperation(description="Compacts the retransmission tables")
+ public void compact() {
+ for(Map.Entry<Address,NakReceiverWindow> entry: xmit_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue();
+ win.compact();
+ }
+ }
+
+
@ManagedAttribute
public double getAverageLossRate() {
double retval=0.0;
@@ -488,16 +524,16 @@ public double getAverageLossRate() {
@ManagedAttribute
public double getAverageSmoothedLossRate() {
- double retval=0.0;
- int count=0;
- if(xmit_table.isEmpty())
- return 0.0;
- for(NakReceiverWindow win: xmit_table.values()) {
- retval+=win.getSmoothedLossRate();
- count++;
- }
- return retval / (double)count;
+ double retval=0.0;
+ int count=0;
+ if(xmit_table.isEmpty())
+ return 0.0;
+ for(NakReceiverWindow win: xmit_table.values()) {
+ retval+=win.getSmoothedLossRate();
+ count++;
}
+ return retval / (double)count;
+ }
public Vector<Integer> providedUpServices() {
@@ -815,11 +851,12 @@ private void handleMessage(Message msg, NakAckHeader hdr) {
return;
}
+ boolean remove_msgs=discard_delivered_msgs && !loopback;
boolean released_processing=false;
try {
while(true) {
// we're removing a msg and set processing to false (if null) *atomically* (wrt to add())
- List<Message> msgs=win.removeMany(processing, max_msg_batch_size);
+ List<Message> msgs=win.removeMany(processing, remove_msgs, max_msg_batch_size);
if(msgs == null || msgs.isEmpty()) {
released_processing=true;
return;
@@ -1082,10 +1119,15 @@ private void rebroadcastMessages() {
if(my_entry == null)
continue;
their_high=their_entry.getHighest();
- my_high=my_entry.getHighest();
+
+ // Cannot ask for 0 to be retransmitted because the first seqno in NAKACK and UNICAST(2) is always 1 !
+ // Also, we need to ask for retransmission of my_high+1, because we already *have* my_high, and don't
+ // need it, so the retransmission range is [my_high+1 .. their_high]: *exclude* my_high, but *include*
+ // their_high
+ my_high=Math.max(1, my_entry.getHighest() +1);
if(their_high > my_high) {
if(log.isTraceEnabled())
- log.trace("sending XMIT request to " + sender + " for messages " + my_high + " - " + their_high);
+ log.trace("[" + local_addr + "] fetching " + my_high + "-" + their_high + " from " + sender);
retransmit(my_high, their_high, sender, true); // use multicast to send retransmit request
xmitted=true;
}
@@ -1197,8 +1239,8 @@ private void overwriteDigest(Digest digest) {
win.setHighestDelivered(highest_delivered_seqno);
continue; // don't destroy my own window
}
- win.destroy(); // stops retransmission
xmit_table.remove(sender);
+ win.destroy(); // stops retransmission
}
win=createNakReceiverWindow(sender, highest_delivered_seqno, low_seqno);
xmit_table.put(sender, win);
@@ -1243,8 +1285,8 @@ private void setDigest(Digest digest, boolean merge) {
|| win.getHighestDelivered() >= highest_delivered_seqno) // my seqno is >= digest's seqno for sender
continue;
- win.destroy(); // stops retransmission
xmit_table.remove(sender);
+ win.destroy(); // stops retransmission
if(sender.equals(local_addr)) { // Adjust the seqno: https://jira.jboss.org/browse/JGRP-1251
seqno_lock.lock();
try {
@@ -1269,8 +1311,9 @@ private void setDigest(Digest digest, boolean merge) {
private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno, long lowest_seqno) {
- NakReceiverWindow win=new NakReceiverWindow(local_addr, sender, this, initial_seqno, lowest_seqno, timer,
- use_range_based_retransmitter);
+ NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, lowest_seqno, timer, true,
+ xmit_table_num_rows, xmit_table_msgs_per_row,
+ xmit_table_resize_factor, xmit_table_max_compaction_time, false);
if(use_stats_for_retransmission) {
win.setRetransmitTimeouts(new ActualInterval(sender));
Oops, something went wrong. Retry.

0 comments on commit 3c51ce9

Please sign in to comment.