New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better task balancing #1482

Merged
merged 73 commits into from Jun 8, 2017
Commits
Jump to file or symbol
Failed to load files and symbols.
+77 鈭5
Diff settings

Always

Just for now

Next

get usage per request type

  • Loading branch information...
darcatron committed Mar 30, 2017
commit 534b899aa072cfbeab34ad9dab730c60db60255f
@@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Singleton;
@@ -14,20 +15,26 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.mesos.Resources;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskCurrentUsage;
import com.hubspot.singularity.SingularityTaskCurrentUsageWithId;
import com.hubspot.singularity.SingularityTaskIdHolder;
import com.hubspot.singularity.SingularityTaskRequest;
import com.hubspot.singularity.SlaveMatchState;
import com.hubspot.singularity.config.CustomExecutorConfiguration;
import com.hubspot.singularity.config.MesosConfiguration;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.UsageManager;
import com.hubspot.singularity.scheduler.SingularityScheduler;
import com.hubspot.singularity.scheduler.SingularitySchedulerStateCache;
@@ -47,15 +54,27 @@
private final SingularitySlaveAndRackHelper slaveAndRackHelper;
private final SingularityTaskSizeOptimizer taskSizeOptimizer;
private final DisasterManager disasterManager;
private final UsageManager usageManager;
private final SlaveManager slaveManager;
private final Provider<SingularitySchedulerStateCache> stateCacheProvider;
private final SchedulerDriverSupplier schedulerDriverSupplier;
@Inject
public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, CustomExecutorConfiguration customExecutorConfiguration, TaskManager taskManager, SingularityMesosTaskPrioritizer taskPrioritizer,
SingularityScheduler scheduler, SingularityConfiguration configuration, SingularityMesosTaskBuilder mesosTaskBuilder,
SingularitySlaveAndRackManager slaveAndRackManager, SingularityTaskSizeOptimizer taskSizeOptimizer, SingularitySlaveAndRackHelper slaveAndRackHelper,
Provider<SingularitySchedulerStateCache> stateCacheProvider, SchedulerDriverSupplier schedulerDriverSupplier, DisasterManager disasterManager) {
public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
CustomExecutorConfiguration customExecutorConfiguration,
TaskManager taskManager,
SingularityMesosTaskPrioritizer taskPrioritizer,
SingularityScheduler scheduler,
SingularityConfiguration configuration,
SingularityMesosTaskBuilder mesosTaskBuilder,
SingularitySlaveAndRackManager slaveAndRackManager,
SingularityTaskSizeOptimizer taskSizeOptimizer,
SingularitySlaveAndRackHelper slaveAndRackHelper,
Provider<SingularitySchedulerStateCache> stateCacheProvider,
SchedulerDriverSupplier schedulerDriverSupplier,
DisasterManager disasterManager,
UsageManager usageManager, SlaveManager slaveManager) {
this.defaultResources = new Resources(mesosConfiguration.getDefaultCpus(), mesosConfiguration.getDefaultMemory(), 0, mesosConfiguration.getDefaultDisk());
this.defaultCustomExecutorResources = new Resources(customExecutorConfiguration.getNumCpus(), customExecutorConfiguration.getMemoryMb(), 0, customExecutorConfiguration.getDiskMb());
this.taskManager = taskManager;
@@ -69,6 +88,8 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, Cus
this.disasterManager = disasterManager;
this.schedulerDriverSupplier = schedulerDriverSupplier;
this.taskPrioritizer = taskPrioritizer;
this.usageManager = usageManager;
this.slaveManager = slaveManager;
}
private Map<String, SingularityTaskRequestHolder> getDueTaskRequestHolders() {
@@ -116,9 +137,20 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, Cus
int tasksScheduled = 0;
Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave = getUsagesPerRequestTypePerSlave();
while (!pendingTaskIdToTaskRequest.isEmpty() && addedTaskInLastLoop && canScheduleAdditionalTasks(taskCredits)) {
addedTaskInLastLoop = false;
Collections.shuffle(offerHolders);
Collections.shuffle(offerHolders); // todo: remove this since we don't need to be choosing at random anymore
// todo
// for each task
// for each offer
// get a score for each of the pending tasks for that offer
// score()
// if offer doesn't satisfy the task's needs, don't include it at all (not even with a 0 score so score > 0 offers only)
// accept offer for best score (existing accept code)
for (SingularityOfferHolder offerHolder : offerHolders) {
if (configuration.getMaxTasksPerOffer() > 0 && offerHolder.getAcceptedTasks().size() >= configuration.getMaxTasksPerOffer()) {
@@ -157,13 +189,50 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, Cus
return offerHolders;
}
//todo: improve logic here
private Map<String, Map<RequestType, Map<String, Integer>>> getUsagesPerRequestTypePerSlave() {
List<String> slavesWithUsage = usageManager.getSlavesWithUsage();
final String cpus = "cpusUsed";
final String memory = "memoryRssBytes";
Map<String, Map<RequestType, List<SingularityTask>>> tasksPerRequestTypePerSlave = taskManager.getActiveTasks().stream()
.filter(t -> slavesWithUsage.contains(t.getOffer().getSlaveId().getValue()))
.collect(Collectors.groupingBy(t -> t.getOffer().getSlaveId().getValue(),
Collectors.groupingBy(t -> t.getTaskRequest().getRequest().getRequestType())));
Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave = new HashMap<>();
for (String slaveId : tasksPerRequestTypePerSlave.keySet()) {
// todo: fix the unchecked Optional.get()
int totalCpu = slaveManager.getObject(slaveId).get().getResources().get().getMemoryMegaBytes().get();
int totalMem = slaveManager.getObject(slaveId).get().getResources().get().getNumCpus().get();
Map<RequestType, Map<String, Integer>> usagesPerRequestType = new HashMap<>();
for (RequestType type : RequestType.values()) {
List<SingularityTaskCurrentUsageWithId> usages = usageManager.getTaskCurrentUsages(tasksPerRequestTypePerSlave.get(slaveId).get(type).stream().map(SingularityTaskIdHolder::getTaskId).collect(Collectors.toList()));
long memUsed = usages.stream().mapToLong(SingularityTaskCurrentUsage::getMemoryRssBytes).sum() / totalMem;
double cpuUsed = usages.stream().mapToDouble(SingularityTaskCurrentUsage::getCpusUsed).sum() / totalCpu;
usagesPerRequestType.put(type, ImmutableMap.of(memory, ((int) memUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(cpus, ((int) cpuUsed)));
}
usagesPerRequestTypePerSlave.put(slaveId, usagesPerRequestType);
}
return usagesPerRequestTypePerSlave;
}
private boolean canScheduleAdditionalTasks(int taskCredits) {
return taskCredits == -1 || taskCredits > 0;
}
// todo: rename to score() and return an int
private Optional<SingularityTask> match(Collection<SingularityTaskRequestHolder> taskRequests, SingularitySchedulerStateCache stateCache, SingularityOfferHolder offerHolder,
Map<String, Map<String, Integer>> tasksPerOfferPerRequest) {
final String offerId = offerHolder.getOffer().getId().getValue();
// todo: remove the loop
for (SingularityTaskRequestHolder taskRequestHolder : taskRequests) {
final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
@@ -198,6 +267,8 @@ private boolean canScheduleAdditionalTasks(int taskCredits) {
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, stateCache);
if (matchesResources && slaveMatchState.isMatchAllowed()) {
// todo: move to separate method
// have this calculate and return a score
final SingularityTask task = mesosTaskBuilder.buildTask(offerHolder.getOffer(), offerHolder.getCurrentResources(), taskRequest, taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources());
final SingularityTask zkTask = taskSizeOptimizer.getSizeOptimizedTask(task);
@@ -226,6 +297,7 @@ private boolean canScheduleAdditionalTasks(int taskCredits) {
}
}
// todo: have this return 0;
return Optional.absent();
}
ProTip! Use n and p to navigate between commits in a pull request.