Permalink
Browse files

- Removed SeqnoTable and SeqnoTableTest

- No need to order messages as they are already ordered by virtue of the coord sending them FIFO (https://issues.jboss.org/browse/JGRP-1461)
  • Loading branch information...
1 parent 58de897 commit d3486c10599b23dc12eb265a68ea6e491bbb501c @belaban committed May 10, 2012
@@ -6,10 +6,10 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.stack.Protocol;
-import org.jgroups.util.SeqnoTable;
import org.jgroups.util.Util;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,20 +24,19 @@
private Address local_addr=null, coord=null;
private final Collection<Address> members=new ArrayList<Address>();
private volatile boolean is_coord=false;
- private AtomicLong seqno=new AtomicLong(0);
+ private AtomicLong seqno=new AtomicLong(1);
/** Maintains messages forwarded to the coord which which no ack has been received yet.
- * Needs to be sorted so we resend them in the right order
+ * Needs to be sorted so we resend them in the right order
*/
- private final Map<Long,byte[]> forward_table=new TreeMap<Long,byte[]>();
+ private final Map<Long,byte[]> forward_table=new TreeMap<Long,byte[]>();
- /** Map<Address, seqno>: maintains the highest seqnos seen for a given member */
- private final SeqnoTable received_table=new SeqnoTable();
- private long forwarded_msgs=0;
- private long bcast_msgs=0;
- private long received_forwards=0;
- private long received_bcasts=0;
+ protected long forwarded_msgs=0;
+ protected long bcast_msgs=0;
+ protected long received_forwards=0;
+ protected long received_bcasts=0;
+ protected long delivered_bcasts=0;
@ManagedAttribute
public boolean isCoordinator() {return is_coord;}
@@ -52,18 +51,22 @@
@ManagedAttribute
public long getReceivedBroadcasts() {return received_bcasts;}
+ @ManagedAttribute(description="Number of messages in the forward-table")
+ public int getForwardTableSize() {return forward_table.size();}
+
@ManagedOperation
public void resetStats() {
- forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=0L;
+ forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=delivered_bcasts=0L;
}
@ManagedOperation
public Map<String,Object> dumpStats() {
Map<String,Object> m=super.dumpStats();
- m.put("forwarded", new Long(forwarded_msgs));
- m.put("broadcast", new Long(bcast_msgs));
- m.put("received_forwards", new Long(received_forwards));
- m.put("received_bcasts", new Long(received_bcasts));
+ m.put("forwarded", forwarded_msgs);
+ m.put("broadcast", bcast_msgs);
+ m.put("received_forwards", received_forwards);
+ m.put("received_bcasts", received_bcasts);
+ m.put("delivered_bcasts", delivered_bcasts);
return m;
}
@@ -89,8 +92,6 @@ public Object down(Event evt) {
broadcast(msg, false); // don't copy, just use the message passed as argument
}
else {
- // SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, next_seqno);
- // msg.putHeader(this.id, hdr);
forwardToCoord(msg, next_seqno);
}
return null; // don't pass down
@@ -181,8 +182,6 @@ private void handleViewChange(View v) {
if(coord_changed) {
resendMessagesInForwardTable(); // maybe optimize in the future: broadcast directly if coord
}
- // remove left members from received_table
- received_table.retainAll(mbrs);
}
private void handleSuspect(Address suspected_mbr) {
@@ -226,9 +225,8 @@ private void resendMessagesInForwardTable() {
SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, key);
forward_msg.putHeader(this.id, hdr);
- if (log.isTraceEnabled()) {
- log.trace("resending msg " + local_addr + "::" + key + " to coord (" + coord + ")");
- }
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": resending " + local_addr + "::" + key + " to coord " + coord);
down_prot.down(new Event(Event.MSG, forward_msg));
}
}
@@ -238,7 +236,7 @@ private void forwardToCoord(final Message msg, long seqno) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
if(log.isTraceEnabled())
- log.trace("forwarding msg " + msg + " (seqno " + seqno + ") to coord (" + coord + ")");
+ log.trace(local_addr + ": forwarding " + local_addr + "::" + seqno + " to coord " + coord);
byte[] marshalled_msg;
try {
@@ -271,7 +269,7 @@ private void broadcast(final Message msg, boolean copy) {
}
if(log.isTraceEnabled())
- log.trace("broadcasting msg " + bcast_msg + " (seqno " + hdr.getSeqno() + ")");
+ log.trace(local_addr + ": broadcasting " + hdr.getOriginalSender() + "::" + hdr.getSeqno());
down_prot.down(new Event(Event.MSG, bcast_msg));
bcast_msgs++;
@@ -286,13 +284,17 @@ private void unwrapAndDeliver(final Message msg) {
SequencerHeader hdr=(SequencerHeader)msg.getHeader(this.id);
Message msg_to_deliver=(Message)Util.objectFromByteBuffer(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
long msg_seqno=hdr.getSeqno();
- if(!canDeliver(msg_to_deliver.getSrc(), msg_seqno))
- return;
+ Address sender=msg_to_deliver.getSrc();
+ if(sender.equals(local_addr)) {
+ synchronized(forward_table) {
+ forward_table.remove(msg_seqno);
+ }
+ }
if(log.isTraceEnabled())
- log.trace("delivering msg " + msg_to_deliver + " (seqno " + msg_seqno +
- "), original sender " + msg_to_deliver.getSrc());
+ log.trace(local_addr + ": delivering " + hdr.getOriginalSender() + "::" + msg_seqno);
up_prot.up(new Event(Event.MSG, msg_to_deliver));
+ delivered_bcasts++;
}
catch(Exception e) {
log.error("failure unmarshalling buffer", e);
@@ -304,37 +306,23 @@ private void deliver(Message msg, Event evt, SequencerHeader hdr) {
Address sender=msg.getSrc();
if(sender == null) {
if(log.isErrorEnabled())
- log.error("sender is null, cannot deliver msg " + msg);
+ log.error(local_addr + ": sender is null, cannot deliver " + sender + "::" + hdr.getSeqno());
return;
}
long msg_seqno=hdr.getSeqno();
- if(!canDeliver(sender, msg_seqno))
- return;
- if(log.isTraceEnabled())
- log.trace("delivering msg " + msg + " (seqno " + msg_seqno + "), sender " + sender);
- up_prot.up(evt);
- }
-
-
- private boolean canDeliver(Address sender, long seqno) {
- // this is the ack for the message sent by myself
if(sender.equals(local_addr)) {
synchronized(forward_table) {
- forward_table.remove(seqno);
+ forward_table.remove(msg_seqno);
}
}
-
- // if msg was already delivered, discard it
- boolean added=received_table.add(sender, seqno);
- if(!added) {
- if(log.isWarnEnabled())
- log.warn("seqno (" + sender + "::" + seqno + " has already been received " +
- "(highest received=" + received_table.getHighestReceived(sender) +
- "); discarding duplicate message");
- }
- return added;
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": delivering " + sender + "::" + msg_seqno);
+ up_prot.up(evt);
+ delivered_bcasts++;
}
+
+
/* ----------------------------- End of Private Methods -------------------------------- */
@@ -1,106 +0,0 @@
-package org.jgroups.util;
-
-import org.jgroups.Address;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Maintains the highest received and highest delivered seqno per member
- * @author Bela Ban
- */
-public class SeqnoTable {
- private final ConcurrentMap<Address, Entry> map=Util.createConcurrentMap();
-
- public long getHighestReceived(Address member) {
- Entry entry=map.get(member);
- return entry != null? entry.getHighestReceived() : -1;
- }
-
- public long getNextToReceive(Address member) {
- Entry entry=map.get(member);
- return entry != null? entry.getNextToReceive() : -1;
- }
-
- public boolean add(Address member, long seqno) {
- Entry entry=map.get(member);
- if(entry == null) {
- entry=new Entry(seqno);
- Entry entry2=map.putIfAbsent(member, entry);
- if(entry2 != null)
- entry=entry2;
- }
- // now entry is not null
- return entry.add(seqno);
- }
-
- public void remove(Address member) {
- map.remove(member);
- }
-
- public boolean retainAll(Collection<Address> members) {
- return map.keySet().retainAll(members);
- }
-
- public void clear() {
- map.clear();
- }
-
- public String toString() {
- return map.toString();
- }
-
-
- private static class Entry {
- long highest_received;
- long next_to_receive;
- final Set<Long> seqnos=new HashSet<Long>();
-
- private Entry(long initial_seqno) {
- this.next_to_receive=this.highest_received=initial_seqno;
- }
-
- public synchronized long getHighestReceived() {
- return highest_received;
- }
-
- public synchronized long getNextToReceive() {
- return next_to_receive;
- }
-
- public synchronized boolean add(long seqno) {
- try {
- if(seqno == next_to_receive) {
- next_to_receive++;
- while(true) {
- if(seqnos.remove(next_to_receive)) {
- next_to_receive++;
- }
- else
- break;
- }
- return true;
- }
-
- if(seqno < next_to_receive)
- return false;
-
- // seqno > next_to_receive
- return seqnos.add(seqno);
- }
- finally {
- highest_received=Math.max(highest_received, seqno);
- }
- }
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- sb.append(next_to_receive).append(" - ").append(highest_received);
- if(!seqnos.isEmpty())
- sb.append(" ").append(seqnos);
- return sb.toString();
- }
- }
-}
Oops, something went wrong. Retry.

7 comments on commit d3486c1

@dimbleby

This fix goes further than I'd expected. What now prevents messages that are re-sent from being delivered twice?

(SEQUENCER currently re-sends messages at change of coordinator, and per JGRP-1449 I even have a pull request outstanding that adds a re-transmit task).

@belaban
Owner

We're maintaining the next message to be delivered and increment it on reception of a matching message, so this should suppress duplicates. Can you think of a use case where this doesn't work ?

@dimbleby

I must be missing something (or we're at cross-purposes).

I don't see any replacement for the code that checked whether the sequence number on an upwards message (in either unwrapAndDeliver() or deliver()) is one that has already been received.

If that's true, then the case that ought to be broken is at a change of coordinator, where the old coordinator had managed to deliver a broadcast message to some members but not others. Then the original sender will call resendMessagesInForwardTable(), and there's nothing to stop some members from receiving the same message twice.

@belaban
Owner

Oops, you're right !
However, the seqnos could be delivered in the order of 5,7,6 if sent by multiple threads, so we cannot keep track of the lowest seqno... Perhaps a simple set of the last N seqnos should be added (similar to SeqnoTable, but simpler), which stores the last N seqnos (N is configurable). We only need to check for dupes, not for order, anyway...

@belaban
Owner

OK, so I think I finally fixed this. Can you take a look and see if this works for you ?

@dimbleby

Thanks - for this and all the other fixes too. I think that the latest change looks good.

I'll leave some tests running for a while. It'll take a while before I'm completely happy that I can't hit more problems; but if you don't hear more from me on this then that'll mean that all is going well. (No news is good news!)

@belaban
Owner

OK. I hope to look at your other 2 SEQUENCER related issues soon, too

Please sign in to comment.