Skip to content
Browse files

- Removed SeqnoTable/Test

- No need to order messages via seqnoTable, as they are already ordered (https://issues.jboss.org/browse/JGRP-1461)
  • Loading branch information...
1 parent d4bf6a0 commit a6e726f6de360373cc785d8bd6ea32a39a718203 @belaban committed May 10, 2012
View
103 src/org/jgroups/protocols/SEQUENCER.java
@@ -6,37 +6,37 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.stack.Protocol;
-import org.jgroups.util.SeqnoTable;
import org.jgroups.util.Util;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
/**
- * Implementation of total order protocol using a sequencer. Consult doc/design/SEQUENCER.txt for details
+ * Implementation of total order protocol using a sequencer.
+ * Consult <a href="https://github.com/belaban/JGroups/blob/master/doc/design/SEQUENCER.txt">SEQUENCER.txt</a> for details
* @author Bela Ban
*/
@MBean(description="Implementation of total order protocol using a sequencer")
public class SEQUENCER extends Protocol {
private Address local_addr=null, coord=null;
private final Collection<Address> members=new ArrayList<Address>();
private volatile boolean is_coord=false;
- private AtomicLong seqno=new AtomicLong(0);
+ private AtomicLong seqno=new AtomicLong(1);
/** Maintains messages forwarded to the coord which which no ack has been received yet.
- * Needs to be sorted so we resend them in the right order
+ * Needs to be sorted so we resend them in the right order
*/
- private final Map<Long,byte[]> forward_table=new TreeMap<Long,byte[]>();
+ private final Map<Long,byte[]> forward_table=new TreeMap<Long,byte[]>();
- /** Map<Address, seqno>: maintains the highest seqnos seen for a given member */
- private final SeqnoTable received_table=new SeqnoTable(0);
- private long forwarded_msgs=0;
- private long bcast_msgs=0;
- private long received_forwards=0;
- private long received_bcasts=0;
+ protected long forwarded_msgs=0;
+ protected long bcast_msgs=0;
+ protected long received_forwards=0;
+ protected long received_bcasts=0;
+ protected long delivered_bcasts=0;
@ManagedAttribute
public boolean isCoordinator() {return is_coord;}
@@ -51,28 +51,32 @@
@ManagedAttribute
public long getReceivedBroadcasts() {return received_bcasts;}
+ @ManagedAttribute(description="Number of messages in the forward-table")
+ public int getForwardTableSize() {return forward_table.size();}
+
@ManagedOperation
public void resetStats() {
- forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=0L;
+ forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=delivered_bcasts=0L;
}
@ManagedOperation
public Map<String,Object> dumpStats() {
- Map<String,Object> m=super.dumpStats();
- m.put("forwarded", new Long(forwarded_msgs));
- m.put("broadcast", new Long(bcast_msgs));
- m.put("received_forwards", new Long(received_forwards));
- m.put("received_bcasts", new Long(received_bcasts));
+ Map<String,Object> m=super.dumpStats();
+ m.put("forwarded", forwarded_msgs);
+ m.put("broadcast", bcast_msgs);
+ m.put("received_forwards", received_forwards);
+ m.put("received_bcasts", received_bcasts);
+ m.put("delivered_bcasts", delivered_bcasts);
return m;
}
@ManagedOperation
public String printStats() {
return dumpStats().toString();
- }
+ }
+
-
public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
@@ -88,8 +92,6 @@ public Object down(Event evt) {
broadcast(msg, false); // don't copy, just use the message passed as argument
}
else {
- // SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, next_seqno);
- // msg.putHeader(this.id, hdr);
forwardToCoord(msg, next_seqno);
}
return null; // don't pass down
@@ -180,16 +182,14 @@ private void handleViewChange(View v) {
if(coord_changed) {
resendMessagesInForwardTable(); // maybe optimize in the future: broadcast directly if coord
}
- // remove left members from received_table
- received_table.retainAll(mbrs);
}
private void handleSuspect(Address suspected_mbr) {
boolean coord_changed=false;
if(suspected_mbr == null)
return;
-
+
synchronized(this) {
List<Address> non_suspected_mbrs=new ArrayList<Address>(members);
non_suspected_mbrs.remove(suspected_mbr);
@@ -225,9 +225,8 @@ private void resendMessagesInForwardTable() {
SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, key);
forward_msg.putHeader(this.id, hdr);
- if (log.isTraceEnabled()) {
- log.trace("resending msg " + local_addr + "::" + key + " to coord (" + coord + ")");
- }
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": resending " + local_addr + "::" + key + " to coord " + coord);
down_prot.down(new Event(Event.MSG, forward_msg));
}
}
@@ -237,7 +236,7 @@ private void forwardToCoord(final Message msg, long seqno) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
if(log.isTraceEnabled())
- log.trace("forwarding msg " + msg + " (seqno " + seqno + ") to coord (" + coord + ")");
+ log.trace(local_addr + ": forwarding " + local_addr + "::" + seqno + " to coord " + coord);
byte[] marshalled_msg;
try {
@@ -270,7 +269,7 @@ private void broadcast(final Message msg, boolean copy) {
}
if(log.isTraceEnabled())
- log.trace("broadcasting msg " + bcast_msg + " (seqno " + hdr.getSeqno() + ")");
+ log.trace(local_addr + ": broadcasting " + hdr.getOriginalSender() + "::" + hdr.getSeqno());
down_prot.down(new Event(Event.MSG, bcast_msg));
bcast_msgs++;
@@ -285,13 +284,17 @@ private void unwrapAndDeliver(final Message msg) {
SequencerHeader hdr=(SequencerHeader)msg.getHeader(this.id);
Message msg_to_deliver=(Message)Util.objectFromByteBuffer(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
long msg_seqno=hdr.getSeqno();
- if(!canDeliver(msg_to_deliver.getSrc(), msg_seqno))
- return;
+ Address sender=msg_to_deliver.getSrc();
+ if(sender.equals(local_addr)) {
+ synchronized(forward_table) {
+ forward_table.remove(msg_seqno);
+ }
+ }
if(log.isTraceEnabled())
- log.trace("delivering msg " + msg_to_deliver + " (seqno " + msg_seqno +
- "), original sender " + msg_to_deliver.getSrc());
+ log.trace(local_addr + ": delivering " + hdr.getOriginalSender() + "::" + msg_seqno);
up_prot.up(new Event(Event.MSG, msg_to_deliver));
+ delivered_bcasts++;
}
catch(Exception e) {
log.error("failure unmarshalling buffer", e);
@@ -303,37 +306,23 @@ private void deliver(Message msg, Event evt, SequencerHeader hdr) {
Address sender=msg.getSrc();
if(sender == null) {
if(log.isErrorEnabled())
- log.error("sender is null, cannot deliver msg " + msg);
+ log.error(local_addr + ": sender is null, cannot deliver " + sender + "::" + hdr.getSeqno());
return;
}
long msg_seqno=hdr.getSeqno();
- if(!canDeliver(sender, msg_seqno))
- return;
- if(log.isTraceEnabled())
- log.trace("delivering msg " + msg + " (seqno " + msg_seqno + "), sender " + sender);
- up_prot.up(evt);
- }
-
-
- private boolean canDeliver(Address sender, long seqno) {
- // this is the ack for the message sent by myself
if(sender.equals(local_addr)) {
synchronized(forward_table) {
- forward_table.remove(seqno);
+ forward_table.remove(msg_seqno);
}
}
-
- // if msg was already delivered, discard it
- boolean added=received_table.add(sender, seqno);
- if(!added) {
- if(log.isWarnEnabled())
- log.warn("seqno (" + sender + "::" + seqno + " has already been received " +
- "(highest received=" + received_table.getHighestReceived(sender) +
- "); discarding duplicate message");
- }
- return added;
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": delivering " + sender + "::" + msg_seqno);
+ up_prot.up(evt);
+ delivered_bcasts++;
}
+
+
/* ----------------------------- End of Private Methods -------------------------------- */
@@ -383,7 +372,7 @@ private final String printType() {
}
}
-
+
public void writeTo(DataOutput out) throws Exception {
out.writeByte(type);
Util.writeStreamable(tag, out);
View
112 src/org/jgroups/util/SeqnoTable.java
@@ -1,112 +0,0 @@
-package org.jgroups.util;
-
-import org.jgroups.Address;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Maintains the highest received and highest delivered seqno per member
- * @author Bela Ban
- */
-public class SeqnoTable {
- private long next_to_receive=0;
- private final ConcurrentMap<Address, Entry> map=Util.createConcurrentMap();
-
-
- public SeqnoTable(long next_to_receive) {
- this.next_to_receive=next_to_receive;
- }
-
- public long getHighestReceived(Address member) {
- Entry entry=map.get(member);
- return entry != null? entry.getHighestReceived() : -1;
- }
-
- public long getNextToReceive(Address member) {
- Entry entry=map.get(member);
- return entry != null? entry.getNextToReceive() : -1;
- }
-
- public boolean add(Address member, long seqno) {
- Entry entry=map.get(member);
- if(entry == null) {
- entry=new Entry(next_to_receive);
- Entry entry2=map.putIfAbsent(member, entry);
- if(entry2 != null)
- entry=entry2;
- }
- // now entry is not null
- return entry.add(seqno);
- }
-
- public void remove(Address member) {
- map.remove(member);
- }
-
- public boolean retainAll(Collection<Address> members) {
- return map.keySet().retainAll(members);
- }
-
- public void clear() {
- map.clear();
- }
-
- public String toString() {
- return map.toString();
- }
-
-
- private static class Entry {
- long highest_received;
- long next_to_receive;
- final Set<Long> seqnos=new HashSet<Long>();
-
- private Entry(long initial_seqno) {
- this.next_to_receive=this.highest_received=initial_seqno;
- }
-
- public synchronized long getHighestReceived() {
- return highest_received;
- }
-
- public synchronized long getNextToReceive() {
- return next_to_receive;
- }
-
- public synchronized boolean add(long seqno) {
- try {
- if(seqno == next_to_receive) {
- next_to_receive++;
- while(true) {
- if(seqnos.remove(next_to_receive)) {
- next_to_receive++;
- }
- else
- break;
- }
- return true;
- }
-
- if(seqno < next_to_receive)
- return false;
-
- // seqno > next_to_receive
- return seqnos.add(seqno);
- }
- finally {
- highest_received=Math.max(highest_received, seqno);
- }
- }
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- sb.append(next_to_receive).append(" - ").append(highest_received);
- if(!seqnos.isEmpty())
- sb.append(" ").append(seqnos);
- return sb.toString();
- }
- }
-}
View
145 tests/junit-functional/org/jgroups/tests/SeqnoTableTest.java
@@ -1,145 +0,0 @@
-
-package org.jgroups.tests;
-
-
-import org.jgroups.Address;
-import org.jgroups.Global;
-import org.jgroups.util.SeqnoTable;
-import org.jgroups.util.Util;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.net.UnknownHostException;
-
-
-
-@Test(groups=Global.FUNCTIONAL)
-public class SeqnoTableTest {
- private static Address MBR=null;
-
- @BeforeClass
- private static void init() throws UnknownHostException {
- MBR=Util.createRandomAddress();
- }
-
-
- public static void testInit() {
- SeqnoTable tab=new SeqnoTable(0);
- tab.add(MBR, 0);
- Assert.assertEquals(0, tab.getHighestReceived(MBR));
- Assert.assertEquals(1, tab.getNextToReceive(MBR));
-
- tab.clear();
- tab=new SeqnoTable(50);
- tab.add(MBR, 50);
- Assert.assertEquals(50, tab.getHighestReceived(MBR));
- Assert.assertEquals(51, tab.getNextToReceive(MBR));
- }
-
-
- public static void testAdd() {
- SeqnoTable tab=new SeqnoTable(0);
- tab.add(MBR, 0);
- tab.add(MBR, 1);
- tab.add(MBR, 2);
- Assert.assertEquals(2, tab.getHighestReceived(MBR));
- Assert.assertEquals(3, tab.getNextToReceive(MBR));
- }
-
-
- public static void testAddWithGaps() {
- SeqnoTable tab=new SeqnoTable(0);
- boolean rc=tab.add(MBR, 0);
- assert rc;
- rc=tab.add(MBR, 1);
- assert rc;
- rc=tab.add(MBR, 2);
- assert rc;
- rc=tab.add(MBR, 4);
- assert rc;
- rc=tab.add(MBR, 5);
- assert rc;
- System.out.println("tab: " + tab);
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(3, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 3);
- assert rc;
- System.out.println("tab: " + tab);
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(6, tab.getNextToReceive(MBR));
- }
-
-
- public static void testAddWithGaps2() {
- SeqnoTable tab=new SeqnoTable(0);
- boolean rc=tab.add(MBR, 5);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(0, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 4);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(0, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 3);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(0, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 2);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(0, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 1);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(0, tab.getNextToReceive(MBR));
-
- rc=tab.add(MBR, 0);
- System.out.println("tab: " + tab);
- assert rc;
- Assert.assertEquals(5, tab.getHighestReceived(MBR));
- Assert.assertEquals(6, tab.getNextToReceive(MBR));
-
- }
-
-
- public static void testInsertionOfDuplicates() {
- SeqnoTable tab=new SeqnoTable(0);
- boolean rc=tab.add(MBR, 0);
- assert rc;
- rc=tab.add(MBR, 0);
- assert !(rc);
-
- rc=tab.add(MBR, 1);
- assert rc;
- rc=tab.add(MBR, 2);
- assert rc;
- rc=tab.add(MBR, 4);
- assert rc;
- rc=tab.add(MBR, 5);
- assert rc;
- System.out.println("tab: " + tab);
-
- rc=tab.add(MBR, 2);
- assert !rc;
-
- rc=tab.add(MBR, 3);
- assert rc;
-
- rc=tab.add(MBR, 3);
- assert !rc;
- }
-
-
-
-}
View
21 tests/junit/org/jgroups/tests/SequencerOrderTest.java
@@ -87,11 +87,19 @@ public void testBroadcastSequence() throws Exception {
sender.join(20000);
System.out.println("Ok, senders have completed");
+
+ for(int i=0; i < 10; i++) {
+ if(r1.size() == EXPECTED_MSGS && r2.size() == EXPECTED_MSGS && r3.size() == EXPECTED_MSGS)
+ break;
+ Util.sleep(1000);
+ }
+
+
final List<String> l1=r1.getMsgs();
final List<String> l2=r2.getMsgs();
final List<String> l3=r3.getMsgs();
- System.out.println("-- verifying messages on A and B");
+ System.out.println("-- verifying messages on A, B and C");
verifyNumberOfMessages(EXPECTED_MSGS, l1, l2, l3);
verifySameOrder(EXPECTED_MSGS, l1, l2, l3);
}
@@ -104,7 +112,7 @@ private static void insertShuffle(JChannel... channels) throws Exception {
shuffle.setMaxSize(10);
shuffle.setMaxTime(1000);
ch.getProtocolStack().insertProtocol(shuffle, ProtocolStack.BELOW, NAKACK.class);
- shuffle.init(); // gets the timer
+ shuffle.init(); // starts the timer
}
}
@@ -127,7 +135,8 @@ private static void verifyNumberOfMessages(int num_msgs, List<String> ... lists)
System.out.println("list #" + (i+1) + ": " + lists[i]);
for(int i=0; i < lists.length; i++)
- assert lists[i].size() == num_msgs : "list #" + (i+1) + " should have " + num_msgs + " elements";
+ assert lists[i].size() == num_msgs : "list #" + (i+1) + " should have " + num_msgs +
+ " elements, but has " + lists[i].size() + " elements";
System.out.println("OK, all lists have the same size (" + num_msgs + ")\n");
}
@@ -149,8 +158,8 @@ private static void verifySameOrder(int expected_msgs, List<String> ... lists) t
}
private static class Sender extends Thread {
- final int num_msgs;
- final JChannel[] channels;
+ final int num_msgs;
+ final JChannel[] channels;
final AtomicInteger num;
public Sender(int num_msgs, AtomicInteger num, JChannel ... channels) {
@@ -184,6 +193,8 @@ private MyReceiver(String name) {
return msgs;
}
+ public int size() {return msgs.size();}
+
public void receive(Message msg) {
String val=(String)msg.getObject();
if(val != null) {

0 comments on commit a6e726f

Please sign in to comment.
Something went wrong with that request. Please try again.