Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pschoenfelder committed Jul 24, 2018
1 parent e610cf1 commit 339dd44
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 45 deletions.
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand All @@ -22,6 +23,7 @@


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hubspot.mesos.Resources; import com.hubspot.mesos.Resources;
Expand Down Expand Up @@ -208,7 +210,7 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
for (SingularityOfferHolder offerHolder : offerHolders.values()) { for (SingularityOfferHolder offerHolder : offerHolders.values()) {
scoringFutures.add(offerScoringSemaphore.call(() -> scoringFutures.add(offerScoringSemaphore.call(() ->
CompletableFuture.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
return buildScoringFuture(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder); return calculateScore(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder);
}, },
offerScoringExecutor))); offerScoringExecutor)));
} }
Expand Down Expand Up @@ -238,7 +240,7 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
return offerHolders.values(); return offerHolders.values();
} }


private Void buildScoringFuture( private Void calculateScore(
Map<String, SingularityOfferHolder> offerHolders, Map<String, SingularityOfferHolder> offerHolders,
Map<String, RequestUtilization> requestUtilizations, Map<String, RequestUtilization> requestUtilizations,
List<SingularityTaskId> activeTaskIds, List<SingularityTaskId> activeTaskIds,
Expand All @@ -256,27 +258,39 @@ private Void buildScoringFuture(
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId)); Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId));


if (taskManager.getActiveTasks().stream() if (taskManager.getActiveTasks().stream()
.anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp() .anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getSlaveUsage().getTimestamp()
&& t.getMesosTask().getSlaveId().getValue().equals(slaveId))) { && t.getMesosTask().getSlaveId().getValue().equals(slaveId))) {
Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(slaveId); Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(slaveId);
if (maybeSlave.isPresent()) { if (maybeSlave.isPresent()) {
usagePoller.getSlaveUsage(maybeSlave.get()) CompletableFuture.supplyAsync(() ->
usagePoller.collectSlaveUsage(
maybeSlave.get(),
System.currentTimeMillis(),
new ConcurrentHashMap<>(),
usageManager.getRequestUtilizations(),
new ConcurrentHashMap<>(),
new AtomicLong(),
new AtomicLong(),
new AtomicDouble(),
new AtomicDouble(),
new AtomicLong(),
new AtomicLong()),
offerScoringExecutor)
.whenComplete((usage, throwable) -> { .whenComplete((usage, throwable) -> {
if (throwable == null) { if (throwable == null && usage.isPresent()) {
currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores( currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores(
usage, usage.get(),
mesosConfiguration.getScoreUsingSystemLoad(), mesosConfiguration.getScoreUsingSystemLoad(),
getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()), getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()),
mesosConfiguration.getLoad5OverloadedThreshold(), mesosConfiguration.getLoad5OverloadedThreshold(),
mesosConfiguration.getLoad1OverloadedThreshold(), mesosConfiguration.getLoad1OverloadedThreshold(),
usage.getTimestamp() usage.get().getTimestamp()
)); ));
} else { } else {
throw new RuntimeException(throwable); throw new RuntimeException(throwable);
} }
}); });
} }
return null;
} }


try { try {
Expand Down
Expand Up @@ -129,10 +129,6 @@ SingularitySlaveUsage getSlaveUsage() {
return diskInUseScore; return diskInUseScore;
} }


long getTimestamp() {
return timestamp;
}

void addEstimatedCpuUsage(double estimatedAddedCpus) { void addEstimatedCpuUsage(double estimatedAddedCpus) {
this.estimatedAddedCpusUsage += estimatedAddedCpus; this.estimatedAddedCpusUsage += estimatedAddedCpus;
} }
Expand Down
Expand Up @@ -118,7 +118,7 @@ public void runActionOnPoll() {
usageFutures.add(usageCollectionSemaphore.call(() -> usageFutures.add(usageCollectionSemaphore.call(() ->
CompletableFuture.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable,
totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable); totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable).get();
}, usageExecutor) }, usageExecutor)
)); ));
}); });
Expand All @@ -136,25 +136,6 @@ public void runActionOnPoll() {
} }
} }


public CompletableFuture<SingularitySlaveUsage> getSlaveUsage(SingularitySlave slave) {
return usageCollectionSemaphore.call(() ->
CompletableFuture.supplyAsync(() -> {
return collectSlaveUsage(
slave,
System.currentTimeMillis(),
new ConcurrentHashMap<>(),
usageManager.getRequestUtilizations(),
new ConcurrentHashMap<>(),
new AtomicLong(),
new AtomicLong(),
new AtomicDouble(),
new AtomicDouble(),
new AtomicLong(),
new AtomicLong());
}, usageExecutor)
);
}

public void runWithRequestLock(Runnable function, String requestId) { public void runWithRequestLock(Runnable function, String requestId) {
ReentrantLock lock = requestLocks.computeIfAbsent(requestId, (r) -> new ReentrantLock()); ReentrantLock lock = requestLocks.computeIfAbsent(requestId, (r) -> new ReentrantLock());
lock.lock(); lock.lock();
Expand All @@ -165,17 +146,18 @@ public void runWithRequestLock(Runnable function, String requestId) {
} }
} }


private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave, public Optional<SingularitySlaveUsage> collectSlaveUsage(
long now, SingularitySlave slave,
Map<String, RequestUtilization> utilizationPerRequestId, long now,
Map<String, RequestUtilization> previousUtilizations, Map<String, RequestUtilization> utilizationPerRequestId,
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts, Map<String, RequestUtilization> previousUtilizations,
AtomicLong totalMemBytesUsed, Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts,
AtomicLong totalMemBytesAvailable, AtomicLong totalMemBytesUsed,
AtomicDouble totalCpuUsed, AtomicLong totalMemBytesAvailable,
AtomicDouble totalCpuAvailable, AtomicDouble totalCpuUsed,
AtomicLong totalDiskBytesUsed, AtomicDouble totalCpuAvailable,
AtomicLong totalDiskBytesAvailable) { AtomicLong totalDiskBytesUsed,
AtomicLong totalDiskBytesAvailable) {
Optional<Long> memoryMbTotal = Optional.absent(); Optional<Long> memoryMbTotal = Optional.absent();
Optional<Double> cpusTotal = Optional.absent(); Optional<Double> cpusTotal = Optional.absent();
Optional<Long> diskMbTotal = Optional.absent(); Optional<Long> diskMbTotal = Optional.absent();
Expand Down Expand Up @@ -334,13 +316,13 @@ private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave,


LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage); LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage);
usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage); usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage);
return slaveUsage; return Optional.of(slaveUsage);
} catch (Throwable t) { } catch (Throwable t) {
String message = String.format("Could not get slave usage for host %s", slave.getHost()); String message = String.format("Could not get slave usage for host %s", slave.getHost());
LOG.error(message, t); LOG.error(message, t);
exceptionNotifier.notify(message, t); exceptionNotifier.notify(message, t);
} }
return null; // TODO: is this really okay? return Optional.absent();
} }


private boolean isEligibleForShuffle(SingularityTaskId task) { private boolean isEligibleForShuffle(SingularityTaskId task) {
Expand Down

0 comments on commit 339dd44

Please sign in to comment.