Skip to content

Commit

Permalink
added ability to set thread creation/release handler (http://jira.jbo…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 26, 2008
1 parent 5d6def2 commit 2387088
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 57 deletions.
91 changes: 42 additions & 49 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -44,7 +44,7 @@
* The {@link #receive(Address, Address, byte[], int, int)} method must
* be called by subclasses when a unicast or multicast message has been received.
* @author Bela Ban
* @version $Id: TP.java,v 1.160.2.22 2008/05/23 08:19:20 belaban Exp $
* @version $Id: TP.java,v 1.160.2.23 2008/05/26 09:14:41 belaban Exp $
*/
public abstract class TP extends Protocol {

Expand Down Expand Up @@ -639,6 +639,35 @@ public void init() throws Exception {

verifyRejectionPolicy(oob_thread_pool_rejection_policy);
verifyRejectionPolicy(thread_pool_rejection_policy);

// ========================================== OOB thread pool ==============================

if(oob_thread_pool_enabled) {
if(oob_thread_pool_queue_enabled)
oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(oob_thread_pool_queue_max_size);
else
oob_thread_pool_queue=new SynchronousQueue<Runnable>();
oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time,
oob_thread_pool_rejection_policy, oob_thread_pool_queue, oob_thread_factory);
}
else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
oob_thread_pool=new DirectExecutor();
}

// ====================================== Regular thread pool ===========================

if(thread_pool_enabled) {
if(thread_pool_queue_enabled)
thread_pool_queue=new LinkedBlockingQueue<Runnable>(thread_pool_queue_max_size);
else
thread_pool_queue=new SynchronousQueue<Runnable>();
thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time,
thread_pool_rejection_policy, thread_pool_queue, default_thread_factory);
}
else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
thread_pool=new DirectExecutor();
}

if(persistent_ports){
pm = new PortsManager(pm_expiry_time,persistent_ports_file);
}
Expand Down Expand Up @@ -669,6 +698,17 @@ public void destroy() {
log.error("failed stopping the timer", e);
}
}

// 3. Stop the thread pools
if(oob_thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool(oob_thread_pool);
oob_thread_pool=null;
}

if(thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool(thread_pool);
thread_pool=null;
}
}
}

Expand All @@ -690,41 +730,6 @@ public void start() throws Exception {
incoming_packet_handler.start();
}


// ========================================== OOB thread pool ==============================
// create a ThreadPoolExecutor for the unmarshaller thread pool
if(oob_thread_pool == null) { // only create if not yet set (e.g. by a user)
if(oob_thread_pool_enabled) {
if(oob_thread_pool_queue_enabled)
oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(oob_thread_pool_queue_max_size);
else
oob_thread_pool_queue=new SynchronousQueue<Runnable>();
oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time,
oob_thread_pool_rejection_policy, oob_thread_pool_queue, oob_thread_factory);
}
else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
oob_thread_pool=new DirectExecutor();
}
}


// ====================================== Regular thread pool ===========================
// create a ThreadPoolExecutor for the unmarshaller thread pool
if(thread_pool == null) { // only create if not yet set (e.g.by a user)
if(thread_pool_enabled) {
if(thread_pool_queue_enabled)
thread_pool_queue=new LinkedBlockingQueue<Runnable>(thread_pool_queue_max_size);
else
thread_pool_queue=new SynchronousQueue<Runnable>();
thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time,
thread_pool_rejection_policy, thread_pool_queue, default_thread_factory);
}
else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
thread_pool=new DirectExecutor();
}
}


if(loopback && !use_concurrent_stack) {
incoming_msg_queue=new Queue();
incoming_msg_handler=new IncomingMessageHandler();
Expand Down Expand Up @@ -754,18 +759,6 @@ public void stop() {
// 2. Stop the incoming message handler
if(incoming_msg_handler != null)
incoming_msg_handler.stop();

// 3. Stop the thread pools

if(oob_thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool(oob_thread_pool);
oob_thread_pool=null;
}

if(thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool(thread_pool);
thread_pool=null;
}
}


Expand Down Expand Up @@ -1661,7 +1654,7 @@ protected void handleConfigEvent(Map<String,Object> map) {
protected static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, String rejection_policy,
BlockingQueue<Runnable> queue, final ThreadFactory factory) {

ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue);
ThreadPoolExecutor pool=new ThreadManagerThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue);
pool.setThreadFactory(factory);

