Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Forward-table now stores messages instead of marshalled messages, which reduces memory as we store the same ref here and in UNICAST{2,3}
- Replaced ConcurrentSkipList in delivery_table with LinkedHashMap: canDelivery() now takes ca. 175 ms compared to 22 seconds with MPerf and 300000 messages
- Coordinator now forwards messages to itself rather than directly sending them, this prevents starvation of messages from other members
- Not using nulls as values for BoundedHashMap; otherwise we cannot find out (on put()) if the null return value means the value didn't exist before, or it was null
  • Loading branch information
belaban committed Mar 5, 2013
1 parent a75a0b3 commit a6808f2
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 74 deletions.
104 changes: 59 additions & 45 deletions src/org/jgroups/protocols/SEQUENCER.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.BoundedHashMap;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;
Expand All @@ -16,10 +17,8 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand All @@ -44,7 +43,7 @@ public class SEQUENCER extends Protocol {
/** Maintains messages forwarded to the coord which which no ack has been received yet.
* Needs to be sorted so we resend them in the right order
*/
protected final NavigableMap<Long,byte[]> forward_table=new ConcurrentSkipListMap<Long,byte[]>();
protected final NavigableMap<Long,Message> forward_table=new ConcurrentSkipListMap<Long,Message>();

protected final Lock send_lock=new ReentrantLock();

Expand All @@ -62,7 +61,7 @@ public class SEQUENCER extends Protocol {
protected final AtomicInteger in_flight_sends=new AtomicInteger(0);

// Maintains received seqnos, so we can weed out dupes
protected final ConcurrentMap<Address,NavigableSet<Long>> delivery_table=Util.createConcurrentMap();
protected final ConcurrentMap<Address,BoundedHashMap<Long,Long>> delivery_table=Util.createConcurrentMap();

protected volatile Flusher flusher;

Expand Down Expand Up @@ -144,7 +143,7 @@ public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
if(msg.getDest() != null || msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
if(msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
break;

if(msg.getSrc() == null)
Expand All @@ -160,14 +159,13 @@ public Object down(Event evt) {
try {
SequencerHeader hdr=new SequencerHeader(is_coord? SequencerHeader.BCAST : SequencerHeader.WRAPPED_BCAST, next_seqno);
msg.putHeader(this.id, hdr);
if(is_coord)
broadcast(msg, false, msg.getSrc(), next_seqno, false); // don't copy, just use the message passed as argument
else {
byte[] marshalled_msg=Util.objectToByteBuffer(msg);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "]: forwarding " + local_addr + "::" + seqno + " to coord " + coord);
forwardToCoord(marshalled_msg, next_seqno);
}
if(log.isTraceEnabled())
log.trace("[" + local_addr + "]: forwarding " + local_addr + "::" + seqno + " to coord " + coord);

// We always forward messages to the coordinator, even if we're the coordinator. Having the coord
// send its messages directly led to starvation of messages from other members. MPerf perf went up
// from 20MB/sec/node to 50MB/sec/node with this change !
forwardToCoord(next_seqno, msg);
}
catch(Exception ex) {
log.error("failed sending message", ex);
Expand Down Expand Up @@ -202,7 +200,7 @@ public Object up(Event evt) {
switch(evt.getType()) {
case Event.MSG:
msg=(Message)evt.getArg();
if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
if(msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
break;
hdr=(SequencerHeader)msg.getHeader(this.id);
if(hdr == null)
Expand Down Expand Up @@ -255,7 +253,7 @@ public Object up(Event evt) {

public void up(MessageBatch batch) {
for(Message msg: batch) {
if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB) || msg.getHeader(id) == null)
if(msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB) || msg.getHeader(id) == null)
continue;
batch.remove(msg);

Expand Down Expand Up @@ -345,9 +343,17 @@ private void handleTmpView(View v) {
*/
protected void flushMessagesInForwardTable() {
if(is_coord) {
for(Map.Entry<Long,byte[]> entry: forward_table.entrySet()) {
for(Map.Entry<Long,Message> entry: forward_table.entrySet()) {
Long key=entry.getKey();
byte[] val=entry.getValue();
Message msg=entry.getValue();
byte[] val;
try {
val=Util.objectToByteBuffer(msg);
}
catch(Exception e) {
log.error("flushing (broadcasting) failed", e);
continue;
}

SequencerHeader hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, key);
Message forward_msg=new Message(null, val).putHeader(this.id, hdr);
Expand All @@ -370,9 +376,18 @@ protected void flushMessagesInForwardTable() {
// ==> By resending 3 until it is received, then resending 4 until it is received, we make sure this won't happen
// (see https://issues.jboss.org/browse/JGRP-1449)
while(flushing && running && !forward_table.isEmpty()) {
Map.Entry<Long,byte[]> entry=forward_table.firstEntry();
Map.Entry<Long,Message> entry=forward_table.firstEntry();
final Long key=entry.getKey();
byte[] val=entry.getValue();
Message msg=entry.getValue();
byte[] val;

try {
val=Util.objectToByteBuffer(msg);
}
catch(Exception e) {
log.error("flushing (broadcasting) failed", e);
continue;
}

while(flushing && running && !forward_table.isEmpty()) {
SequencerHeader hdr=new SequencerHeader(SequencerHeader.FLUSH, key);
Expand All @@ -389,24 +404,29 @@ protected void flushMessagesInForwardTable() {
}


protected void forwardToCoord(final byte[] marshalled_msg, long seqno) {
protected void forwardToCoord(long seqno, Message msg) {
if(is_coord) {
forward(msg, seqno, false);
return;
}

if(!running || flushing) {
forward_table.put(seqno, marshalled_msg);
forward_table.put(seqno, msg);
return;
}

if(!ack_mode) {
forward_table.put(seqno, marshalled_msg);
forward(marshalled_msg, seqno, false);
forward_table.put(seqno, msg);
forward(msg, seqno, false);
return;
}

send_lock.lock();
try {
forward_table.put(seqno, marshalled_msg);
forward_table.put(seqno, msg);
while(running && !flushing) {
ack_promise.reset();
forward(marshalled_msg, seqno, true);
forward(msg, seqno, true);
if(!ack_mode || !running || flushing)
break;
Long ack=ack_promise.getResult(500);
Expand All @@ -419,15 +439,20 @@ protected void forwardToCoord(final byte[] marshalled_msg, long seqno) {
}
}

protected void forward(final byte[] marshalled_msg, long seqno, boolean flush) {
protected void forward(final Message msg, long seqno, boolean flush) {
Address target=coord;
if(target == null)
return;
byte type=flush? SequencerHeader.FLUSH : SequencerHeader.FORWARD;
SequencerHeader hdr=new SequencerHeader(type, seqno);
Message forward_msg=new Message(target, marshalled_msg).putHeader(this.id,hdr);
down_prot.down(new Event(Event.MSG, forward_msg));
forwarded_msgs++;
try {
SequencerHeader hdr=new SequencerHeader(type, seqno);
Message forward_msg=new Message(target, Util.objectToByteBuffer(msg)).putHeader(this.id,hdr);
down_prot.down(new Event(Event.MSG, forward_msg));
forwarded_msgs++;
}
catch(Exception ex) {
log.error("failed forwarding message to " + msg.getDest(), ex);
}
}

protected void broadcast(final Message msg, boolean copy, Address original_sender, long seqno, boolean resend) {
Expand Down Expand Up @@ -508,24 +533,14 @@ protected void deliver(Message msg, Event evt, SequencerHeader hdr) {
* the coordinator.
*/
protected boolean canDeliver(Address sender, long seqno) {
NavigableSet<Long> seqno_set=delivery_table.get(sender);
BoundedHashMap<Long,Long> seqno_set=delivery_table.get(sender);
if(seqno_set == null) {
seqno_set=new ConcurrentSkipListSet<Long>();
NavigableSet<Long> existing=delivery_table.put(sender,seqno_set);
seqno_set=new BoundedHashMap<Long,Long>(delivery_table_max_size);
BoundedHashMap<Long,Long> existing=delivery_table.put(sender,seqno_set);
if(existing != null)
seqno_set=existing;
}

boolean added=seqno_set.add(seqno);
int size=seqno_set.size();
if(size > delivery_table_max_size) {
// trim the seqno_set to delivery_table_max_size elements by removing the first N seqnos
for(int i=0; i < size - delivery_table_max_size; i++) {
if(seqno_set.pollFirst() == null)
break;
}
}
return added;
return seqno_set.add(seqno, seqno);
}

protected void block() {
Expand Down Expand Up @@ -605,7 +620,6 @@ public void run() {




public static class SequencerHeader extends Header {
protected static final byte FORWARD = 1;
protected static final byte FLUSH = 2;
Expand Down
13 changes: 12 additions & 1 deletion src/org/jgroups/protocols/SHUFFLE.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ public void init() throws Exception {
timer=getTransport().getTimer();
}

public void stop() {
super.stop();
stopTask();
}

public void destroy() {
super.destroy();
stopTask();
}

public Object up(Event evt) {
if(!up)
return up_prot.up(evt);
Expand Down Expand Up @@ -149,7 +159,8 @@ private void shuffleAndSendMessages() {
if(!up_msgs.isEmpty()) {
Collections.shuffle(up_msgs);
for(Message msg: up_msgs)
up_prot.up(new Event(Event.MSG, msg));
if(up_prot != null)
up_prot.up(new Event(Event.MSG, msg));
up_msgs.clear();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public Object up(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
if(msg.getDest() == null || msg.isFlagSet(Message.NO_RELIABILITY)) // only handle unicast messages
if(msg.getDest() == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) // only handle unicast messages
break; // pass up

Header hdr=(Header)msg.getHeader(this.id);
Expand Down Expand Up @@ -450,7 +450,7 @@ public Object down(Event evt) {
Address dst=msg.getDest();

/* only handle unicast messages */
if (dst == null || msg.isFlagSet(Message.NO_RELIABILITY))
if (dst == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY))
break;

if(!running) {
Expand Down
26 changes: 26 additions & 0 deletions src/org/jgroups/util/BoundedHashMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.jgroups.util;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* Bounded linked hashmap; used by SEQUENCER (and probably soon) FORWARD_TO_COORD
* @author Bela Ban
* @since 3.3
*/
public class BoundedHashMap<K,V> extends LinkedHashMap<K,V> {
private static final long serialVersionUID=-5368387761328082187L;
protected final int max_size;

public BoundedHashMap(int max_size) {
this.max_size=max_size;
}

protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
return size() > max_size;
}

public boolean add(K key, V val) {
return super.put(key, val) == null;
}
}
3 changes: 2 additions & 1 deletion src/org/jgroups/util/BoundedList.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

/**
* A bounded subclass of LinkedList, oldest elements are removed once max capacity is exceeded. Note that this
* class is not synchronized (like LinkedList).
* class is not synchronized (like LinkedList). Don't use this for high performance, as size() has a linear cost. But
* in most scenarios, this class is used for maintaining a history, e.g. of digests or views, so perf is not critical.
* @author Bela Ban Nov 20, 2003
*/
public class BoundedList<T> extends ConcurrentLinkedQueue<T> {
Expand Down
47 changes: 22 additions & 25 deletions tests/junit/org/jgroups/tests/SequencerOrderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,49 @@
*/
@Test(groups=Global.STACK_INDEPENDENT,sequential=true)
public class SequencerOrderTest {
private JChannel c1, c2, c3;
private MyReceiver r1, r2, r3;
static final String GROUP="SequencerOrderTest";
static final int NUM_MSGS=50; // messages per thread
static final int NUM_THREADS=10;
static final int EXPECTED_MSGS=NUM_MSGS * NUM_THREADS;
static final String props="sequencer.xml";
private Sender[] senders=new Sender[NUM_THREADS];
private JChannel a, b, c;
private MyReceiver r1, r2, r3;
static final String GROUP="SequencerOrderTest";
static final int NUM_MSGS=50; // messages per thread
static final int NUM_THREADS=10;
static final int EXPECTED_MSGS=NUM_MSGS * NUM_THREADS;
static final String props="sequencer.xml";
private final Sender[] senders=new Sender[NUM_THREADS];
protected final AtomicInteger num=new AtomicInteger(0);


@BeforeMethod
void setUp() throws Exception {
c1=new JChannel(props);
c1.setName("A");
c1.connect(GROUP);
a=new JChannel(props).name("A");
a.connect(GROUP);
r1=new MyReceiver("A");
c1.setReceiver(r1);
a.setReceiver(r1);

c2=new JChannel(props);
c2.setName("B");
c2.connect(GROUP);
b=new JChannel(props).name("B");
b.connect(GROUP);
r2=new MyReceiver("B");
c2.setReceiver(r2);
b.setReceiver(r2);

c3=new JChannel(props);
c3.setName("C");
c3.connect(GROUP);
c=new JChannel(props).name("C");
c.connect(GROUP);
r3=new MyReceiver("C");
c3.setReceiver(r3);
c.setReceiver(r3);

Util.waitUntilAllChannelsHaveSameSize(10000, 1000, c1,c2,c3);
Util.waitUntilAllChannelsHaveSameSize(10000, 1000,a,b,c);

for(int i=0; i < senders.length; i++)
senders[i]=new Sender(NUM_MSGS, num, c1, c2, c3);
senders[i]=new Sender(NUM_MSGS, num,a,b,c);
}

@AfterMethod
void tearDown() throws Exception {
removeSHUFFLE(c3,c2,c1);
Util.close(c3, c2, c1);
removeSHUFFLE(c,b,a);
Util.close(c,b,a);
}

@Test @SuppressWarnings("unchecked")
public void testBroadcastSequence() throws Exception {
insertShuffle(c1, c2, c3);
insertShuffle(a,b,c);

// use concurrent senders to send messages to the group
System.out.println("Starting " + senders.length + " sender threads (each sends " + NUM_MSGS + " messages)");
Expand Down

0 comments on commit a6808f2

Please sign in to comment.