Skip to content

Commit

Permalink
fix offer match loop for less iterations, fix status update call in t…
Browse files Browse the repository at this point in the history
…ests to be synchronous
  • Loading branch information
ssalinas committed Feb 2, 2018
1 parent 12a4459 commit 7c3f8a6
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 43 deletions.
@@ -1,14 +1,13 @@
package com.hubspot.singularity.mesos;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -106,8 +105,9 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Offer> offers)
return Collections.emptyList();
}

final List<SingularityTaskRequestHolder> sortedTaskRequests = getSortedDueTaskRequests();
final int numDueTasks = sortedTaskRequests.size();
final List<SingularityTaskRequest> sortedTaskRequests = getSortedDueTaskRequests();
final List<SingularityTaskRequestHolder> sortedTaskRequestHolders = new ArrayList<>();
final int numDueTasks = sortedTaskRequestHolders.size();

final List<SingularityOfferHolder> offerHolders = offers.stream()
.collect(Collectors.groupingBy((o) -> o.getAgentId().getValue()))
Expand All @@ -127,44 +127,76 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Offer> offers)
})
.collect(Collectors.toList());

if (sortedTaskRequests.isEmpty()) {
return offerHolders;
}

Double smallestMemAsk = null;
Double smallestCpuAsk = null;
Double smallestDiskAsk = null;

for (SingularityTaskRequest taskRequest : getSortedDueTaskRequests()) {
SingularityTaskRequestHolder taskRequestHolder = new SingularityTaskRequestHolder(taskRequest, defaultResources, defaultCustomExecutorResources);
sortedTaskRequestHolders.add(taskRequestHolder);
if (smallestDiskAsk == null || taskRequestHolder.getTotalResources().getDiskMb() < smallestDiskAsk) {
smallestDiskAsk = taskRequestHolder.getTotalResources().getDiskMb();
}
if (smallestCpuAsk == null || taskRequestHolder.getTotalResources().getCpus() < smallestCpuAsk) {
smallestCpuAsk = taskRequestHolder.getTotalResources().getCpus();
}
if (smallestMemAsk == null || taskRequestHolder.getTotalResources().getMemoryMb() < smallestMemAsk) {
smallestMemAsk = taskRequestHolder.getTotalResources().getMemoryMb();
}
}

final Resources minResources = new Resources(smallestCpuAsk, smallestMemAsk, 0, smallestDiskAsk);

final Map<String, Map<String, Integer>> tasksPerOfferPerRequest = new HashMap<>();
final AtomicInteger tasksScheduled = new AtomicInteger(0);
final Map<String, SingularitySlaveUsage> currentSlaveUsagesBySlaveId = usageManager.getCurrentSlaveUsages(offerHolders.stream().map(SingularityOfferHolder::getSlaveId).collect(Collectors.toList()));
final List<SingularityOfferHolder> fullOffers = new ArrayList<>();

