Permalink
Browse files

- Added internal thread pool to TP (https://issues.jboss.org/browse/J…

  • Loading branch information...
Bela Ban
Bela Ban committed Feb 28, 2013
1 parent 3611d3c commit 23c84aab75aa3cedae6dd5b6336243df067d0c7b
Showing with 137 additions and 34 deletions.
  1. +1 −1 src/org/jgroups/Message.java
  2. +124 −30 src/org/jgroups/protocols/TP.java
  3. +12 −3 src/org/jgroups/util/MessageBatch.java
@@ -64,7 +64,7 @@
NO_TOTAL_ORDER((short)(1 << 5)), // bypass total order (e.g. SEQUENCER)
NO_RELAY( (short)(1 << 6)), // bypass relaying (RELAY)
RSVP( (short)(1 << 7)), // ack of a multicast (https://issues.jboss.org/browse/JGRP-1389)
- INTERNAL( (short)(1 << 8));
+ INTERNAL( (short)(1 << 8)); // for internal use by JGroups only, don't use !
final short value;
Flag(short value) {this.value=value;}
@@ -158,33 +158,59 @@
@Property(name="oob_thread_pool.queue_enabled", description="Use queue to enqueue incoming OOB messages")
protected boolean oob_thread_pool_queue_enabled=true;
- @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages. Default is 500")
+ @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages")
protected int oob_thread_pool_queue_max_size=500;
@Property(name="oob_thread_pool.rejection_policy",
- description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run. Default is Discard")
- String oob_thread_pool_rejection_policy="discard";
+ description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
+ protected String oob_thread_pool_rejection_policy="discard";
protected int thread_pool_min_threads=2;
protected int thread_pool_max_threads=10;
protected long thread_pool_keep_alive_time=30000;
- @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages. Default true")
+ @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages")
protected boolean thread_pool_enabled=true;
- @Property(name="thread_pool.queue_enabled", description="Use queue to enqueue incoming regular messages. Default is true")
+ @Property(name="thread_pool.queue_enabled", description="Queue to enqueue incoming regular messages")
protected boolean thread_pool_queue_enabled=true;
- @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages. Default is 500")
+ @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages")
protected int thread_pool_queue_max_size=500;
@Property(name="thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String thread_pool_rejection_policy="Discard";
+
+ @Property(name="internal_thread_pool.enabled",description="Switch for enabling thread pool for internal messages",
+ writable=false)
+ protected boolean internal_thread_pool_enabled=true;
+
+ @Property(name="internal_thread_pool.min_threads",description="Minimum thread pool size for the internal thread pool")
+ protected int internal_thread_pool_min_threads=2;
+
+ @Property(name="internal_thread_pool.max_threads",description="Maximum thread pool size for the internal thread pool")
+ protected int internal_thread_pool_max_threads=4;
+
+ @Property(name="internal_thread_pool.keep_alive_time", description="Timeout in ms to remove idle threads from the internal pool")
+ protected long internal_thread_pool_keep_alive_time=30000;
+
+ @Property(name="internal_thread_pool.queue_enabled", description="Queue to enqueue incoming internal messages")
+ protected boolean internal_thread_pool_queue_enabled=true;
+
+ @Property(name="internal_thread_pool.queue_max_size",description="Maximum queue size for incoming internal messages")
+ protected int internal_thread_pool_queue_max_size=500;
+
+ @Property(name="internal_thread_pool.rejection_policy",
+ description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
+ protected String internal_thread_pool_rejection_policy="discard";
+
+
+
@Property(description="Type of timer to be used. Valid values are \"old\" (DefaultTimeScheduler, used up to 2.10), " +
"\"new\" or \"new2\" (TimeScheduler2), \"new3\" (TimeScheduler3) and \"wheel\". Note that this property " +
"might disappear in future releases, if one of the 3 timers is chosen as default timer")
@@ -443,10 +469,13 @@ public int getTimerQueueSize() {
protected String channel_name=null;
@ManagedAttribute(description="Number of OOB messages received")
- protected long num_oob_msgs_received=0;
+ protected long num_oob_msgs_received;
@ManagedAttribute(description="Number of regular messages received")
- protected long num_incoming_msgs_received=0;
+ protected long num_incoming_msgs_received;
+
+ @ManagedAttribute(description="Number of internal messages received")
+ protected long num_internal_msgs_received;
@ManagedAttribute(description="Class of the timer implementation")
public String getTimerClass() {
@@ -498,10 +527,10 @@ public void clearDifferentVersionCache() {
protected Executor oob_thread_pool;
/** Factory which is used by oob_thread_pool */
- protected ThreadFactory oob_thread_factory=null;
+ protected ThreadFactory oob_thread_factory;
/** Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true */
- protected BlockingQueue<Runnable> oob_thread_pool_queue=null;
+ protected BlockingQueue<Runnable> oob_thread_pool_queue;
// ================================== Regular thread pool ======================
@@ -510,10 +539,18 @@ public void clearDifferentVersionCache() {
protected Executor thread_pool;
/** Factory which is used by oob_thread_pool */
- protected ThreadFactory default_thread_factory=null;
+ protected ThreadFactory default_thread_factory;
+
+ /** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
+ protected BlockingQueue<Runnable> thread_pool_queue;
+
+ // ================================== Internal thread pool ======================
+
+ /** The thread pool which handles JGroups internal messages (Flag.INTERNAL)*/
+ protected Executor internal_thread_pool;
/** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
- protected BlockingQueue<Runnable> thread_pool_queue=null;
+ protected BlockingQueue<Runnable> internal_thread_pool_queue;
// ================================== Timer thread pool =========================
protected TimeScheduler timer;
@@ -608,7 +645,7 @@ public String toString() {
public void resetStats() {
num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;
- num_oob_msgs_received=num_incoming_msgs_received=0;
+ num_oob_msgs_received=num_incoming_msgs_received=num_internal_msgs_received=0;
}
public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
@@ -800,6 +837,26 @@ public int getRegularMaxQueueSize() {
return thread_pool_queue_max_size;
}
+
+ @ManagedAttribute(description="Current number of threads in the internal thread pool")
+ public int getInternalPoolSize() {
+ return internal_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)internal_thread_pool).getPoolSize() : 0;
+ }
+
+ public long getInternalMessages() {
+ return num_internal_msgs_received;
+ }
+
+ @ManagedAttribute(description="Number of messages in the internal thread pool's queue")
+ public int getInternalQueueSize() {
+ return internal_thread_pool_queue != null? internal_thread_pool_queue.size() : 0;
+ }
+
+ public int getInternalMaxQueueSize() {
+ return internal_thread_pool_queue_max_size;
+ }
+
+
@ManagedAttribute(name="timer_tasks",description="Number of timer tasks queued up for execution")
public int getNumTimerTasks() {
return timer != null? timer.size() : -1;
@@ -939,6 +996,7 @@ else if(timer_type.equalsIgnoreCase("wheel")) {
Util.verifyRejectionPolicy(oob_thread_pool_rejection_policy);
Util.verifyRejectionPolicy(thread_pool_rejection_policy);
+ Util.verifyRejectionPolicy(internal_thread_pool_rejection_policy);
// ========================================== OOB thread pool ==============================
@@ -974,6 +1032,23 @@ else if(timer_type.equalsIgnoreCase("wheel")) {
}
}
+
+ // ========================================== Internal thread pool ==============================
+
+ if(internal_thread_pool == null
+ || (internal_thread_pool instanceof ThreadPoolExecutor && ((ThreadPoolExecutor)internal_thread_pool).isShutdown())) {
+ if(internal_thread_pool_enabled) {
+ if(internal_thread_pool_queue_enabled)
+ internal_thread_pool_queue=new LinkedBlockingQueue<Runnable>(internal_thread_pool_queue_max_size);
+ else
+ internal_thread_pool_queue=new SynchronousQueue<Runnable>();
+ internal_thread_pool=createThreadPool(internal_thread_pool_min_threads, internal_thread_pool_max_threads, internal_thread_pool_keep_alive_time,
+ internal_thread_pool_rejection_policy, internal_thread_pool_queue, oob_thread_factory);
+ }
+ // if the internal thread pool is disabled, we won't create it (not even a DirectExecutor)
+ }
+
+
Map<String, Object> m=new HashMap<String, Object>(2);
if(bind_addr != null)
m.put("bind_addr", bind_addr);
@@ -1019,6 +1094,9 @@ public void destroy() {
if(thread_pool instanceof ThreadPoolExecutor)
shutdownThreadPool(thread_pool);
+
+ if(internal_thread_pool instanceof ThreadPoolExecutor)
+ shutdownThreadPool(internal_thread_pool);
}
/**
@@ -1187,7 +1265,8 @@ public Object down(Event evt) {
final String cluster_name=hdr.channel_name;
// changed to fix http://jira.jboss.com/jira/browse/JGRP-506
- Executor pool=msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool;
+ Executor pool=msg.isFlagSet(Message.Flag.INTERNAL) && internal_thread_pool != null? internal_thread_pool
+ : msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool;
pool.execute(new Runnable() {
public void run() {
passMessageUp(copy, cluster_name, false, multicast, false);
@@ -1336,24 +1415,33 @@ protected void receive(Address sender, byte[] data, int offset, int length) {
if(is_message_list) { // used if message bundling is enabled
final MessageBatch[] batches=readMessageBatch(dis, multicast);
- final MessageBatch batch=batches[0], oob_batch=batches[1];
+ final MessageBatch batch=batches[0], oob_batch=batches[1], internal_batch=batches[2];
if(oob_batch != null) {
num_oob_msgs_received+=oob_batch.size();
oob_thread_pool.execute(new BatchHandler(oob_batch));
}
-
if(batch != null) {
num_incoming_msgs_received+=batch.size();
thread_pool.execute(new BatchHandler(batch));
}
+ if(internal_batch != null) {
+ num_internal_msgs_received+=internal_batch.size();
+ Executor pool=internal_thread_pool != null? internal_thread_pool : oob_thread_pool;
+ pool.execute(new BatchHandler(batch));
+ }
}
else {
Message msg=readMessage(dis);
- boolean is_oob=msg.isFlagSet(Message.Flag.OOB);
- if(is_oob) num_oob_msgs_received++;
- else num_incoming_msgs_received++;
- Executor pool=is_oob? oob_thread_pool : thread_pool;
+ if(msg.isFlagSet(Message.Flag.INTERNAL))
+ num_internal_msgs_received++;
+ else if(msg.isFlagSet(Message.Flag.OOB))
+ num_oob_msgs_received++;
+ else
+ num_incoming_msgs_received++;
+
+ Executor pool=msg.isFlagSet(Message.Flag.INTERNAL) && internal_thread_pool != null? internal_thread_pool
+ : msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool;
TpHeader hdr=(TpHeader)msg.getHeader(id);
String cluster_name=hdr.channel_name;
pool.execute(new MyHandler(msg, cluster_name, multicast));
@@ -1617,13 +1705,14 @@ public static void writeMessageList(Address dest, Address src, String cluster_na
}
/**
- * Reads a list of messages into 2 MessageBatches: a regular one and an OOB one
+ * Reads a list of messages into 3 MessageBatches: a regular, an OOB and an internal one
* @param in
- * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1 (either can be null)
+ * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1
+ * and the internal at index 2 (either can be null)
* @throws Exception
*/
public static MessageBatch[] readMessageBatch(DataInputStream in, boolean multicast) throws Exception {
- MessageBatch[] mbs=new MessageBatch[2];
+ MessageBatch[] batches=new MessageBatch[3]; // [0]: reg, [1]: OOB, [2]: internal
Address dest=Util.readAddress(in);
Address src=Util.readAddress(in);
String cluster_name=Util.readString(in);
@@ -1636,17 +1725,22 @@ public static void writeMessageList(Address dest, Address src, String cluster_na
if(msg.getSrc() == null)
msg.setSrc(src);
if(msg.isFlagSet(Message.Flag.OOB)) {
- if(mbs[1] == null)
- mbs[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len);
- mbs[1].add(msg);
+ if(batches[1] == null)
+ batches[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len);
+ batches[1].add(msg);
+ }
+ else if(msg.isFlagSet(Message.Flag.INTERNAL)) {
+ if(batches[2] == null)
+ batches[2]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.INTERNAL, len);
+ batches[2].add(msg);
}
else {
- if(mbs[0] == null)
- mbs[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len);
- mbs[0].add(msg);
+ if(batches[0] == null)
+ batches[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len);
+ batches[0].add(msg);
}
}
- return mbs;
+ return batches;
}
@@ -45,15 +45,24 @@ public MessageBatch(int capacity) {
public MessageBatch(Collection<Message> msgs) {
messages=new Message[msgs.size()];
- int num_reg=0, num_oob=0;
+ int num_reg=0, num_oob=0, num_internal=0;
for(Message msg: msgs) {
messages[index++]=msg;
if(msg.isFlagSet(Message.Flag.OOB))
num_oob++;
+ else if(msg.isFlagSet(Message.Flag.INTERNAL))
+ num_internal++;
else
num_reg++;
}
- mode=num_oob == 0? Mode.REG : num_reg == 0? Mode.OOB : Mode.MIXED;
+ if(num_internal > 0 && num_oob == 0 && num_reg == 0)
+ mode=Mode.INTERNAL;
+ else if(num_oob > 0 && num_internal == 0 && num_reg == 0)
+ mode=Mode.OOB;
+ else if(num_reg > 0 && num_oob == 0 && num_internal == 0)
+ mode=Mode.REG;
+ else
+ mode=Mode.MIXED;
}
public MessageBatch(Address dest, Address sender, String cluster_name, boolean multicast, Collection<Message> msgs) {
@@ -259,7 +268,7 @@ protected void resize() {
T visit(final Message msg, final MessageBatch batch);
}
- public enum Mode {OOB, REG, MIXED}
+ public enum Mode {OOB, REG, INTERNAL, MIXED}
/** Iterates over <em>non-null</em> elements of a batch, skipping null elements */

0 comments on commit 23c84aa

Please sign in to comment.