New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better task balancing #1482

Merged
merged 73 commits into from Jun 8, 2017
Commits
Jump to file or symbol
Failed to load files and symbols.
+79 鈭77
Diff settings

Always

Just for now

Viewing a subset of changes. View all

move usage by request type collection to usage poller

  • Loading branch information...
darcatron committed Apr 4, 2017
commit a855e57251169a565b161647e101823a71e32f4e
@@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Singleton;
@@ -14,28 +15,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.mesos.Resources;
import com.hubspot.mesos.json.MesosResourcesObject;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularitySlave;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularitySlaveUsageWithId;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskCurrentUsage;
import com.hubspot.singularity.SingularityTaskCurrentUsageWithId;
import com.hubspot.singularity.SingularityTaskIdHolder;
import com.hubspot.singularity.SingularityTaskRequest;
import com.hubspot.singularity.SlaveMatchState;
import com.hubspot.singularity.config.CustomExecutorConfiguration;
import com.hubspot.singularity.config.MesosConfiguration;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.UsageManager;
import com.hubspot.singularity.scheduler.SingularityScheduler;
@@ -46,11 +41,6 @@
private static final Logger LOG = LoggerFactory.getLogger(SingularityMesosOfferScheduler.class);
private static final String CPU_USED = "cpusUsed";
private static final String CPU_TOTAL = "cpusTotal";
private static final String MEMORY_USED = "memoryRssBytes";
private static final String MEMORY_TOTAL = "memoryRssBytesTotal";
private final Resources defaultResources;
private final Resources defaultCustomExecutorResources;
private final TaskManager taskManager;
@@ -63,7 +53,6 @@
private final SingularityTaskSizeOptimizer taskSizeOptimizer;
private final DisasterManager disasterManager;
private final UsageManager usageManager;
private final SlaveManager slaveManager;
private final Provider<SingularitySchedulerStateCache> stateCacheProvider;
private final SchedulerDriverSupplier schedulerDriverSupplier;
@@ -82,7 +71,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
Provider<SingularitySchedulerStateCache> stateCacheProvider,
SchedulerDriverSupplier schedulerDriverSupplier,
DisasterManager disasterManager,
UsageManager usageManager, SlaveManager slaveManager) {
UsageManager usageManager) {
this.defaultResources = new Resources(mesosConfiguration.getDefaultCpus(), mesosConfiguration.getDefaultMemory(), 0, mesosConfiguration.getDefaultDisk());
this.defaultCustomExecutorResources = new Resources(customExecutorConfiguration.getNumCpus(), customExecutorConfiguration.getMemoryMb(), 0, customExecutorConfiguration.getDiskMb());
this.taskManager = taskManager;
@@ -97,7 +86,6 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
this.schedulerDriverSupplier = schedulerDriverSupplier;
this.taskPrioritizer = taskPrioritizer;
this.usageManager = usageManager;
this.slaveManager = slaveManager;
}
public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers) {
@@ -126,6 +114,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
boolean addedTaskInLastLoop = true;
int tasksScheduled = 0;
final List<SingularitySlaveUsageWithId> currentSlaveUsages = usageManager.getCurrentSlaveUsages(offerHolders.stream().map(o -> o.getOffer().getSlaveId().getValue()).collect(Collectors.toList()));
while (!pendingTaskIdToTaskRequest.isEmpty() && addedTaskInLastLoop && canScheduleAdditionalTasks(taskCredits)) {
addedTaskInLastLoop = false;
@@ -141,7 +130,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
continue;
}
double score = score(offerHolder, stateCache, tasksPerOfferPerRequest, taskRequestHolder, getUsagesPerRequestTypePerSlave());
double score = score(offerHolder, stateCache, tasksPerOfferPerRequest, taskRequestHolder, getSlaveUsage(currentSlaveUsages, offerHolder.getOffer().getSlaveId().getValue()));

This comment has been minimized.

@ssalinas

ssalinas Apr 20, 2017

Member

for clarity, maybe something like 'hostScore' here? The score is for the particular slave, not necessarily about the offer

@ssalinas

ssalinas Apr 20, 2017

Member

for clarity, maybe something like 'hostScore' here? The score is for the particular slave, not necessarily about the offer

This comment has been minimized.

@darcatron

darcatron Apr 20, 2017

Contributor

I'm not sure about the naming here. We do look at the slave's utilization to score the offer, but we are still scoring the offer itself since offers aren't uniquely 1:1 for a slave (e.g. 2 offers for the same slave).

The slave utilization weight will be the same for all offers on the same slave, but the offer resources will be different per offer. So, it seems to me that we're scoring the offer in this class rather than the slave itself

@darcatron

darcatron Apr 20, 2017

Contributor

I'm not sure about the naming here. We do look at the slave's utilization to score the offer, but we are still scoring the offer itself since offers aren't uniquely 1:1 for a slave (e.g. 2 offers for the same slave).

The slave utilization weight will be the same for all offers on the same slave, but the offer resources will be different per offer. So, it seems to me that we're scoring the offer in this class rather than the slave itself

if (score > 0) {
// todo: can short circuit here if score is high enough
scorePerOffer.put(offerHolder, score);

This comment has been minimized.

@darcatron

darcatron Mar 30, 2017

Contributor

Thought we might want to have a value that's definitely good enough to just accept instead of continue evaluating

@darcatron

darcatron Mar 30, 2017

Contributor

Thought we might want to have a value that's definitely good enough to just accept instead of continue evaluating

@@ -201,54 +190,18 @@ public boolean isConnected() {
return taskRequestHolders;
}
private Map<String, Map<RequestType, Map<String, Integer>>> getUsagesPerRequestTypePerSlave() {
List<String> slavesWithUsage = usageManager.getSlavesWithUsage();
Map<String, Map<RequestType, List<SingularityTask>>> tasksPerRequestTypePerSlave = taskManager.getActiveTasks().stream()
.filter(t -> slavesWithUsage.contains(t.getOffer().getSlaveId().getValue()))
.collect(Collectors.groupingBy(t -> t.getOffer().getSlaveId().getValue(),
Collectors.groupingBy(t -> t.getTaskRequest().getRequest().getRequestType())));
Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave = new HashMap<>();
for (String slaveId : tasksPerRequestTypePerSlave.keySet()) {
final Optional<SingularitySlave> slave = slaveManager.getObject(slaveId);
if (!slave.isPresent() || !slave.get().getResources().isPresent() ||
!slave.get().getResources().get().getMemoryMegaBytes().isPresent() ||
!slave.get().getResources().get().getNumCpus().isPresent()) {
LOG.debug("Could not find slave or resources for slave {}, skipping", slaveId);
continue;
}
final MesosResourcesObject resources = slave.get().getResources().get();
int totalCpu = resources.getMemoryMegaBytes().get();
int totalMem = resources.getNumCpus().get();
Map<RequestType, Map<String, Integer>> usagesPerRequestType = new HashMap<>();
for (RequestType type : RequestType.values()) {
List<SingularityTaskCurrentUsageWithId> usages = usageManager.getTaskCurrentUsages(tasksPerRequestTypePerSlave.get(slaveId).get(type).stream().map(SingularityTaskIdHolder::getTaskId).collect(Collectors.toList()));
long memUsed = usages.stream().mapToLong(SingularityTaskCurrentUsage::getMemoryRssBytes).sum();
double cpuUsed = usages.stream().mapToDouble(SingularityTaskCurrentUsage::getCpusUsed).sum();
usagesPerRequestType.put(type, ImmutableMap.of(MEMORY_USED, ((int) memUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(MEMORY_TOTAL, totalMem));
usagesPerRequestType.put(type, ImmutableMap.of(CPU_USED, ((int) cpuUsed)));
usagesPerRequestType.put(type, ImmutableMap.of(CPU_TOTAL, totalCpu));
}
usagesPerRequestTypePerSlave.put(slaveId, usagesPerRequestType);
}
return usagesPerRequestTypePerSlave;
}
private boolean canScheduleAdditionalTasks(int taskCredits) {
return taskCredits == -1 || taskCredits > 0;
}
private Optional<SingularitySlaveUsageWithId> getSlaveUsage(List<SingularitySlaveUsageWithId> slaveUsages, String slaveId) {
List<SingularitySlaveUsageWithId> filteredSlaveUsages = slaveUsages.stream().filter(u -> u.getSlaveId().equals(slaveId)).collect(Collectors.toList());
return filteredSlaveUsages.size() == 1 ? Optional.of(filteredSlaveUsages.get(0)) : Optional.empty();
}
private double score(SingularityOfferHolder offerHolder, SingularitySchedulerStateCache stateCache, Map<String, Map<String, Integer>> tasksPerOfferPerRequest,
SingularityTaskRequestHolder taskRequestHolder, Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave) {
SingularityTaskRequestHolder taskRequestHolder, Optional<SingularitySlaveUsageWithId> maybeSlaveUsage) {
final Offer offer = offerHolder.getOffer();
final String offerId = offer.getId().getValue();
@@ -280,7 +233,7 @@ private double score(SingularityOfferHolder offerHolder, SingularitySchedulerSta
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, stateCache);
if (matchesResources && slaveMatchState.isMatchAllowed()) {
return score(offer, taskRequest, usagesPerRequestTypePerSlave);
return score(offer, taskRequest, maybeSlaveUsage);
} else {
offerHolder.addRejectedTask(pendingTaskId);
@@ -293,7 +246,7 @@ private double score(SingularityOfferHolder offerHolder, SingularitySchedulerSta
return 0;
}
private double score(Offer offer, SingularityTaskRequest taskRequest, Map<String, Map<RequestType, Map<String, Integer>>> usagesPerRequestTypePerSlave) {
private double score(Offer offer, SingularityTaskRequest taskRequest, Optional<SingularitySlaveUsageWithId> maybeSlaveUsage) {
double requestTypeCpuWeight = 0.20;

This comment has been minimized.

@ssalinas

ssalinas Apr 20, 2017

Member

Let's make these configurable, maybe another object in the configuration yaml?

@ssalinas

ssalinas Apr 20, 2017

Member

Let's make these configurable, maybe another object in the configuration yaml?

This comment has been minimized.

@darcatron

darcatron Apr 20, 2017

Contributor

Yup, I was in progress on this (now committed), but I kept the fields under SingularityConfiguration since I saw a lot of other stuff in there as well (e.g. caching). We could pull it into an OfferConfiguration file if you think that'd be better for organization

@darcatron

darcatron Apr 20, 2017

Contributor

Yup, I was in progress on this (now committed), but I kept the fields under SingularityConfiguration since I saw a lot of other stuff in there as well (e.g. caching). We could pull it into an OfferConfiguration file if you think that'd be better for organization

double requestTypeMemWeight = 0.30;
double freeCpuWeight = 0.20;
@@ -302,19 +255,20 @@ private double score(Offer offer, SingularityTaskRequest taskRequest, Map<String
double defaultScoreForMissingUsage = 0.10;
String slaveId = offer.getSlaveId().getValue();
if (!usagesPerRequestTypePerSlave.containsKey(slaveId)) {
LOG.info("Offer {} has no usage data. Will default to {}", offer.getId(), defaultScoreForMissingUsage);
if (!maybeSlaveUsage.isPresent() || !maybeSlaveUsage.get().getCpuTotal().isPresent() || !maybeSlaveUsage.get().getMemoryTotal().isPresent()) {
LOG.info("Slave {} has no total usage data. Will default to {}", slaveId, defaultScoreForMissingUsage);
return defaultScoreForMissingUsage;
}
RequestType requestType = taskRequest.getRequest().getRequestType();
Map<String, Integer> usagePerResource = usagesPerRequestTypePerSlave.get(slaveId).get(requestType);
SingularitySlaveUsageWithId slaveUsage = maybeSlaveUsage.get();
Map<RequestType, Map<String, Number>> usagesPerRequestType = slaveUsage.getUsagePerRequestType();
Map<String, Number> usagePerResource = usagesPerRequestType.get(taskRequest.getRequest().getRequestType());
score += requestTypeCpuWeight * (1 - (usagePerResource.get(CPU_USED) / usagePerResource.get(CPU_TOTAL)));
score += requestTypeMemWeight * (1 - (usagePerResource.get(MEMORY_USED) / usagePerResource.get(MEMORY_TOTAL)));
score += requestTypeCpuWeight * (1 - (usagePerResource.get(SingularitySlaveUsage.CPU_USED).doubleValue() / slaveUsage.getCpuTotal().get()));
score += requestTypeMemWeight * (1 - (usagePerResource.get(SingularitySlaveUsage.MEMORY_USED).longValue() / slaveUsage.getMemoryTotal().get()));
score += freeCpuWeight * (MesosUtils.getNumCpus(offer) / usagePerResource.get(CPU_TOTAL));
score += freeMemWeight * (MesosUtils.getMemory(offer) / usagePerResource.get(MEMORY_TOTAL));
score += freeCpuWeight * (MesosUtils.getNumCpus(offer) / slaveUsage.getCpuTotal().get());
score += freeMemWeight * (MesosUtils.getMemory(offer) / slaveUsage.getMemoryTotal().get());
return score;
}
@@ -350,7 +304,7 @@ private void addRequestToMapByOfferId(Map<String, Map<String, Integer>> tasksPer
tasksPerOfferPerRequest.get(offerId).put(requestId, 0);
}
} else {
tasksPerOfferPerRequest.put(offerId, new HashMap<String, Integer>());
tasksPerOfferPerRequest.put(offerId, new HashMap<>());
tasksPerOfferPerRequest.get(offerId).put(requestId, 1);
}
}
@@ -364,10 +318,7 @@ private boolean tooManyTasksPerOfferForRequest(Map<String, Map<String, Integer>>
}
int maxPerOfferPerRequest = taskRequest.getRequest().getMaxTasksPerOffer().or(configuration.getMaxTasksPerOfferPerRequest());
if (!(maxPerOfferPerRequest > 0)) {
return false;
}
return tasksPerOfferPerRequest.get(offerId).get(taskRequest.getRequest().getId()) > maxPerOfferPerRequest;
return maxPerOfferPerRequest > 0 && tasksPerOfferPerRequest.get(offerId).get(taskRequest.getRequest().getId()) > maxPerOfferPerRequest;
}
private boolean isTooManyInstancesForRequest(SingularityTaskRequest taskRequest, SingularitySchedulerStateCache stateCache) {
@@ -1,19 +1,26 @@
package com.hubspot.singularity.scheduler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.hubspot.mesos.client.MesosClient;
import com.hubspot.mesos.json.MesosTaskMonitorObject;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.SingularitySlave;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularityTaskCurrentUsage;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTaskUsage;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.UsageManager;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
@@ -26,23 +33,33 @@
private final UsageManager usageManager;
private final SingularityUsageHelper usageHelper;
private final SingularityExceptionNotifier exceptionNotifier;
private final RequestManager requestManager;
@Inject
SingularityUsagePoller(SingularityConfiguration configuration, SingularityUsageHelper usageHelper, UsageManager usageManager, MesosClient mesosClient, SingularityExceptionNotifier exceptionNotifier) {
SingularityUsagePoller(SingularityConfiguration configuration,
SingularityUsageHelper usageHelper,
UsageManager usageManager,
MesosClient mesosClient,
SingularityExceptionNotifier exceptionNotifier,
RequestManager requestManager) {
super(configuration.getCheckUsageEveryMillis(), TimeUnit.MILLISECONDS);
this.configuration = configuration;
this.usageHelper = usageHelper;
this.mesosClient = mesosClient;
this.usageManager = usageManager;
this.exceptionNotifier = exceptionNotifier;
this.requestManager = requestManager;
}
@Override
public void runActionOnPoll() {
final long now = System.currentTimeMillis();
Map<RequestType, Map<String, Number>> usagesPerRequestType = new HashMap<>();

This comment has been minimized.

@ssalinas

ssalinas Apr 20, 2017

Member

wouldn't we want this to be per-slave, not overall?

@ssalinas

ssalinas Apr 20, 2017

Member

wouldn't we want this to be per-slave, not overall?

This comment has been minimized.

@darcatron

darcatron Apr 20, 2017

Contributor

This should be per slave. This poller loops through each slave and creates a new SingularitySlaveUsage with the stats for that slave

@darcatron

darcatron Apr 20, 2017

Contributor

This should be per slave. This poller loops through each slave and creates a new SingularitySlaveUsage with the stats for that slave

for (SingularitySlave slave : usageHelper.getSlavesToTrackUsageFor()) {
Optional<Long> memoryTotal = Optional.empty();
Optional<Double> cpuTotal = Optional.empty();
long memoryBytesUsed = 0;
double cpusUsed = 0;
@@ -68,6 +85,7 @@ public void runActionOnPoll() {
double taskCpusUsed = ((usage.getCpuSeconds() - lastUsage.getCpuSeconds()) / (usage.getTimestamp() - lastUsage.getTimestamp()));
updateUsagesPerRequestType(usagesPerRequestType, getRequestType(taskUsage), usage.getMemoryRssBytes(), taskCpusUsed);
SingularityTaskCurrentUsage currentUsage = new SingularityTaskCurrentUsage(usage.getMemoryRssBytes(), now, taskCpusUsed);
usageManager.saveCurrentTaskUsage(taskId, currentUsage);
@@ -76,7 +94,16 @@ public void runActionOnPoll() {
}
}
SingularitySlaveUsage slaveUsage = new SingularitySlaveUsage(memoryBytesUsed, now, cpusUsed, allTaskUsage.size());
if (!slave.getResources().isPresent() ||
!slave.getResources().get().getMemoryMegaBytes().isPresent() ||
!slave.getResources().get().getNumCpus().isPresent()) {
LOG.debug("Could not find slave or resources for slave {}", slave.getId());
} else {
memoryTotal = Optional.of(slave.getResources().get().getMemoryMegaBytes().get().longValue());
cpuTotal = Optional.of(slave.getResources().get().getNumCpus().get().doubleValue());
}
SingularitySlaveUsage slaveUsage = new SingularitySlaveUsage(memoryBytesUsed, now, cpusUsed, allTaskUsage.size(), memoryTotal, cpuTotal, usagesPerRequestType);
List<Long> slaveTimestamps = usageManager.getSlaveUsageTimestamps(slave.getId());
if (slaveTimestamps.size() + 1 > configuration.getNumUsageToKeep()) {
usageManager.deleteSpecificSlaveUsage(slave.getId(), slaveTimestamps.get(0));
@@ -98,4 +125,28 @@ private SingularityTaskUsage getUsage(MesosTaskMonitorObject taskUsage) {
return new SingularityTaskUsage(taskUsage.getStatistics().getMemRssBytes(), taskUsage.getStatistics().getTimestamp(), cpuSeconds);
}
private RequestType getRequestType(MesosTaskMonitorObject task) {
//todo: check optional.get()
return requestManager.getRequest(SingularityTaskId.valueOf(task.getSource()).getRequestId()).get().getRequest().getRequestType();
}
private void updateUsagesPerRequestType(Map<RequestType, Map<String, Number>> usagePerRequestType, RequestType type, long memUsed, double cpuUsed) {
if (usagePerRequestType.containsKey(type)) {
long oldMemUsed = 0L;
double oldCpuUsed = 0;
if (usagePerRequestType.get(type).containsKey(SingularitySlaveUsage.MEMORY_USED)) {
oldMemUsed = usagePerRequestType.get(type).get(SingularitySlaveUsage.MEMORY_USED).longValue();
}
if (usagePerRequestType.get(type).containsKey(SingularitySlaveUsage.CPU_USED)) {
oldCpuUsed = usagePerRequestType.get(type).get(SingularitySlaveUsage.CPU_USED).doubleValue();
}
usagePerRequestType.get(type).put(SingularitySlaveUsage.MEMORY_USED, oldMemUsed + memUsed);
usagePerRequestType.get(type).put(SingularitySlaveUsage.CPU_USED, oldCpuUsed + cpuUsed);
} else {
usagePerRequestType.put(type, ImmutableMap.of(SingularitySlaveUsage.MEMORY_USED, memUsed));
usagePerRequestType.put(type, ImmutableMap.of(SingularitySlaveUsage.CPU_USED, cpuUsed));
}
}
}
ProTip! Use n and p to navigate between commits in a pull request.