for (SingularityTaskRequestHolder taskRequestHolder : sortedTaskRequests) {
AtomicBoolean addedTask = new AtomicBoolean(false);
for (SingularityTaskRequestHolder taskRequestHolder : sortedTaskRequestHolders) {
lock.runWithRequestLock(() -> {
addedTask.set(false);
Map<SingularityOfferHolder, Double> scorePerOffer = new ConcurrentHashMap<>();
java.util.Optional<SingularityOfferHolder> bestOffer = offerHolders.parallelStream()
.filter((offerHolder) -> !(configuration.getMaxTasksPerOffer() > 0 && offerHolder.getAcceptedTasks().size() >= configuration.getMaxTasksPerOffer()))
.max(Comparator.comparingDouble((offerHolder) -> scorePerOffer.computeIfAbsent(offerHolder, calculateScore(currentSlaveUsagesBySlaveId, tasksPerOfferPerRequest, taskRequestHolder))));
if (bestOffer.isPresent()) {
Map<SingularityOfferHolder, Double> scorePerOffer = offerHolders
.parallelStream()
.filter((offerHolder) ->
!(configuration.getMaxTasksPerOffer() > 0 && offerHolder.getAcceptedTasks().size() >= configuration.getMaxTasksPerOffer())
&& !fullOffers.contains(offerHolder))
.collect(Collectors.toMap(
Function.identity(),
(offerHolder) -> calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferPerRequest, taskRequestHolder)
));
java.util.Optional<SingularityOfferHolder> bestOffer = scorePerOffer.keySet()
.stream()
.filter((offerHolder) -> scorePerOffer.get(offerHolder) > 0)
.max(Comparator.comparingDouble(scorePerOffer::get));

if (bestOffer.isPresent() && scorePerOffer.get(bestOffer.get()) > 0) {
LOG.info("Best offer {}/1 is on {}", scorePerOffer.get(bestOffer.get()), bestOffer.get().getSanitizedHost());
SingularityMesosTaskHolder taskHolder = acceptTask(bestOffer.get(), tasksPerOfferPerRequest, taskRequestHolder);
tasksScheduled.getAndIncrement();
bestOffer.get().addMatchedTask(taskHolder);
addedTask.set(true);

// If this offer doesn't have any of the minimum requested resources left, skip it for future iterations
if (!MesosUtils.doesOfferMatchResources(Optional.absent(), minResources, bestOffer.get().getCurrentResources(), Collections.emptyList())) {
fullOffers.add(bestOffer.get());
}
}
}, taskRequestHolder.getTaskRequest().getRequest().getId(), "checkOffers");
if (!addedTask.get()) {
break;
}
}

LOG.info("{} tasks scheduled, {} tasks remaining after examining {} offers", tasksScheduled, numDueTasks - tasksScheduled.get(), offers.size());

return offerHolders;
}

private Function<SingularityOfferHolder, Double> calculateScore(Map<String, SingularitySlaveUsage> currentSlaveUsagesBySlaveId, Map<String, Map<String, Integer>> tasksPerOfferPerRequest, SingularityTaskRequestHolder taskRequestHolder) {
return (offerHolder) -> {
Optional<SingularitySlaveUsage> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(offerHolder.getSlaveId()));
double score = score(offerHolder, tasksPerOfferPerRequest, taskRequestHolder, maybeSlaveUsage);
LOG.warn("Scored {} | Task {} | Offer - mem {} - cpu {} | Slave {} | maybeSlaveUsage - {}", score, taskRequestHolder.getTaskRequest().getPendingTask().getPendingTaskId().getId(),
MesosUtils.getMemory(offerHolder.getCurrentResources(), Optional.absent()), MesosUtils.getNumCpus(offerHolder.getCurrentResources(), Optional.absent()), offerHolder.getHostname(), maybeSlaveUsage);
return score;
};
private double calculateScore(SingularityOfferHolder offerHolder, Map<String, SingularitySlaveUsage> currentSlaveUsagesBySlaveId, Map<String, Map<String, Integer>> tasksPerOfferPerRequest, SingularityTaskRequestHolder taskRequestHolder) {
Optional<SingularitySlaveUsage> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(offerHolder.getSlaveId()));
double score = score(offerHolder, tasksPerOfferPerRequest, taskRequestHolder, maybeSlaveUsage);
LOG.trace("Scored {} | Task {} | Offer - mem {} - cpu {} | Slave {} | maybeSlaveUsage - {}", score, taskRequestHolder.getTaskRequest().getPendingTask().getPendingTaskId().getId(),
MesosUtils.getMemory(offerHolder.getCurrentResources(), Optional.absent()), MesosUtils.getNumCpus(offerHolder.getCurrentResources(), Optional.absent()), offerHolder.getHostname(), maybeSlaveUsage);
return score;
}

private double getNormalizedWeight(ResourceUsageType type) {
Expand Down Expand Up @@ -194,16 +226,14 @@ private double getNormalizedWeight(ResourceUsageType type) {
}
}

