Skip to content

Commit

Permalink
Made timer's thread pool and thread pool queue configurable: https://…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 15, 2016
1 parent 745a5a9 commit 0b26a05
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 31 deletions.
17 changes: 13 additions & 4 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -202,18 +202,24 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan
@Property(description="Type of timer to be used. The only valid value is \"new3\" (TimeScheduler3).")
protected String timer_type="new3";

@Property(name="timer_thread_pool.enabled",description="Enable or disable the thread pool inside the timer")
protected boolean timer_thread_pool_enabled=true;

@Property(name="timer.min_threads",description="Minimum thread pool size for the timer thread pool")
protected int timer_min_threads=2;
protected int timer_min_threads=1;

@Property(name="timer.max_threads",description="Max thread pool size for the timer thread pool")
protected int timer_max_threads=4;
protected int timer_max_threads=5;

@Property(name="timer.keep_alive_time", description="Timeout in ms to remove idle threads from the timer pool")
protected long timer_keep_alive_time=30000;

@Property(name="timer.queue_max_size", description="Max number of elements on a timer queue")
protected int timer_queue_max_size=100;

@Property(name="timer.queue_enabled", description="Use queue to for the timer thread pool")
protected boolean timer_queue_enabled=false;

@Property(name="timer.rejection_policy",description="Timer rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String timer_rejection_policy="abort"; // abort will spawn a new thread if the timer thread pool is full

Expand Down Expand Up @@ -1036,9 +1042,12 @@ diagnostics_ttl, log, getSocketFactory(), getThreadFactory(), diagnostics_passco
.transport(this);

if(timer == null) {
if(timer_type.equalsIgnoreCase("new3"))
if(timer_type.equalsIgnoreCase("new3")) {
BlockingQueue<Runnable> queue=timer_queue_enabled? new ArrayBlockingQueue<>(timer_queue_max_size) :
new SynchronousQueue<>();
timer=new TimeScheduler3(timer_thread_factory, timer_min_threads, timer_max_threads, timer_keep_alive_time,
timer_queue_max_size, timer_rejection_policy);
queue, timer_rejection_policy, timer_thread_pool_enabled);
}
else
throw new Exception("timer_type has to be \"new3\"");
}
Expand Down
73 changes: 46 additions & 27 deletions src/org/jgroups/util/TimeScheduler3.java
Expand Up @@ -8,6 +8,8 @@

import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;


