Permalink
Browse files

first version that scored a new best on perf.Test !

  • Loading branch information...
1 parent c1aaa9e commit f8ca3075f225ccf77e8de1d8d0d8cb5b451c84e6 Bela Ban committed Dec 7, 2010
Showing with 40 additions and 20 deletions.
  1. +8 −16 src/org/jgroups/stack/NakReceiverWindow.java
  2. +32 −4 src/org/jgroups/util/RetransmitTable.java
@@ -8,12 +8,11 @@
import org.jgroups.annotations.GuardedBy;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
+import org.jgroups.util.RetransmitTable;
import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -77,7 +76,7 @@
/**
* ConcurrentMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers
*/
- private final Map<Long,Message> xmit_table=Util.createHashMap();
+ private final RetransmitTable xmit_table=new RetransmitTable(20, 50000, 1);
/**
* Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
@@ -260,9 +259,9 @@ public boolean add(final long seqno, final Message msg) {
// Case #3: we finally received a missing message. Case #2 handled seqno <= highest_delivered, so this
// seqno *must* be between highest_delivered and next_to_add
if(seqno < next_to_add) {
- if(xmit_table.containsKey(seqno))
+ Message existing=xmit_table.putIfAbsent(seqno, msg);
+ if(existing != null)
return false; // key/value was present
- xmit_table.put(seqno, msg); // only set message if not yet received (bela July 23 2003)
num_xmits=retransmitter.remove(seqno);
if(log.isTraceEnabled())
log.trace(new StringBuilder("added missing msg ").append(msg.getSrc()).append('#').append(seqno));
@@ -396,11 +395,9 @@ public void stable(long seqno) {
}
// we need to remove all seqnos *including* seqno
- if(!xmit_table.isEmpty()) {
- for(long i=low; i <= seqno; i++) {
- xmit_table.remove(i);
- }
- }
+ if(!xmit_table.isEmpty())
+ xmit_table.purge(seqno);
+
// remove all seqnos below seqno from retransmission
for(long i=low; i <= seqno; i++) {
retransmitter.remove(i);
@@ -565,12 +562,7 @@ protected String printMessages() {
try {
sb.append('[').append(low).append(" : ").append(highest_delivered).append(" (").append(highest_received).append(")");
if(xmit_table != null && !xmit_table.isEmpty()) {
- int non_received=0;
-
- for(Map.Entry<Long,Message> entry: xmit_table.entrySet()) {
- if(entry.getValue() == null)
- non_received++;
- }
+ int non_received=xmit_table.getNullMessages(highest_delivered, highest_received);
sb.append(" (size=").append(xmit_table.size()).append(", missing=").append(non_received).
append(", highest stability=").append(highest_stability_seqno).append(')');
}
@@ -15,7 +15,9 @@
* @author Bela Ban
*/
public class RetransmitTable {
- protected int msgs_per_row;
+ protected final int num_rows;
+ protected final int msgs_per_row;
+ protected final long original_offset;
protected Message[][] matrix;
@@ -30,8 +32,9 @@ public RetransmitTable() {
}
public RetransmitTable(int num_rows, int msgs_per_row, long offset) {
+ this.num_rows=num_rows;
this.msgs_per_row=msgs_per_row;
- this.offset=offset;
+ this.offset=this.original_offset=offset;
matrix=new Message[num_rows][];
}
@@ -45,7 +48,13 @@ public boolean put(long seqno, Message msg) {
return putIfAbsent(seqno, msg) == null;
}
-
+ /**
+ * Adds a message if the element at the given index is null. Returns null if no message existed at the given index,
+ * else returns the existing message and doesn't set the element.
+ * @param seqno
+ * @param msg
+ * @return The existing message, or null if there wasn't any
+ */
public Message putIfAbsent(long seqno, Message msg) {
int[] row_and_index=computeRowAndIndex(seqno);
Message[] row=getRow(row_and_index[0]);
@@ -66,7 +75,8 @@ public Message get(long seqno) {
}
- public Message remove(long seqno) {
+ /** Removes the message with seqno from the table, nulls the index */
+ public Message remove(long seqno) { // todo: purge if we can !
int[] row_and_index=computeRowAndIndex(seqno);
Message[] row=getRow(row_and_index[0]);
Message existing_msg=row[row_and_index[1]];
@@ -77,6 +87,12 @@ public Message remove(long seqno) {
return existing_msg;
}
+ public void clear() {
+ matrix=new Message[num_rows][];
+ size=0;
+ offset=original_offset;
+ }
+
/**
* Removes all messages less than or equal to seqno from the table. Adjusts offset and moves rows down by the
@@ -124,6 +140,18 @@ public int computeSize() {
return retval;
}
+ /** Returns the number of null elements in range [start .. end] */
+ public int getNullMessages(long start, long end) {
+ int retval=0;
+ for(long i=start; i <= end; i++) {
+ int[] row_and_index=computeRowAndIndex(i);
+ Message[] row=matrix[row_and_index[0]];
+ if(row != null && row[row_and_index[1]] == null)
+ retval++;
+ }
+ return retval;
+ }
+
public String toString() {
StringBuilder sb=new StringBuilder();

0 comments on commit f8ca307

Please sign in to comment.