Permalink
Browse files

added purge() method

  • Loading branch information...
1 parent beb4656 commit 74c8e738d67f3f6dfc6e1325fee29d7cef5a79ba Bela Ban committed Dec 7, 2010
@@ -20,17 +20,18 @@
protected Message[][] matrix;
/** The first seqno, at matrix[0][0] */
- protected int offset;
+ protected long offset;
protected volatile int size=0;
public RetransmitTable() {
- this(10, 50000);
+ this(10, 50000, 0);
}
- public RetransmitTable(int num_rows, int msgs_per_row) {
+ public RetransmitTable(int num_rows, int msgs_per_row, long offset) {
this.msgs_per_row=msgs_per_row;
+ this.offset=offset;
matrix=new Message[num_rows][];
}
@@ -40,38 +41,141 @@ public RetransmitTable(int num_rows, int msgs_per_row) {
* @param msg
* @return True if the element at the computed index was null, else false
*/
- public boolean add(long seqno, Message msg) {
+ public boolean put(long seqno, Message msg) {
+ return putIfAbsent(seqno, msg) == null;
+ }
+
+
+ public Message putIfAbsent(long seqno, Message msg) {
int[] row_and_index=computeRowAndIndex(seqno);
- if(row_and_index == null)
- return false;
Message[] row=getRow(row_and_index[0]);
- if(row[row_and_index[1]] == null) {
+ Message existing_msg=row[row_and_index[1]];
+ if(existing_msg == null) {
row[row_and_index[1]]=msg;
size++;
- return true;
+ return null;
}
- return false;
+ else
+ return existing_msg;
}
public Message get(long seqno) {
int[] row_and_index=computeRowAndIndex(seqno);
- if(row_and_index == null)
- return null;
Message[] row=getRow(row_and_index[0]);
- return row[row_and_index[1]]; // todo: what do we do when we have an index out of bound exception ?
+ return row[row_and_index[1]];
}
- /** Returns the total capacity in the matrix */
- public int capacity() {
- return matrix.length * msgs_per_row;
+
+ public Message remove(long seqno) {
+ int[] row_and_index=computeRowAndIndex(seqno);
+ Message[] row=getRow(row_and_index[0]);
+ Message existing_msg=row[row_and_index[1]];
+ if(existing_msg != null) {
+ row[row_and_index[1]]=null;
+ size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
+ }
+ return existing_msg;
}
+
+ /**
+ * Removes all messages less than or equal to seqno from the table. Adjusts offset and moves rows down by the
+ * number of removed rows. This method should be used when a number of messages can be removed at once, instead
+ * of individually removing them with remove().
+ * @param seqno
+ */
+ public void purge(long seqno) {
+ long diff=seqno - offset;
+ if(diff < msgs_per_row)
+ return;
+ int num_rows_to_remove=(int)(diff / msgs_per_row);
+ System.arraycopy(matrix, num_rows_to_remove, matrix, 0, matrix.length - num_rows_to_remove);
+ for(int i=matrix.length - num_rows_to_remove; i < matrix.length; i++)
+ matrix[i]=null;
+
+ offset+=(num_rows_to_remove * msgs_per_row); // todo: is this correct ?
+ size=computeSize();
+ }
+
+
+ @Deprecated
+ public boolean containsKey(long seqno) {return get(seqno) != null;}
+
+ /** Returns the total capacity in the matrix */
+ public int capacity() {return matrix.length * msgs_per_row;}
+
/** Returns the numbers of messages in the table */
- public int size() {
- return size;
+ public int size() {return size;}
+
+ public boolean isEmpty() {return size <= 0;}
+
+ /** A more expensive way to compute the size, done by iterating through the entire table and adding up non-null values */
+ public int computeSize() {
+ int retval=0;
+ for(int i=0; i < matrix.length; i++) {
+ Message[] row=matrix[i];
+ if(row == null)
+ continue;
+ for(int j=0; j < row.length; j++) {
+ if(row[j] != null)
+ retval++;
+ }
+ }
+ return retval;
}
+ public String toString() {
+ StringBuilder sb=new StringBuilder();
+ sb.append("size=" + size + ", capacity=" + capacity());
+ return sb.toString();
+ }
+
+ /** Dumps the seqnos in the table as a list */
+ public String dump() {
+ StringBuilder sb=new StringBuilder();
+ boolean first=true;
+ for(int i=0; i < matrix.length; i++) {
+ Message[] row=matrix[i];
+ if(row == null)
+ continue;
+ for(int j=0; j < row.length; j++) {
+ if(row[j] != null) {
+ long seqno=offset + (i * msgs_per_row) + j;
+ if(first)
+ first=false;
+ else
+ sb.append(", ");
+ sb.append(seqno);
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Dumps the non-null in the table in a pseudo graphic way */
+ public String dumpMatrix() {
+ StringBuilder sb=new StringBuilder();
+ for(int i=0; i < matrix.length; i++) {
+ Message[] row=matrix[i];
+ sb.append(i + ": ");
+ if(row == null) {
+ sb.append("\n");
+ continue;
+ }
+ for(int j=0; j < row.length; j++) {
+ if(row[j] != null)
+ sb.append("* ");
+ else
+ sb.append(" ");
+ }
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+
+
/**
* Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
* @param index
@@ -97,8 +201,7 @@ protected void resize(int new_capacity) {
/** Computes and returns the row index and the index within that row for seqno */
protected int[] computeRowAndIndex(long seqno) {
- if(seqno < offset)
- return null;
+ assert seqno >= offset;
int[] retval=new int[2];
int row_index=(int)(((seqno- offset) / msgs_per_row));
int index=(int)(seqno - offset) % msgs_per_row;
@@ -107,6 +210,8 @@ protected void resize(int new_capacity) {
return retval;
}
+
+
}
@@ -13,14 +13,15 @@
static final Message MSG=new Message(null, null, "test");
public static void testCreation() {
- RetransmitTable table=new RetransmitTable(3, 10);
- assert table.size() == 0;
+ RetransmitTable table=new RetransmitTable(3, 10, 0);
+ int size=table.size();
+ assert size == 0;
assert table.get(15) == null;
}
public static void testAddition() {
- RetransmitTable table=new RetransmitTable(3, 10);
+ RetransmitTable table=new RetransmitTable(3, 10, 0);
addAndGet(table, 0, "0");
addAndGet(table, 1, "1");
addAndGet(table, 5, "5");
@@ -30,35 +31,93 @@ public static void testAddition() {
addAndGet(table, 19, "19");
addAndGet(table, 20, "20");
addAndGet(table, 29, "29");
- System.out.println("table: " + table);
+ System.out.println("table: " + table.dump());
assert table.size() == 9;
+ assert table.size() == table.computeSize();
assert table.capacity() == 30;
}
+
+ public static void testAdditionWithOffset() {
+ RetransmitTable table=new RetransmitTable(3, 10, 100);
+ addAndGet(table, 100, "100");
+ addAndGet(table, 101, "101");
+ addAndGet(table, 105, "105");
+ addAndGet(table, 109, "109");
+ addAndGet(table, 110, "110");
+ addAndGet(table, 111, "111");
+ addAndGet(table, 119, "119");
+ addAndGet(table, 120, "120");
+ addAndGet(table, 129, "129");
+ System.out.println("table: " + table.dump());
+ assert table.size() == 9;
+ assert table.capacity() == 30;
+ }
+
+
public static void testDuplicateAddition() {
- RetransmitTable table=new RetransmitTable(3, 10);
+ RetransmitTable table=new RetransmitTable(3, 10, 0);
addAndGet(table, 0, "0");
addAndGet(table, 1, "1");
addAndGet(table, 5, "5");
addAndGet(table, 9, "9");
addAndGet(table, 10, "10");
- assert !table.add(5, new Message());
+ assert !table.put(5, new Message());
assert table.get(5).getObject().equals("5");
assert table.size() == 5;
}
+
+ public static void testDumpMatrix() {
+ RetransmitTable table=new RetransmitTable(3, 10, 1);
+ long[] seqnos={1,3,5,7,9,12,14,16,18,20,21,22,23,24};
+ for(long seqno: seqnos)
+ table.put(seqno, MSG);
+ System.out.println("matrix:\n" + table.dumpMatrix());
+ }
+
+
+ public static void testMassAddition() {
+ RetransmitTable table=new RetransmitTable(3, 10, 0);
+ final int NUM_MSGS=10005;
+ final Message MSG=new Message(null, null, "hello world");
+ for(int i=0; i < NUM_MSGS; i++)
+ table.put(i, MSG);
+ System.out.println("table = " + table);
+ assert table.size() == NUM_MSGS;
+ assert table.capacity() == 10010;
+ }
+
public static void testResize() {
- RetransmitTable table=new RetransmitTable(3, 10);
+ RetransmitTable table=new RetransmitTable(3, 10, 0);
assert table.capacity() == 30;
addAndGet(table, 30, "30");
addAndGet(table, 35, "35");
assert table.capacity() == 40;
}
+ public static void testPurge() {
+ RetransmitTable table=new RetransmitTable(5, 10, 0);
+ for(long seqno=0; seqno <= 20; seqno++)
+ table.put(seqno, MSG);
+ table.put(40, MSG); table.put(45, MSG);
+ assert table.size() == 23;
+
+ for(long seqno=0; seqno <= 15; seqno++)
+ table.remove(seqno);
+
+ System.out.println("table: " + table.dump());
+ table.purge(15);
+ System.out.println("table: " + table.dump());
+
+ assert table.size() == 7;
+ }
+
+
protected static void addAndGet(RetransmitTable table, long seqno, String message) {
- boolean added=table.add(seqno, new Message(null, null, message));
+ boolean added=table.put(seqno, new Message(null, null, message));
assert added;
Message msg=table.get(seqno);
assert msg != null && msg.getObject().equals(message);

0 comments on commit 74c8e73

Please sign in to comment.