Permalink
Browse files

Using a BoundedHashMap in ForwardQueue (https://issues.jboss.org/brow…

  • Loading branch information...
1 parent 7832007 commit 49ba2863978fbe072bb5d3917e787a4c3208d28d Bela Ban committed Mar 14, 2013
Showing with 9 additions and 23 deletions.
  1. +9 −23 src/org/jgroups/util/ForwardQueue.java
@@ -6,10 +6,11 @@
import org.jgroups.logging.Log;
import org.jgroups.stack.Protocol;
-import java.util.*;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -26,9 +27,6 @@
protected Address local_addr;
- /** The address to send messages to (e.g. the coordinator in {@link org.jgroups.protocols.FORWARD_TO_COORD}) */
- // protected volatile Address target;
-
/** Maintains messages forwarded to the target which which no ack has been received yet.
* Needs to be sorted so we can resend them in the right order */
protected final NavigableMap<Long,Message> forward_table=new ConcurrentSkipListMap<Long,Message>();
@@ -46,7 +44,7 @@
protected final AtomicInteger in_flight_sends=new AtomicInteger(0);
// Maintains received seqnos, so we can weed out dupes
- protected final ConcurrentMap<Address,NavigableSet<Long>> delivery_table=Util.createConcurrentMap();
+ protected final ConcurrentMap<Address,BoundedHashMap<Long,Long>> delivery_table=Util.createConcurrentMap();
protected volatile Flusher flusher;
@@ -82,7 +80,7 @@ public ForwardQueue(Log log) {
/** Total size of all queues of the delivery table */
public int deliveryTableSize() {
int retval=0;
- for(Set<Long> val: delivery_table.values())
+ for(BoundedHashMap<Long,Long> val: delivery_table.values())
retval+=val.size();
return retval;
}
@@ -236,26 +234,14 @@ protected void flushMessagesInForwardTable(Address target) {
* Note that this method is never called concurrently for the same sender.
*/
protected boolean canDeliver(Address sender, long seqno) {
- NavigableSet<Long> seqno_set=delivery_table.get(sender);
+ BoundedHashMap<Long,Long> seqno_set=delivery_table.get(sender);
if(seqno_set == null) {
- seqno_set=new ConcurrentSkipListSet<Long>();
- NavigableSet<Long> existing=delivery_table.put(sender,seqno_set);
+ seqno_set=new BoundedHashMap<Long,Long>(delivery_table_max_size);
+ BoundedHashMap<Long,Long> existing=delivery_table.put(sender,seqno_set);
if(existing != null)
seqno_set=existing;
}
-
- boolean added=seqno_set.add(seqno);
- int size=seqno_set.size();
- if(size > delivery_table_max_size) {
- // trim the seqno_set to delivery_table_max_size elements by removing the first N seqnos
-
- // iteration: very bad !!!
- for(int i=0; i < size - delivery_table_max_size; i++) {
- if(seqno_set.pollFirst() == null)
- break;
- }
- }
- return added;
+ return seqno_set.add(seqno, seqno);
}
protected void block() {

0 comments on commit 49ba286

Please sign in to comment.