Skip to content
Permalink
Browse files
adding a ThreadPoolExecutor with a waterMark to control the threading…
… behavior
  • Loading branch information
Supun Kamburugamuva committed Jan 17, 2011
1 parent 7af6a8c commit 22f6a490f44cdb29ac857c0d801a5f9556a32878
Showing 7 changed files with 724 additions and 7 deletions.
@@ -19,11 +19,13 @@

package org.apache.axis2.transport.base.threads;

import org.apache.axis2.transport.base.threads.watermark.DefaultWaterMarkQueue;
import org.apache.axis2.transport.base.threads.watermark.WaterMarkExecutor;
import org.apache.axis2.transport.base.threads.watermark.WaterMarkQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
@@ -33,7 +35,7 @@ public class NativeWorkerPool implements WorkerPool {
static final Log log = LogFactory.getLog(NativeWorkerPool.class);

private final ThreadPoolExecutor executor;
private final LinkedBlockingQueue<Runnable> blockingQueue;
private final BlockingQueue<Runnable> blockingQueue;

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName, String threadGroupId) {
@@ -45,10 +47,123 @@ public NativeWorkerPool(int core, int max, int keepAlive,
(queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queueLength));
executor = new ThreadPoolExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
core, max, keepAlive,
TimeUnit.SECONDS,
blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
}

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName,
String threadGroupId, BlockingQueue<Runnable> queue) {

if (log.isDebugEnabled()) {
log.debug("Using native util.concurrent package..");
}

if (queue == null) {
blockingQueue =
(queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queueLength));
} else {
blockingQueue = queue;
}

executor = new ThreadPoolExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
}

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName,
String threadGroupId, BlockingQueue<Runnable> queue,
RejectedExecutionHandler rejectedExecutionHandler) {

if (log.isDebugEnabled()) {
log.debug("Using native util.concurrent package..");
}

if (queue == null) {
blockingQueue =
(queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queueLength));
} else {
blockingQueue = queue;
}

executor = new ThreadPoolExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
rejectedExecutionHandler);
}

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, int waterMark, String threadGroupName,
String threadGroupId) {

if (log.isDebugEnabled()) {
log.debug("Using native util.concurrent package..");
}


blockingQueue =
(queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
: new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));

executor = new WaterMarkExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
(WaterMarkQueue<Runnable>) blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
}

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, int waterMark, String threadGroupName,
String threadGroupId, WaterMarkQueue<Runnable> queue) {

if (log.isDebugEnabled()) {
log.debug("Using native util.concurrent package..");
}

if (queue == null) {
blockingQueue =
(queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
: new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
} else {
blockingQueue = queue;
}

executor = new WaterMarkExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
(WaterMarkQueue<Runnable>) blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
}

public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, int waterMark, String threadGroupName,
String threadGroupId,
RejectedExecutionHandler rejectedExecutionHandler) {

if (log.isDebugEnabled()) {
log.debug("Using native util.concurrent package..");
}


blockingQueue =
(queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
: new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));

executor = new WaterMarkExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
(WaterMarkQueue<Runnable>) blockingQueue,
new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
rejectedExecutionHandler);
}

public void execute(final Runnable task) {
@@ -19,6 +19,8 @@

package org.apache.axis2.transport.base.threads;

import java.util.concurrent.BlockingQueue;

/**
* Worker pool factory.
* For the moment this always creates {@link NativeWorkerPool} instances since
@@ -27,8 +29,25 @@
public class WorkerPoolFactory {

public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName, String threadGroupId) {
int queueLength, String threadGroupName,
String threadGroupId) {
return new NativeWorkerPool(
core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
}

public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
int queueLength, int waterMark, String threadGroupName,
String threadGroupId) {
return new NativeWorkerPool(core, max, keepAlive,
queueLength, waterMark, threadGroupName,
threadGroupId);
}

public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName,
String threadGroupId, BlockingQueue<Runnable> queue) {
return new NativeWorkerPool(core, max, keepAlive,
queueLength, threadGroupName,
threadGroupId, queue);
}
}

0 comments on commit 22f6a49

Please sign in to comment.