From 339dd4491295a2a12d30b06126826667c76ab90c Mon Sep 17 00:00:00 2001 From: Paul Schoenfelder Date: Tue, 24 Jul 2018 13:49:46 -0400 Subject: [PATCH] PR changes --- .../mesos/SingularityMesosOfferScheduler.java | 30 ++++++++---- ...ularitySlaveUsageWithCalculatedScores.java | 4 -- .../scheduler/SingularityUsagePoller.java | 48 ++++++------------- 3 files changed, 37 insertions(+), 45 deletions(-) diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java index 3beada3d5a..11da1b23ac 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java @@ -10,6 +10,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -22,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.hubspot.mesos.Resources; @@ -208,7 +210,7 @@ public Collection checkOffers(final Collection of for (SingularityOfferHolder offerHolder : offerHolders.values()) { scoringFutures.add(offerScoringSemaphore.call(() -> CompletableFuture.supplyAsync(() -> { - return buildScoringFuture(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder); + return calculateScore(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder); }, offerScoringExecutor))); } @@ -238,7 +240,7 @@ public Collection checkOffers(final Collection of return offerHolders.values(); } - private Void buildScoringFuture( + private Void calculateScore( Map offerHolders, Map requestUtilizations, List activeTaskIds, @@ -256,27 +258,39 @@ private Void buildScoringFuture( Optional maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId)); if (taskManager.getActiveTasks().stream() - .anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp() + .anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getSlaveUsage().getTimestamp() && t.getMesosTask().getSlaveId().getValue().equals(slaveId))) { Optional maybeSlave = slaveManager.getSlave(slaveId); if (maybeSlave.isPresent()) { - usagePoller.getSlaveUsage(maybeSlave.get()) + CompletableFuture.supplyAsync(() -> + usagePoller.collectSlaveUsage( + maybeSlave.get(), + System.currentTimeMillis(), + new ConcurrentHashMap<>(), + usageManager.getRequestUtilizations(), + new ConcurrentHashMap<>(), + new AtomicLong(), + new AtomicLong(), + new AtomicDouble(), + new AtomicDouble(), + new AtomicLong(), + new AtomicLong()), + offerScoringExecutor) .whenComplete((usage, throwable) -> { - if (throwable == null) { + if (throwable == null && usage.isPresent()) { currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores( - usage, + usage.get(), mesosConfiguration.getScoreUsingSystemLoad(), getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()), mesosConfiguration.getLoad5OverloadedThreshold(), mesosConfiguration.getLoad1OverloadedThreshold(), - usage.getTimestamp() + usage.get().getTimestamp() )); } else { throw new RuntimeException(throwable); } }); } - return null; } try { diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularitySlaveUsageWithCalculatedScores.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularitySlaveUsageWithCalculatedScores.java index b634fa068a..0c8ae4cef8 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularitySlaveUsageWithCalculatedScores.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularitySlaveUsageWithCalculatedScores.java @@ -129,10 +129,6 @@ SingularitySlaveUsage getSlaveUsage() { return diskInUseScore; } - long getTimestamp() { - return timestamp; - } - void addEstimatedCpuUsage(double estimatedAddedCpus) { this.estimatedAddedCpusUsage += estimatedAddedCpus; } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java index e9894db9fb..9c9449f344 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUsagePoller.java @@ -118,7 +118,7 @@ public void runActionOnPoll() { usageFutures.add(usageCollectionSemaphore.call(() -> CompletableFuture.supplyAsync(() -> { return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, - totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable); + totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable).get(); }, usageExecutor) )); }); @@ -136,25 +136,6 @@ public void runActionOnPoll() { } } - public CompletableFuture getSlaveUsage(SingularitySlave slave) { - return usageCollectionSemaphore.call(() -> - CompletableFuture.supplyAsync(() -> { - return collectSlaveUsage( - slave, - System.currentTimeMillis(), - new ConcurrentHashMap<>(), - usageManager.getRequestUtilizations(), - new ConcurrentHashMap<>(), - new AtomicLong(), - new AtomicLong(), - new AtomicDouble(), - new AtomicDouble(), - new AtomicLong(), - new AtomicLong()); - }, usageExecutor) - ); - } - public void runWithRequestLock(Runnable function, String requestId) { ReentrantLock lock = requestLocks.computeIfAbsent(requestId, (r) -> new ReentrantLock()); lock.lock(); @@ -165,17 +146,18 @@ public void runWithRequestLock(Runnable function, String requestId) { } } - private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave, - long now, - Map utilizationPerRequestId, - Map previousUtilizations, - Map> overLoadedHosts, - AtomicLong totalMemBytesUsed, - AtomicLong totalMemBytesAvailable, - AtomicDouble totalCpuUsed, - AtomicDouble totalCpuAvailable, - AtomicLong totalDiskBytesUsed, - AtomicLong totalDiskBytesAvailable) { + public Optional collectSlaveUsage( + SingularitySlave slave, + long now, + Map utilizationPerRequestId, + Map previousUtilizations, + Map> overLoadedHosts, + AtomicLong totalMemBytesUsed, + AtomicLong totalMemBytesAvailable, + AtomicDouble totalCpuUsed, + AtomicDouble totalCpuAvailable, + AtomicLong totalDiskBytesUsed, + AtomicLong totalDiskBytesAvailable) { Optional memoryMbTotal = Optional.absent(); Optional cpusTotal = Optional.absent(); Optional diskMbTotal = Optional.absent(); @@ -334,13 +316,13 @@ private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave, LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage); usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage); - return slaveUsage; + return Optional.of(slaveUsage); } catch (Throwable t) { String message = String.format("Could not get slave usage for host %s", slave.getHost()); LOG.error(message, t); exceptionNotifier.notify(message, t); } - return null; // TODO: is this really okay? + return Optional.absent(); } private boolean isEligibleForShuffle(SingularityTaskId task) {