New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better task balancing #1482

Merged
merged 73 commits into from Jun 8, 2017
Commits
Jump to file or symbol
Failed to load files and symbols.
+117 鈭84
Diff settings

Always

Just for now

Viewing a subset of changes. View all

score offers and choose best score

  • Loading branch information...
darcatron committed Mar 30, 2017
commit 6601fb1d8b7f54e2b5b23f666e09fdb5a42d7b6f
@@ -11,17 +11,18 @@
import javax.inject.Singleton;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.mesos.Resources;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskCurrentUsage;
import com.hubspot.singularity.SingularityTaskCurrentUsageWithId;
@@ -43,6 +44,11 @@
private static final Logger LOG = LoggerFactory.getLogger(SingularityMesosOfferScheduler.class);
private static final String CPU_USED = "cpusUsed";
private static final String CPU_TOTAL = "cpusTotal";
private static final String MEMORY_USED = "memoryRssBytes";
private static final String MEMORY_TOTAL = "memoryRssBytesTotal";
private final Resources defaultResources;
private final Resources defaultCustomExecutorResources;
private final TaskManager taskManager;
@@ -137,46 +143,45 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
int tasksScheduled = 0;
Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave = getUsagesPerRequestTypePerSlave();
while (!pendingTaskIdToTaskRequest.isEmpty() && addedTaskInLastLoop && canScheduleAdditionalTasks(taskCredits)) {
addedTaskInLastLoop = false;
Collections.shuffle(offerHolders); // todo: remove this since we don't need to be choosing at random anymore
for (SingularityTaskRequestHolder taskRequestHolder : pendingTaskIdToTaskRequest.values()) {
Map<SingularityOfferHolder, Double> scorePerOffer = new HashMap<>();
// todo
// for each task
// for each offer
// get a score for each of the pending tasks for that offer
// score()
// if offer doesn't satisfy the task's needs, don't include it at all (not even with a 0 score so score > 0 offers only)
// accept offer for best score (existing accept code)
for (SingularityOfferHolder offerHolder : offerHolders) {
if (configuration.getMaxTasksPerOffer() > 0 && offerHolder.getAcceptedTasks().size() >= configuration.getMaxTasksPerOffer()) {
LOG.trace("Offer {} is full ({}) - skipping", offerHolder.getOffer(), offerHolder.getAcceptedTasks().size());
continue;
}
for (SingularityOfferHolder offerHolder : offerHolders) {
if (configuration.getMaxTasksPerOffer() > 0 && offerHolder.getAcceptedTasks().size() >= configuration.getMaxTasksPerOffer()) {
LOG.trace("Offer {} is full ({}) - skipping", offerHolder.getOffer(), offerHolder.getAcceptedTasks().size());
continue;
double score = score(offerHolder, stateCache, tasksPerOfferPerRequest, taskRequestHolder, getUsagesPerRequestTypePerSlave());
if (score > 0) {
// todo: can short circuit here if score is high enough
scorePerOffer.put(offerHolder, score);

This comment has been minimized.

@darcatron

darcatron Mar 30, 2017

Contributor

Thought we might want to have a value that's definitely good enough to just accept instead of continue evaluating

@darcatron

darcatron Mar 30, 2017

Contributor

Thought we might want to have a value that's definitely good enough to just accept instead of continue evaluating

}
}
Optional<SingularityTask> accepted = match(pendingTaskIdToTaskRequest.values(), stateCache, offerHolder, tasksPerOfferPerRequest);
if (accepted.isPresent()) {
if (!scorePerOffer.isEmpty()) {
SingularityOfferHolder bestOffer = Collections.max(scorePerOffer.entrySet(), Map.Entry.comparingByValue()).getKey();
LOG.info("Best offer is {} with a score of {}/1", bestOffer, scorePerOffer.get(bestOffer));
SingularityTask task = acceptTask(bestOffer, stateCache, tasksPerOfferPerRequest, taskRequestHolder);
tasksScheduled++;
if (useTaskCredits) {
taskCredits--;
LOG.debug("Remaining task credits: {}", taskCredits);
}
offerHolder.addMatchedTask(accepted.get());
bestOffer.addMatchedTask(task);
addedTaskInLastLoop = true;
pendingTaskIdToTaskRequest.remove(accepted.get().getTaskRequest().getPendingTask().getPendingTaskId().getId());
pendingTaskIdToTaskRequest.remove(task.getTaskRequest().getPendingTask().getPendingTaskId().getId());
if (useTaskCredits && taskCredits == 0) {
LOG.info("Used all available task credits, not scheduling any more tasks");
break;
}
}
if (pendingTaskIdToTaskRequest.isEmpty()) {
break;
}
}
}
@@ -192,8 +197,6 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
//todo: improve logic here
private Map<String, Map<RequestType, Map<String, Integer>>> getUsagesPerRequestTypePerSlave() {
List<String> slavesWithUsage = usageManager.getSlavesWithUsage();
final String cpus = "cpusUsed";
final String memory = "memoryRssBytes";
Map<String, Map<RequestType, List<SingularityTask>>> tasksPerRequestTypePerSlave = taskManager.getActiveTasks().stream()
.filter(t -> slavesWithUsage.contains(t.getOffer().getSlaveId().getValue()))
@@ -210,11 +213,13 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
for (RequestType type : RequestType.values()) {
List<SingularityTaskCurrentUsageWithId> usages = usageManager.getTaskCurrentUsages(tasksPerRequestTypePerSlave.get(slaveId).get(type).stream().map(SingularityTaskIdHolder::getTaskId).collect(Collectors.toList()));
long memUsed = usages.stream().mapToLong(SingularityTaskCurrentUsage::getMemoryRssBytes).sum() / totalMem;
double cpuUsed = usages.stream().mapToDouble(SingularityTaskCurrentUsage::getCpusUsed).sum() / totalCpu;
long memUsed = usages.stream().mapToLong(SingularityTaskCurrentUsage::getMemoryRssBytes).sum();
double cpuUsed = usages.stream().mapToDouble(SingularityTaskCurrentUsage::getCpusUsed).sum();
usagesPerRequestType.put(type, ImmutableMap.of(memory, ((int) memUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(cpus, ((int) cpuUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(MEMORY_USED, ((int) memUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(MEMORY_TOTAL, totalMem));
usagesPerRequestType.put(type, ImmutableMap.of(CPU_USED, ((int) cpuUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(CPU_TOTAL, totalCpu));
}
usagesPerRequestTypePerSlave.put(slaveId, usagesPerRequestType);
@@ -227,78 +232,92 @@ private boolean canScheduleAdditionalTasks(int taskCredits) {
return taskCredits == -1 || taskCredits > 0;
}
// todo: rename to score() and return an int
private Optional<SingularityTask> match(Collection<SingularityTaskRequestHolder> taskRequests, SingularitySchedulerStateCache stateCache, SingularityOfferHolder offerHolder,
Map<String, Map<String, Integer>> tasksPerOfferPerRequest) {
private double score(SingularityOfferHolder offerHolder, SingularitySchedulerStateCache stateCache, Map<String, Map<String, Integer>> tasksPerOfferPerRequest,
SingularityTaskRequestHolder taskRequestHolder, Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave) {
final String offerId = offerHolder.getOffer().getId().getValue();
// todo: remove the loop
for (SingularityTaskRequestHolder taskRequestHolder : taskRequests) {
final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
final Offer offer = offerHolder.getOffer();
final String offerId = offer.getId().getValue();
final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
final SingularityPendingTaskId pendingTaskId = taskRequest.getPendingTask().getPendingTaskId();
if (offerHolder.hasRejectedPendingTaskAlready(taskRequest.getPendingTask().getPendingTaskId())) {
continue;
}
if (offerHolder.hasRejectedPendingTaskAlready(pendingTaskId)) {
return 0;
}
if (tooManyTasksPerOfferForRequest(tasksPerOfferPerRequest, offerId, taskRequestHolder.getTaskRequest())) {
LOG.debug("Skipping task request for request id {}, too many tasks already scheduled using offer {}", taskRequest.getRequest().getId(), offerId);
continue;
}
if (tooManyTasksPerOfferForRequest(tasksPerOfferPerRequest, offerId, taskRequestHolder.getTaskRequest())) {
LOG.debug("Skipping task request for request id {}, too many tasks already scheduled using offer {}", taskRequest.getRequest().getId(), offerId);
return 0;
}
if (taskRequest.getRequest().getRequestType() == RequestType.ON_DEMAND) {
int maxActiveOnDemandTasks = taskRequest.getRequest().getInstances().or(configuration.getMaxActiveOnDemandTasksPerRequest());
if (maxActiveOnDemandTasks > 0) {
int activeTasksForRequest = stateCache.getActiveTaskIdsForRequest(taskRequest.getRequest().getId()).size();
if (activeTasksForRequest >= maxActiveOnDemandTasks) {
LOG.debug("Skipping pending task {}, already running {} instances for request {} (max is {})", taskRequest.getPendingTask().getPendingTaskId(), activeTasksForRequest);
continue;
}
}
}
if (isTooManyInstancesForRequest(taskRequest, stateCache)) {
LOG.debug("Skipping pending task {}, too many instances already running", pendingTaskId);
return 0;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Attempting to match task {} resources {} with required role '{}' ({} for task + {} for executor) with remaining offer resources {}",
pendingTaskId, taskRequestHolder.getTotalResources(), taskRequest.getRequest().getRequiredRole().or("*"),
taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources(), offerHolder.getCurrentResources());
}
final boolean matchesResources = MesosUtils.doesOfferMatchResources(taskRequest.getRequest().getRequiredRole(),
taskRequestHolder.getTotalResources(), offerHolder.getCurrentResources(), taskRequestHolder.getRequestedPorts());
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, stateCache);
if (matchesResources && slaveMatchState.isMatchAllowed()) {
return score(offer, taskRequest, usagesPerRequestTypePerSlave);
} else {
offerHolder.addRejectedTask(pendingTaskId);
if (LOG.isTraceEnabled()) {
LOG.trace("Attempting to match task {} resources {} with required role '{}' ({} for task + {} for executor) with remaining offer resources {}", taskRequest.getPendingTask().getPendingTaskId(),
taskRequestHolder.getTotalResources(), taskRequest.getRequest().getRequiredRole().or("*"), taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources(),
offerHolder.getCurrentResources());
LOG.trace("Ignoring offer {} with roles {} on {} for task {}; matched resources: {}, slave match state: {}", offerId,
MesosUtils.getRoles(offer), offer.getHostname(), pendingTaskId, matchesResources, slaveMatchState);
}
}
final boolean matchesResources = MesosUtils.doesOfferMatchResources(taskRequest.getRequest().getRequiredRole(), taskRequestHolder.getTotalResources(), offerHolder.getCurrentResources(),
taskRequestHolder.getRequestedPorts());
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, stateCache);
return 0;
}
if (matchesResources && slaveMatchState.isMatchAllowed()) {
// todo: move to separate method
// have this calculate and return a score
final SingularityTask task = mesosTaskBuilder.buildTask(offerHolder.getOffer(), offerHolder.getCurrentResources(), taskRequest, taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources());
private double score(Offer offer, SingularityTaskRequest taskRequest, Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave) {
double requestTypeCpuWeight = 0.20;

This comment has been minimized.

@ssalinas

ssalinas Apr 20, 2017

Member

Let's make these configurable, maybe another object in the configuration yaml?

@ssalinas

ssalinas Apr 20, 2017

Member

Let's make these configurable, maybe another object in the configuration yaml?

This comment has been minimized.

@darcatron

darcatron Apr 20, 2017

Contributor

Yup, I was in progress on this (now committed), but I kept the fields under SingularityConfiguration since I saw a lot of other stuff in there as well (e.g. caching). We could pull it into an OfferConfiguration file if you think that'd be better for organization

@darcatron

darcatron Apr 20, 2017

Contributor

Yup, I was in progress on this (now committed), but I kept the fields under SingularityConfiguration since I saw a lot of other stuff in there as well (e.g. caching). We could pull it into an OfferConfiguration file if you think that'd be better for organization

double requestTypeMemWeight = 0.20;
double freeCpuWeight = 0.20;
double freeMemWeight = 0.30;
double score = 0;
final SingularityTask zkTask = taskSizeOptimizer.getSizeOptimizedTask(task);
String slaveId = offer.getSlaveId().getValue();
RequestType requestType = taskRequest.getRequest().getRequestType();
Map<String, Integer> usagePerResource = usagesPerRequestTypePerSlave.get(slaveId).get(requestType);
if (LOG.isTraceEnabled()) {
LOG.trace("Accepted and built task {}", zkTask);
}
score += requestTypeCpuWeight * (1 - (usagePerResource.get(CPU_USED) / usagePerResource.get(CPU_TOTAL)));
score += requestTypeMemWeight * (1 - (usagePerResource.get(MEMORY_USED) / usagePerResource.get(MEMORY_TOTAL)));
LOG.info("Launching task {} slot on slave {} ({})", task.getTaskId(), offerHolder.getOffer().getSlaveId().getValue(), offerHolder.getOffer().getHostname());
score += freeCpuWeight * (1 - (MesosUtils.getNumCpus(offer) / usagePerResource.get(CPU_TOTAL)));
score += freeMemWeight * (1 - (MesosUtils.getMemory(offer) / usagePerResource.get(MEMORY_TOTAL)));
taskManager.createTaskAndDeletePendingTask(zkTask);
return score;
}
stateCache.getActiveTaskIds().add(task.getTaskId());
stateCache.getActiveTaskIdsForRequest(task.getTaskRequest().getRequest().getId()).add(task.getTaskId());
addRequestToMapByOfferId(tasksPerOfferPerRequest, offerId, taskRequest.getRequest().getId());
stateCache.getScheduledTasks().remove(taskRequest.getPendingTask());
private SingularityTask acceptTask(SingularityOfferHolder offerHolder, SingularitySchedulerStateCache stateCache, Map<String, Map<String, Integer>> tasksPerOfferPerRequest, SingularityTaskRequestHolder taskRequestHolder) {
final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
final SingularityTask task = mesosTaskBuilder.buildTask(offerHolder.getOffer(), offerHolder.getCurrentResources(), taskRequest, taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources());
return Optional.of(task);
} else {
offerHolder.addRejectedTask(taskRequest.getPendingTask().getPendingTaskId());
final SingularityTask zkTask = taskSizeOptimizer.getSizeOptimizedTask(task);
if (LOG.isTraceEnabled()) {
LOG.trace("Ignoring offer {} with roles {} on {} for task {}; matched resources: {}, slave match state: {}", offerHolder.getOffer().getId().getValue(),
MesosUtils.getRoles(offerHolder.getOffer()), offerHolder.getOffer().getHostname(), taskRequest.getPendingTask().getPendingTaskId(), matchesResources, slaveMatchState);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Accepted and built task {}", zkTask);
}
// todo: have this return 0;
return Optional.absent();
LOG.info("Launching task {} slot on slave {} ({})", task.getTaskId(), offerHolder.getOffer().getSlaveId().getValue(), offerHolder.getOffer().getHostname());
taskManager.createTaskAndDeletePendingTask(zkTask);
stateCache.getActiveTaskIds().add(task.getTaskId());
stateCache.getActiveTaskIdsForRequest(task.getTaskRequest().getRequest().getId()).add(task.getTaskId());
addRequestToMapByOfferId(tasksPerOfferPerRequest, offerHolder.getOffer().getId().getValue(), taskRequest.getRequest().getId());
stateCache.getScheduledTasks().remove(taskRequest.getPendingTask());
return task;
}
private void addRequestToMapByOfferId(Map<String, Map<String, Integer>> tasksPerOfferPerRequest, String offerId, String requestId) {
@@ -330,8 +349,22 @@ private boolean tooManyTasksPerOfferForRequest(Map<String, Map<String, Integer>>
return tasksPerOfferPerRequest.get(offerId).get(taskRequest.getRequest().getId()) > maxPerOfferPerRequest;
}
private boolean isTooManyInstancesForRequest(SingularityTaskRequest taskRequest, SingularitySchedulerStateCache stateCache) {
if (taskRequest.getRequest().getRequestType() == RequestType.ON_DEMAND) {
int maxActiveOnDemandTasks = taskRequest.getRequest().getInstances().or(configuration.getMaxActiveOnDemandTasksPerRequest());
if (maxActiveOnDemandTasks > 0) {
int activeTasksForRequest = stateCache.getActiveTaskIdsForRequest(taskRequest.getRequest().getId()).size();
LOG.debug("Running {} instances for request {}. Max is {}", activeTasksForRequest, taskRequest.getRequest().getId(), maxActiveOnDemandTasks);
if (activeTasksForRequest >= maxActiveOnDemandTasks) {
return true;
}
}
}
return false;
}
public boolean isConnected() {
return schedulerDriverSupplier.get().isPresent();
}
}
ProTip! Use n and p to navigate between commits in a pull request.