Skip to content

Commit

Permalink
Rework async stuff per PR
Browse files Browse the repository at this point in the history
  • Loading branch information
pschoenfelder committed Jul 19, 2018
1 parent 3a7a125 commit e610cf1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 45 deletions.
Expand Up @@ -206,43 +206,11 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
List<CompletableFuture<Void>> scoringFutures = new ArrayList<>(); List<CompletableFuture<Void>> scoringFutures = new ArrayList<>();
AtomicReference<Throwable> scoringException = new AtomicReference<>(null); AtomicReference<Throwable> scoringException = new AtomicReference<>(null);
for (SingularityOfferHolder offerHolder : offerHolders.values()) { for (SingularityOfferHolder offerHolder : offerHolders.values()) {
if (isOfferFull(offerHolder)) { scoringFutures.add(offerScoringSemaphore.call(() ->
continue; CompletableFuture.supplyAsync(() -> {
} return buildScoringFuture(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder);
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(offerHolder.getSlaveId())); },

offerScoringExecutor)));
if (taskManager.getActiveTasks().stream()
.anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp()
&& t.getMesosTask().getSlaveId().getValue().equals(offerHolder.getSlaveId()))) {
Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId());
if (maybeSlave.isPresent()) {
usagePoller.getSlaveUsage(maybeSlave.get());
}
continue;
}

// if (maybeSlaveUsage.isPresent() && System.currentTimeMillis() - maybeSlaveUsage.get().getTimestamp() > configuration.getMaxSlaveUsageMetricAgeMs()) {
// Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId());
// if (maybeSlave.isPresent()) {
// usagePoller.getSlaveUsage(maybeSlave.get());
// }
// continue;
// }
scoringFutures.add(
offerScoringSemaphore.call(
() -> CompletableFuture.runAsync(() -> {
try {
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId()));
if (score != 0) {
scorePerOffer.put(offerHolder.getSlaveId(), score);
}
} catch (Throwable t) {
LOG.error("Uncaught exception while scoring offers", t);
scoringException.set(t);
}
},
offerScoringExecutor
)));
} }


CompletableFutures.allOf(scoringFutures).join(); CompletableFutures.allOf(scoringFutures).join();
Expand Down Expand Up @@ -270,6 +238,59 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
return offerHolders.values(); return offerHolders.values();
} }


private Void buildScoringFuture(
Map<String, SingularityOfferHolder> offerHolders,
Map<String, RequestUtilization> requestUtilizations,
List<SingularityTaskId> activeTaskIds,
Map<String, SingularitySlaveUsageWithCalculatedScores> currentSlaveUsagesBySlaveId,
Map<String, Integer> tasksPerOfferHost,
SingularityTaskRequestHolder taskRequestHolder,
Map<String, Double> scorePerOffer,
List<SingularityTaskId> activeTaskIdsForRequest,
AtomicReference<Throwable> scoringException,
SingularityOfferHolder offerHolder) {
if (isOfferFull(offerHolder)) {
return null;
}
String slaveId = offerHolder.getSlaveId();
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId));

if (taskManager.getActiveTasks().stream()
.anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp()
&& t.getMesosTask().getSlaveId().getValue().equals(slaveId))) {
Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(slaveId);
if (maybeSlave.isPresent()) {
usagePoller.getSlaveUsage(maybeSlave.get())
.whenComplete((usage, throwable) -> {
if (throwable == null) {
currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores(
usage,
mesosConfiguration.getScoreUsingSystemLoad(),
getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()),
mesosConfiguration.getLoad5OverloadedThreshold(),
mesosConfiguration.getLoad1OverloadedThreshold(),
usage.getTimestamp()
));
} else {
throw new RuntimeException(throwable);
}
});
}
return null;
}

try {
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId()));
if (score != 0) {
scorePerOffer.put(slaveId, score);
}
} catch (Throwable t) {
LOG.error("Uncaught exception while scoring offers", t);
scoringException.set(t);
}
return null;
}

