Skip to content

Commit

Permalink
- fixed incorrect ordering in SEQUENCER (http://jira.jboss.com/jira/b…
Browse files Browse the repository at this point in the history
…rowse/JGRP-607)

- Added SeqnoTable and SeqnoTableTest
- Added unit test for JGRP-607
  • Loading branch information
belaban committed Oct 25, 2007
1 parent 6c70ed2 commit e557773
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 20 deletions.
31 changes: 11 additions & 20 deletions src/org/jgroups/protocols/SEQUENCER.java
@@ -1,9 +1,9 @@

package org.jgroups.protocols;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.jgroups.*;
import org.jgroups.stack.Protocol;
import org.jgroups.util.SeqnoTable;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;

Expand All @@ -14,7 +14,7 @@
/**
* Implementation of total order protocol using a sequencer. Consult doc/design/SEQUENCER.txt for details
* @author Bela Ban
* @version $Id: SEQUENCER.java,v 1.11.2.1 2007/04/27 08:03:52 belaban Exp $
* @version $Id: SEQUENCER.java,v 1.11.2.2 2007/10/25 08:11:51 belaban Exp $
*/
public class SEQUENCER extends Protocol {
private Address local_addr=null, coord=null;
Expand All @@ -26,7 +26,7 @@ public class SEQUENCER extends Protocol {
private final Map forward_table=new TreeMap();

/** Map<Address, seqno>: maintains the highest seqnos seen for a given member */
private final ConcurrentHashMap received_table=new ConcurrentHashMap();
private final SeqnoTable received_table=new SeqnoTable(0);

private long forwarded_msgs=0;
private long bcast_msgs=0;
Expand Down Expand Up @@ -167,13 +167,7 @@ private void handleViewChange(View v) {
resendMessagesInForwardTable(); // maybe optimize in the future: broadcast directly if coord
}
// remove left members from received_table
int size=received_table.size();
Set keys=received_table.keySet();
keys.retainAll(members);
if(keys.size() != size) {
if(log.isTraceEnabled())
log.trace("adjusted received_table, keys are " + keys);
}
received_table.retainAll(members);
}

/**
Expand Down Expand Up @@ -236,17 +230,14 @@ private void deliver(Message msg, SequencerHeader hdr) {
}

// if msg was already delivered, discard it
Long highest_seqno_seen=(Long)received_table.get(original_sender);
if(highest_seqno_seen != null) {
if(highest_seqno_seen.longValue() >= msg_seqno) {
if(log.isWarnEnabled())
log.warn("message seqno (" + original_sender + "::" + msg_seqno + " has already " +
"been received (highest received=" + highest_seqno_seen + "); discarding duplicate message");
return;
}
boolean added=received_table.add(original_sender, msg_seqno);
if(!added) {
if(log.isWarnEnabled())
log.warn("seqno (" + original_sender + "::" + msg_seqno + " has already been received " +
"(highest received=" + received_table.getHighestReceived(original_sender) +
"); discarding duplicate message");
return;
}
// update the table with the new seqno
received_table.put(original_sender, new Long(msg_seqno));

// pass a copy of the message up the stack
Message tmp=msg.copy(true);
Expand Down
113 changes: 113 additions & 0 deletions src/org/jgroups/util/SeqnoTable.java
@@ -0,0 +1,113 @@
package org.jgroups.util;

import org.jgroups.Address;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Maintains the highest received and highest delivered seqno per member
* @author Bela Ban
* @version $Id: SeqnoTable.java,v 1.1.2.2 2007/10/25 08:11:53 belaban Exp $
*/
public class SeqnoTable {
private long next_to_receive=0;
private final ConcurrentMap<Address,Entry> map=new ConcurrentHashMap<Address,Entry>();


public SeqnoTable(long next_to_receive) {
this.next_to_receive=next_to_receive;
}

public long getHighestReceived(Address member) {
Entry entry=map.get(member);
return entry != null? entry.getHighestReceived() : -1;
}

public long getNextToReceive(Address member) {
Entry entry=map.get(member);
return entry != null? entry.getNextToReceive() : -1;
}

public boolean add(Address member, long seqno) {
Entry entry=map.get(member);
if(entry == null) {
entry=new Entry(next_to_receive);
map.putIfAbsent(member, entry);
}
// now entry is not null
return entry.add(seqno);
}

public void remove(Address member) {
map.remove(member);
}

public boolean retainAll(Collection<Address> members) {
return map.keySet().retainAll(members);
}

public void clear() {
map.clear();
}

public String toString() {
return map.toString();
}


private static class Entry {
long highest_received;
long next_to_receive;
final Set<Long> seqnos=new HashSet<Long>();

private Entry(long initial_seqno) {
this.next_to_receive=this.highest_received=initial_seqno;
}

public synchronized long getHighestReceived() {
return highest_received;
}

public synchronized long getNextToReceive() {
return next_to_receive;
}

public synchronized boolean add(long seqno) {
try {
if(seqno == next_to_receive) {
next_to_receive++;
while(true) {
if(seqnos.remove(next_to_receive)) {
next_to_receive++;
}
else
break;
}
return true;
}

if(seqno < next_to_receive)
return false;

// seqno > next_to_receive
seqnos.add(seqno);
return true;
}
finally {
highest_received=Math.max(highest_received, seqno);
}
}

public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(next_to_receive).append(" - ").append(highest_received);
if(!seqnos.isEmpty())
sb.append(" ").append(seqnos);
return sb.toString();
}
}
}
162 changes: 162 additions & 0 deletions tests/junit/org/jgroups/tests/SeqnoTableTest.java
@@ -0,0 +1,162 @@

