Permalink
Browse files

Merged branch JGRP-1133.xmit_table.compact onto JGRP-1133.xmit_table

  • Loading branch information...
2 parents aa3753f + dfc44e5 commit 4ff1a45b95a7753cd36c1fafb8fb88b1b00cebf4 Bela Ban committed Dec 20, 2010
@@ -53,11 +53,29 @@
protected long max_bytes=10000000;
@Property(description="Max number of milliseconds before a stability message is sent to the sender(s)")
- protected long stable_interval=10000L;
+ protected long stable_interval=60000L;
@Property(description="Max number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid")
protected int max_stable_msgs=5;
+ @Property(description="Number of rows of the matrix in the retransmission table (only for experts)",writable=false)
+ int xmit_table_num_rows=5;
+
+ @Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). " +
+ "The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row",writable=false)
+ int xmit_table_msgs_per_row=10000;
+
+ @Property(description="Resize factor of the matrix in the retransmission table (only for experts)",writable=false)
+ double xmit_table_resize_factor=1.2;
+
+ @Property(description="Number of milliseconds after which the matrix in the retransmission table " +
+ "is compacted (only for experts)",writable=false)
+ long xmit_table_max_compaction_time=10 * 60 * 1000;
+
+ @Property(description="If enabled, the removal of a message from the retransmission table causes an " +
+ "automatic purge (only for experts)",writable=false)
+ boolean xmit_table_automatic_purging=true;
+
/* --------------------------------------------- JMX ---------------------------------------------- */
@@ -185,6 +203,38 @@ public int getNumberOfMessagesInReceiveWindows() {
return num;
}
+
+ @ManagedOperation(description="Returns the sizes of all NakReceiverWindow.RetransmitTables")
+ public String printRetransmitTableSizes() {
+ StringBuilder sb=new StringBuilder();
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ sb.append(entry.getKey() + ": ").append(win.getRetransmiTableSize())
+ .append(" (capacity=" + win.getRetransmitTableCapacity())
+ .append(", fill factor=" + win.getRetransmitTableFillFactor() + "%)\n");
+ }
+ return sb.toString();
+ }
+
+
+ @ManagedOperation(description="Compacts the retransmission tables")
+ public void compact() {
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ win.compact();
+ }
+ }
+
+ @ManagedOperation(description="Purges highes delivered messages and compacts the retransmission tables")
+ public void purgeAndCompact() {
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue().received_msgs;
+ win.stable(win.getHighestDelivered());
+ win.compact();
+ }
+ }
+
+
public void resetStats() {
num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_xmits=0;
}
@@ -218,6 +268,8 @@ public void init() throws Exception {
super.init();
if(max_stable_msgs < 1)
throw new IllegalArgumentException("max_stable_msgs ( " + max_stable_msgs + ") must be > 0");
+ if(max_bytes <= 0)
+ throw new IllegalArgumentException("max_bytes has to be > 0");
}
public void start() throws Exception {
@@ -448,6 +500,14 @@ protected void sendStableMessage(Address dest, long low, long high) {
log.trace(sb.toString());
}
down_prot.down(new Event(Event.MSG, stable_msg));
+
+ ReceiverEntry entry=recv_table.get(dest);
+ NakReceiverWindow win=entry != null? entry.received_msgs : null;
+ if(win != null) {
+ //System.out.println("[" + local_addr + "] stable(" + dest + ", hd=" + win.getHighestDelivered() + "): " +
+ // "win: " + win);
+ win.stable(win.getHighestDelivered());
+ }
}
@@ -585,7 +645,7 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
num_msgs_received++;
num_bytes_received+=msg.getLength();
- if(added && max_bytes > 0) {
+ if(added) {
int bytes=entry.received_bytes.addAndGet(msg.getLength());
if(bytes >= max_bytes) {
entry.received_bytes_lock.lock();
@@ -650,7 +710,10 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
private ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, long conn_id) {
- NakReceiverWindow win=new NakReceiverWindow(sender, this, seqno-1, seqno-1, timer);
+ NakReceiverWindow win=new NakReceiverWindow(sender, this, seqno-1, seqno-1, timer, true,
+ xmit_table_num_rows, xmit_table_msgs_per_row,
+ xmit_table_resize_factor, xmit_table_max_compaction_time,
+ xmit_table_automatic_purging);
ReceiverEntry entry=new ReceiverEntry(win, conn_id, max_stable_msgs);
ReceiverEntry entry2=recv_table.putIfAbsent(sender, entry);
if(entry2 != null)
@@ -136,6 +136,20 @@
@Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
private boolean log_not_found_msgs=true;
+ @Property(description="Number of rows of the matrix in the retransmission table (only for experts)",writable=false)
+ int xmit_table_num_rows=5;
+
+ @Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). " +
+ "The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row",writable=false)
+ int xmit_table_msgs_per_row=10000;
+
+ @Property(description="Resize factor of the matrix in the retransmission table (only for experts)",writable=false)
+ double xmit_table_resize_factor=1.2;
+
+ @Property(description="Number of milliseconds after which the matrix in the retransmission table " +
+ "is compacted (only for experts)",writable=false)
+ long xmit_table_max_compaction_time=10 * 60 * 1000;
+
/* -------------------------------------------------- JMX ---------------------------------------------------------- */
@@ -479,13 +493,22 @@ public String printRetransmitTableSizes() {
for(Map.Entry<Address,NakReceiverWindow> entry: xmit_table.entrySet()) {
NakReceiverWindow win=entry.getValue();
sb.append(entry.getKey() + ": ").append(win.getRetransmiTableSize())
- .append(" (capacity=" + win.getRetransmitTableCapacity() + ", purges=" + win.getRetransmitTablePurges() +
- ", resizes=" + win.getRetransmitTableResizings() + "\n");
+ .append(" (capacity=" + win.getRetransmitTableCapacity())
+ .append(", fill factor=" + win.getRetransmitTableFillFactor() + "%)\n");
}
return sb.toString();
}
+ @ManagedOperation(description="Compacts the retransmission tables")
+ public void compact() {
+ for(Map.Entry<Address,NakReceiverWindow> entry: xmit_table.entrySet()) {
+ NakReceiverWindow win=entry.getValue();
+ win.compact();
+ }
+ }
+
+
@ManagedAttribute
public double getAverageLossRate() {
double retval=0.0;
@@ -501,16 +524,16 @@ public double getAverageLossRate() {
@ManagedAttribute
public double getAverageSmoothedLossRate() {
- double retval=0.0;
- int count=0;
- if(xmit_table.isEmpty())
- return 0.0;
- for(NakReceiverWindow win: xmit_table.values()) {
- retval+=win.getSmoothedLossRate();
- count++;
- }
- return retval / (double)count;
+ double retval=0.0;
+ int count=0;
+ if(xmit_table.isEmpty())
+ return 0.0;
+ for(NakReceiverWindow win: xmit_table.values()) {
+ retval+=win.getSmoothedLossRate();
+ count++;
}
+ return retval / (double)count;
+ }
public Vector<Integer> providedUpServices() {
@@ -1288,7 +1311,9 @@ private void setDigest(Digest digest, boolean merge) {
private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno, long lowest_seqno) {
- NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, lowest_seqno, timer, true);
+ NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, lowest_seqno, timer, true,
+ xmit_table_num_rows, xmit_table_msgs_per_row,
+ xmit_table_resize_factor, xmit_table_max_compaction_time, false);
if(use_stats_for_retransmission) {
win.setRetransmitTimeouts(new ActualInterval(sender));
@@ -115,6 +115,16 @@ public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, lo
public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd,
long highest_delivered_seqno, long lowest_seqno, TimeScheduler sched,
boolean use_range_based_retransmitter) {
+ this(sender, cmd, highest_delivered_seqno, lowest_seqno, sched, use_range_based_retransmitter,
+ 5, 10000, 1.2, 5 * 60 * 1000, false);
+ }
+
+
+ public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd,
+ long highest_delivered_seqno, long lowest_seqno, TimeScheduler sched,
+ boolean use_range_based_retransmitter,
+ int num_rows, int msgs_per_row, double resize_factor, long max_compaction_time,
+ boolean automatic_purging) {
highest_delivered=highest_delivered_seqno;
highest_received=highest_delivered;
low=Math.min(lowest_seqno, highest_delivered);
@@ -125,7 +135,7 @@ public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd,
new RangeBasedRetransmitter(sender, cmd, sched) :
new DefaultRetransmitter(sender, cmd, sched);
- xmit_table=new RetransmitTable(5, 10000, low);
+ xmit_table=new RetransmitTable(num_rows, msgs_per_row, low, resize_factor, max_compaction_time, automatic_purging);
}
@@ -208,9 +218,11 @@ private void setSmoothedLossRate() {
public int getRetransmitTableCapacity() {return xmit_table.capacity();}
- public int getRetransmitTableResizings() {return xmit_table.getResizings();}
+ public double getRetransmitTableFillFactor() {return xmit_table.getFillFactor();}
- public int getRetransmitTablePurges() {return xmit_table.getPurges();}
+ public void compact() {
+ xmit_table.compact();
+ }
/**
@@ -373,8 +385,7 @@ public void stable(long seqno) {
}
// we need to remove all seqnos *including* seqno
- if(!xmit_table.isEmpty())
- xmit_table.purge(seqno);
+ xmit_table.purge(seqno);
// remove all seqnos below seqno from retransmission
for(long i=low; i <= seqno; i++) {
Oops, something went wrong.

0 comments on commit 4ff1a45

Please sign in to comment.