Skip to content

Commit

Permalink
Move to array blocking queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Amareshwari Sriramadasu committed Mar 24, 2016
1 parent fb186e7 commit 69dcc5b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 1 deletion.
Expand Up @@ -892,6 +892,25 @@ public static String getWSFilterImplConfKey(String filterName) {
*/
public static final long DEFAULT_ESTIMATE_TIMEOUT_MILLIS = 300000L; // 5 minutes


/**
* Key used to get minimum number of threads in the estimate thread pool
*/
public static final String ESTIMATE_POOL_MIN_THREADS = SERVER_PFX + "estimate.pool.min.threads";
public static final int DEFAULT_ESTIMATE_POOL_MIN_THREADS = 3;

/**
* Key used to get maximum number of threads in the estimate thread pool
*/
public static final String ESTIMATE_POOL_MAX_THREADS = SERVER_PFX + "estimate.pool.max.threads";
public static final int DEFAULT_ESTIMATE_POOL_MAX_THREADS = 100;

/**
* Key used to get keep alive time for threads in the estimate thread pool
*/
public static final String ESTIMATE_POOL_KEEP_ALIVE_MILLIS = SERVER_PFX + "estimate.pool.keepalive.millis";
public static final int DEFAULT_ESTIMATE_POOL_KEEP_ALIVE_MILLIS = 60000; // 1 minute

public static final String QUERY_PHASE1_REWRITERS = SERVER_PFX + "query.phase1.rewriters";

/**
Expand Down
Expand Up @@ -91,6 +91,7 @@ public AsyncEventListener(int poolSize, long timeOutSeconds, final boolean isDae
.build();
// fixed pool with min and max equal to poolSize
processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory);
processor.allowCoreThreadTimeOut(true);
}

/**
Expand Down
Expand Up @@ -1254,6 +1254,13 @@ public synchronized void start() {
}

private void startEstimatePool() {
int minPoolSize = conf.getInt(ESTIMATE_POOL_MIN_THREADS,
DEFAULT_ESTIMATE_POOL_MIN_THREADS);
int maxPoolSize = conf.getInt(ESTIMATE_POOL_MAX_THREADS,
DEFAULT_ESTIMATE_POOL_MAX_THREADS);
int keepAlive = conf.getInt(ESTIMATE_POOL_KEEP_ALIVE_MILLIS,
DEFAULT_ESTIMATE_POOL_KEEP_ALIVE_MILLIS);

final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
final AtomicInteger thId = new AtomicInteger();
// We are creating our own thread factory, just so that we can override thread name for easy debugging
Expand All @@ -1267,7 +1274,12 @@ public Thread newThread(Runnable r) {
};

log.debug("starting estimate pool");
this.estimatePool = Executors.newCachedThreadPool(threadFactory);

ThreadPoolExecutor estimatePool = new ThreadPoolExecutor(minPoolSize, maxPoolSize, keepAlive, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(minPoolSize), threadFactory);
estimatePool.allowCoreThreadTimeOut(true);
estimatePool.prestartCoreThread();
this.estimatePool = estimatePool;
}

private static final String REWRITE_GAUGE = "CUBE_REWRITE";
Expand Down
20 changes: 20 additions & 0 deletions lens-server/src/main/resources/lensserver-default.xml
Expand Up @@ -748,6 +748,26 @@
</description>
</property>

<property>
<name>lens.server.estimate.pool.min.threads</name>
<value>3</value>
<description>Minimum number of threads in the estimate thread pool</description>
</property>

<property>
<name>lens.server.estimate.pool.max.threads</name>
<value>100</value>
<description>Maximum number of threads in the estimate thread pool</description>
</property>

<property>
<name>lens.server.estimate.pool.keepalive.millis</name>
<value>60000</value>
<description>Thread keep alive time in milliseconds for the estimate thread pool.
If there are no estimate requests for this period,then cached threads are released from the pool.
</description>
</property>

<property>
<name>lens.server.session.expiry.service.interval.secs</name>
<value>3600</value>
Expand Down

0 comments on commit 69dcc5b

Please sign in to comment.