package org.jgroups.tests;


import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.jgroups.Address;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.SeqnoTable;

import java.net.UnknownHostException;


public class SeqnoTableTest extends TestCase {
SeqnoTable tab;
private static Address MBR;

static {
try {
MBR=new IpAddress("127.0.0.1", 5555);
}
catch(UnknownHostException e) {
throw new RuntimeException(e);
}
}

public SeqnoTableTest(String name) {
super(name);
}

public void setUp() throws Exception {
super.setUp();
}

protected void tearDown() throws Exception {
super.tearDown();
tab.clear();
}

public void testInit() {
tab=new SeqnoTable(0);
tab.add(MBR, 0);
assertEquals(0, tab.getHighestReceived(MBR));
assertEquals(1, tab.getNextToReceive(MBR));

tab.clear();
tab=new SeqnoTable(50);
tab.add(MBR, 50);
assertEquals(50, tab.getHighestReceived(MBR));
assertEquals(51, tab.getNextToReceive(MBR));
}

public void testAdd() {
tab=new SeqnoTable(0);
tab.add(MBR, 0);
tab.add(MBR, 1);
tab.add(MBR, 2);
assertEquals(2, tab.getHighestReceived(MBR));
assertEquals(3, tab.getNextToReceive(MBR));
}

public void testAddWithGaps() {
tab=new SeqnoTable(0);
boolean rc=tab.add(MBR, 0);
assertTrue(rc);
rc=tab.add(MBR, 1);
assertTrue(rc);
rc=tab.add(MBR, 2);
assertTrue(rc);
rc=tab.add(MBR, 4);
assertTrue(rc);
rc=tab.add(MBR, 5);
assertTrue(rc);
System.out.println("tab: " + tab);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(3, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 3);
assertTrue(rc);
System.out.println("tab: " + tab);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(6, tab.getNextToReceive(MBR));
}

public void testAddWithGaps2() {
tab=new SeqnoTable(0);
boolean rc=tab.add(MBR, 5);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(0, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 4);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(0, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 3);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(0, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 2);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(0, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 1);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(0, tab.getNextToReceive(MBR));

rc=tab.add(MBR, 0);
System.out.println("tab: " + tab);
assertTrue(rc);
assertEquals(5, tab.getHighestReceived(MBR));
assertEquals(6, tab.getNextToReceive(MBR));

}

public void testInsertionOfDuplicates() {
tab=new SeqnoTable(0);
boolean rc=tab.add(MBR, 0);
assertTrue(rc);
rc=tab.add(MBR, 0);
assertFalse(rc);

rc=tab.add(MBR, 1);
assertTrue(rc);
rc=tab.add(MBR, 2);
assertTrue(rc);
rc=tab.add(MBR, 4);
assertTrue(rc);
rc=tab.add(MBR, 5);
assertTrue(rc);
System.out.println("tab: " + tab);

rc=tab.add(MBR, 2);
assertFalse(rc);

rc=tab.add(MBR, 3);
assertTrue(rc);

rc=tab.add(MBR, 3);
assertFalse(rc);
}



public static Test suite() {
return new TestSuite(SeqnoTableTest.class);
}

public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

0 comments on commit e557773

Please sign in to comment.