private List<SingularityTaskRequestHolder> getSortedDueTaskRequests() {
private List<SingularityTaskRequest> getSortedDueTaskRequests() {
final List<SingularityTaskRequest> taskRequests = taskPrioritizer.getSortedDueTasks(scheduler.getDueTasks());

taskRequests.forEach((taskRequest) -> LOG.trace("Task {} is due", taskRequest.getPendingTask().getPendingTaskId()));

taskPrioritizer.removeTasksAffectedByPriorityFreeze(taskRequests);

return taskRequests.stream()
.map((taskRequest) -> new SingularityTaskRequestHolder(taskRequest, defaultResources, defaultCustomExecutorResources))
.collect(Collectors.toList());
return taskRequests;
}

private double score(SingularityOfferHolder offerHolder, Map<String, Map<String, Integer>> tasksPerOfferHostPerRequest,
Expand Down
@@ -1,6 +1,7 @@
package com.hubspot.singularity.mesos;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.mesos.v1.Protos.AgentID;
import org.apache.mesos.v1.Protos.InverseOffer;
Expand Down Expand Up @@ -86,7 +87,7 @@ public enum SchedulerState {
*
* @param update Contains info about the current tasks status
*/
public abstract void statusUpdate(TaskStatus update);
public abstract CompletableFuture<Boolean> statusUpdate(TaskStatus update);

/**
* Received when a custom message generated by the executor is forwarded by
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -247,13 +248,13 @@ public void rescindInverseOffer(OfferID offerId) {
}

@Override
public void statusUpdate(TaskStatus status) {
public CompletableFuture<Boolean> statusUpdate(TaskStatus status) {
if (!isRunning()) {
LOG.info("Scheduler is in state {}, queueing an update {} - {} queued updates so far", state.name(), status, queuedUpdates.size());
queuedUpdates.add(status);
return;
return CompletableFuture.completedFuture(false);
}
handleStatusUpdateAsync(status);
return handleStatusUpdateAsync(status);
}

@Override
Expand Down Expand Up @@ -439,9 +440,9 @@ public SchedulerState getState() {
return state;
}

private void handleStatusUpdateAsync(TaskStatus status) {
private CompletableFuture<Boolean> handleStatusUpdateAsync(TaskStatus status) {
long start = System.currentTimeMillis();
statusUpdateHandler.processStatusUpdateAsync(status)
return statusUpdateHandler.processStatusUpdateAsync(status)
.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
exceptionNotifier.notify(String.format("Scheduler threw an uncaught exception (%s)", throwable.getMessage()), throwable);
Expand Down
Expand Up @@ -4,6 +4,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -194,6 +195,28 @@ public List<Protos.Offer> getOffers() {
return offers;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof SingularityOfferHolder) {
final SingularityOfferHolder that = (SingularityOfferHolder) obj;
return Objects.equals(this.roles, that.roles) &&
Objects.equals(this.rackId, that.rackId) &&
Objects.equals(this.slaveId, that.slaveId) &&
Objects.equals(this.hostname, that.hostname) &&
Objects.equals(this.textAttributes, that.textAttributes) &&
Objects.equals(this.reservedSlaveAttributes, that.reservedSlaveAttributes);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(roles, rackId, slaveId, hostname, textAttributes, reservedSlaveAttributes);
}

@Override
public String toString() {
return "SingularityOfferHolder{" +
Expand Down
Expand Up @@ -54,7 +54,7 @@ public void statusUpdate(@PathParam("taskId") String taskId, @PathParam("taskSta
scheduler.statusUpdate(TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId))
.setState(TaskState.valueOf(taskState))
.build());
.build()).join();
}

@POST
Expand Down
Expand Up @@ -618,7 +618,9 @@ private void processTaskCleanupsForRequest(String requestId, List<SingularityTas
final Multiset<SingularityDeployKey> incrementalCleaningTasks = HashMultiset.create(cleanupTasks.size());
final Set<SingularityDeployKey> isBouncing = new HashSet<>(cleanupTasks.size());
final List<String> taskIdsForDeletedRequest = new ArrayList<>();
boolean isRequestDeleting = false;

// TODO - Better check for deleting request state
final Set<SingularityTaskId> cleaningTasks = new HashSet<>(cleanupTasks.size());
for (SingularityTaskCleanup cleanupTask : cleanupTasks) {
cleaningTasks.add(cleanupTask.getTaskId());
Expand All @@ -630,6 +632,7 @@ private void processTaskCleanupsForRequest(String requestId, List<SingularityTas
}
if (cleanupTask.getCleanupType() == TaskCleanupType.REQUEST_DELETING) {
taskIdsForDeletedRequest.add(cleanupTask.getTaskId().getId());
isRequestDeleting = true;
}
}

Expand All @@ -652,7 +655,7 @@ private void processTaskCleanupsForRequest(String requestId, List<SingularityTas
isBouncing.remove(SingularityDeployKey.fromTaskId(taskId));
}

cleanupRequestIfNoRemainingTasks(cleanupTask, taskIdsForDeletedRequest);
cleanupRequestIfNoRemainingTasks(cleanupTask, taskIdsForDeletedRequest, isRequestDeleting);
}

for (SingularityDeployKey bounceSucceeded : isBouncing) {
Expand All @@ -668,12 +671,12 @@ private void processTaskCleanupsForRequest(String requestId, List<SingularityTas

}

private void cleanupRequestIfNoRemainingTasks(SingularityTaskCleanup cleanupTask, List<String> taskIdsForDeletedRequest) {
private void cleanupRequestIfNoRemainingTasks(SingularityTaskCleanup cleanupTask, List<String> taskIdsForDeletedRequest, boolean isRequestDeleting) {
String requestId = cleanupTask.getTaskId().getRequestId();

taskIdsForDeletedRequest.remove(cleanupTask.getTaskId().getId());
if (taskIdsForDeletedRequest.isEmpty()) {
LOG.info("All tasks for requestId {} are now killed, re-enqueueing request cleanup", requestId);
if (taskIdsForDeletedRequest.isEmpty() && isRequestDeleting) {
LOG.warn("All tasks for requestId {} are now killed, re-enqueueing request cleanup", requestId);
requestManager.createCleanupRequest(
new SingularityRequestCleanup(
cleanupTask.getUser(), RequestCleanupType.DELETING, System.currentTimeMillis(),
Expand Down
Expand Up @@ -15,7 +15,7 @@
public class SingularityCuratorTestBase {

@Rule
public Timeout globalTimeout = Timeout.seconds(30); // 30 seconds max for each @Test method
public Timeout globalTimeout = Timeout.seconds(120); // 30 seconds max for each @Test method

@Inject
protected CuratorFramework cf;
Expand Down
Expand Up @@ -358,6 +358,7 @@ public void testTaskDestroy() {
killKilledTasks();

Assert.assertEquals(2, taskManager.getNumActiveTasks());
System.out.println(requestManager.getCleanupRequests());
Assert.assertEquals(0, requestManager.getCleanupRequests().size());
Assert.assertEquals(RequestState.ACTIVE, requestManager.getRequest(requestId).get().getState());
}
Expand Down Expand Up @@ -1512,7 +1513,7 @@ public void testTaskOddities() {
.setState(TaskState.TASK_RUNNING);

// should not throw exception:
sms.statusUpdate(bldr.build());
sms.statusUpdate(bldr.build()).join();

initRequest();
initFirstDeploy();
Expand Down
Expand Up @@ -359,7 +359,7 @@ protected void statusUpdate(SingularityTask task, TaskState state, Optional<Long
bldr.setTimestamp(timestamp.get() / 1000);
}

sms.statusUpdate(bldr.build());
sms.statusUpdate(bldr.build()).join();
}

protected void statusUpdate(SingularityTask task, TaskState state) {
Expand Down
Expand Up @@ -95,7 +95,7 @@ public SingularityTestModule(boolean useDbTests) throws Exception {
rootLogger.setLevel(Level.toLevel(System.getProperty("singularity.test.log.level", "WARN")));

Logger hsLogger = context.getLogger("com.hubspot");
hsLogger.setLevel(Level.toLevel(System.getProperty("singularity.test.log.level.for.com.hubspot", "ERROR")));
hsLogger.setLevel(Level.toLevel(System.getProperty("singularity.test.log.level.for.com.hubspot", "WARN")));

this.ts = new TestingServer();
}
Expand Down

0 comments on commit 7c3f8a6

Please sign in to comment.