New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better task balancing #1482

Merged
merged 73 commits into from Jun 8, 2017
Commits
Jump to file or symbol
Failed to load files and symbols.
+27 鈭30
Diff settings

Always

Just for now

Viewing a subset of changes. View all

refactor

  • Loading branch information...
darcatron committed Mar 30, 2017
commit a4c6800648d1282731292ba87f5dc30fad190f07
@@ -5,7 +5,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Singleton;
@@ -101,25 +100,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
this.slaveManager = slaveManager;
}
private Map<String, SingularityTaskRequestHolder> getDueTaskRequestHolders() {
final List<SingularityTaskRequest> taskRequests = taskPrioritizer.getSortedDueTasks(scheduler.getDueTasks());
for (SingularityTaskRequest taskRequest : taskRequests) {
LOG.trace("Task {} is due", taskRequest.getPendingTask().getPendingTaskId());
}
taskPrioritizer.removeTasksAffectedByPriorityFreeze(taskRequests);
final Map<String, SingularityTaskRequestHolder> taskRequestHolders = new HashMap<>(taskRequests.size());
for (SingularityTaskRequest taskRequest : taskRequests) {
taskRequestHolders.put(taskRequest.getPendingTask().getPendingTaskId().getId(), new SingularityTaskRequestHolder(taskRequest, defaultResources, defaultCustomExecutorResources));
}
return taskRequestHolders;
}
public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers, final Set<Protos.OfferID> acceptedOffers) {
public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers) {
boolean useTaskCredits = disasterManager.isTaskCreditEnabled();
int taskCredits = useTaskCredits ? disasterManager.getUpdatedCreditCount() : -1;
final SingularitySchedulerStateCache stateCache = stateCacheProvider.get();
@@ -170,6 +151,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
if (!scorePerOffer.isEmpty()) {
SingularityOfferHolder bestOffer = Collections.max(scorePerOffer.entrySet(), Map.Entry.comparingByValue()).getKey();
LOG.info("Best offer is {} with a score of {}/1", bestOffer, scorePerOffer.get(bestOffer));
SingularityTask task = acceptTask(bestOffer, stateCache, tasksPerOfferPerRequest, taskRequestHolder);
tasksScheduled++;
@@ -197,7 +179,28 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
return offerHolders;
}
//todo: improve logic here
public boolean isConnected() {
return schedulerDriverSupplier.get().isPresent();
}
private Map<String, SingularityTaskRequestHolder> getDueTaskRequestHolders() {
final List<SingularityTaskRequest> taskRequests = taskPrioritizer.getSortedDueTasks(scheduler.getDueTasks());
for (SingularityTaskRequest taskRequest : taskRequests) {
LOG.trace("Task {} is due", taskRequest.getPendingTask().getPendingTaskId());
}
taskPrioritizer.removeTasksAffectedByPriorityFreeze(taskRequests);
final Map<String, SingularityTaskRequestHolder> taskRequestHolders = new HashMap<>(taskRequests.size());
for (SingularityTaskRequest taskRequest : taskRequests) {
taskRequestHolders.put(taskRequest.getPendingTask().getPendingTaskId().getId(), new SingularityTaskRequestHolder(taskRequest, defaultResources, defaultCustomExecutorResources));
}
return taskRequestHolders;
}
private Map<String, Map<RequestType, Map<String, Integer>>> getUsagesPerRequestTypePerSlave() {
List<String> slavesWithUsage = usageManager.getSlavesWithUsage();
@@ -213,7 +216,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
if (!slave.isPresent() || !slave.get().getResources().isPresent() ||
!slave.get().getResources().get().getMemoryMegaBytes().isPresent() ||
!slave.get().getResources().get().getNumCpus().isPresent()) {
LOG.debug("Could not find slave or resources for slave {}", slaveId);
LOG.debug("Could not find slave or resources for slave {}, skipping", slaveId);
continue;
}
@@ -381,8 +384,4 @@ private boolean isTooManyInstancesForRequest(SingularityTaskRequest taskRequest,
return false;
}
public boolean isConnected() {
return schedulerDriverSupplier.get().isPresent();
}
}
@@ -90,7 +90,7 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
final Set<Protos.OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offers.size());
try {
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers, acceptedOffers);
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers);
for (SingularityOfferHolder offerHolder : offerHolders) {
if (!offerHolder.getAcceptedTasks().isEmpty()) {
@@ -10,13 +10,11 @@
import javax.inject.Singleton;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
@@ -60,7 +58,7 @@ public void runActionOnPoll() {
offers.add(cachedOffer.getOffer());
}
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers, Sets.<OfferID> newHashSet());
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers);
if (offerHolders.isEmpty()) {
return;
ProTip! Use n and p to navigate between commits in a pull request.