Skip to content

Commit

Permalink
Replace DispatchQueue and BookingQueue with HealthyThreadPool (#1035)
Browse files Browse the repository at this point in the history
* Update dispatchQuery to use min_cores

Sorting jobs only by priority causes a situation where low priority jobs can get starved by a constant flow of high priority jobs.
The new formula adds a modifier to the sorting rank to take into account the number of cores the job is requesting and also the number of days the job is waiting on the queue.
Priorities numbers over 200 will mostly override the formula and work as a priority only based scheduling.
sort = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days)

Besides that, also take layer_int_cores_min into account when filtering folder_resourse limitations to avoid allocating more cores than the folder limits.

(cherry picked from commit 566411aeeddc60983a30eabe121fd03263d05525)

* Revert "Update dispatchQuery to use min_cores"

This reverts commit 2eb4936

* Replace DispatchQueue and BookingQueue with HealthyThreadPool

Queues will not inherit from ThreadPoolExecutor, instead they will
manage an instance of HealthThreadPool, which is a
threadPoolExecutor that handles healthChecks, termination and repeated
tasks. With this the Booking queue should be able to self-heal when locked threads happen.

* Remove trackit reference

* Refactor HostReportQueue to use guava Cache

Use a guava cache to store only the last version of a HostReport per host.

* Configure HostReportQueue on opencue.properties

* Fix unit tests

* Fix unit tests

* This unit tests is not actually testing anything useful

Test doesn't make sense with the new threadpool and will also
cause problems whenever an user changes a config property.

Co-authored-by: Roula O'Regan <roregan@imageworks.com>
  • Loading branch information
DiegoTavares and roulaoregan-spi committed Mar 29, 2022
1 parent 6da47ba commit 027d853
Show file tree
Hide file tree
Showing 37 changed files with 781 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,5 @@ public ServletRegistrationBean<JobLaunchServlet> jobLaunchServlet() {
b.setServlet(new JobLaunchServlet());
return b;
}

}

148 changes: 61 additions & 87 deletions cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,128 +19,102 @@

package com.imageworks.spcue.dispatcher;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import com.imageworks.spcue.util.CueUtil;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

