Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cuebot reserve all cores #1313

Open
wants to merge 75 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
48758fb
fix: only skip cores when the number is strictly under the requiremen…
KernAttila May 31, 2023
8f80240
fix: only skip memory when the amount is strictly under the requireme…
KernAttila May 31, 2023
7a3917d
feat: handle negative cores request
KernAttila May 31, 2023
da57b2e
fix: missing semicolon
KernAttila May 31, 2023
78fb71d
fix: error unclosed string literal
KernAttila May 31, 2023
fd4a040
feat: handle negative frame cores requirement in a public function, t…
KernAttila May 31, 2023
263b248
fix: handle negative cores requirement during dispatch
KernAttila May 31, 2023
00f9111
fix: handle negative cores requirement during reservation
KernAttila May 31, 2023
5c9dcf4
fix: handle negative cores requirement during report
KernAttila May 31, 2023
fd17206
feat: handle negative frame cores requirement in a public function, t…
KernAttila May 31, 2023
627274a
dev: add debug to find out how cores are reserved with negative value…
KernAttila May 31, 2023
7e8c48e
fix: cores are counted by 100
KernAttila May 31, 2023
e5a77fe
fix: do not set minimum if below threshold. we now accept negative va…
KernAttila May 31, 2023
ef2b627
fix: threadable only if core != 100
KernAttila May 31, 2023
902d30a
fix: test if minimumCores is between 0 and 100 (excluded) to mark it …
KernAttila May 31, 2023
f82485e
dbg: do not override minimumCores
KernAttila Jun 1, 2023
c3d2c24
fix: syntax error, missing semicolon
KernAttila Jun 1, 2023
3a793b7
fix: handle negative cores during db insertion, in determineMinimumCo…
KernAttila Jun 1, 2023
7e10f64
fix: syntax error, missing semicolon
KernAttila Jun 1, 2023
145d14e
log: add debug logs to determine threadable during db insertion.
KernAttila Jun 1, 2023
7ca2918
fix: syntax error, did not pay attention
KernAttila Jun 1, 2023
7c3f61c
fix: missing logger
KernAttila Jun 1, 2023
3b94240
doc: fix typo
KernAttila Jun 1, 2023
9baba0b
feat: handle negative core requests (+ remove debug logs)
KernAttila Jun 1, 2023
19521d9
feat: handle negative core requests (+ remove debug logs)
KernAttila Jun 1, 2023
2cd6945
fix: do not default to 100 cores, return false when the processed req…
KernAttila Jun 1, 2023
ca89e05
fix: same for local dispatch
KernAttila Jun 1, 2023
b442fe7
doc: remove all logs
KernAttila Jun 1, 2023
33c2952
fis: better management of negative cores
KernAttila Jun 1, 2023
866d738
feat: handle zero/negative cores request when launching frame
KernAttila Jun 1, 2023
b96dbe1
feat!: make layer threadable when requested minCores is <= 0
KernAttila Jun 1, 2023
7309701
fix: matchNumbersOnly() handle negative values
KernAttila Jun 1, 2023
21794dc
add tst for negative numbers in validator
KernAttila Jun 1, 2023
06b7f8c
doc: can assign negative core values in show filters
KernAttila Jun 1, 2023
adf5909
fix: avoid double booking with negative requests
KernAttila Jun 2, 2023
2067abf
doc: add debugs to find where we default to service core requirement.
KernAttila Jun 2, 2023
a392a3d
doc: add debug
KernAttila Jun 2, 2023
1dfd79b
dbg: default to zero but continue
KernAttila Jun 2, 2023
de62e36
dev: modify trigger function for test
KernAttila Jun 2, 2023
939bfe6
fix: return early
KernAttila Jun 5, 2023
d64039b
fix: avoid double booking with negative requests
KernAttila Jun 5, 2023
4442530
fix: forgot to change parameter name
KernAttila Jun 5, 2023
a2aa631
fix: logger use proper class and fix core -> threads
KernAttila Jun 5, 2023
c4fc553
dbg: do not assign minimum
KernAttila Jun 5, 2023
e7022cc
dbg: add debug info when not dispatching
KernAttila Jun 5, 2023
2634735
dbg: add debug info when setting corePoints to layer
KernAttila Jun 5, 2023
f93754b
dbg: debug when determining threadable
KernAttila Jun 5, 2023
1c79a55
fix: syntax
KernAttila Jun 5, 2023
dd44a7a
fix: syntax
KernAttila Jun 5, 2023
2081bf4
fix: syntax
KernAttila Jun 5, 2023
00e2999
dbg: add frame summary before dispatch
KernAttila Jun 5, 2023
7f3e24c
dbg: log requested minCores
KernAttila Jun 5, 2023
29282bd
dbg: debug when allocating cores
KernAttila Jun 5, 2023
8a0f001
dbg: add debug info when dispatching
KernAttila Jun 5, 2023
85bab38
dbg, change variable to display
KernAttila Jun 5, 2023
2ccd2e4
dbg: add logs and do not use getCoreSpan()
KernAttila Jun 5, 2023
1f24058
doc: remove debug and keep getCoreSpan()
KernAttila Jun 5, 2023
359ed53
dbg: do not accept negative cores when the host is used.
KernAttila Jun 5, 2023
cc4744b
fix: handle negative and zero cores requests early
KernAttila Jun 5, 2023
1f0d7d9
feat: add canHandleNegativeCoresRequirement()
KernAttila Jun 5, 2023
5ff094a
fix: early break without throwing exception
KernAttila Jun 5, 2023
de80d1d
fix: typo
KernAttila Jun 5, 2023
191bdca
fix: do not throw exceptions, pass a state instead
KernAttila Jun 5, 2023
f17d7ae
fix: typo
KernAttila Jun 5, 2023
06195b7
fix: use porper function to retrieve host name
KernAttila Jun 5, 2023
9b57fe9
fix: no namespace
KernAttila Jun 5, 2023
8ff11e3
fix: no namespace
KernAttila Jun 5, 2023
79b6d11
config: lower thresholds, jobs are not distributed to all hosts
KernAttila Jun 5, 2023
247327e
pref: set frame_query_max=2
KernAttila Jun 7, 2023
a97b4c0
feat: accept dash character in tags (-)
KernAttila Jun 9, 2023
cc0f3a9
feat: Added function and property `canHandleNegativeCoresRequest`
KernAttila Aug 25, 2023
9eddb03
fix: line too long
KernAttila Aug 26, 2023
1a93ca1
revert: no need to do this anymore, cuebot handles calculation before…
KernAttila Aug 26, 2023
962ce55
revert: no need to handle negative cores in rqd, cuebot handles the c…
KernAttila Aug 26, 2023
014b91b
Merge branch 'master' of https://github.com/Wolf-Pipeline/OpenCue int…
KernAttila Sep 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import com.imageworks.spcue.grpc.host.LockState;
import com.imageworks.spcue.util.CueUtil;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class DispatchHost extends Entity
implements HostInterface, FacilityInterface, ResourceContainer {

private static final Logger logger = LogManager.getLogger(DispatchHost.class);

public String facilityId;
public String allocationId;
public LockState lockState;
Expand Down Expand Up @@ -76,12 +81,52 @@ public String getFacilityId() {
return facilityId;
}

public boolean canHandleNegativeCoresRequest(int requestedCores) {
// Request is positive, no need to test further.
if (requestedCores > 0) {
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
// All cores are available, validate the request.
if (cores == idleCores) {
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
// Some or all cores are busy, avoid booking again.
logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores.");
return false;
}

public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
if (requestedCores <=0 && idleCores < cores) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
// Book all cores minus the request
int totalCores = idleCores + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCores + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {

minCores = handleNegativeCoresRequirement(minCores);
if (idleCores < minCores) {
return false;
}
if (minCores <= 0) {
return false;
}
else if (idleMemory < minMemory) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.imageworks.spcue.dispatcher.ResourceContainer;
import com.imageworks.spcue.grpc.renderpartition.RenderPartitionType;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

/**
* Contains information about local desktop cores a user has
* assigned to the given job.
Expand All @@ -33,6 +36,8 @@
public class LocalHostAssignment extends Entity
implements ResourceContainer {

private static final Logger logger = LogManager.getLogger(LocalHostAssignment.class);

private int idleCoreUnits;
private long idleMemory;
private int idleGpuUnits;
Expand Down Expand Up @@ -62,12 +67,36 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu
this.maxGpuMemory = maxGpuMemory;
}

public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
if (requestedCores <=0 && idleCoreUnits < threads) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
// Book all cores minus the request
int totalCores = idleCoreUnits + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCoreUnits + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {

minCores = handleNegativeCoresRequirement(minCores);
if (idleCoreUnits < minCores) {
return false;
}
if (minCores <= 0) {
return false;
}
else if (idleMemory < minMemory) {
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions cuebot/src/main/java/com/imageworks/spcue/SortableShow.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public boolean isSkipped(String tags, long cores, long memory) {
try {
if (failed.containsKey(tags)) {
long [] mark = failed.get(tags);
if (cores <= mark[0]) {
logger.info("skipped due to not enough cores " + cores + " <= " + mark[0]);
if (cores < mark[0]) {
logger.info("skipped due to not enough cores " + cores + " < " + mark[0]);
return true;
}
else if (memory <= mark[1]) {
logger.info("skipped due to not enough memory " + memory + " <= " + mark[1]);
else if (memory < mark[1]) {
logger.info("skipped due to not enough memory " + memory + " < " + mark[1]);
return true;
}
}
Expand Down
18 changes: 17 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.grpc.host.ThreadMode;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class VirtualProc extends FrameEntity implements ProcInterface {

private static final Logger logger = LogManager.getLogger(VirtualProc.class);

public String hostId;
public String allocationId;
public String frameId;
public String hostName;
public String os;
public byte[] childProcesses;

public boolean canHandleNegativeCoresRequest;
public int coresReserved;
public long memoryReserved;
public long memoryUsed;
Expand Down Expand Up @@ -111,7 +117,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame) {
proc.coresReserved = proc.coresReserved + host.strandedCores;
}

if (proc.coresReserved >= 100) {
proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);

if (proc.coresReserved == 0) {
logger.debug("Reserving all cores");
proc.coresReserved = host.cores;
}
else if (proc.coresReserved < 0) {
logger.debug("Reserving all cores minus " + proc.coresReserved);
proc.coresReserved = host.cores + proc.coresReserved;
}
else if (proc.coresReserved >= 100) {

int originalCores = proc.coresReserved;

Expand Down
27 changes: 15 additions & 12 deletions cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface LayerDao {
public List<LayerDetail> getLayerDetails(JobInterface job);

/**
* Returns true if supplied layer is compelte.
* Returns true if supplied layer is complete.
*
* @param layer
* @return boolean
Expand All @@ -82,7 +82,7 @@ public interface LayerDao {
void insertLayerDetail(LayerDetail l);

/**
* gets a layer detail from an object that implments layer
* gets a layer detail from an object that implements layer
*
* @param layer
* @return LayerDetail
Expand Down Expand Up @@ -167,7 +167,7 @@ public interface LayerDao {
void updateLayerTags(LayerInterface layer, Set<String> tags);

/**
* Insert a key/valye pair into the layer environment
* Insert a key/value pair into the layer environment
*
* @param layer
* @param key
Expand Down Expand Up @@ -292,7 +292,7 @@ public interface LayerDao {

/**
* Update all layers of the set type in the specified job
* with the new min cores requirement.
* with the new min gpu requirement.
*
* @param job
* @param gpus
Expand All @@ -304,17 +304,16 @@ public interface LayerDao {
* Update a layer's max cores value, which limits how
* much threading can go on.
*
* @param job
* @param cores
* @param type
* @param layer
* @param threadable
*/
void updateThreadable(LayerInterface layer, boolean threadable);

/**
* Update a layer's timeout value, which limits how
* much the frame can run on a host.
*
* @param job
* @param layer
* @param timeout
*/
void updateTimeout(LayerInterface layer, int timeout);
Expand All @@ -323,8 +322,8 @@ public interface LayerDao {
* Update a layer's LLU timeout value, which limits how
* much the frame can run on a host without updates in the log file.
*
* @param job
* @param timeout
* @param layer
* @param timeout_llu
*/
void updateTimeoutLLU(LayerInterface layer, int timeout_llu);

Expand All @@ -341,7 +340,7 @@ public interface LayerDao {

/**
* Appends a tag to the current set of tags. If the tag
* already exists than nothing happens.
* already exists then nothing happens.
*
* @param layer
* @param val
Expand All @@ -363,8 +362,9 @@ public interface LayerDao {
* Update layer usage with processor time usage.
* This happens when the proc has completed or failed some work.
*
* @param proc
* @param layer
* @param newState
* @param exitStatus
*/
void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus);

Expand All @@ -387,6 +387,9 @@ public interface LayerDao {

/**
* Enable/disable memory optimizer.
*
* @param layer
* @param state
*/
void enableMemoryOptimizer(LayerInterface layer, boolean state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@
import com.imageworks.spcue.util.CueUtil;
import com.imageworks.spcue.util.SqlUtil;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class);
private static final String INSERT_OUTPUT_PATH =
"INSERT INTO " +
"layer_output " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,16 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {
" on job " + job.getName());

for (DispatchFrame frame: frames) {

VirtualProc proc = VirtualProc.build(host, frame);

if (host.idleCores < frame.minCores ||
if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) {
logger.debug("Cannot dispatch job, host is busy.");
break;
}
if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) ||
host.idleMemory < frame.minMemory ||
host.idleGpus < frame.minGpus ||
host.idleGpuMemory < frame.minGpuMemory) {
logger.debug("Cannot dispatch, insufficient resources.");
break;
}

Expand All @@ -281,6 +284,8 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {

boolean success = new DispatchFrameTemplate(proc, job, frame, false) {
public void wrapDispatchFrame() {
logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " +
proc.coresReserved + " coresReserved");
dispatch(frame, proc);
dispatchSummary(proc, frame, "Booking");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public void handleHostReport(HostReport report, boolean isBoot) {
bookingManager.removeInactiveLocalHostAssignment(lca);
}
}

if (host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
int cores_to_reserve = host.handleNegativeCoresRequirement(Dispatcher.CORE_POINTS_RESERVED_MIN);
if (cores_to_reserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
msg = String.format("%s doesn't have enough idle cores, %d needs %d",
host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public JobDetail createJob(BuildableJob buildableJob) {
}
}

if (layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN;
}

Expand Down
12 changes: 8 additions & 4 deletions cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public class JobSpec {
public JobSpec() {
}

public static final String NAME_REGEX = "^([\\w\\.]{3,})$";
public static final String NAME_REGEX = "^([\\w\\.-]{3,})$";

public static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX);

Expand Down Expand Up @@ -612,12 +612,16 @@ private void determineMinimumCores(Element layerTag, LayerDetail layer) {
int corePoints = layer.minimumCores;

if (cores.contains(".")) {
corePoints = (int) (Double.valueOf(cores) * 100 + .5);
if (cores.contains("-")) {
corePoints = (int) (Double.valueOf(cores) * 100 - .5);
} else {
corePoints = (int) (Double.valueOf(cores) * 100 + .5);
}
} else {
corePoints = Integer.valueOf(cores);
}

if (corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) {
if (corePoints > 0 && corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) {
corePoints = Dispatcher.CORE_POINTS_RESERVED_DEFAULT;
}

Expand Down Expand Up @@ -651,7 +655,7 @@ private void determineChunkSize(Element layerTag, LayerDetail layer) {
*/
private void determineThreadable(Element layerTag, LayerDetail layer) {
// Must have at least 1 core to thread.
if (layer.minimumCores < 100) {
if (layer.minimumCores > 0 && layer.minimumCores < 100) {
layer.isThreadable = false;
}
else if (layerTag.getChildTextTrim("threadable") != null) {
Expand Down
2 changes: 1 addition & 1 deletion cuegui/cuegui/FilterDialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def createAction(self):
"Create Action",
"What value should this property be set to?",
0,
0,
-8, # Minimum core value can be <=0, booking all cores minus this value.
50000,
2)
value = float(value)
Expand Down
2 changes: 1 addition & 1 deletion cuesubmit/cuesubmit/Submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def buildLayer(layerData, command, lastLayer=None):
@type lastLayer: outline.layer.Layer
@param lastLayer: layer that this new layer should be dependent on if dependType is set.
"""
threadable = float(layerData.cores) >= 2
threadable = float(layerData.cores) >= 2 or float(layerData.cores) <= 0
layer = outline.modules.shell.Shell(
layerData.name, command=command.split(), chunk=layerData.chunk,
threads=float(layerData.cores), range=str(layerData.layerRange), threadable=threadable)
Expand Down
Loading
Loading