Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ public class SingularityConfiguration extends Configuration {
// If cpuHardLimit is specified and a task is requesting a base cpu of > cpuHardLimit, that task's new hard limit is requested cpus * cpuHardLimitScaleFactor
private double cpuHardLimitScaleFactor = 1.25;

private Map<String, String> preemptibleTasksOnlyMachineAttributes = Collections.emptyMap();

private long preemptibleTaskMaxExpectedRuntimeMs = 900000; // 15 minutes

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -1670,4 +1674,20 @@ public SingularityConfiguration setCpuHardLimitScaleFactor(double cpuHardLimitSc
this.cpuHardLimitScaleFactor = cpuHardLimitScaleFactor;
return this;
}

public Map<String, String> getPreemptibleTasksOnlyMachineAttributes() {
return preemptibleTasksOnlyMachineAttributes;
}

public void setPreemptibleTasksOnlyMachineAttributes(Map<String, String> preemptibleTasksOnlyMachineAttributes) {
this.preemptibleTasksOnlyMachineAttributes = preemptibleTasksOnlyMachineAttributes;
}

public long getPreemptibleTaskMaxExpectedRuntimeMs() {
return preemptibleTaskMaxExpectedRuntimeMs;
}

public void setPreemptibleTaskMaxExpectedRuntimeMs(long preemptibleTaskMaxExpectedRuntimeMs) {
this.preemptibleTaskMaxExpectedRuntimeMs = preemptibleTaskMaxExpectedRuntimeMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ private double score(SingularityOfferHolder offerHolder, Map<String, Integer> ta
if (!matchesResources) {
return 0;
}

final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, activeTaskIdsForRequest);
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, activeTaskIdsForRequest, isPreemptibleTask(taskRequest));

if (slaveMatchState.isMatchAllowed()) {
return score(offerHolder.getHostname(), taskRequest, maybeSlaveUsage);
Expand All @@ -387,6 +386,19 @@ private double score(SingularityOfferHolder offerHolder, Map<String, Integer> ta
return 0;
}

private boolean isPreemptibleTask(SingularityTaskRequest taskRequest) {
// A long running task can be replaced + killed easily
if (taskRequest.getRequest().getRequestType().isLongRunning()) {
return true;
}

// A short, non-long-running task
Optional<SingularityDeployStatistics> deployStatistics = deployManager.getDeployStatistics(taskRequest.getRequest().getId(), taskRequest.getDeploy().getId());
return deployStatistics.isPresent()
&& deployStatistics.get().getAverageRuntimeMillis().isPresent()
&& deployStatistics.get().getAverageRuntimeMillis().get() < configuration.getPreemptibleTaskMaxExpectedRuntimeMs();
}

@VisibleForTesting
double score(String hostname, SingularityTaskRequest taskRequest, Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage) {
if (!maybeSlaveUsage.isPresent() || maybeSlaveUsage.get().isMissingUsageData()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class SingularitySlaveAndRackManager {
this.leaderCache = leaderCache;
}

SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest, List<SingularityTaskId> activeTaskIdsForRequest) {
SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest, List<SingularityTaskId> activeTaskIdsForRequest, boolean isPreemptibleTask) {
final String host = offerHolder.getHostname();
final String rackId = offerHolder.getRackId();
final String slaveId = offerHolder.getSlaveId();
Expand Down Expand Up @@ -113,7 +113,7 @@ SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTa
}
}

if (!isSlaveAttributesMatch(offerHolder, taskRequest)) {
if (!isSlaveAttributesMatch(offerHolder, taskRequest, isPreemptibleTask)) {
return SlaveMatchState.SLAVE_ATTRIBUTES_DO_NOT_MATCH;
}

Expand Down Expand Up @@ -241,7 +241,7 @@ SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTa
return SlaveMatchState.OK;
}

private boolean isSlaveAttributesMatch(SingularityOfferHolder offer, SingularityTaskRequest taskRequest) {
private boolean isSlaveAttributesMatch(SingularityOfferHolder offer, SingularityTaskRequest taskRequest, boolean isPreemptibleTask) {
if (offer.hasReservedSlaveAttributes()) {
Map<String, String> reservedSlaveAttributes = offer.getReservedSlaveAttributes();

Expand All @@ -260,6 +260,14 @@ private boolean isSlaveAttributesMatch(SingularityOfferHolder offer, Singularity
}
}

if (!configuration.getPreemptibleTasksOnlyMachineAttributes().isEmpty()) {
if (slaveAndRackHelper.hasRequiredAttributes(offer.getTextAttributes(), configuration.getPreemptibleTasksOnlyMachineAttributes())
&& !isPreemptibleTask) {
LOG.debug("Host {} is reserved for preemptible tasks", offer.getHostname());
return false;
}
}

if (taskRequest.getRequest().getRequiredSlaveAttributes().isPresent()
&& !slaveAndRackHelper.hasRequiredAttributes(offer.getTextAttributes(), taskRequest.getRequest().getRequiredSlaveAttributes().get())) {
LOG.trace("Task requires slave with attributes {}, (slave attributes are {})", taskRequest.getRequest().getRequiredSlaveAttributes().get(), offer.getTextAttributes());
Expand Down