private MaxProbableUsage getMaxProbableUsageForSlave(List<SingularityTaskId> activeTaskIds, Map<String, RequestUtilization> requestUtilizations, String sanitizedHostname) { private MaxProbableUsage getMaxProbableUsageForSlave(List<SingularityTaskId> activeTaskIds, Map<String, RequestUtilization> requestUtilizations, String sanitizedHostname) {
double cpu = 0; double cpu = 0;
double memBytes = 0; double memBytes = 0;
Expand Down
Expand Up @@ -68,7 +68,7 @@ public class SingularityUsagePoller extends SingularityLeaderOnlyPoller {
private final DeployManager deployManager; private final DeployManager deployManager;
private final TaskManager taskManager; private final TaskManager taskManager;


private final AsyncSemaphore<Void> usageCollectionSemaphore; private final AsyncSemaphore<SingularitySlaveUsage> usageCollectionSemaphore;
private final ExecutorService usageExecutor; private final ExecutorService usageExecutor;
private final ConcurrentHashMap<String, ReentrantLock> requestLocks; private final ConcurrentHashMap<String, ReentrantLock> requestLocks;


Expand Down Expand Up @@ -112,12 +112,12 @@ public void runActionOnPoll() {


Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts = new ConcurrentHashMap<>(); Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts = new ConcurrentHashMap<>();


List<CompletableFuture<Void>> usageFutures = new ArrayList<>(); List<CompletableFuture<SingularitySlaveUsage>> usageFutures = new ArrayList<>();


usageHelper.getSlavesToTrackUsageFor().forEach((slave) -> { usageHelper.getSlavesToTrackUsageFor().forEach((slave) -> {
usageFutures.add(usageCollectionSemaphore.call(() -> usageFutures.add(usageCollectionSemaphore.call(() ->
CompletableFuture.runAsync(() -> { CompletableFuture.supplyAsync(() -> {
collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable,
totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable); totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable);
}, usageExecutor) }, usageExecutor)
)); ));
Expand All @@ -136,10 +136,10 @@ public void runActionOnPoll() {
} }
} }


public CompletableFuture<Void> getSlaveUsage(SingularitySlave slave) { public CompletableFuture<SingularitySlaveUsage> getSlaveUsage(SingularitySlave slave) {
return usageCollectionSemaphore.call(() -> return usageCollectionSemaphore.call(() ->
CompletableFuture.runAsync(() -> { CompletableFuture.supplyAsync(() -> {
collectSlaveUsage( return collectSlaveUsage(
slave, slave,
System.currentTimeMillis(), System.currentTimeMillis(),
new ConcurrentHashMap<>(), new ConcurrentHashMap<>(),
Expand All @@ -165,7 +165,7 @@ public void runWithRequestLock(Runnable function, String requestId) {
} }
} }


private void collectSlaveUsage(SingularitySlave slave, private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave,
long now, long now,
Map<String, RequestUtilization> utilizationPerRequestId, Map<String, RequestUtilization> utilizationPerRequestId,
Map<String, RequestUtilization> previousUtilizations, Map<String, RequestUtilization> previousUtilizations,
Expand Down Expand Up @@ -334,11 +334,13 @@ private void collectSlaveUsage(SingularitySlave slave,


LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage); LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage);
usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage); usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage);
return slaveUsage;
} catch (Throwable t) { } catch (Throwable t) {
String message = String.format("Could not get slave usage for host %s", slave.getHost()); String message = String.format("Could not get slave usage for host %s", slave.getHost());
LOG.error(message, t); LOG.error(message, t);
exceptionNotifier.notify(message, t); exceptionNotifier.notify(message, t);
} }
return null; // TODO: is this really okay?
} }


private boolean isEligibleForShuffle(SingularityTaskId task) { private boolean isEligibleForShuffle(SingularityTaskId task) {
Expand Down

0 comments on commit e610cf1

Please sign in to comment.