diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 5b4f5508a82..5f13a192de7 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -44,7 +44,7 @@ * The {@link #receive(Address, Address, byte[], int, int)} method must * be called by subclasses when a unicast or multicast message has been received. * @author Bela Ban - * @version $Id: TP.java,v 1.160.2.22 2008/05/23 08:19:20 belaban Exp $ + * @version $Id: TP.java,v 1.160.2.23 2008/05/26 09:14:41 belaban Exp $ */ public abstract class TP extends Protocol { @@ -639,6 +639,35 @@ public void init() throws Exception { verifyRejectionPolicy(oob_thread_pool_rejection_policy); verifyRejectionPolicy(thread_pool_rejection_policy); + + // ========================================== OOB thread pool ============================== + + if(oob_thread_pool_enabled) { + if(oob_thread_pool_queue_enabled) + oob_thread_pool_queue=new LinkedBlockingQueue(oob_thread_pool_queue_max_size); + else + oob_thread_pool_queue=new SynchronousQueue(); + oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time, + oob_thread_pool_rejection_policy, oob_thread_pool_queue, oob_thread_factory); + } + else { // otherwise use the caller's thread to unmarshal the byte buffer into a message + oob_thread_pool=new DirectExecutor(); + } + + // ====================================== Regular thread pool =========================== + + if(thread_pool_enabled) { + if(thread_pool_queue_enabled) + thread_pool_queue=new LinkedBlockingQueue(thread_pool_queue_max_size); + else + thread_pool_queue=new SynchronousQueue(); + thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time, + thread_pool_rejection_policy, thread_pool_queue, default_thread_factory); + } + else { // otherwise use the caller's thread to unmarshal the byte buffer into a message + thread_pool=new DirectExecutor(); + } + if(persistent_ports){ pm = new PortsManager(pm_expiry_time,persistent_ports_file); } @@ -669,6 +698,17 @@ public void destroy() { log.error("failed stopping the timer", e); } } + + // 3. Stop the thread pools + if(oob_thread_pool instanceof ThreadPoolExecutor) { + shutdownThreadPool(oob_thread_pool); + oob_thread_pool=null; + } + + if(thread_pool instanceof ThreadPoolExecutor) { + shutdownThreadPool(thread_pool); + thread_pool=null; + } } } @@ -690,41 +730,6 @@ public void start() throws Exception { incoming_packet_handler.start(); } - - // ========================================== OOB thread pool ============================== - // create a ThreadPoolExecutor for the unmarshaller thread pool - if(oob_thread_pool == null) { // only create if not yet set (e.g. by a user) - if(oob_thread_pool_enabled) { - if(oob_thread_pool_queue_enabled) - oob_thread_pool_queue=new LinkedBlockingQueue(oob_thread_pool_queue_max_size); - else - oob_thread_pool_queue=new SynchronousQueue(); - oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time, - oob_thread_pool_rejection_policy, oob_thread_pool_queue, oob_thread_factory); - } - else { // otherwise use the caller's thread to unmarshal the byte buffer into a message - oob_thread_pool=new DirectExecutor(); - } - } - - - // ====================================== Regular thread pool =========================== - // create a ThreadPoolExecutor for the unmarshaller thread pool - if(thread_pool == null) { // only create if not yet set (e.g.by a user) - if(thread_pool_enabled) { - if(thread_pool_queue_enabled) - thread_pool_queue=new LinkedBlockingQueue(thread_pool_queue_max_size); - else - thread_pool_queue=new SynchronousQueue(); - thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time, - thread_pool_rejection_policy, thread_pool_queue, default_thread_factory); - } - else { // otherwise use the caller's thread to unmarshal the byte buffer into a message - thread_pool=new DirectExecutor(); - } - } - - if(loopback && !use_concurrent_stack) { incoming_msg_queue=new Queue(); incoming_msg_handler=new IncomingMessageHandler(); @@ -754,18 +759,6 @@ public void stop() { // 2. Stop the incoming message handler if(incoming_msg_handler != null) incoming_msg_handler.stop(); - - // 3. Stop the thread pools - - if(oob_thread_pool instanceof ThreadPoolExecutor) { - shutdownThreadPool(oob_thread_pool); - oob_thread_pool=null; - } - - if(thread_pool instanceof ThreadPoolExecutor) { - shutdownThreadPool(thread_pool); - thread_pool=null; - } } @@ -1661,7 +1654,7 @@ protected void handleConfigEvent(Map map) { protected static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, String rejection_policy, BlockingQueue queue, final ThreadFactory factory) { - ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue); + ThreadPoolExecutor pool=new ThreadManagerThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue); pool.setThreadFactory(factory); //default diff --git a/src/org/jgroups/util/DefaultThreadFactory.java b/src/org/jgroups/util/DefaultThreadFactory.java index a4fae498a1a..c41ff640559 100644 --- a/src/org/jgroups/util/DefaultThreadFactory.java +++ b/src/org/jgroups/util/DefaultThreadFactory.java @@ -7,9 +7,9 @@ * THREAD-5,MyCluster,192.168.1.5:63754 or THREAD,MyCluster,192.168.1.5:63754 * @author Vladimir Blagojevic * @author Bela Ban - * @version $Id: DefaultThreadFactory.java,v 1.3.2.3 2008/05/23 05:30:34 belaban Exp $ + * @version $Id: DefaultThreadFactory.java,v 1.3.2.4 2008/05/26 09:14:38 belaban Exp $ */ -public class DefaultThreadFactory implements ExtendedThreadFactory { +public class DefaultThreadFactory implements ExtendedThreadFactory, ThreadManager { protected final ThreadGroup group; protected final String baseName; protected final boolean createDaemons; @@ -20,6 +20,7 @@ public class DefaultThreadFactory implements ExtendedThreadFactory { protected boolean includeLocalAddress=false; protected String clusterName=null; protected String address=null; + protected ThreadDecorator threadDecorator=null; public DefaultThreadFactory(ThreadGroup group, String baseName, boolean createDaemons) { @@ -52,6 +53,14 @@ public void setAddress(String address){ this.address=address; } + public ThreadDecorator getThreadDecorator() { + return threadDecorator; + } + + public void setThreadDecorator(ThreadDecorator threadDecorator) { + this.threadDecorator = threadDecorator; + } + public Thread newThread(Runnable r, String name) { return newThread(group, r, name); } @@ -68,6 +77,8 @@ protected Thread newThread(ThreadGroup group, Runnable r, String name, String ad Thread retval=new Thread(group, r, name); retval.setDaemon(createDaemons); renameThread(retval, address, cluster_name); + if(threadDecorator != null) + threadDecorator.threadCreated(retval); return retval; } @@ -115,4 +126,5 @@ public void renameThread(Thread thread) { + } diff --git a/src/org/jgroups/util/ThreadDecorator.java b/src/org/jgroups/util/ThreadDecorator.java new file mode 100644 index 00000000000..d6b408229ed --- /dev/null +++ b/src/org/jgroups/util/ThreadDecorator.java @@ -0,0 +1,23 @@ +package org.jgroups.util; + +/** + * An object that can alter the state of a thread when it receives a callback from a {@link ThreadManager} notifying + * it that the thread has been created or released from use. + * + * @author Brian Stansberry + * @version $Id: ThreadDecorator.java,v 1.1.2.1 2008/05/26 09:14:40 belaban Exp $ + */ +public interface ThreadDecorator { + /** + * Notification that thread has just been created. + * @param thread the thread + */ + void threadCreated(Thread thread); + + /** + * Notification that thread has just been released from use + * (e.g. returned to a thread pool after executing a task). + * @param thread the thread + */ + void threadReleased(Thread thread); +} diff --git a/src/org/jgroups/util/ThreadManager.java b/src/org/jgroups/util/ThreadManager.java new file mode 100644 index 00000000000..61ce7f7f946 --- /dev/null +++ b/src/org/jgroups/util/ThreadManager.java @@ -0,0 +1,22 @@ +package org.jgroups.util; + +/** + * An object that manages threads and provides callbacks to a + * {@link ThreadDecorator} to allow it to alter their state. + * + * @author Brian Stansberry + * @version $Id: ThreadManager.java,v 1.1.2.1 2008/05/26 09:14:40 belaban Exp $ + */ +public interface ThreadManager { + /** + * Gets the ThreadDecorator associated with this manager. + * @return the ThreadDecorator, or null if there is none. + */ + ThreadDecorator getThreadDecorator(); + + /** + * Sets the ThreadDecorator associated this manager should use. + * @param decorator the ThreadDecorator, or null. + */ + void setThreadDecorator(ThreadDecorator decorator); +} diff --git a/src/org/jgroups/util/ThreadManagerThreadPoolExecutor.java b/src/org/jgroups/util/ThreadManagerThreadPoolExecutor.java new file mode 100644 index 00000000000..53075cb1637 --- /dev/null +++ b/src/org/jgroups/util/ThreadManagerThreadPoolExecutor.java @@ -0,0 +1,58 @@ +package org.jgroups.util; + +import java.util.concurrent.*; +import java.util.concurrent.ThreadFactory; + +/** + * ThreadPoolExecutor subclass that implements @{link ThreadManager}. + * @author Brian Stansberry + * @version $Id: ThreadManagerThreadPoolExecutor.java,v 1.1.2.1 2008/05/26 09:14:39 belaban Exp $ + */ +public class ThreadManagerThreadPoolExecutor extends ThreadPoolExecutor implements ThreadManager { + private ThreadDecorator decorator; + + public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public ThreadDecorator getThreadDecorator() { + return decorator; + } + + public void setThreadDecorator(ThreadDecorator decorator) { + this.decorator=decorator; + } + + /** + * Invokes {@link ThreadDecorator#threadReleased(Thread)} on the current thread. + *

+ * {@inheritDoc} + */ + @Override + protected void afterExecute(Runnable r, Throwable t) { + try { + super.afterExecute(r, t); + } + finally { + if(decorator != null) + decorator.threadReleased(Thread.currentThread()); + } + } + +} diff --git a/src/org/jgroups/util/TimeScheduler.java b/src/org/jgroups/util/TimeScheduler.java index a60a1a2be56..304b3491cd6 100644 --- a/src/org/jgroups/util/TimeScheduler.java +++ b/src/org/jgroups/util/TimeScheduler.java @@ -39,9 +39,9 @@ * added tasks will not restart it: start() has to be called to * restart the scheduler. * @author Bela Ban - * @version $Id: TimeScheduler.java,v 1.23.4.2 2008/05/22 13:23:11 belaban Exp $ + * @version $Id: TimeScheduler.java,v 1.23.4.3 2008/05/26 09:14:40 belaban Exp $ */ -public class TimeScheduler extends ScheduledThreadPoolExecutor { +public class TimeScheduler extends ScheduledThreadPoolExecutor implements ThreadManager { /** The interface that submitted tasks must implement */ public interface Task extends Runnable { @@ -72,10 +72,9 @@ public interface Task extends Runnable { } } + private ThreadDecorator threadDecorator=null; - - - /** + /** * Create a scheduler that executes tasks in dynamically adjustable intervals */ public TimeScheduler() { @@ -96,7 +95,13 @@ public TimeScheduler(int corePoolSize) { setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(getRejectedExecutionHandler())); } + public ThreadDecorator getThreadDecorator() { + return threadDecorator; + } + public void setThreadDecorator(ThreadDecorator threadDecorator) { + this.threadDecorator=threadDecorator; + } public String dumpTaskQueue() { return getQueue().toString(); @@ -176,7 +181,22 @@ public void stop() throws InterruptedException { } - private class TaskWrapper implements Runnable, ScheduledFuture { + + + @Override + protected void afterExecute(Runnable r, Throwable t) + { + try { + super.afterExecute(r, t); + } + finally { + if(threadDecorator != null) + threadDecorator.threadReleased(Thread.currentThread()); + } + } + + + private class TaskWrapper implements Runnable, ScheduledFuture { private final Task task; private ScheduledFuture future; // cannot be null ! private boolean cancelled=false;