Skip to content

Commit

Permalink
- Messages to self are now batched (https://issues.jboss.org/browse/J…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jan 4, 2017
1 parent c8a0b76 commit 5445e59
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 32 deletions.
11 changes: 7 additions & 4 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -1136,7 +1136,9 @@ protected void loopback(Message msg, final boolean multicast) {


// changed to fix http://jira.jboss.com/jira/browse/JGRP-506 // changed to fix http://jira.jboss.com/jira/browse/JGRP-506
boolean internal=msg.isFlagSet(Message.Flag.INTERNAL); boolean internal=msg.isFlagSet(Message.Flag.INTERNAL);
submitToThreadPool(() -> passMessageUp(copy, null, false, multicast, false), internal); boolean oob=msg.isFlagSet(Message.Flag.OOB);
// submitToThreadPool(() -> passMessageUp(copy, null, false, multicast, false), internal);
msg_processing_policy.loopback(msg, oob, internal);
} }


protected void _send(Message msg, Address dest) { protected void _send(Message msg, Address dest) {
Expand Down Expand Up @@ -1179,6 +1181,10 @@ public void passMessageUp(Message msg, byte[] cluster_name, boolean perform_clus


if(up_prot == null) if(up_prot == null)
return; return;

if(multicast && discard_own_mcast && local_addr != null && local_addr.equals(msg.getSrc()))
return;

// Discard if message's cluster name is not the same as our cluster name // Discard if message's cluster name is not the same as our cluster name
if(perform_cluster_name_matching && this.cluster_name != null && !this.cluster_name.equals(cluster_name)) { if(perform_cluster_name_matching && this.cluster_name != null && !this.cluster_name.equals(cluster_name)) {
if(log_discard_msgs && log.isWarnEnabled()) { if(log_discard_msgs && log.isWarnEnabled()) {
Expand All @@ -1192,9 +1198,6 @@ public void passMessageUp(Message msg, byte[] cluster_name, boolean perform_clus
} }
return; return;
} }

if(multicast && discard_own_mcast && local_addr != null && local_addr.equals(msg.getSrc()))
return;
up_prot.up(msg); up_prot.up(msg);
} }


Expand Down
23 changes: 23 additions & 0 deletions src/org/jgroups/stack/MessageProcessingPolicy.java
Expand Up @@ -22,7 +22,30 @@ default void reset() {}
/** Called before the transport is destroyed */ /** Called before the transport is destroyed */
default void destroy() {} default void destroy() {}


/**
* Process a message that was not received from the transport but from above (e.g. the channel or a protocol), and
* needs to be looped back up because (1) the destination address is null (every multicast message is looped back)
* or (2) the destination address is the sender's address (unicast message to self).<br/>
* A message that is looped back can bypass cluster name matching.
* @param msg the message to be looped back up the stack.
* @param oob true if the message is an OOB message
* @param internal true if the message is internal
*/
void loopback(Message msg, boolean oob, boolean internal);

/**
* Process a message received from the transport
* @param msg the message
* @param oob true if the message is an OOB message
* @param internal true if the message is internal
*/
void process(Message msg, boolean oob, boolean internal); void process(Message msg, boolean oob, boolean internal);


/**
* Process a batch received from the transport
* @param batch the batch
* @param oob true if the batch contains only OOB messages
* @param internal true if the batch contains only internal messages (or internal and OOB messages)
*/
void process(MessageBatch batch, boolean oob, boolean internal); void process(MessageBatch batch, boolean oob, boolean internal);
} }
33 changes: 24 additions & 9 deletions src/org/jgroups/util/MaxOneThreadPerSender.java
Expand Up @@ -44,13 +44,22 @@ public void init(TP transport) {
resize=max_buffer_size == 0; resize=max_buffer_size == 0;
} }


public void loopback(Message msg, boolean oob, boolean internal) {
if(oob || internal) {
super.loopback(msg, oob, internal);
return;
}
MessageTable table=msg.dest() == null? mcasts : ucasts;
table.process(msg, true);
}

