New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better task balancing #1482

Merged
merged 73 commits into from Jun 8, 2017
Commits
Jump to file or symbol
Failed to load files and symbols.
+167 鈭42
Diff settings

Always

Just for now

Viewing a subset of changes. View all

calculate min offer score based on cluster state

  • Loading branch information...
darcatron committed May 23, 2017
commit 75cfd41778a44b2a64a9a6b9c6cc184e7970392e
@@ -0,0 +1,57 @@
package com.hubspot.singularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class SingularityClusterUtilization {
private long totalMemBytesUsed;
private long totalMemBytesAvailable;
private double totalCpuUsed;
private double totalCpuAvailable;
private final long timestamp;
@JsonCreator
public SingularityClusterUtilization(@JsonProperty("totalMemBytesUsed") long totalMemBytesUsed,
@JsonProperty("totalMemBytesAvailable") long totalMemBytesAvailable,
@JsonProperty("totalCpuUsed") double totalCpuUsed,
@JsonProperty("totalCpuAvailable") double totalCpuAvailable,
@JsonProperty("timestamp") long timestamp) {
this.totalMemBytesUsed = totalMemBytesUsed;
this.totalMemBytesAvailable = totalMemBytesAvailable;
this.totalCpuUsed = totalCpuUsed;
this.totalCpuAvailable = totalCpuAvailable;
this.timestamp = timestamp;
}
public long getTotalMemBytesUsed() {
return totalMemBytesUsed;
}
public long getTotalMemBytesAvailable() {
return totalMemBytesAvailable;
}
public double getTotalCpuUsed() {
return totalCpuUsed;
}
public double getTotalCpuAvailable() {
return totalCpuAvailable;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "SingularityClusterUtilization [" +
"totalMemBytesUsed=" + totalMemBytesUsed +
", totalMemBytesAvailable=" + totalMemBytesAvailable +
", totalCpuUsed=" + totalCpuUsed +
", totalCpuAvailable=" + totalCpuAvailable +
", timestamp=" + timestamp +
"]";
}
}
@@ -198,8 +198,6 @@
private int maxTasksPerOfferPerRequest = 0;
private double minOfferScore = 0.40;
private int maxOfferAttemptsPerTask = 0;
private long maxMillisPastDuePerTask = TimeUnit.MINUTES.toMillis(5);
@@ -692,10 +690,6 @@ public int getMaxTasksPerOfferPerRequest() {
return maxTasksPerOfferPerRequest;
}
public double getMinOfferScore() {
return minOfferScore;
}
public int getMaxOfferAttemptsPerTask() {
return maxOfferAttemptsPerTask;
}
@@ -1091,11 +1085,6 @@ public void setMaxTasksPerOfferPerRequest(int maxTasksPerOfferPerRequest) {
this.maxTasksPerOfferPerRequest = maxTasksPerOfferPerRequest;
}
public SingularityConfiguration setMinOfferScore(double minOfferScore) {
this.minOfferScore = minOfferScore;
return this;
}
public SingularityConfiguration setMaxOfferAttemptsPerTask(int maxOfferAttemptsPerTask) {
this.maxOfferAttemptsPerTask = maxOfferAttemptsPerTask;
return this;
@@ -13,6 +13,7 @@
import com.codahale.metrics.MetricRegistry;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.singularity.SingularityClusterUtilization;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityDeleteResult;
import com.hubspot.singularity.SingularitySlaveUsage;
@@ -31,22 +32,30 @@
private static final String SLAVE_PATH = ROOT_PATH + "/slaves";
private static final String TASK_PATH = ROOT_PATH + "/tasks";
private static final String USAGE_SUMMARY_PATH = ROOT_PATH + "/summary";

This comment has been minimized.

@darcatron

darcatron May 23, 2017

Contributor

@ssalinas can you take a look at this piece? Just wanna make sure I got this set up right

@darcatron

darcatron May 23, 2017

Contributor

@ssalinas can you take a look at this piece? Just wanna make sure I got this set up right

private static final String USAGE_HISTORY_PATH_KEY = "history";
private static final String CURRENT_USAGE_NODE_KEY = "CURRENT";
private final Transcoder<SingularitySlaveUsage> slaveUsageTranscoder;
private final Transcoder<SingularityTaskUsage> taskUsageTranscoder;
private final Transcoder<SingularityTaskCurrentUsage> taskCurrentUsageTranscoder;
private final Transcoder<SingularityClusterUtilization> clusterUtilizationTranscoder;
@Inject
public UsageManager(CuratorFramework curator, SingularityConfiguration configuration, MetricRegistry metricRegistry, Transcoder<SingularitySlaveUsage> slaveUsageTranscoder,
Transcoder<SingularityTaskUsage> taskUsageTranscoder, Transcoder<SingularityTaskCurrentUsage> taskCurrentUsageTranscoder) {
public UsageManager(CuratorFramework curator,
SingularityConfiguration configuration,
MetricRegistry metricRegistry,
Transcoder<SingularitySlaveUsage> slaveUsageTranscoder,
Transcoder<SingularityTaskUsage> taskUsageTranscoder,
Transcoder<SingularityTaskCurrentUsage> taskCurrentUsageTranscoder,
Transcoder<SingularityClusterUtilization> clusterUtilizationTranscoder) {
super(curator, configuration, metricRegistry);
this.slaveUsageTranscoder = slaveUsageTranscoder;
this.taskUsageTranscoder = taskUsageTranscoder;
this.taskCurrentUsageTranscoder = taskCurrentUsageTranscoder;
this.clusterUtilizationTranscoder = clusterUtilizationTranscoder;
}
public List<String> getSlavesWithUsage() {
@@ -95,6 +104,10 @@ private String getCurrentSlaveUsagePath(String slaveId) {
return ZKPaths.makePath(getSlaveUsagePath(slaveId), CURRENT_USAGE_NODE_KEY);
}
private String getSpecificClusterUtilizationPath(long timestamp) {

This comment has been minimized.

@ssalinas

ssalinas May 23, 2017

Member

why SpecificClusterUtilization instead of just getUsageSummaryPath or something like that?

@ssalinas

ssalinas May 23, 2017

Member

why SpecificClusterUtilization instead of just getUsageSummaryPath or something like that?

This comment has been minimized.

@darcatron

darcatron May 23, 2017

Contributor

the "specific" keyword was the pattern the other items were using so I kept that since the timestamp specified which one to grab. I can drop the historical data and then rename it

@darcatron

darcatron May 23, 2017

Contributor

the "specific" keyword was the pattern the other items were using so I kept that since the timestamp specified which one to grab. I can drop the historical data and then rename it

return ZKPaths.makePath(USAGE_SUMMARY_PATH, Long.toString(timestamp));
}
private String getCurrentTaskUsagePath(String taskId) {
return ZKPaths.makePath(getTaskUsagePath(taskId), CURRENT_USAGE_NODE_KEY);
}
@@ -107,6 +120,10 @@ public SingularityDeleteResult deleteTaskUsage(String taskId) {
return delete(getTaskUsagePath(taskId));
}
public SingularityDeleteResult deleteSpecificClusterUtilization(long timestamp) {
return delete(getSpecificClusterUtilizationPath(timestamp));
}
public SingularityDeleteResult deleteSpecificSlaveUsage(String slaveId, long timestamp) {
return delete(getSpecificSlaveUsagePath(slaveId, timestamp));
}
@@ -123,6 +140,10 @@ public SingularityCreateResult saveSpecificTaskUsage(String taskId, SingularityT
return save(getSpecificTaskUsagePath(taskId, usage.getTimestamp()), usage, taskUsageTranscoder);
}
public SingularityCreateResult saveSpecificClusterUtilization(SingularityClusterUtilization utilization) {
return save(getSpecificClusterUtilizationPath(utilization.getTimestamp()) , utilization, clusterUtilizationTranscoder);

This comment has been minimized.

@ssalinas

ssalinas May 23, 2017

Member

is there any point at which we would want a history of these? If we don't need a history of summaries, we might as well save the data to the summary path (without timestamp) and just overwrite when there is new data.

@ssalinas

ssalinas May 23, 2017

Member

is there any point at which we would want a history of these? If we don't need a history of summaries, we might as well save the data to the summary path (without timestamp) and just overwrite when there is new data.

This comment has been minimized.

@darcatron

darcatron May 23, 2017

Contributor

I didn't see a reason to at this point. Saw the others were saving them so it seemed okay since we only save up to 5 points. I think it's safe to just have the one point. Keeps it simpler

@darcatron

darcatron May 23, 2017

Contributor

I didn't see a reason to at this point. Saw the others were saving them so it seemed okay since we only save up to 5 points. I think it's safe to just have the one point. Keeps it simpler

}
public SingularityCreateResult saveSpecificSlaveUsageAndSetCurrent(String slaveId, SingularitySlaveUsage usage) {
set(getCurrentSlaveUsagePath(slaveId), usage, slaveUsageTranscoder);
return save(getSpecificSlaveUsagePath(slaveId, usage.getTimestamp()), usage, slaveUsageTranscoder);
@@ -139,7 +160,7 @@ public int compare(SingularitySlaveUsage o1, SingularitySlaveUsage o2) {
public List<SingularitySlaveUsage> getSlaveUsage(String slaveId) {
List<SingularitySlaveUsage> children = getAsyncChildren(getSlaveUsageHistoryPath(slaveId), slaveUsageTranscoder);
Collections.sort(children, SLAVE_USAGE_COMPARATOR_TIMESTAMP_ASC);
children.sort(SLAVE_USAGE_COMPARATOR_TIMESTAMP_ASC);
return children;
}
@@ -154,7 +175,23 @@ public int compare(SingularityTaskUsage o1, SingularityTaskUsage o2) {
public List<SingularityTaskUsage> getTaskUsage(String taskId) {
List<SingularityTaskUsage> children = getAsyncChildren(getTaskUsageHistoryPath(taskId), taskUsageTranscoder);
Collections.sort(children, TASK_USAGE_COMPARATOR_TIMESTAMP_ASC);
children.sort(TASK_USAGE_COMPARATOR_TIMESTAMP_ASC);
return children;
}
private static final Comparator<SingularityClusterUtilization> CLUSTER_UTILIZATION_COMPARATOR_TIMESTAMP_ASC = new Comparator<SingularityClusterUtilization>() {
@Override
public int compare(SingularityClusterUtilization o1, SingularityClusterUtilization o2) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
}
};
public List<SingularityClusterUtilization> getClusterUtilization() {
List<SingularityClusterUtilization> children = getAsyncChildren(USAGE_SUMMARY_PATH, clusterUtilizationTranscoder);
children.sort(CLUSTER_UTILIZATION_COMPARATOR_TIMESTAMP_ASC);
return children;
}
@@ -4,6 +4,7 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.hubspot.singularity.SingularityClusterUtilization;
import com.hubspot.singularity.SingularityDeploy;
import com.hubspot.singularity.SingularityDeployHistory;
import com.hubspot.singularity.SingularityDeployKey;
@@ -100,6 +101,7 @@ public void configure(final Binder binder) {
bindTranscoder(binder).asJson(SingularityUserSettings.class);
bindTranscoder(binder).asJson(SingularitySlaveUsage.class);
bindTranscoder(binder).asJson(SingularityTaskUsage.class);
bindTranscoder(binder).asJson(SingularityClusterUtilization.class);
bindTranscoder(binder).asJson(SingularityTaskCurrentUsage.class);
bindTranscoder(binder).asCompressedJson(SingularityDeployHistory.class);
@@ -24,6 +24,7 @@
import com.hubspot.mesos.MesosUtils;
import com.hubspot.mesos.Resources;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.SingularityClusterUtilization;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularitySlaveUsage.ResourceUsageType;
@@ -134,7 +135,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
SingularityTaskRequestHolder taskRequestHolder = iterator.next();
Map<SingularityOfferHolder, Double> scorePerOffer = new HashMap<>();
double minScore = minScore(taskRequestHolder.getTaskRequest(), offerMatchAttemptsPerTask, System.currentTimeMillis());
double minScore = minScore(taskRequestHolder.getTaskRequest(), offerMatchAttemptsPerTask, getLatestClusterUtilization(), System.currentTimeMillis());
LOG.trace("Minimum score {} for task {}", minScore, taskRequestHolder.getTaskRequest().getPendingTask().getPendingTaskId().getId());
@@ -191,6 +192,11 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
return offerHolders;
}
private Optional<SingularityClusterUtilization> getLatestClusterUtilization() {
List<SingularityClusterUtilization> clusterUtilizations = usageManager.getClusterUtilization();
return clusterUtilizations.isEmpty() ? Optional.absent() : Optional.of(clusterUtilizations.get(clusterUtilizations.size() - 1));
}
private double getNormalizedWeight(ResourceUsageType type) {
double freeCpuWeight = configuration.getFreeCpuWeightForOffer();
double freeMemWeight = configuration.getFreeMemWeightForOffer();
@@ -357,8 +363,17 @@ private double calculateScore(double longRunningMemUsedScore, double memFreeScor
}
@VisibleForTesting
double minScore(SingularityTaskRequest taskRequest, Map<String, Integer> offerMatchAttemptsPerTask, long now) {
double minScore = configuration.getMinOfferScore();
double minScore(SingularityTaskRequest taskRequest, Map<String, Integer> offerMatchAttemptsPerTask, Optional<SingularityClusterUtilization> maybeUtilization, long now) {
if (!maybeUtilization.isPresent()) {
return 0.00;
}
SingularityClusterUtilization utilization = maybeUtilization.get();
double memScore = (1 - (utilization.getTotalMemBytesUsed() / (double) utilization.getTotalMemBytesAvailable())) * configuration.getFreeMemWeightForOffer();
double cpuScore = (1 - (utilization.getTotalCpuUsed() / utilization.getTotalCpuAvailable())) * configuration.getFreeCpuWeightForOffer();
double tolerance = 0.30;
double minScore = memScore + cpuScore - tolerance;
minScore -= offerMatchAttemptsPerTask.getOrDefault(taskRequest.getPendingTask().getPendingTaskId().getId(), 0) / getMaxOfferAttemptsPerTask();
minScore -= millisPastDue(taskRequest, now) / (double) configuration.getMaxMillisPastDuePerTask();
@@ -13,6 +13,7 @@
import com.hubspot.mesos.client.MesosClient;
import com.hubspot.mesos.json.MesosTaskMonitorObject;
import com.hubspot.singularity.InvalidSingularityTaskIdException;
import com.hubspot.singularity.SingularityClusterUtilization;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityRequestWithState;
import com.hubspot.singularity.SingularitySlave;
@@ -66,8 +67,8 @@
@Override
public void runActionOnPoll() {
final long now = System.currentTimeMillis();
double totalMemBytesUsed = 0.00;
double totalMemBytesAvailable = 0.00;
long totalMemBytesUsed = 0;
long totalMemBytesAvailable = 0;
double totalCpuUsed = 0.00;
double totalCpuAvailable = 0.00;
@@ -158,7 +159,7 @@ public void runActionOnPoll() {
exceptionNotifier.notify(message, e);
}
setMinScore(totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable);
saveClusterUtilization(totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, now);
}
}
@@ -190,10 +191,11 @@ private void updateLongRunningTasksUsage(Map<ResourceUsageType, Number> longRunn
longRunningTasksUsage.compute(ResourceUsageType.CPU_USED, (k, v) -> (v == null) ? cpuUsed : v.doubleValue() + cpuUsed);
}
private void setMinScore(double totalMemBytesUsed, double totalMemBytesAvailable, double totalCpuUsed, double totalCpuAvailable) {
double tolerance = 3.00;
double memScore = (1 - (totalMemBytesUsed / totalMemBytesAvailable)) * configuration.getFreeMemWeightForOffer();
double cpuScore = (1 - (totalCpuUsed / totalCpuAvailable)) * configuration.getFreeCpuWeightForOffer();
configuration.setMinOfferScore((memScore + cpuScore) / tolerance);
private void saveClusterUtilization(long totalMemBytesUsed, long totalMemBytesAvailable, double totalCpuUsed, double totalCpuAvailable, long now) {
List<SingularityClusterUtilization> utilizations = usageManager.getClusterUtilization();
if (utilizations.size() + 1 > configuration.getNumUsageToKeep()) {
usageManager.deleteSpecificClusterUtilization(utilizations.get(0).getTimestamp());
}
usageManager.saveSpecificClusterUtilization(new SingularityClusterUtilization(totalMemBytesUsed, totalMemBytesAvailable, totalCpuUsed, totalCpuAvailable, now));
}
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.