diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java index c8838f11c2..3beada3d5a 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java @@ -206,43 +206,11 @@ public Collection checkOffers(final Collection of List> scoringFutures = new ArrayList<>(); AtomicReference scoringException = new AtomicReference<>(null); for (SingularityOfferHolder offerHolder : offerHolders.values()) { - if (isOfferFull(offerHolder)) { - continue; - } - Optional maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(offerHolder.getSlaveId())); - - if (taskManager.getActiveTasks().stream() - .anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp() - && t.getMesosTask().getSlaveId().getValue().equals(offerHolder.getSlaveId()))) { - Optional maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId()); - if (maybeSlave.isPresent()) { - usagePoller.getSlaveUsage(maybeSlave.get()); - } - continue; - } - -// if (maybeSlaveUsage.isPresent() && System.currentTimeMillis() - maybeSlaveUsage.get().getTimestamp() > configuration.getMaxSlaveUsageMetricAgeMs()) { -// Optional maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId()); -// if (maybeSlave.isPresent()) { -// usagePoller.getSlaveUsage(maybeSlave.get()); -// } -// continue; -// } - scoringFutures.add( - offerScoringSemaphore.call( - () -> CompletableFuture.runAsync(() -> { - try { - double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId())); - if (score != 0) { - scorePerOffer.put(offerHolder.getSlaveId(), score); - } - } catch (Throwable t) { - LOG.error("Uncaught exception while scoring offers", t); - scoringException.set(t); - } - }, - offerScoringExecutor - ))); + scoringFutures.add(offerScoringSemaphore.call(() -> + CompletableFuture.supplyAsync(() -> { + return buildScoringFuture(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder); + }, + offerScoringExecutor))); } CompletableFutures.allOf(scoringFutures).join(); @@ -270,6 +238,59 @@ public Collection checkOffers(final Collection of return offerHolders.values(); } + private Void buildScoringFuture( + Map offerHolders, + Map requestUtilizations, + List activeTaskIds, + Map currentSlaveUsagesBySlaveId, + Map tasksPerOfferHost, + SingularityTaskRequestHolder taskRequestHolder, + Map scorePerOffer, + List activeTaskIdsForRequest, + AtomicReference scoringException, + SingularityOfferHolder offerHolder) { + if (isOfferFull(offerHolder)) { + return null; + } + String slaveId = offerHolder.getSlaveId(); + Optional maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId)); + + if (taskManager.getActiveTasks().stream() + .anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp() + && t.getMesosTask().getSlaveId().getValue().equals(slaveId))) { + Optional maybeSlave = slaveManager.getSlave(slaveId); + if (maybeSlave.isPresent()) { + usagePoller.getSlaveUsage(maybeSlave.get()) + .whenComplete((usage, throwable) -> { + if (throwable == null) { + currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores( + usage, + mesosConfiguration.getScoreUsingSystemLoad(), + getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()), + mesosConfiguration.getLoad5OverloadedThreshold(), + mesosConfiguration.getLoad1OverloadedThreshold(), + usage.getTimestamp() + )); + } else { + throw new RuntimeException(throwable); + } + }); + } + return null; + } + + try { + double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId())); + if (score != 0) { + scorePerOffer.put(slaveId, score); + } + } catch (Throwable t) { + LOG.error("Uncaught exception while scoring offers", t); + scoringException.set(t); + } + return null; + } + private MaxProbableUsage getMaxProbableUsageForSlave(List activeTaskIds, Map requestUtilizations, String sanitizedHostname) { double cpu = 0; double memBytes = 0; diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java index 6d1f6d7fcf..e9894db9fb 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java @@ -68,7 +68,7 @@ public class SingularityUsagePoller extends SingularityLeaderOnlyPoller { private final DeployManager deployManager; private final TaskManager taskManager; - private final AsyncSemaphore usageCollectionSemaphore; + private final AsyncSemaphore usageCollectionSemaphore; private final ExecutorService usageExecutor; private final ConcurrentHashMap requestLocks; @@ -112,12 +112,12 @@ public void runActionOnPoll() { Map> overLoadedHosts = new ConcurrentHashMap<>(); - List> usageFutures = new ArrayList<>(); + List> usageFutures = new ArrayList<>(); usageHelper.getSlavesToTrackUsageFor().forEach((slave) -> { usageFutures.add(usageCollectionSemaphore.call(() -> - CompletableFuture.runAsync(() -> { - collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, + CompletableFuture.supplyAsync(() -> { + return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable); }, usageExecutor) )); @@ -136,10 +136,10 @@ public void runActionOnPoll() { } } - public CompletableFuture getSlaveUsage(SingularitySlave slave) { + public CompletableFuture getSlaveUsage(SingularitySlave slave) { return usageCollectionSemaphore.call(() -> - CompletableFuture.runAsync(() -> { - collectSlaveUsage( + CompletableFuture.supplyAsync(() -> { + return collectSlaveUsage( slave, System.currentTimeMillis(), new ConcurrentHashMap<>(), @@ -165,7 +165,7 @@ public void runWithRequestLock(Runnable function, String requestId) { } } - private void collectSlaveUsage(SingularitySlave slave, + private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave, long now, Map utilizationPerRequestId, Map previousUtilizations, @@ -334,11 +334,13 @@ private void collectSlaveUsage(SingularitySlave slave, LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage); usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage); + return slaveUsage; } catch (Throwable t) { String message = String.format("Could not get slave usage for host %s", slave.getHost()); LOG.error(message, t); exceptionNotifier.notify(message, t); } + return null; // TODO: is this really okay? } private boolean isEligibleForShuffle(SingularityTaskId task) {