From de01267622295534ed09269e10e2b13e7fbe6c2c Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Fri, 26 Feb 2010 09:02:03 +0000 Subject: [PATCH] ns --- src/org/jgroups/stack/AckReceiverWindow2.java | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 src/org/jgroups/stack/AckReceiverWindow2.java diff --git a/src/org/jgroups/stack/AckReceiverWindow2.java b/src/org/jgroups/stack/AckReceiverWindow2.java new file mode 100644 index 00000000000..dd0b22a732b --- /dev/null +++ b/src/org/jgroups/stack/AckReceiverWindow2.java @@ -0,0 +1,216 @@ +package org.jgroups.stack; + + +import org.jgroups.Message; +import org.jgroups.util.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; + + +/** + * Counterpart of AckSenderWindow. Simple FIFO buffer. + * Every message received is ACK'ed (even duplicates) and added to a hashmap + * keyed by seqno. The next seqno to be received is stored in next_to_remove. When a message with + * a seqno less than next_to_remove is received, it will be discarded. The remove() method removes + * and returns a message whose seqno is equal to next_to_remove, or null if not found.
+ * Change May 28 2002 (bela): replaced TreeSet with HashMap. Keys do not need to be sorted, and adding a key to + * a sorted set incurs overhead. + * + * @author Bela Ban + * @version $Id: AckReceiverWindow2.java,v 1.1 2010/02/26 09:02:03 belaban Exp $ + */ +public class AckReceiverWindow2 { + private final AtomicLong next_to_remove; + private final AtomicBoolean processing=new AtomicBoolean(false); + private final ConcurrentMap segments=new ConcurrentHashMap(); + private final int segment_capacity; + private long highest_segment_created=0; + + static final Message TOMBSTONE=new Message(true); + + + public AckReceiverWindow2(long initial_seqno, int segment_capacity) { + next_to_remove=new AtomicLong(initial_seqno); + this.segment_capacity=segment_capacity; + long index=next_to_remove.get() / segment_capacity; + long first_seqno=(next_to_remove.get() / segment_capacity) * segment_capacity; + this.segments.put(index, new Segment(first_seqno, segment_capacity)); + Segment initial_segment=findOrCreateSegment(next_to_remove.get()); + for(long i=0; i < next_to_remove.get(); i++) { + initial_segment.add(i, TOMBSTONE); + } + } + + public AtomicBoolean getProcessing() { + return processing; + } + + + + /** Adds a new message. Message cannot be null + * @return True if the message was added, false if not (e.g. duplicate, message was already present) + */ + public boolean add(long seqno, Message msg) { + return add2(seqno, msg) == 1; + } + + + /** + * Adds a message if not yet received + * @param seqno + * @param msg + * @return -1 if not added because seqno < next_to_remove, 0 if not added because already present, + * 1 if added successfully + */ + public byte add2(long seqno, Message msg) { + Segment segment=findOrCreateSegment(seqno); + if(segment == null) + return -1; + return segment.add(seqno, msg); + } + + + /** + * Removes a message whose seqno is equal to next_to_remove, increments the latter. Returns message + * that was removed, or null, if no message can be removed. Messages are thus removed in order. + */ + public Message remove() { + long next=next_to_remove.get(); + Segment segment=findOrCreateSegment(next); + if(segment == null) + return null; + Message retval=segment.remove(next); + if(retval != null) + next_to_remove.compareAndSet(next, next +1); + return retval; + } + + + + /** + * Removes as many messages as possible (in sequence, without gaps) + * @param max Max number of messages to be removed + * @return Tuple,Long>: a tuple of the message list and the highest seqno removed + */ + public Tuple,Long> removeMany(int max) { + List list=new LinkedList(); // we remove msgs.size() messages *max* + Tuple,Long> retval=new Tuple,Long>(list, 0L); + + return null; + } + + + public Message removeOOBMessage() { + return null; + } + + /** + * Removes as many OOB messages as possible and return the highest seqno + * @return the highest seqno or -1 if no OOB message was found + */ + public long removeOOBMessages() { + return -1; + } + + + public boolean hasMessagesToRemove() { + return false; + } + + + public void reset() { + } + + public int size() { + return 0; + } + + public String toString() { + StringBuilder sb=new StringBuilder(); + + return sb.toString(); + } + + + public String printDetails() { + StringBuilder sb=new StringBuilder(); + + return sb.toString(); + } + + private Segment findOrCreateSegment(long seqno) { + long index=seqno / segment_capacity; + if(index > highest_segment_created) { + long start_seqno=seqno / segment_capacity * segment_capacity; + Segment segment=new Segment(start_seqno, segment_capacity); + Segment tmp=segments.putIfAbsent(index, segment); + if(tmp != null) // segment already exists + segment=tmp; + else + highest_segment_created=index; + return segment; + } + + return segments.get(index); + } + + + private static class Segment { + final long start_index; // e.g. 5000. Then seqno 5100 would be at index 100 + final int capacity; + final AtomicReferenceArray array; + final AtomicInteger count=new AtomicInteger(0); // counts the numbers of non-empty elements (also tombstones) + + public Segment(long start_index, int capacity) { + this.start_index=start_index; + this.capacity=capacity; + this.array=new AtomicReferenceArray(capacity); + } + + public byte add(long seqno, Message msg) { + int index=index(seqno); + if(index < 0) + return -1; + boolean success=array.compareAndSet(index, null, msg); + if(success) { + count.incrementAndGet(); + return 1; + } + else + return 0; + } + + public Message remove(long seqno) { + int index=index(seqno); + Message retval=array.get(index); + if(retval != null && array.compareAndSet(index, retval, TOMBSTONE)) + return retval; + return null; + } + + public String toString() { + return start_index + " - " + (start_index + capacity -1) + " (" + count + " elements set)"; + } + + private int index(long seqno) { + if(seqno < start_index) + return -1; + + int index=(int)(seqno - start_index); + if(index < 0 || index >= capacity) { + // todo: replace with returning -1 + throw new IndexOutOfBoundsException("index=" + index + ", start_index=" + start_index + ", seqno=" + seqno); + } + return index; + } + + } + +} \ No newline at end of file