Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add balanced mode to dispatch query #1103

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,31 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, DispatchHost ho
int limit);

/**
* Return whether FIFO scheduling is enabled or not in the same priority for unittest.
* Return Scheduling Mode selected
*
* @return
*/
boolean getFifoSchedulingEnabled();
SchedulingMode getSchedulingMode();

/**
* Set whether FIFO scheduling is enabled or not in the same priority for unittest.
* Set Scheduling Mode.
*
* @param fifoSchedulingEnabled
* @param schedulingMode
*/
void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled);
void setSchedulingMode(SchedulingMode schedulingMode);

/**
* - PRIORITY_ONLY: Sort by priority only
* - FIFO: Whether or not to enable FIFO scheduling in the same priority.
* - BALANCED: Use a rank formula that takes into account time waiting, and number
* of cores required: rank = priority + (100 * (1 - (job.cores/job.int_min_cores))) + age in days
*/
enum SchedulingMode {
PRIORITY_ONLY,
FIFO,
BALANCED
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

public class DispatchQuery {

public static final String FIND_JOBS_BY_SHOW =
"/* FIND_JOBS_BY_SHOW */ " +
public static final String FIND_JOBS_BY_SHOW_PRIORITY_MODE =
"/* FIND_JOBS_BY_SHOW_PRIORITY_MODE */ " +
"SELECT pk_job, int_priority, rank FROM ( " +
"SELECT " +
"ROW_NUMBER() OVER (ORDER BY job_resource.int_priority DESC) AS rank, " +
Expand Down Expand Up @@ -101,16 +101,91 @@ public class DispatchQuery {
") " +
") AS t1 WHERE rank < ?";

// sort = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days) */
public static final String FIND_JOBS_BY_SHOW_BALANCED_MODE =
"/* FIND_JOBS_BY_SHOW_BALANCED_MODE */ " +
"SELECT pk_job, int_priority, rank FROM ( " +
"SELECT " +
"ROW_NUMBER() OVER (ORDER BY int_priority DESC) AS rank, " +
"pk_job, " +
"int_priority " +
"FROM ( " +
"SELECT DISTINCT " +
"job.pk_job as pk_job, " +
"CAST( " +
"job_resource.int_priority + ( " +
"100 * (CASE WHEN job_resource.int_min_cores <= 0 THEN 0 " +
"ELSE " +
"CASE WHEN job_resource.int_cores > job_resource.int_min_cores THEN 0 " +
"ELSE 1 - job_resource.int_cores/job_resource.int_min_cores " +
"END " +
"END) " +
") + ( " +
"(DATE_PART('days', NOW()) - DATE_PART('days', job.ts_updated)) " +
") as INT) as int_priority " +
"FROM " +
"job , " +
"job_resource , " +
"folder , " +
"folder_resource, " +
"point , " +
"layer , " +
"layer_stat , " +
"host " +
"WHERE " +
"job.pk_job = job_resource.pk_job " +
"AND job.pk_folder = folder.pk_folder " +
"AND folder.pk_folder = folder_resource.pk_folder " +
"AND folder.pk_dept = point.pk_dept " +
"AND folder.pk_show = point.pk_show " +
"AND job.pk_job = layer.pk_job " +
"AND job_resource.pk_job = job.pk_job " +
"AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN layer_stat.pk_layer ELSE NULL END) = layer.pk_layer " +
"AND " +
"(" +
"folder_resource.int_max_cores = -1 " +
"OR " +
"folder_resource.int_cores + layer.int_cores_min < folder_resource.int_max_cores " +
") " +
"AND job.str_state = 'PENDING' " +
"AND job.b_paused = false " +
"AND job.pk_show = ? " +
"AND job.pk_facility = ? " +
"AND " +
"(" +
"job.str_os IS NULL OR job.str_os = '' " +
"OR " +
"job.str_os = ? " +
") " +
"AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " +
"AND layer.int_cores_min <= ? " +
"AND layer.int_mem_min <= ? " +
"AND (CASE WHEN layer.b_threadable = true THEN 1 ELSE 0 END) >= ? " +
"AND layer.int_gpus_min <= ? " +
"AND layer.int_gpu_mem_min BETWEEN ? AND ? " +
"AND job_resource.int_cores + layer.int_cores_min <= job_resource.int_max_cores " +
"AND host.str_tags ~* ('(?x)' || layer.str_tags) " +
"AND host.str_name = ? " +
") AS t1 ) AS t2 WHERE rank < ?";


public static final String FIND_JOBS_BY_GROUP =
FIND_JOBS_BY_SHOW
public static final String FIND_JOBS_BY_GROUP_PRIORITY_MODE =
FIND_JOBS_BY_SHOW_PRIORITY_MODE
.replace(
"FIND_JOBS_BY_SHOW",
"FIND_JOBS_BY_GROUP")
.replace(
"AND job.pk_show = ? ",
"AND job.pk_folder = ? ");

public static final String FIND_JOBS_BY_GROUP_BALANCED_MODE =
FIND_JOBS_BY_SHOW_BALANCED_MODE
.replace(
"FIND_JOBS_BY_SHOW",
"FIND_JOBS_BY_GROUP")
.replace(
"AND job.pk_show = ? ",
"AND job.pk_folder = ? ");

private static final String replaceQueryForFifo(String query) {
return query
Expand All @@ -125,8 +200,8 @@ private static final String replaceQueryForFifo(String query) {
"WHERE rank < ? ORDER BY rank");
}

public static final String FIND_JOBS_FIFO_BY_SHOW = replaceQueryForFifo(FIND_JOBS_BY_SHOW);
public static final String FIND_JOBS_FIFO_BY_GROUP = replaceQueryForFifo(FIND_JOBS_BY_GROUP);
public static final String FIND_JOBS_BY_SHOW_FIFO_MODE = replaceQueryForFifo(FIND_JOBS_BY_SHOW_PRIORITY_MODE);
public static final String FIND_JOBS_BY_GROUP_FIFO_MODE = replaceQueryForFifo(FIND_JOBS_BY_GROUP_PRIORITY_MODE);

/**
* Dispatch a host in local booking mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,7 @@
import com.imageworks.spcue.grpc.host.ThreadMode;
import com.imageworks.spcue.util.CueUtil;

import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_JOB_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_JOB_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_LAYER_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_DISPATCH_FRAME_BY_LAYER_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_LOCAL;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_SHOWS;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_UNDER_PROCED_JOB_BY_FACILITY;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.HIGHER_PRIORITY_JOB_BY_FACILITY_EXISTS;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.*;


/**
Expand Down Expand Up @@ -130,24 +115,24 @@ public List<SortableShow> getShows() {
new ConcurrentHashMap<String, ShowCache>();

/**
* Whether or not to enable FIFO scheduling in the same priority.
* Choose between different scheduling strategies
*/
private boolean fifoSchedulingEnabled;
private SchedulingMode schedulingMode;

@Autowired
public DispatcherDaoJdbc(Environment env) {
fifoSchedulingEnabled = env.getProperty(
"dispatcher.fifo_scheduling_enabled", Boolean.class, false);
this.schedulingMode = SchedulingMode.valueOf(env.getProperty(
"dispatcher.scheduling_mode", String.class, "PRIORITY_ONLY"));
}

@Override
public boolean getFifoSchedulingEnabled() {
return fifoSchedulingEnabled;
public SchedulingMode getSchedulingMode() {
return schedulingMode;
}

@Override
public void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled) {
this.fifoSchedulingEnabled = fifoSchedulingEnabled;
public void setSchedulingMode(SchedulingMode schedulingMode) {
this.schedulingMode = schedulingMode;
}

/**
Expand Down Expand Up @@ -211,7 +196,7 @@ private List<String> findDispatchJobs(DispatchHost host, int numJobs, boolean sh
}

result.addAll(getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
findByShowQuery(),
PKJOB_MAPPER,
s.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand All @@ -233,6 +218,24 @@ private List<String> findDispatchJobs(DispatchHost host, int numJobs, boolean sh

}

private String findByShowQuery() {
switch (schedulingMode) {
case PRIORITY_ONLY: return FIND_JOBS_BY_SHOW_PRIORITY_MODE;
case FIFO: return FIND_JOBS_BY_SHOW_FIFO_MODE;
case BALANCED: return FIND_JOBS_BY_SHOW_BALANCED_MODE;
default: return FIND_JOBS_BY_SHOW_PRIORITY_MODE;
}
}

private String findByGroupQuery() {
switch (schedulingMode) {
case PRIORITY_ONLY: return FIND_JOBS_BY_GROUP_PRIORITY_MODE;
case FIFO: return FIND_JOBS_BY_GROUP_FIFO_MODE;
case BALANCED: return FIND_JOBS_BY_GROUP_BALANCED_MODE;
default: return FIND_JOBS_BY_GROUP_PRIORITY_MODE;
}
}

@Override
public List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
return findDispatchJobs(host, numJobs, true);
Expand All @@ -246,7 +249,7 @@ public List<String> findDispatchJobs(DispatchHost host, int numJobs) {
@Override
public List<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_GROUP : FIND_JOBS_BY_GROUP,
findByGroupQuery(),
PKJOB_MAPPER,
g.getGroupId(),host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand Down Expand Up @@ -406,7 +409,7 @@ public boolean higherPriorityJobExists(JobDetail baseJob, VirtualProc proc) {
public List<String> findDispatchJobs(DispatchHost host,
ShowInterface show, int numJobs) {
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
findByShowQuery(),
PKJOB_MAPPER,
show.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand Down
9 changes: 7 additions & 2 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ dispatcher.frame_query_max=20
dispatcher.job_frame_dispatch_max=8
# Maximum number of frames to dispatch from a host at one time.
dispatcher.host_frame_dispatch_max=12
# Whether or not to enable FIFO scheduling in the same priority.
dispatcher.fifo_scheduling_enabled=false
# Choose between different scheduling strategies:
# - PRIORITY_ONLY: Sort by priority only
# - FIFO: Whether or not to enable FIFO scheduling in the same priority.
# - BALANCED: Use a rank formula that takes into account time waiting, and number
# of cores required: rank = priority + (100 * (1 - (job.cores/job.int_min_cores))) + age in days
# layer limiting is also disabled in this mode for performance reasons
dispatcher.scheduling_mode=PRIORITY_ONLY

# Number of threads to keep in the pool for launching job.
dispatcher.launch_queue.core_pool_size=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ private void launchJobs(int count) throws Exception {

@Before
public void launchJob() {
dispatcherDao.setFifoSchedulingEnabled(true);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.FIFO);

dispatcher.setTestMode(true);
jobLauncher.testMode = true;
}

@After
public void resetFifoScheduling() {
dispatcherDao.setFifoSchedulingEnabled(false);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);
}

@Before
Expand Down Expand Up @@ -171,11 +171,11 @@ public void createHost() {
@Transactional
@Rollback(true)
public void testFifoSchedulingEnabled() {
assertTrue(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setFifoSchedulingEnabled(false);
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setFifoSchedulingEnabled(true);
assertTrue(dispatcherDao.getFifoSchedulingEnabled());
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.FIFO);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY);
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.FIFO);
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.FIFO);
}

@Test
Expand Down Expand Up @@ -213,8 +213,7 @@ public void testPortionSorted() throws Exception {
@Transactional
@Rollback(true)
public void testFifoSchedulingDisabled() throws Exception {
dispatcherDao.setFifoSchedulingEnabled(false);
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
dispatcherDao.setSchedulingMode(DispatcherDao.SchedulingMode.PRIORITY_ONLY);

int count = 10;
launchJobs(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,6 @@ public void testHigherPriorityJobExistsMaxProcBound() {
@Transactional
@Rollback(true)
public void testFifoSchedulingEnabled() {
assertFalse(dispatcherDao.getFifoSchedulingEnabled());
assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY);
}
}