Skip to content

Commit

Permalink
Add thread pool properties (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
splhack committed Dec 14, 2021
1 parent cc6edd8 commit 7e05c4e
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import com.imageworks.spcue.util.CueUtil;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

public class BookingQueue extends ThreadPoolExecutor {

private static final Logger logger = Logger.getLogger(BookingQueue.class);

private static final int INITIAL_QUEUE_SIZE = 1000;

private static final int THREADS_MINIMUM = 6;
private static final int THREADS_MAXIMUM = 6;
private static final int THREADS_KEEP_ALIVE_SECONDS = 10;

private int queueCapacity;
private int baseSleepTimeMillis = 400;
private AtomicBoolean isShutdown = new AtomicBoolean(false);

Expand All @@ -50,11 +51,25 @@ public class BookingQueue extends ThreadPoolExecutor {
.weakValues()
.build();

public BookingQueue(int sleepTimeMs) {
super(THREADS_MINIMUM, THREADS_MAXIMUM, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE));
private BookingQueue(int corePoolSize, int maxPoolSize, int queueCapacity, int sleepTimeMs) {
super(corePoolSize, maxPoolSize, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity));
this.queueCapacity = queueCapacity;
this.baseSleepTimeMillis = sleepTimeMs;
this.setRejectedExecutionHandler(rejectCounter);
logger.info("BookingQueue" +
" core:" + getCorePoolSize() +
" max:" + getMaximumPoolSize() +
" capacity:" + queueCapacity +
" sleepTimeMs:" + sleepTimeMs);
}

@Autowired
public BookingQueue(Environment env, String propertyKeyPrefix, int sleepTimeMs) {
this(CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"),
sleepTimeMs);
}

public void execute(DispatchBookHost r) {
Expand All @@ -71,6 +86,10 @@ public long getRejectedTaskCount() {
return rejectCounter.getRejectCount();
}

public int getQueueCapacity() {
return queueCapacity;
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("clearing out booking queue: " + this.getQueue().size());
Expand All @@ -87,7 +106,7 @@ public void shutdown() {
public int sleepTime() {
if (!isShutdown.get()) {
int sleep = (int) (baseSleepTimeMillis - (((this.getQueue().size () /
(float) INITIAL_QUEUE_SIZE) * baseSleepTimeMillis)) * 2);
(float) queueCapacity) * baseSleepTimeMillis)) * 2);
if (sleep < 0) {
sleep = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,37 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

import com.imageworks.spcue.dispatcher.commands.DispatchHandleHostReport;
import com.imageworks.spcue.util.CueUtil;

public class HostReportQueue extends ThreadPoolExecutor {

private static final Logger logger = Logger.getLogger(HostReportQueue.class);

private static final int THREAD_POOL_SIZE_INITIAL = 6;
private static final int THREAD_POOL_SIZE_MAX = 8;
private static final int QUEUE_SIZE_INITIAL = 1000;

private QueueRejectCounter rejectCounter = new QueueRejectCounter();
private AtomicBoolean isShutdown = new AtomicBoolean(false);
private int queueCapacity;

public HostReportQueue() {
super(THREAD_POOL_SIZE_INITIAL, THREAD_POOL_SIZE_MAX, 10 , TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(QUEUE_SIZE_INITIAL));
private HostReportQueue(String name, int corePoolSize, int maxPoolSize, int queueCapacity) {
super(corePoolSize, maxPoolSize, 10 , TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity));
this.queueCapacity = queueCapacity;
this.setRejectedExecutionHandler(rejectCounter);
logger.info(name +
" core:" + getCorePoolSize() +
" max:" + getMaximumPoolSize() +
" capacity:" + queueCapacity);
}

@Autowired
public HostReportQueue(Environment env, String name, String propertyKeyPrefix) {
this(name,
CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"));
}

public void execute(DispatchHandleHostReport r) {
Expand All @@ -60,6 +72,10 @@ public long getRejectedTaskCount() {
return rejectCounter.getRejectCount();
}

public int getQueueCapacity() {
return queueCapacity;
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("Shutting down report pool, currently " + this.getActiveCount() + " active threads.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

/*
* Copyright Contributors to the OpenCue Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



package com.imageworks.spcue.dispatcher;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.imageworks.spcue.util.CueUtil;

/**
* Wrapper class of spring ThreadPoolTaskExecutor to initialize with the thread pool properties.
*/
public class ThreadPoolTaskExecutorWrapper extends ThreadPoolTaskExecutor {

private static final Logger logger = Logger.getLogger(ThreadPoolTaskExecutorWrapper.class);
private static final long serialVersionUID = -2977068663355369141L;

private int queueCapacity;

private ThreadPoolTaskExecutorWrapper(String name, int corePoolSize, int maxPoolSize,
int queueCapacity) {
super();
this.setMaxPoolSize(maxPoolSize);
this.setCorePoolSize(corePoolSize);
this.setQueueCapacity(queueCapacity);
this.queueCapacity = queueCapacity;
logger.info(name +
" core:" + getCorePoolSize() +
" max:" + getMaxPoolSize() +
" capacity:" + queueCapacity);
}

@Autowired
public ThreadPoolTaskExecutorWrapper(Environment env, String name, String propertyKeyPrefix) {
this(name,
CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"));
}

public int getQueueCapacity() {
return queueCapacity;
}
}
15 changes: 14 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/util/CueUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.mail.util.ByteArrayDataSource;

import org.apache.log4j.Logger;
import org.springframework.core.env.Environment;

import com.imageworks.spcue.LayerInterface;
import com.imageworks.spcue.SpcueRuntimeException;
Expand Down Expand Up @@ -351,5 +352,17 @@ public static List<Integer> normalizeFrameRange(FrameSet frameSet, int chunkSize
return Collections.unmodifiableList(
new ArrayList<Integer>(result));
}
}

/**
* Get "{prefix}.{key}" property int value
*
* @param env
* @param prefix Example "dispatcher.report_queue"
* @param key Example "core_pool_size"
*/
public static int getIntProperty(Environment env, String prefix, String key)
throws IllegalStateException {
Integer value = env.getRequiredProperty(prefix + "." + key, Integer.class);
return value.intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,34 @@
</constructor-arg>
</bean>

<bean id="launchQueue" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="1" />
<property name="queueCapacity" value="100" />
</bean>

<bean id="dispatchPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="4" />
<property name="queueCapacity" value="500" />
<bean id="launchQueue" class="com.imageworks.spcue.dispatcher.ThreadPoolTaskExecutorWrapper">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>LaunchQueue</value>
</constructor-arg>
<constructor-arg index="2" type="java.lang.String">
<value>dispatcher.launch_queue</value>
</constructor-arg>
</bean>

<bean id="killPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="4" />
<property name="queueCapacity" value="500" />
<bean id="dispatchPool" class="com.imageworks.spcue.dispatcher.ThreadPoolTaskExecutorWrapper">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>DispatchPool</value>
</constructor-arg>
<constructor-arg index="2" type="java.lang.String">
<value>dispatcher.dispatch_pool</value>
</constructor-arg>
</bean>

<bean id="managePool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="8" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="250" />
<bean id="managePool" class="com.imageworks.spcue.dispatcher.ThreadPoolTaskExecutorWrapper">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>ManagePool</value>
</constructor-arg>
<constructor-arg index="2" type="java.lang.String">
<value>dispatcher.manage_pool</value>
</constructor-arg>
</bean>

<bean id="dispatchQueue" class="com.imageworks.spcue.dispatcher.DispatchQueue" destroy-method="shutdown">
Expand All @@ -92,11 +98,31 @@
<property name="dispatchPool" ref="managePool" />
</bean>

<bean id="reportQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown"/>
<bean id="killQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown"/>
<bean id="reportQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>ReportQueue</value>
</constructor-arg>
<constructor-arg index="2" type="java.lang.String">
<value>dispatcher.report_queue</value>
</constructor-arg>
</bean>
<bean id="killQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>KillQueue</value>
</constructor-arg>
<constructor-arg index="2" type="java.lang.String">
<value>dispatcher.kill_queue</value>
</constructor-arg>
</bean>

<bean id="bookingQueue" class="com.imageworks.spcue.dispatcher.BookingQueue" destroy-method="shutdown">
<constructor-arg index="0" type="int">
<!-- arg0: env -->
<constructor-arg index="1" type="java.lang.String">
<value>dispatcher.booking_queue</value>
</constructor-arg>
<constructor-arg index="2" type="int">
<value>300</value>
</constructor-arg>
</bean>
Expand Down
42 changes: 42 additions & 0 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,48 @@ dispatcher.job_frame_dispatch_max=8
# Maximum number of frames to dispatch from a host at one time.
dispatcher.host_frame_dispatch_max=12

# Number of threads to keep in the pool for launching job.
dispatcher.launch_queue.core_pool_size=1
# Maximum number of threads to allow in the pool for launching job.
dispatcher.launch_queue.max_pool_size=1
# Queue capacity for launching job.
dispatcher.launch_queue.queue_capacity=100

# Number of threads to keep in the pool for various operation.
dispatcher.dispatch_pool.core_pool_size=4
# Maximum number of threads to allow in the pool for various operation.
dispatcher.dispatch_pool.max_pool_size=4
# Queue capacity for various operation.
dispatcher.dispatch_pool.queue_capacity=500

# Number of threads to keep in the pool for management operation.
dispatcher.manage_pool.core_pool_size=8
# Maximum number of threads to allow in the pool for management operation.
dispatcher.manage_pool.max_pool_size=8
# Queue capacity for management operation.
dispatcher.manage_pool.queue_capacity=250

# Number of threads to keep in the pool for handling Host Report.
dispatcher.report_queue.core_pool_size=6
# Maximum number of threads to allow in the pool for handling Host Report.
dispatcher.report_queue.max_pool_size=8
# Queue capacity for handling Host Report.
dispatcher.report_queue.queue_capacity=1000

# Number of threads to keep in the pool for kill frame operation.
dispatcher.kill_queue.core_pool_size=6
# Maximum number of threads to allow in the pool for kill frame operation.
dispatcher.kill_queue.max_pool_size=8
# Queue capacity for kill frame operation.
dispatcher.kill_queue.queue_capacity=1000

# Number of threads to keep in the pool for booking.
dispatcher.booking_queue.core_pool_size=6
# Maximum number of threads to allow in the pool for booking.
dispatcher.booking_queue.max_pool_size=6
# Queue capacity for booking.
dispatcher.booking_queue.queue_capacity=1000

# Jobs will be archived to the history tables after being completed for this long.
history.archive_jobs_cutoff_hours=72

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class TestBookingQueue extends AbstractTransactionalJUnit4SpringContextTe
@Resource
HostManager hostManager;

@Resource
BookingQueue bookingQueue;

private static final String HOSTNAME = "beta";

@Before
Expand Down Expand Up @@ -90,11 +93,10 @@ public void testBookingQueue() {
host1.idleCores = 500;
DispatchHost host2 = hostDao.findDispatchHost(HOSTNAME);
DispatchHost host3 = hostDao.findDispatchHost(HOSTNAME);
BookingQueue queue = new BookingQueue(1000);

queue.execute(new DispatchBookHost(host2,dispatcher));
queue.execute(new DispatchBookHost(host3,dispatcher));
queue.execute(new DispatchBookHost(host1,dispatcher));
bookingQueue.execute(new DispatchBookHost(host2,dispatcher));
bookingQueue.execute(new DispatchBookHost(host3,dispatcher));
bookingQueue.execute(new DispatchBookHost(host1,dispatcher));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit 7e05c4e

Please sign in to comment.