Permalink
Browse files

draft of RetransmissionTable

  • Loading branch information...
1 parent 95feacc commit beb465680bc13f4b6965697a65629c5c864d13b2 Bela Ban committed Dec 7, 2010
@@ -0,0 +1,113 @@
+package org.jgroups.util;
+
+import org.jgroups.Message;
+
+/**
+ * A store for messages to be retransmitted or delivered. Used on sender and receiver side, as a replacement for
+ * HashMap. RetransmitTable should use less memory than HashMap, as HashMap.Entry has 4 fields, plus arrays for storage.
+ * <p/>
+ * RetransmitTable maintains a matrix (an array of arrays) of messages. Messages are stored in the matrix by mapping
+ * their seqno to an index. E.g. when we have 10 rows of 1000 messages each, and first_seqno is 3000, then a message with
+ * seqno=5600, will be stored in the 3rd row, at index 600.
+ * <p/>
+ * Rows are removed when all messages in that row have been received.<p/>
+ * This class in not synchronized; the caller has to make sure access to it is synchronized
+ * @author Bela Ban
+ */
+public class RetransmitTable {
+ protected int msgs_per_row;
+
+ protected Message[][] matrix;
+
+ /** The first seqno, at matrix[0][0] */
+ protected int offset;
+
+ protected volatile int size=0;
+
+
+ public RetransmitTable() {
+ this(10, 50000);
+ }
+
+ public RetransmitTable(int num_rows, int msgs_per_row) {
+ this.msgs_per_row=msgs_per_row;
+ matrix=new Message[num_rows][];
+ }
+
+ /**
+ * Adds a new message to the index computed as a function of seqno
+ * @param seqno
+ * @param msg
+ * @return True if the element at the computed index was null, else false
+ */
+ public boolean add(long seqno, Message msg) {
+ int[] row_and_index=computeRowAndIndex(seqno);
+ if(row_and_index == null)
+ return false;
+ Message[] row=getRow(row_and_index[0]);
+ if(row[row_and_index[1]] == null) {
+ row[row_and_index[1]]=msg;
+ size++;
+ return true;
+ }
+ return false;
+ }
+
+ public Message get(long seqno) {
+ int[] row_and_index=computeRowAndIndex(seqno);
+ if(row_and_index == null)
+ return null;
+ Message[] row=getRow(row_and_index[0]);
+ return row[row_and_index[1]]; // todo: what do we do when we have an index out of bound exception ?
+ }
+
+ /** Returns the total capacity in the matrix */
+ public int capacity() {
+ return matrix.length * msgs_per_row;
+ }
+
+ /** Returns the numbers of messages in the table */
+ public int size() {
+ return size;
+ }
+
+
+ /**
+ * Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
+ * @param index
+ * @return A row
+ */
+ protected Message[] getRow(int index) {
+ if(index >= matrix.length)
+ resize(index +1);
+ Message[] row=matrix[index];
+ if(row == null) {
+ row=new Message[msgs_per_row];
+ matrix[index]=row;
+ }
+ return row;
+ }
+
+ /** Resizes the matrix to the new size */
+ protected void resize(int new_capacity) {
+ Message[][] new_matrix=new Message[new_capacity][];
+ System.arraycopy(matrix, 0, new_matrix, 0, matrix.length);
+ matrix=new_matrix;
+ }
+
+ /** Computes and returns the row index and the index within that row for seqno */
+ protected int[] computeRowAndIndex(long seqno) {
+ if(seqno < offset)
+ return null;
+ int[] retval=new int[2];
+ int row_index=(int)(((seqno- offset) / msgs_per_row));
+ int index=(int)(seqno - offset) % msgs_per_row;
+ retval[0]=row_index;
+ retval[1]=index;
+ return retval;
+ }
+
+
+
+}
+
@@ -0,0 +1,67 @@
+package org.jgroups.tests;
+
+import org.jgroups.Global;
+import org.jgroups.Message;
+import org.jgroups.util.RetransmitTable;
+import org.testng.annotations.Test;
+
+/** Tests {@link org.jgroups.util.RetransmitTable}
+ * @author Bela Ban
+ */
+@Test(groups=Global.FUNCTIONAL,sequential=false)
+public class RetransmitTableTest {
+ static final Message MSG=new Message(null, null, "test");
+
+ public static void testCreation() {
+ RetransmitTable table=new RetransmitTable(3, 10);
+ assert table.size() == 0;
+ assert table.get(15) == null;
+ }
+
+
+ public static void testAddition() {
+ RetransmitTable table=new RetransmitTable(3, 10);
+ addAndGet(table, 0, "0");
+ addAndGet(table, 1, "1");
+ addAndGet(table, 5, "5");
+ addAndGet(table, 9, "9");
+ addAndGet(table, 10, "10");
+ addAndGet(table, 11, "11");
+ addAndGet(table, 19, "19");
+ addAndGet(table, 20, "20");
+ addAndGet(table, 29, "29");
+ System.out.println("table: " + table);
+ assert table.size() == 9;
+ assert table.capacity() == 30;
+ }
+
+ public static void testDuplicateAddition() {
+ RetransmitTable table=new RetransmitTable(3, 10);
+ addAndGet(table, 0, "0");
+ addAndGet(table, 1, "1");
+ addAndGet(table, 5, "5");
+ addAndGet(table, 9, "9");
+ addAndGet(table, 10, "10");
+
+ assert !table.add(5, new Message());
+ assert table.get(5).getObject().equals("5");
+ assert table.size() == 5;
+ }
+
+ public static void testResize() {
+ RetransmitTable table=new RetransmitTable(3, 10);
+ assert table.capacity() == 30;
+ addAndGet(table, 30, "30");
+ addAndGet(table, 35, "35");
+ assert table.capacity() == 40;
+ }
+
+
+ protected static void addAndGet(RetransmitTable table, long seqno, String message) {
+ boolean added=table.add(seqno, new Message(null, null, message));
+ assert added;
+ Message msg=table.get(seqno);
+ assert msg != null && msg.getObject().equals(message);
+ }
+
+}

0 comments on commit beb4656

Please sign in to comment.