diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 02c7b5c24d..9f3c15a093 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -275,8 +275,10 @@ public T setLevel(String level) { public void setMessageProcessingPolicy(String policy) { if(policy == null) return; + msg_processing_policy=policy.startsWith("submit")? new SubmitToThreadPool() : - policy.startsWith("max")? new MaxOneThreadPerSender() : null; + policy.startsWith("max")? new MaxOneThreadPerSender() : + policy.startsWith("direct")? new PassRegularMessagesUpDirectly() : null; try { if(msg_processing_policy == null) { Class clazz=(Class)Util.loadClass(policy, getClass()); @@ -290,6 +292,7 @@ public void setMessageProcessingPolicy(String policy) { } } + /* --------------------------------------------- JMX ---------------------------------------------- */ @Component(name="msg_stats") protected final MsgStats msg_stats=new MsgStats(); diff --git a/src/org/jgroups/util/PassRegularMessagesUpDirectly.java b/src/org/jgroups/util/PassRegularMessagesUpDirectly.java new file mode 100644 index 0000000000..9927c6f904 --- /dev/null +++ b/src/org/jgroups/util/PassRegularMessagesUpDirectly.java @@ -0,0 +1,38 @@ +package org.jgroups.util; + +import org.jgroups.Message; + +/** + * {@link org.jgroups.stack.MessageProcessingPolicy} which passes regular messages and message batches up directly + * (on the same thread), but passes OOB messages to the thread pool. + * @author Bela Ban + * @since 5.2.14 + */ +public class PassRegularMessagesUpDirectly extends SubmitToThreadPool { + + @Override + public boolean loopback(Message msg, boolean oob) { + if(oob) + return super.loopback(msg, oob); + tp.passMessageUp(msg, null, false, msg.getDest() == null, false); + return true; + } + + @Override + public boolean process(Message msg, boolean oob) { + if(oob) + return super.process(msg, oob); + SingleMessageHandler smh=new SingleMessageHandler(msg); + smh.run(); + return true; + } + + @Override + public boolean process(MessageBatch batch, boolean oob) { + if(oob) + return super.process(batch, oob); + BatchHandler bh=new BatchHandler(batch); + bh.run(); + return true; + } +}