//default
Expand Down
16 changes: 14 additions & 2 deletions src/org/jgroups/util/DefaultThreadFactory.java
Expand Up @@ -7,9 +7,9 @@
* THREAD-5,MyCluster,192.168.1.5:63754 or THREAD,MyCluster,192.168.1.5:63754
* @author Vladimir Blagojevic
* @author Bela Ban
* @version $Id: DefaultThreadFactory.java,v 1.3.2.3 2008/05/23 05:30:34 belaban Exp $
* @version $Id: DefaultThreadFactory.java,v 1.3.2.4 2008/05/26 09:14:38 belaban Exp $
*/
public class DefaultThreadFactory implements ExtendedThreadFactory {
public class DefaultThreadFactory implements ExtendedThreadFactory, ThreadManager {
protected final ThreadGroup group;
protected final String baseName;
protected final boolean createDaemons;
Expand All @@ -20,6 +20,7 @@ public class DefaultThreadFactory implements ExtendedThreadFactory {
protected boolean includeLocalAddress=false;
protected String clusterName=null;
protected String address=null;
protected ThreadDecorator threadDecorator=null;


public DefaultThreadFactory(ThreadGroup group, String baseName, boolean createDaemons) {
Expand Down Expand Up @@ -52,6 +53,14 @@ public void setAddress(String address){
this.address=address;
}

public ThreadDecorator getThreadDecorator() {
return threadDecorator;
}

public void setThreadDecorator(ThreadDecorator threadDecorator) {
this.threadDecorator = threadDecorator;
}

public Thread newThread(Runnable r, String name) {
return newThread(group, r, name);
}
Expand All @@ -68,6 +77,8 @@ protected Thread newThread(ThreadGroup group, Runnable r, String name, String ad
Thread retval=new Thread(group, r, name);
retval.setDaemon(createDaemons);
renameThread(retval, address, cluster_name);
if(threadDecorator != null)
threadDecorator.threadCreated(retval);
return retval;
}

Expand Down Expand Up @@ -115,4 +126,5 @@ public void renameThread(Thread thread) {




}
23 changes: 23 additions & 0 deletions src/org/jgroups/util/ThreadDecorator.java
@@ -0,0 +1,23 @@
package org.jgroups.util;

/**
* An object that can alter the state of a thread when it receives a callback from a {@link ThreadManager} notifying
* it that the thread has been created or released from use.
*
* @author Brian Stansberry
* @version $Id: ThreadDecorator.java,v 1.1.2.1 2008/05/26 09:14:40 belaban Exp $
*/
public interface ThreadDecorator {
/**
* Notification that <code>thread</code> has just been created.
* @param thread the thread
*/
void threadCreated(Thread thread);

/**
* Notification that <code>thread</code> has just been released from use
* (e.g. returned to a thread pool after executing a task).
* @param thread the thread
*/
void threadReleased(Thread thread);
}
22 changes: 22 additions & 0 deletions src/org/jgroups/util/ThreadManager.java
@@ -0,0 +1,22 @@
package org.jgroups.util;

/**
* An object that manages threads and provides callbacks to a
* {@link ThreadDecorator} to allow it to alter their state.
*
* @author Brian Stansberry
* @version $Id: ThreadManager.java,v 1.1.2.1 2008/05/26 09:14:40 belaban Exp $
*/
public interface ThreadManager {
/**
* Gets the ThreadDecorator associated with this manager.
* @return the ThreadDecorator, or <code>null</code> if there is none.
*/
ThreadDecorator getThreadDecorator();

/**
* Sets the ThreadDecorator associated this manager should use.
* @param decorator the ThreadDecorator, or <code>null</code>.
*/
void setThreadDecorator(ThreadDecorator decorator);
}
58 changes: 58 additions & 0 deletions src/org/jgroups/util/ThreadManagerThreadPoolExecutor.java
@@ -0,0 +1,58 @@
package org.jgroups.util;

import java.util.concurrent.*;
import java.util.concurrent.ThreadFactory;

/**
* ThreadPoolExecutor subclass that implements @{link ThreadManager}.
* @author Brian Stansberry
* @version $Id: ThreadManagerThreadPoolExecutor.java,v 1.1.2.1 2008/05/26 09:14:39 belaban Exp $
*/
public class ThreadManagerThreadPoolExecutor extends ThreadPoolExecutor implements ThreadManager {
private ThreadDecorator decorator;

public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public ThreadManagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public ThreadDecorator getThreadDecorator() {
return decorator;
}

public void setThreadDecorator(ThreadDecorator decorator) {
this.decorator=decorator;
}

/**
* Invokes {@link ThreadDecorator#threadReleased(Thread)} on the current thread.
* <p/>
* {@inheritDoc}
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
super.afterExecute(r, t);
}
finally {
if(decorator != null)
decorator.threadReleased(Thread.currentThread());
}
}

}
32 changes: 26 additions & 6 deletions src/org/jgroups/util/TimeScheduler.java
Expand Up @@ -39,9 +39,9 @@
* added tasks will not restart it: <tt>start()</tt> has to be called to
* restart the scheduler.
* @author Bela Ban
* @version $Id: TimeScheduler.java,v 1.23.4.2 2008/05/22 13:23:11 belaban Exp $
* @version $Id: TimeScheduler.java,v 1.23.4.3 2008/05/26 09:14:40 belaban Exp $
*/
public class TimeScheduler extends ScheduledThreadPoolExecutor {
public class TimeScheduler extends ScheduledThreadPoolExecutor implements ThreadManager {

/** The interface that submitted tasks must implement */
public interface Task extends Runnable {
Expand Down Expand Up @@ -72,10 +72,9 @@ public interface Task extends Runnable {
}
}

private ThreadDecorator threadDecorator=null;



/**
/**
* Create a scheduler that executes tasks in dynamically adjustable intervals
*/
public TimeScheduler() {
Expand All @@ -96,7 +95,13 @@ public TimeScheduler(int corePoolSize) {
setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(getRejectedExecutionHandler()));
}

public ThreadDecorator getThreadDecorator() {
return threadDecorator;
}

public void setThreadDecorator(ThreadDecorator threadDecorator) {
this.threadDecorator=threadDecorator;
}

public String dumpTaskQueue() {
return getQueue().toString();
Expand Down Expand Up @@ -176,7 +181,22 @@ public void stop() throws InterruptedException {
}


private class TaskWrapper<V> implements Runnable, ScheduledFuture<V> {


@Override
protected void afterExecute(Runnable r, Throwable t)
{
try {
super.afterExecute(r, t);
}
finally {
if(threadDecorator != null)
threadDecorator.threadReleased(Thread.currentThread());
}
}


private class TaskWrapper<V> implements Runnable, ScheduledFuture<V> {
private final Task task;
private ScheduledFuture<?> future; // cannot be null !
private boolean cancelled=false;
Expand Down

0 comments on commit 2387088

Please sign in to comment.