public class BookingQueue extends ThreadPoolExecutor {

private static final Logger logger = Logger.getLogger(BookingQueue.class);

private static final int THREADS_KEEP_ALIVE_SECONDS = 10;
public class BookingQueue {

private int queueCapacity;
private int baseSleepTimeMillis = 400;
private AtomicBoolean isShutdown = new AtomicBoolean(false);
private final int healthThreshold;
private final int minUnhealthyPeriodMin;
private final int queueCapacity;
private final int corePoolSize;
private final int maxPoolSize;
private static final int BASE_SLEEP_TIME_MILLIS = 300;

private QueueRejectCounter rejectCounter = new QueueRejectCounter();
private static final Logger logger = Logger.getLogger("HEALTH");
private HealthyThreadPool healthyThreadPool;

private Cache<String, DispatchBookHost> bookingCache = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
// Invalidate entries that got executed by the threadpool and lost their reference
.weakValues()
.build();

private BookingQueue(int corePoolSize, int maxPoolSize, int queueCapacity, int sleepTimeMs) {
super(corePoolSize, maxPoolSize, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity));
public BookingQueue(int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity,
int corePoolSize, int maxPoolSize) {
this.healthThreshold = healthThreshold;
this.minUnhealthyPeriodMin = minUnhealthyPeriodMin;
this.queueCapacity = queueCapacity;
this.baseSleepTimeMillis = sleepTimeMs;
this.setRejectedExecutionHandler(rejectCounter);
logger.info("BookingQueue" +
" core:" + getCorePoolSize() +
" max:" + getMaximumPoolSize() +
" capacity:" + queueCapacity +
" sleepTimeMs:" + sleepTimeMs);
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
initThreadPool();
}

@Autowired
public BookingQueue(Environment env, String propertyKeyPrefix, int sleepTimeMs) {
this(CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"),
sleepTimeMs);
public void initThreadPool() {
healthyThreadPool = new HealthyThreadPool(
"BookingQueue",
healthThreshold,
minUnhealthyPeriodMin,
queueCapacity,
corePoolSize,
maxPoolSize,
BASE_SLEEP_TIME_MILLIS);
}

public void execute(DispatchBookHost r) {
if (isShutdown.get()) {
return;
}
if (bookingCache.getIfPresent(r.getKey()) == null){
bookingCache.put(r.getKey(), r);
super.execute(r);
public boolean isHealthy() {
try {
if (!healthyThreadPool.isHealthyOrShutdown()) {
logger.warn("BookingQueue: Unhealthy queue terminated, starting a new one");
initThreadPool();
}
} catch (InterruptedException e) {
// TODO: evaluate crashing the whole springbook context here
// to force a container restart cycle
logger.error("Failed to restart BookingThreadPool", e);
return false;
}

return true;
}

public void execute(KeyRunnable r) {
healthyThreadPool.execute(r);
}

public long getRejectedTaskCount() {
return rejectCounter.getRejectCount();
return healthyThreadPool.getRejectedTaskCount();
}

public int getQueueCapacity() {
return queueCapacity;
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("clearing out booking queue: " + this.getQueue().size());
this.getQueue().clear();
}
healthyThreadPool.shutdown();
}

public int getSize() {
return healthyThreadPool.getQueue().size();
}

/**
* Lowers the sleep time as the queue grows.
*
* @return
*/
public int sleepTime() {
if (!isShutdown.get()) {
int sleep = (int) (baseSleepTimeMillis - (((this.getQueue().size () /
(float) queueCapacity) * baseSleepTimeMillis)) * 2);
if (sleep < 0) {
sleep = 0;
}
return sleep;
} else {
return 0;
}
public int getRemainingCapacity() {
return healthyThreadPool.getQueue().remainingCapacity();
}

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (isShutdown()) {
this.remove(r);
} else {
try {
Thread.sleep(sleepTime());
} catch (InterruptedException e) {
logger.info("booking queue was interrupted.");
Thread.currentThread().interrupt();
}
}
public int getActiveCount() {
return healthyThreadPool.getActiveCount();
}

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
public long getCompletedTaskCount() {
return healthyThreadPool.getCompletedTaskCount();
}

// Invalidate cache to avoid having to wait for GC to mark processed entries collectible
DispatchBookHost h = (DispatchBookHost)r;
bookingCache.invalidate(h.getKey());
public long getCorePoolSize() {
return corePoolSize;
}

if (sleepTime() < 100) {
logger.info("BookingQueue cleanup executed.");
getQueue().clear();
}
public long getMaximumPoolSize() {
return maxPoolSize;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,93 +23,84 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;

public class DispatchQueue {

private TaskExecutor dispatchPool;
private ThreadPoolTaskExecutor _dispatchPool;
private String name = "Default";
private AtomicBoolean isShutdown = new AtomicBoolean(false);

private final AtomicLong tasksRun = new AtomicLong(0);
private final AtomicLong tasksRejected = new AtomicLong(0);
private int healthThreshold;
private int minUnhealthyPeriodMin;
private int queueCapacity;
private int corePoolSize;
private int maxPoolSize;

private static final Logger logger = Logger.getLogger(DispatchQueue.class);

public DispatchQueue() {}
private static final Logger logger = Logger.getLogger("HEALTH");
private String name = "Default";
private HealthyThreadPool healthyDispatchPool;

public DispatchQueue(String name) {
public DispatchQueue(String name, int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity,
int corePoolSize, int maxPoolSize) {
this.name = name;
this.healthThreshold = healthThreshold;
this.minUnhealthyPeriodMin = minUnhealthyPeriodMin;
this.queueCapacity = queueCapacity;
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
initThreadPool();
}

public void execute(Runnable r) {
public void initThreadPool() {
healthyDispatchPool = new HealthyThreadPool(
name,
healthThreshold,
minUnhealthyPeriodMin,
queueCapacity,
corePoolSize,
maxPoolSize);
}

public boolean isHealthy() {
try {
if (!isShutdown.get()) {
this.dispatchPool.execute(r);
tasksRun.addAndGet(1);
if (!healthyDispatchPool.isHealthyOrShutdown()) {
logger.warn("DispatchQueue_" + name + ": Unhealthy queue terminated, starting a new one");
initThreadPool();
}
} catch (Exception e) {
long rejection = tasksRejected.addAndGet(1);
logger.warn("Warning, dispatch queue - [" + name + "] rejected, " + e);
throw new DispatchQueueTaskRejectionException(
"Warning, dispatch queue [" + name + " rejected task #"
+ rejection);
} catch (InterruptedException e) {
// TODO: evaluate crashing the whole springbook context here
// to force a container restart cycle
logger.error("DispatchQueue_" + name + ":Failed to restart DispatchThreadPool", e);
return false;
}
}

public int getMaxPoolSize() {
return _dispatchPool.getMaxPoolSize();
return true;
}

public int getActiveThreadCount() {
return _dispatchPool.getActiveCount();
public void execute(KeyRunnable r) {
healthyDispatchPool.execute(r);
}

public int getWaitingCount() {
return _dispatchPool.getThreadPoolExecutor().getQueue().size();
public long getRejectedTaskCount() {
return healthyDispatchPool.getRejectedTaskCount();
}

public int getRemainingCapacity() {
return _dispatchPool.getThreadPoolExecutor().getQueue().remainingCapacity();
public void shutdown() {
healthyDispatchPool.shutdown();
}

public long getTotalDispatched() {
return tasksRun.get();
public int getSize() {
return healthyDispatchPool.getQueue().size();
}

public long getTotalRejected() {
return tasksRejected.get();
public int getRemainingCapacity() {
return healthyDispatchPool.getQueue().remainingCapacity();
}

public TaskExecutor getDispatchPool() {
return dispatchPool;
public int getActiveCount() {
return healthyDispatchPool.getActiveCount();
}

public void setDispatchPool(TaskExecutor dispatchPool) {
this.dispatchPool = dispatchPool;
this._dispatchPool = (ThreadPoolTaskExecutor) dispatchPool;
public long getCompletedTaskCount() {
return healthyDispatchPool.getCompletedTaskCount();
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("Shutting down thread pool " + name + ", currently "
+ getActiveThreadCount() + " active threads.");
final long startTime = System.currentTimeMillis();
while (getWaitingCount() != 0 && getActiveThreadCount() != 0) {
try {
if (System.currentTimeMillis() - startTime > 10000) {
throw new InterruptedException(name
+ " thread pool failed to shutdown properly");
}
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.imageworks.spcue.VirtualProc;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import com.imageworks.spcue.dispatcher.commands.DispatchNextFrame;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;
import com.imageworks.spcue.grpc.host.LockState;
import com.imageworks.spcue.grpc.job.FrameExitStatus;
import com.imageworks.spcue.grpc.job.FrameState;
Expand Down Expand Up @@ -158,10 +159,11 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);

final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();
if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand All @@ -182,7 +184,7 @@ public void run() {
* properties.
*/
if (redirectManager.hasRedirect(proc)) {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand All @@ -195,7 +197,7 @@ public void run() {
});
}
else {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand Down
Loading

0 comments on commit 027d853

Please sign in to comment.