/**
Expand All @@ -17,19 +19,19 @@
*/
public class TimeScheduler3 implements TimeScheduler, Runnable {
/** Thread pool used to execute the tasks */
protected final ThreadPoolExecutor pool;
protected final Executor pool;

/** DelayQueue with tasks being sorted according to execution times (next execution first) */
protected final BlockingQueue<Task> queue=new DelayQueue<>();
protected final BlockingQueue<Task> queue=new DelayQueue<>();

/** Thread which removes tasks ready to be executed from the queue and submits them to the pool for execution */
protected volatile Thread runner;
protected volatile Thread runner;

protected static final Log log=LogFactory.getLog(TimeScheduler3.class);
protected static final Log log=LogFactory.getLog(TimeScheduler3.class);

protected ThreadFactory timer_thread_factory;
protected ThreadFactory timer_thread_factory;

protected enum TaskType {dynamic, fixed_rate, fixed_delay}
protected enum TaskType {dynamic, fixed_rate, fixed_delay}


/**
Expand All @@ -45,27 +47,32 @@ public TimeScheduler3() {

public TimeScheduler3(ThreadFactory factory, int min_threads, int max_threads, long keep_alive_time, int max_queue_size,
String rejection_policy) {
this(factory, min_threads, max_threads, keep_alive_time, new ArrayBlockingQueue<>(max_queue_size), rejection_policy, true);
}

public TimeScheduler3(ThreadFactory factory, int min_threads, int max_threads, long keep_alive_time,
BlockingQueue<Runnable> queue, String rejection_policy, boolean thread_pool_enabled) {
timer_thread_factory=factory;
pool=new ThreadPoolExecutor(min_threads, max_threads,keep_alive_time, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(max_queue_size),
factory, Util.parseRejectionPolicy(rejection_policy));
pool=thread_pool_enabled?
new ThreadPoolExecutor(min_threads, max_threads,keep_alive_time, TimeUnit.MILLISECONDS,
queue, factory, Util.parseRejectionPolicy(rejection_policy))
: new DirectExecutor();
start();
}



public void setThreadFactory(ThreadFactory f) {pool.setThreadFactory(f);}
public int getMinThreads() {return pool.getCorePoolSize();}
public void setMinThreads(int size) {pool.setCorePoolSize(size);}
public int getMaxThreads() {return pool.getMaximumPoolSize();}
public void setMaxThreads(int size) {pool.setMaximumPoolSize(size);}
public long getKeepAliveTime() {return pool.getKeepAliveTime(TimeUnit.MILLISECONDS);}
public void setKeepAliveTime(long time) {pool.setKeepAliveTime(time, TimeUnit.MILLISECONDS);}
public int getCurrentThreads() {return pool.getPoolSize();}
public int getQueueSize() {return pool.getQueue().size();}
public void setThreadFactory(ThreadFactory f) {condSet((p) -> p.setThreadFactory(f));}
public int getMinThreads() {return condGet(ThreadPoolExecutor::getCorePoolSize, 0);}
public void setMinThreads(int size) {condSet(p -> p.setCorePoolSize(size));}
public int getMaxThreads() {return condGet(ThreadPoolExecutor::getMaximumPoolSize, 0);}
public void setMaxThreads(int size) {condSet(p -> p.setMaximumPoolSize(size));}
public long getKeepAliveTime() {return condGet(p -> p.getKeepAliveTime(TimeUnit.MILLISECONDS), 0L);}
public void setKeepAliveTime(long time) {condSet(p -> p.setKeepAliveTime(time, TimeUnit.MILLISECONDS));}
public int getCurrentThreads() {return condGet(ThreadPoolExecutor::getPoolSize, 0);}
public int getQueueSize() {return condGet(p -> p.getQueue().size(), 0);}
public int size() {return queue.size();}
public String toString() {return getClass().getSimpleName();}
public boolean isShutdown() {return pool.isShutdown();}
public boolean isShutdown() {return condGet(ThreadPoolExecutor::isShutdown, false);}


public String dumpTimerTasks() {
Expand Down Expand Up @@ -137,13 +144,16 @@ public void stop() {
}
queue.clear();

List<Runnable> remaining_tasks=pool.shutdownNow();
remaining_tasks.stream().filter(task -> task instanceof Future).forEach(task -> ((Future)task).cancel(true));
pool.getQueue().clear();
try {
pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e) {
if(pool instanceof ThreadPoolExecutor) {
ThreadPoolExecutor p=(ThreadPoolExecutor)pool;
List<Runnable> remaining_tasks=p.shutdownNow();
remaining_tasks.stream().filter(task -> task instanceof Future).forEach(task -> ((Future)task).cancel(true));
p.getQueue().clear();
try {
p.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e) {
}
}

// clears the threads list (https://issues.jboss.org/browse/JGRP-1971)
Expand Down Expand Up @@ -188,7 +198,16 @@ protected Future<?> doSchedule(Task task, long initial_delay) {
return add(task);
}

protected void condSet(Consumer<ThreadPoolExecutor> setter) {
if(pool instanceof ThreadPoolExecutor)
setter.accept((ThreadPoolExecutor)pool);
}

protected <T> T condGet(Function<ThreadPoolExecutor,T> getter, T default_value) {
if(pool instanceof ThreadPoolExecutor)
return getter.apply((ThreadPoolExecutor)pool);
return default_value;
}


protected void submitToPool(final Task entry) {
Expand Down

0 comments on commit 0b26a05

Please sign in to comment.