Skip to content

Commit

Permalink
Merge pull request #1711 from HubSpot/rebalance
Browse files Browse the repository at this point in the history
Take system usage into account when scoring offers
  • Loading branch information
ssalinas committed Feb 15, 2018
2 parents bd4d2ff + fa38fbe commit 500c283
Show file tree
Hide file tree
Showing 14 changed files with 594 additions and 42 deletions.

Large diffs are not rendered by default.

Expand Up @@ -26,6 +26,13 @@ public enum ResourceUsageType {
private final Map<ResourceUsageType, Number> longRunningTasksUsage;
private final int numTasks;
private final long timestamp;
private final double systemMemTotalBytes;
private final double systemMemFreeBytes;
private final double systemLoad1Min;
private final double systemLoad5Min;
private final double systemLoad15Min;
private final double slaveDiskUsed;
private final double slaveDiskTotal;

@JsonCreator
public SingularitySlaveUsage(@JsonProperty("cpusUsed") double cpusUsed,
Expand All @@ -39,7 +46,14 @@ public SingularitySlaveUsage(@JsonProperty("cpusUsed") double cpusUsed,
@JsonProperty("diskMbTotal") Optional<Long> diskMbTotal,
@JsonProperty("longRunningTasksUsage") Map<ResourceUsageType, Number> longRunningTasksUsage,
@JsonProperty("numTasks") int numTasks,
@JsonProperty("timestamp") long timestamp) {
@JsonProperty("timestamp") long timestamp,
@JsonProperty("systemMemTotalBytes") double systemMemTotalBytes,
@JsonProperty("systemMemFreeBytes") double systemMemFreeBytes,
@JsonProperty("systemLoad1Min") double systemLoad1Min,
@JsonProperty("systemLoad5Min") double systemLoad5Min,
@JsonProperty("systemLoad15Min") double systemLoad15Min,
@JsonProperty("slaveDiskUsed") double slaveDiskUsed,
@JsonProperty("slaveDiskTotal") double slaveDiskTotal) {
this.cpusUsed = cpusUsed;
this.cpusReserved = cpusReserved;
this.cpusTotal = cpusTotal;
Expand All @@ -52,6 +66,13 @@ public SingularitySlaveUsage(@JsonProperty("cpusUsed") double cpusUsed,
this.longRunningTasksUsage = longRunningTasksUsage;
this.numTasks = numTasks;
this.timestamp = timestamp;
this.systemMemTotalBytes = systemMemTotalBytes;
this.systemMemFreeBytes = systemMemFreeBytes;
this.systemLoad1Min = systemLoad1Min;
this.systemLoad5Min = systemLoad5Min;
this.systemLoad15Min = systemLoad15Min;
this.slaveDiskUsed = slaveDiskUsed;
this.slaveDiskTotal = slaveDiskTotal;
}

public double getCpusUsed() {
Expand Down Expand Up @@ -110,17 +131,56 @@ public long getTimestamp() {
return timestamp;
}

public double getSystemMemTotalBytes() {
return systemMemTotalBytes;
}

public double getSystemMemFreeBytes() {
return systemMemFreeBytes;
}

public double getSystemLoad1Min() {
return systemLoad1Min;
}

public double getSystemLoad5Min() {
return systemLoad5Min;
}

public double getSystemLoad15Min() {
return systemLoad15Min;
}

public double getSlaveDiskUsed() {
return slaveDiskUsed;
}

public double getSlaveDiskTotal() {
return slaveDiskTotal;
}

@Override
public String toString() {
return "SingularitySlaveUsage [memoryBytesUsed=" + memoryBytesUsed +
", memoryMbReserved=" + memoryMbReserved +
", memoryMbTotal=" + memoryMbTotal +
", cpusUsed=" + cpusUsed +
return "SingularitySlaveUsage{" +
"cpusUsed=" + cpusUsed +
", cpusReserved=" + cpusReserved +
", cpusTotal=" + cpusTotal +
", numTasks=" + numTasks +
", memoryBytesUsed=" + memoryBytesUsed +
", memoryMbReserved=" + memoryMbReserved +
", memoryMbTotal=" + memoryMbTotal +
", diskBytesUsed=" + diskBytesUsed +
", diskMbReserved=" + diskMbReserved +
", diskMbTotal=" + diskMbTotal +
", longRunningTasksUsage=" + longRunningTasksUsage +
", numTasks=" + numTasks +
", timestamp=" + timestamp +
"]";
", systemMemTotalBytes=" + systemMemTotalBytes +
", systemMemFreeBytes=" + systemMemFreeBytes +
", systemLoad1Min=" + systemLoad1Min +
", systemLoad5Min=" + systemLoad5Min +
", systemLoad15Min=" + systemLoad15Min +
", slaveDiskUsed=" + slaveDiskUsed +
", slaveDiskTotal=" + slaveDiskTotal +
'}';
}
}
Expand Up @@ -20,7 +20,14 @@ public SingularitySlaveUsageWithId(SingularitySlaveUsage usage, String slaveId)
usage.getDiskMbTotal(),
usage.getLongRunningTasksUsage(),
usage.getNumTasks(),
usage.getTimestamp()
usage.getTimestamp(),
usage.getSystemMemTotalBytes(),
usage.getSystemMemFreeBytes(),
usage.getSystemLoad1Min(),
usage.getSystemLoad5Min(),
usage.getSystemLoad15Min(),
usage.getSlaveDiskUsed(),
usage.getSlaveDiskTotal()
);
this.slaveId = slaveId;
}
Expand Down
@@ -0,0 +1,5 @@
package com.hubspot.singularity;

public enum SingularityUsageScoringStrategy {
SPREAD_TASK_USAGE, SPREAD_SYSTEM_USAGE
}
Expand Up @@ -4,14 +4,19 @@

import com.hubspot.mesos.json.MesosMasterMetricsSnapshotObject;
import com.hubspot.mesos.json.MesosMasterStateObject;
import com.hubspot.mesos.json.MesosSlaveMetricsSnapshotObject;
import com.hubspot.mesos.json.MesosSlaveStateObject;
import com.hubspot.mesos.json.MesosTaskMonitorObject;

public interface MesosClient {

public String getMasterUri(String hostnameAndPort);

public String getMetricsSnapshotUri(String hostnameAndPort);
default String getMetricsSnapshotUri(String hostnameAndPort) {
return getMasterMetricsSnapshotUri(hostnameAndPort);
}

public String getMasterMetricsSnapshotUri(String hostnameAndPort);

public static class MesosClientException extends RuntimeException {
private static final long serialVersionUID = 1L;
Expand All @@ -30,6 +35,8 @@ public MesosClientException(String message, Throwable cause) {

public MesosMasterMetricsSnapshotObject getMasterMetricsSnapshot(String uri);

public MesosSlaveMetricsSnapshotObject getSlaveMetricsSnapshot(String uri);

public String getSlaveUri(String hostname);

public MesosSlaveStateObject getSlaveState(String uri);
Expand Down
Expand Up @@ -15,6 +15,7 @@
import com.hubspot.mesos.JavaUtils;
import com.hubspot.mesos.json.MesosMasterMetricsSnapshotObject;
import com.hubspot.mesos.json.MesosMasterStateObject;
import com.hubspot.mesos.json.MesosSlaveMetricsSnapshotObject;
import com.hubspot.mesos.json.MesosSlaveStateObject;
import com.hubspot.mesos.json.MesosTaskMonitorObject;

Expand All @@ -28,7 +29,8 @@ public class SingularityMesosClient implements MesosClient {
private static final String MASTER_STATE_FORMAT = "http://%s/master/state";
private static final String MESOS_SLAVE_JSON_URL = "http://%s:5051/slave(1)/state";
private static final String MESOS_SLAVE_STATISTICS_URL = "http://%s:5051/monitor/statistics";
private static final String MESOS_METRICS_SNAPSHOT_URL = "http://%s/metrics/snapshot";
private static final String MESOS_MASTER_METRICS_SNAPSHOT_URL = "http://%s/metrics/snapshot";
private static final String MESOS_SLAVE_METRICS_SNAPSHOT_URL = "http://%s:5051/metrics/snapshot";

private static final TypeReference<List<MesosTaskMonitorObject>> TASK_MONITOR_TYPE_REFERENCE = new TypeReference<List<MesosTaskMonitorObject>>() {};

Expand All @@ -45,8 +47,8 @@ public String getMasterUri(String hostnameAndPort) {
}

@Override
public String getMetricsSnapshotUri(String hostnameAndPort) {
return String.format(MESOS_METRICS_SNAPSHOT_URL, hostnameAndPort);
public String getMasterMetricsSnapshotUri(String hostnameAndPort) {
return String.format(MESOS_MASTER_METRICS_SNAPSHOT_URL, hostnameAndPort);
}

private HttpResponse getFromMesos(String uri) {
Expand Down Expand Up @@ -91,6 +93,11 @@ public MesosMasterMetricsSnapshotObject getMasterMetricsSnapshot(String uri) {
return getFromMesos(uri, MesosMasterMetricsSnapshotObject.class);
}

@Override
public MesosSlaveMetricsSnapshotObject getSlaveMetricsSnapshot(String hostname) {
return getFromMesos(String.format(MESOS_SLAVE_METRICS_SNAPSHOT_URL, hostname), MesosSlaveMetricsSnapshotObject.class);
}

@Override
public String getSlaveUri(String hostname) {
return String.format(MESOS_SLAVE_JSON_URL, hostname);
Expand Down
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.common.base.Optional;
import com.hubspot.singularity.SingularityUsageScoringStrategy;

@JsonIgnoreProperties( ignoreUnknown = true )
public class MesosConfiguration {
Expand Down Expand Up @@ -55,6 +56,7 @@ public class MesosConfiguration {
private int statusUpdateConcurrencyLimit = 500;
private int maxStatusUpdateQueueSize = 5000;
private int offersConcurrencyLimit = 100;
private SingularityUsageScoringStrategy scoringStrategy = SingularityUsageScoringStrategy.SPREAD_TASK_USAGE;

public int getMaxNumInstancesPerRequest() {
return maxNumInstancesPerRequest;
Expand Down Expand Up @@ -271,4 +273,8 @@ public int getOffersConcurrencyLimit() {
public void setOffersConcurrencyLimit(int offersConcurrencyLimit) {
this.offersConcurrencyLimit = offersConcurrencyLimit;
}

public SingularityUsageScoringStrategy getScoringStrategy() {
return scoringStrategy;
}
}
Expand Up @@ -188,19 +188,20 @@ public Optional<SingularityClusterUtilization> getClusterUtilization() {
return getData(USAGE_SUMMARY_PATH, clusterUtilizationTranscoder);
}

public Map<String, SingularitySlaveUsage> getCurrentSlaveUsages(List<String> slaveIds) {
public List<SingularitySlaveUsageWithId> getCurrentSlaveUsages(List<String> slaveIds) {
List<String> paths = new ArrayList<>(slaveIds.size());
for (String slaveId : slaveIds) {
paths.add(getCurrentSlaveUsagePath(slaveId));
}

return getAsyncWithPath("getAllCurrentSlaveUsage", paths, slaveUsageTranscoder);
return getAsyncWithPath("getAllCurrentSlaveUsage", paths, slaveUsageTranscoder)
.entrySet().stream()
.map((entry) -> new SingularitySlaveUsageWithId(entry.getValue(), getSlaveIdFromCurrentUsagePath(entry.getKey())))
.collect(Collectors.toList());
}

public List<SingularitySlaveUsageWithId> getAllCurrentSlaveUsage() {
return getCurrentSlaveUsages(getSlavesWithUsage()).entrySet().stream()
.map((entry) -> new SingularitySlaveUsageWithId(entry.getValue(), getSlaveIdFromCurrentUsagePath(entry.getKey())))
.collect(Collectors.toList());
return getCurrentSlaveUsages(getSlavesWithUsage());
}

public List<Long> getSlaveUsageTimestamps(String slaveId) {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularitySlaveUsage.ResourceUsageType;
import com.hubspot.singularity.SingularitySlaveUsageWithId;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTaskRequest;
Expand Down Expand Up @@ -184,12 +185,14 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
.stream()
.map(SingularityOfferHolder::getSlaveId)
.collect(Collectors.toList()))
.entrySet().parallelStream()
.parallelStream()
.collect(Collectors.toMap(
Map.Entry::getKey,
(e) -> this.buildSlaveUsageWithScores(e.getValue())
SingularitySlaveUsageWithId::getSlaveId,
this::buildSlaveUsageWithScores
));

LOG.trace("Found slave usages {}", currentSlaveUsagesBySlaveId);

for (SingularityTaskRequestHolder taskRequestHolder : sortedTaskRequestHolders) {
lock.runWithRequestLock(() -> {
Map<String, Integer> tasksPerOffer = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -228,18 +231,37 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
SingularitySlaveUsageWithCalculatedScores buildSlaveUsageWithScores(SingularitySlaveUsage slaveUsage) {
if (SingularitySlaveUsageWithCalculatedScores.missingUsageData(slaveUsage)) {
return new SingularitySlaveUsageWithCalculatedScores(slaveUsage, true, 0, 0, 0, 0, 0, 0, 0);
} else {
double longRunningCpusUsedScore = slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.CPU_USED).doubleValue() / slaveUsage.getCpusTotal().get();
double longRunningMemUsedScore = ((double) slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.MEMORY_BYTES_USED).longValue() / slaveUsage.getMemoryBytesTotal().get());
double longRunningDiskUsedScore = ((double) slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.DISK_BYTES_USED).longValue() / slaveUsage.getDiskBytesTotal().get());
double cpusFreeScore = 1 - (slaveUsage.getCpusReserved() / slaveUsage.getCpusTotal().get());
double memFreeScore = 1 - ((double) slaveUsage.getMemoryMbReserved() / slaveUsage.getMemoryMbTotal().get());
double diskFreeScore = 1 - ((double) slaveUsage.getDiskMbReserved() / slaveUsage.getDiskMbTotal().get());
return new SingularitySlaveUsageWithCalculatedScores(
slaveUsage, false, longRunningCpusUsedScore, longRunningMemUsedScore, longRunningDiskUsedScore, cpusFreeScore, memFreeScore, diskFreeScore,
scoreLongRunningTask(longRunningMemUsedScore, memFreeScore, longRunningCpusUsedScore, cpusFreeScore, longRunningDiskUsedScore, diskFreeScore)
);
}
double longRunningCpusUsedScore = slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.CPU_USED).doubleValue() / slaveUsage.getCpusTotal().get();
double longRunningMemUsedScore = ((double) slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.MEMORY_BYTES_USED).longValue() / slaveUsage.getMemoryBytesTotal().get());
double longRunningDiskUsedScore = ((double) slaveUsage.getLongRunningTasksUsage().get(ResourceUsageType.DISK_BYTES_USED).longValue() / slaveUsage.getDiskBytesTotal().get());
switch (configuration.getMesosConfiguration().getScoringStrategy()) {
case SPREAD_TASK_USAGE:
double cpusFreeScore = 1 - (slaveUsage.getCpusReserved() / slaveUsage.getCpusTotal().get());
double memFreeScore = 1 - ((double) slaveUsage.getMemoryMbReserved() / slaveUsage.getMemoryMbTotal().get());
double diskFreeScore = 1 - ((double) slaveUsage.getDiskMbReserved() / slaveUsage.getDiskMbTotal().get());
return new SingularitySlaveUsageWithCalculatedScores(
slaveUsage, false, longRunningCpusUsedScore, longRunningMemUsedScore, longRunningDiskUsedScore, cpusFreeScore, memFreeScore, diskFreeScore,
scoreLongRunningTask(longRunningMemUsedScore, memFreeScore, longRunningCpusUsedScore, cpusFreeScore, longRunningDiskUsedScore, diskFreeScore)
);
case SPREAD_SYSTEM_USAGE:
default:
double systemCpuFreeScore = Math.max(0, 1 - slaveUsage.getSystemLoad15Min());
double systemMemFreeScore = 1 - (slaveUsage.getSystemMemTotalBytes() - slaveUsage.getSystemMemFreeBytes()) / slaveUsage.getSystemMemTotalBytes();
double systemDiskFreeScore = 1 - (slaveUsage.getSlaveDiskUsed() / slaveUsage.getSlaveDiskTotal());
return new SingularitySlaveUsageWithCalculatedScores(
slaveUsage,
false,
longRunningCpusUsedScore,
longRunningMemUsedScore,
longRunningDiskUsedScore,
systemCpuFreeScore,
systemMemFreeScore,
systemDiskFreeScore,
scoreLongRunningTask(longRunningMemUsedScore, systemMemFreeScore, longRunningCpusUsedScore, systemCpuFreeScore, longRunningDiskUsedScore, systemDiskFreeScore)
);
}

}

private boolean isOfferFull(SingularityOfferHolder offerHolder) {
Expand Down
Expand Up @@ -239,10 +239,16 @@ public void resourceOffers(List<Offer> offers) {
LOG.error("Received fatal error while handling offers - will decline all available offers", t);

mesosSchedulerClient.decline(offersToCheck.stream()
.filter((o) -> !acceptedOffers.contains(o.getId()))
.filter((o) -> !acceptedOffers.contains(o.getId()) && !cachedOffers.containsKey(o.getId().getValue()))
.map(Offer::getId)
.collect(Collectors.toList()));

offersToCheck.forEach((o) -> {
if (cachedOffers.containsKey(o.getId().getValue())) {
offerCache.returnOffer(cachedOffers.get(o.getId().getValue()));
}
});

throw t;
}

Expand Down
Expand Up @@ -76,6 +76,7 @@
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.TaskRequestManager;
import com.hubspot.singularity.helpers.RFC5545Schedule;
import com.hubspot.singularity.mesos.SingularitySchedulerLock;
import com.hubspot.singularity.smtp.SingularityMailer;

@Singleton
Expand All @@ -85,22 +86,20 @@ public class SingularityScheduler {

private final SingularityConfiguration configuration;
private final SingularityCooldown cooldown;

private final TaskManager taskManager;
private final RequestManager requestManager;
private final TaskRequestManager taskRequestManager;
private final DeployManager deployManager;

private final SlaveManager slaveManager;
private final RackManager rackManager;

private final SingularityMailer mailer;

private final SingularityLeaderCache leaderCache;
private final SingularitySchedulerLock lock;

@Inject
public SingularityScheduler(TaskRequestManager taskRequestManager, SingularityConfiguration configuration, SingularityCooldown cooldown, DeployManager deployManager,
TaskManager taskManager, RequestManager requestManager, SlaveManager slaveManager, RackManager rackManager, SingularityMailer mailer, SingularityLeaderCache leaderCache) {
TaskManager taskManager, RequestManager requestManager, SlaveManager slaveManager, RackManager rackManager, SingularityMailer mailer,
SingularityLeaderCache leaderCache, SingularitySchedulerLock lock) {
this.taskRequestManager = taskRequestManager;
this.configuration = configuration;
this.deployManager = deployManager;
Expand All @@ -111,6 +110,7 @@ public SingularityScheduler(TaskRequestManager taskRequestManager, SingularityCo
this.mailer = mailer;
this.cooldown = cooldown;
this.leaderCache = leaderCache;
this.lock = lock;
}

private void cleanupTaskDueToDecomission(final Map<String, Optional<String>> requestIdsToUserToReschedule, final Set<SingularityTaskId> matchingTaskIds, SingularityTask task,
Expand Down Expand Up @@ -238,7 +238,10 @@ public void drainPendingQueue() {

deployKeyToPendingRequests.entrySet().parallelStream()
.forEach((deployKeyToPendingRequest) -> {
handlePendingRequestsForDeployKey(obsoleteRequests, heldForScheduledActiveTask, totalNewScheduledTasks, deployKeyToPendingRequest.getKey(), deployKeyToPendingRequest.getValue());
lock.runWithRequestLock(
() -> handlePendingRequestsForDeployKey(obsoleteRequests, heldForScheduledActiveTask, totalNewScheduledTasks, deployKeyToPendingRequest.getKey(), deployKeyToPendingRequest.getValue()),
deployKeyToPendingRequest.getKey().getRequestId(),
String.format("%s#%s", getClass().getSimpleName(), "drainPendingQueue"));
});

LOG.info("Scheduled {} new tasks ({} obsolete requests, {} held) in {}", totalNewScheduledTasks.get(), obsoleteRequests.get(), heldForScheduledActiveTask.get(), JavaUtils.duration(start));
Expand Down

0 comments on commit 500c283

Please sign in to comment.