Permalink
Browse files

first draft of new compaction mode - haven't tested yet

  • Loading branch information...
1 parent aa3753f commit fac65a29a7123667cf3ffad1529766e4238e1a77 Bela Ban committed Dec 15, 2010
@@ -479,8 +479,7 @@ public String printRetransmitTableSizes() {
for(Map.Entry<Address,NakReceiverWindow> entry: xmit_table.entrySet()) {
NakReceiverWindow win=entry.getValue();
sb.append(entry.getKey() + ": ").append(win.getRetransmiTableSize())
- .append(" (capacity=" + win.getRetransmitTableCapacity() + ", purges=" + win.getRetransmitTablePurges() +
- ", resizes=" + win.getRetransmitTableResizings() + "\n");
+ .append(" (capacity=" + win.getRetransmitTableCapacity() + ")");
}
return sb.toString();
}
@@ -208,10 +208,6 @@ private void setSmoothedLossRate() {
public int getRetransmitTableCapacity() {return xmit_table.capacity();}
- public int getRetransmitTableResizings() {return xmit_table.getResizings();}
-
- public int getRetransmitTablePurges() {return xmit_table.getPurges();}
-
/**
* Adds a message according to its seqno (sequence number).
@@ -23,9 +23,22 @@
/** The first seqno, at matrix[0][0] */
protected long offset;
- protected volatile int size=0;
- protected volatile int num_resizes=0;
- protected volatile int num_purges=0;
+ protected int size=0;
+
+ /** The highest seqno purged */
+ protected long highest_seqno_purged;
+
+ /** The highest seqno in the table */
+ protected long highest_seqno;
+
+ /** Time (in ms) after which a compaction should take place. 0 disables compaction */
+ protected long max_compaction_time=2 * 60 * 1000L;
+
+ /** The time when the last compaction took place. If a {@link #compact(long)} takes place and sees that the
+ * last compaction is more than max_compaction_time ms ago, a compaction will take place */
+ protected long last_compaction_timestamp=0;
+
+
/** Returns the total capacity in the matrix */
@@ -34,10 +47,6 @@
/** Returns the numbers of messages in the table */
public int size() {return size;}
- public int getResizings() {return num_resizes;}
-
- public int getPurges() {return num_purges;}
-
public boolean isEmpty() {return size <= 0;}
@@ -54,7 +63,7 @@ public RetransmitTable(int num_rows, int msgs_per_row, long offset, double resiz
this.num_rows=num_rows;
this.msgs_per_row=msgs_per_row;
this.resize_factor=resize_factor;
- this.offset=offset;
+ this.offset=this.highest_seqno_purged=this.highest_seqno=offset;
matrix=new Message[num_rows][];
if(resize_factor <= 1)
throw new IllegalArgumentException("resize_factor needs to be > 1");
@@ -79,14 +88,18 @@ public boolean put(long seqno, Message msg) {
*/
public Message putIfAbsent(long seqno, Message msg) {
int row_index=computeRow(seqno);
- if(row_index >= matrix.length)
- resize(row_index +1);
+ if(row_index >= matrix.length) {
+ resize(seqno);
+ row_index=computeRow(seqno);
+ }
Message[] row=getRow(row_index);
int index=computeIndex(seqno);
Message existing_msg=row[index];
if(existing_msg == null) {
row[index]=msg;
size++;
+ if(seqno > highest_seqno)
+ highest_seqno=seqno;
return null;
}
else
@@ -126,8 +139,8 @@ public Message remove(long seqno) {
* it is not used anymore after returning */
public void clear() {
matrix=new Message[num_rows][];
- size=num_resizes=num_purges=0;
- offset=1;
+ size=0;
+ offset=highest_seqno_purged=highest_seqno=1;
}
@@ -137,18 +150,103 @@ public void clear() {
* of individually removing them with remove().
* @param seqno
*/
+// public void purge(long seqno) {
+// long diff=seqno - offset;
+// if(diff < msgs_per_row)
+// return;
+// int num_rows_to_remove=(int)(diff / msgs_per_row);
+// System.arraycopy(matrix, num_rows_to_remove, matrix, 0, matrix.length - num_rows_to_remove);
+// for(int i=matrix.length - num_rows_to_remove; i < matrix.length; i++)
+// matrix[i]=null;
+//
+// offset+=(num_rows_to_remove * msgs_per_row);
+// size=computeSize();
+// num_purges++;
+// }
+
+
+ /**
+ * Removes all messages less than or equal to seqno from the table. Does this by nulling entire rows in the matrix
+ * and nulling all elements < index(seqno) of the first row that cannot be removed
+ * @param seqno
+ */
public void purge(long seqno) {
- long diff=seqno - offset;
- if(diff < msgs_per_row)
- return;
- int num_rows_to_remove=(int)(diff / msgs_per_row);
- System.arraycopy(matrix, num_rows_to_remove, matrix, 0, matrix.length - num_rows_to_remove);
- for(int i=matrix.length - num_rows_to_remove; i < matrix.length; i++)
+ int num_rows_to_remove=(int)(seqno - offset / msgs_per_row);
+ for(int i=0; i < num_rows_to_remove; i++) // Null all rows which can be fully removed
matrix[i]=null;
- offset+=(num_rows_to_remove * msgs_per_row);
+ int row_index=computeRow(seqno);
+ if(row_index < 0 || row_index >= matrix.length)
+ return;
+
+ Message[] row=matrix[row_index];
+ if(row != null) {
+ int index=computeIndex(seqno);
+ for(int i=0; i <= index; i++) // null all messages up to and including seqno in the given row
+ row[i]=null;
+ }
+ size=computeSize();
+ if(seqno > highest_seqno_purged)
+ highest_seqno_purged=seqno;
+
+ // see if compaction should be triggered
+ if(max_compaction_time <= 0)
+ return;
+
+ long current_time=System.currentTimeMillis();
+ if(last_compaction_timestamp > 0) {
+ if(current_time - last_compaction_timestamp >= max_compaction_time) {
+ compact();
+ last_compaction_timestamp=current_time;
+ }
+ }
+ else
+ last_compaction_timestamp=current_time;
+ }
+
+
+
+ /** Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes
+ * a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place) */
+ protected void resize(long seqno) {
+ int num_rows_to_purge=(int)((highest_seqno_purged - offset) / msgs_per_row);
+ int row_index=computeRow(seqno) - num_rows_to_purge;
+
+ int new_size=Math.max(row_index +1, matrix.length);
+
+ if(new_size > matrix.length) {
+ Message[][] new_matrix=new Message[new_size][];
+ System.arraycopy(matrix, num_rows_to_purge, new_matrix, 0, matrix.length - num_rows_to_purge);
+ matrix=new_matrix;
+ }
+ else if(num_rows_to_purge > 0) {
+ System.arraycopy(matrix, num_rows_to_purge, matrix, 0, matrix.length - num_rows_to_purge);
+ }
+
+ offset+=(num_rows_to_purge * msgs_per_row);
size=computeSize();
- num_purges++;
+ }
+
+ /**
+ * Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The
+ * capacity of the matrix should be size * resize_factor
+ */
+ public void compact() {
+ int num_rows_to_purge=(int)((highest_seqno_purged - offset) / msgs_per_row);
+
+ int num_unused_rows=(int)((highest_seqno - highest_seqno_purged) / msgs_per_row);
+
+ int new_size=(int)((matrix.length - num_rows_to_purge - num_unused_rows) * resize_factor);
+
+ int alt_new_size=(int)(size * resize_factor); // ???
+
+ assert new_size == alt_new_size;
+
+ if(new_size < matrix.length) {
+ Message[][] new_matrix=new Message[new_size][];
+ System.arraycopy(matrix, num_rows_to_purge, new_matrix, 0, matrix.length - num_rows_to_purge - num_unused_rows);
+ matrix=new_matrix;
+ }
}
@@ -248,13 +346,15 @@ public String dumpMatrix() {
}
/** Resizes the matrix to the new size */
- protected void resize(int new_capacity) {
- int new_size=(int)Math.max(new_capacity, matrix.length * resize_factor);
- Message[][] new_matrix=new Message[new_size][];
- System.arraycopy(matrix, 0, new_matrix, 0, matrix.length);
- matrix=new_matrix;
- num_resizes++;
- }
+// protected void resize(int new_capacity) {
+// int new_size=(int)Math.max(new_capacity, matrix.length * resize_factor);
+// Message[][] new_matrix=new Message[new_size][];
+// System.arraycopy(matrix, 0, new_matrix, 0, matrix.length);
+// matrix=new_matrix;
+// num_resizes++;
+// }
+
+

0 comments on commit fac65a2

Please sign in to comment.