public void process(Message msg, boolean oob, boolean internal) { public void process(Message msg, boolean oob, boolean internal) {
if(oob || internal) { if(oob || internal) {
super.process(msg, oob, internal); super.process(msg, oob, internal);
return; return;
} }
MessageTable table=msg.dest() == null? mcasts : ucasts; MessageTable table=msg.dest() == null? mcasts : ucasts;
table.process(msg); table.process(msg, false);
} }


public void process(MessageBatch batch, boolean oob, boolean internal) { public void process(MessageBatch batch, boolean oob, boolean internal) {
Expand Down Expand Up @@ -86,9 +95,9 @@ protected Entry get(final Address dest, final Address sender) {
return entry; return entry;
} }


protected void process(Message msg) { protected void process(Message msg, boolean loopback) {
Address dest=msg.dest(), sender=msg.src(); Address dest=msg.dest(), sender=msg.src();
get(dest, sender).process(msg); get(dest, sender).process(msg, loopback);
} }


protected void process(MessageBatch batch) { protected void process(MessageBatch batch) {
Expand Down Expand Up @@ -130,15 +139,15 @@ public Entry reset() {
} }




protected void process(Message msg) { protected void process(Message msg, boolean loopback) {
if(!allowedToSubmitToThreadPool(msg)) { if(!allowedToSubmitToThreadPool(msg)) {
queued_msgs.increment(); queued_msgs.increment();
return; return;
} }


submitted_msgs.increment(); submitted_msgs.increment();
// running is true, we didn't queue msg and need to submit a task to the thread pool // running is true, we didn't queue msg and need to submit a task to the thread pool
submit(new BatchHandlerLoop(batch_creator.apply(16).add(msg), this)); submit(new BatchHandlerLoop(batch_creator.apply(16).add(msg), this, loopback));
} }


protected void process(MessageBatch batch) { protected void process(MessageBatch batch) {
Expand All @@ -149,10 +158,10 @@ protected void process(MessageBatch batch) {


submitted_batches.increment(); submitted_batches.increment();
// running is true, we didn't queue msg and need to submit a task to the thread pool // running is true, we didn't queue msg and need to submit a task to the thread pool
submit(new BatchHandlerLoop(batch_creator.apply(batch.size()).add(batch), this)); submit(new BatchHandlerLoop(batch_creator.apply(batch.size()).add(batch), this, false));
} }


protected void submit(BatchHandlerLoop batch_handler) { protected void submit(Runnable batch_handler) {
// running is true, we didn't queue msg and need to submit a task to the thread pool // running is true, we didn't queue msg and need to submit a task to the thread pool
try { try {
if(!tp.submitToThreadPool(batch_handler, false)) if(!tp.submitToThreadPool(batch_handler, false))
Expand Down Expand Up @@ -231,11 +240,13 @@ public String toString() {




protected class BatchHandlerLoop extends BatchHandler { protected class BatchHandlerLoop extends BatchHandler {
protected final Entry entry; protected final Entry entry;
protected final boolean loopback;


public BatchHandlerLoop(MessageBatch batch, Entry entry) { public BatchHandlerLoop(MessageBatch batch, Entry entry, boolean loopback) {
super(batch); super(batch);
this.entry=entry; this.entry=entry;
this.loopback=loopback;
} }


public void run() { public void run() {
Expand All @@ -250,5 +261,9 @@ public void run() {
while(entry.workAvailable(this.batch)); // transfers msgs from entry.batch --> this.batch while(entry.workAvailable(this.batch)); // transfers msgs from entry.batch --> this.batch
// worker termination: workAvailable() already set running=false // worker termination: workAvailable() already set running=false
} }

@Override protected void passBatchUp() {
tp.passBatchUp(batch, !loopback, !loopback);
}
} }
} }
7 changes: 7 additions & 0 deletions src/org/jgroups/util/SubmitToThreadPool.java
Expand Up @@ -26,6 +26,9 @@ public void init(TP transport) {
this.log=tp.getLog(); this.log=tp.getLog();
} }


public void loopback(Message msg, boolean oob, boolean internal) {
tp.submitToThreadPool(() -> tp.passMessageUp(msg, null, false, msg.dest() == null,false), internal);
}


public void process(Message msg, boolean oob, boolean internal) { public void process(Message msg, boolean oob, boolean internal) {
tp.submitToThreadPool(new SingleMessageHandler(msg), internal); tp.submitToThreadPool(new SingleMessageHandler(msg), internal);
Expand Down Expand Up @@ -130,6 +133,10 @@ else if(batch.getMode() == MessageBatch.Mode.INTERNAL)
msg_stats.incrNumBytesReceived(batch.length()); msg_stats.incrNumBytesReceived(batch.length());
tp.avgBatchSize().add(batch_size); tp.avgBatchSize().add(batch_size);
} }
passBatchUp();
}

protected void passBatchUp() {
tp.passBatchUp(batch, true, true); tp.passBatchUp(batch, true, true);
} }
} }
Expand Down
Expand Up @@ -2,6 +2,7 @@


import org.jgroups.*; import org.jgroups.*;
import org.jgroups.protocols.TP; import org.jgroups.protocols.TP;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Promise; import org.jgroups.util.Promise;
import org.jgroups.util.Util; import org.jgroups.util.Util;
import org.testng.Assert; import org.testng.Assert;
Expand All @@ -12,15 +13,15 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


/** /**
* Tests unicasts to self (loopback of transport protocol) * Tests unicast and multicast messages to self (loopback of transport protocol)
* @author Richard Achmatowicz 12 May 2008 * @author Richard Achmatowicz 12 May 2008
* @author Bela Ban Dec 31 2003 * @author Bela Ban Dec 31 2003
*/ */
@Test(groups=Global.STACK_DEPENDENT,singleThreaded=true) @Test(groups=Global.STACK_DEPENDENT,singleThreaded=true)
public class UnicastLoopbackTest extends ChannelTestBase { public class LoopbackTest extends ChannelTestBase {
JChannel channel=null; protected JChannel channel;


@BeforeMethod protected void setUp() throws Exception {channel=createChannel(true, 1);} @BeforeMethod protected void setUp() throws Exception {channel=createChannel(true, 1).name("A");}
@AfterMethod protected void tearDown() throws Exception {Util.close(channel);} @AfterMethod protected void tearDown() throws Exception {Util.close(channel);}




Expand All @@ -30,34 +31,52 @@ public class UnicastLoopbackTest extends ChannelTestBase {
* (ii) all messages are correctly received * (ii) all messages are correctly received
*/ */
public void testUnicastMsgsWithLoopback() throws Exception { public void testUnicastMsgsWithLoopback() throws Exception {
final long TIMEOUT = 60 * 1000; sendMessagesWithLoopback(true);
}

/**
* Tests that when MULTICAST messages are sent to self, the following conditions hold:
* (i) no messages touch the network
* (ii) all messages are correctly received
*/
public void testMulticastMsgsWithLoopback() throws Exception {
sendMessagesWithLoopback(false);
}

protected void sendMessagesWithLoopback(boolean unicast) throws Exception {
final long TIMEOUT = 60_0000;
final int NUM=1000; final int NUM=1000;
long num_msgs_sent_before = 0 ; long num_msgs_sent_before = 0;
long num_msgs_sent_after = 0 ; long num_msgs_sent_after = 0;


Promise<Boolean> promise = new Promise<>() ; Promise<Boolean> promise = new Promise<>() ;
MyReceiver receiver = new MyReceiver(NUM, promise) ; MyReceiver receiver = new MyReceiver(NUM, promise) ;
channel.setReceiver(receiver) ; channel.setReceiver(receiver) ;
channel.connect("UnicastLoopbackTest") ; channel.connect("UnicastLoopbackTest") ;


Address local_addr=channel.getAddress(); int largest_thread_pool_size=channel.getProtocolStack().getTransport().getThreadPoolSizeLargest();

num_msgs_sent_before = getNumMessagesSentViaNetwork(channel) ; num_msgs_sent_before = getNumMessagesSentViaNetwork(channel) ;


// send NUM UNICAST messages to ourself // send NUM messages to dest
Address dest=unicast? channel.getAddress() : null;
for(int i=1; i <= NUM; i++) { for(int i=1; i <= NUM; i++) {
channel.send(new Message(local_addr, i)); channel.send(new Message(dest, i));
if(i % 100 == 0) if(i % 100 == 0)
System.out.println("-- sent " + i); System.out.printf("-- [%s] sent %d\n", Thread.currentThread().getName(), i);
} }


num_msgs_sent_after = getNumMessagesSentViaNetwork(channel) ; num_msgs_sent_after = getNumMessagesSentViaNetwork(channel) ;


System.out.printf("\nlargest pool size before: %d after: %d\n", largest_thread_pool_size,
largest_thread_pool_size=channel.getProtocolStack().getTransport().getThreadPoolSizeLargest());

// when sending msgs to self, messages should not touch the network // when sending msgs to self, messages should not touch the network
System.out.println("num msgs before: " + num_msgs_sent_before + ", num msgs after: " + num_msgs_sent_after); System.out.println("num msgs before: " + num_msgs_sent_before + ", num msgs after: " + num_msgs_sent_after);
assert num_msgs_sent_before <= num_msgs_sent_after; assert num_msgs_sent_before <= num_msgs_sent_after;
assert num_msgs_sent_after < NUM/10; if(unicast)

assert num_msgs_sent_after < NUM/10;
else
assert num_msgs_sent_after <= NUM; // max of NUM single messages; probably some batches were sent
try { try {
// wait for all messages to be received // wait for all messages to be received
promise.getResultWithTimeout(TIMEOUT) ; promise.getResultWithTimeout(TIMEOUT) ;
Expand All @@ -66,7 +85,6 @@ public void testUnicastMsgsWithLoopback() throws Exception {
// timeout exception occurred // timeout exception occurred
Assert.fail("Test timed out before all messages were received; received " + receiver.getNumMsgsReceived()) ; Assert.fail("Test timed out before all messages were received; received " + receiver.getNumMsgsReceived()) ;
} }

} }




Expand All @@ -91,7 +109,6 @@ private static long getNumMessagesSentViaNetwork(JChannel ch) throws Exception {
* then sets a promise. * then sets a promise.
*/ */
private static class MyReceiver extends ReceiverAdapter { private static class MyReceiver extends ReceiverAdapter {

private final int numExpected ; private final int numExpected ;
private int numReceived; private int numReceived;
private final Promise<Boolean> p ; private final Promise<Boolean> p ;
Expand All @@ -104,17 +121,30 @@ public MyReceiver(int numExpected, Promise<Boolean> p) {


// when we receive a Message, we update the count of messages received // when we receive a Message, we update the count of messages received
public void receive(Message msg) { public void receive(Message msg) {

Integer num=msg.getObject();
Integer num=(Integer)msg.getObject();
numReceived++; numReceived++;
if(num != null && num % 100 == 0) if(num != null && num % 100 == 0)
System.out.println("-- received " + num); System.out.printf("-- [%s] received %d\n", Thread.currentThread().getName(), num);


// if we have received NUM messages, set the result // if we have received NUM messages, set the result
if (numReceived >= numExpected) if (numReceived >= numExpected)
p.setResult(Boolean.TRUE) ; p.setResult(Boolean.TRUE) ;
} }


public void receive(MessageBatch batch) {
int size=batch.size();
numReceived+=size;
// System.out.printf("received batch of %d msgs, total: %d\n", size, numReceived);
for(Message msg: batch) {
Integer num=msg != null? msg.getObject() : null;
if(num != null && num % 100 == 0)
System.out.printf("-- [%s] received %d\n", Thread.currentThread().getName(), num);
}

if(numReceived >= numExpected)
p.setResult(Boolean.TRUE);
}

public int getNumMsgsReceived() { public int getNumMsgsReceived() {
return numReceived ; return numReceived ;
} }
Expand Down

0 comments on commit 5445e59

Please sign in to comment.