Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine offers to schedule tasks more efficiently #1561

Merged
merged 18 commits into from Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions SingularityBase/src/main/java/com/hubspot/mesos/MesosUtils.java
Expand Up @@ -300,6 +300,17 @@ public static boolean doesOfferMatchResources(Optional<String> requiredRole, Res
return true;
}

public static boolean allResourceCountsNonNegative(List<Resource> resources) {
return doesOfferMatchResources(
Optional.absent(),
new Resources(0, 0, 0, 0),
resources,
Collections.emptyList() // TODO: does something special need to be done here for ports?
// TODO: No, because individual ports will have already been subtracted by the time we call this.
// TODO: we just need to check that number of ports is nonnegative
);
}

public static boolean isTaskDone(TaskState state) {
return state == TaskState.TASK_FAILED || state == TaskState.TASK_LOST || state == TaskState.TASK_KILLED || state == TaskState.TASK_FINISHED;
}
Expand Down Expand Up @@ -395,6 +406,34 @@ public static List<Resource> subtractResources(List<Resource> resources, List<Re
return remaining;
}

public static List<Resource> combineResources(List<List<Resource>> resourcesList) {
List<Resource> resources = new ArrayList<>();
for (List<Resource> resourcesToAdd : resourcesList) {
for (Resource resource : resourcesToAdd) {
Optional<Resource> matched = getMatchingResource(resource, resources);
if (!matched.isPresent()) {
resources.add(resource);
} else {
int index = resources.indexOf(matched.get());
Resource.Builder resourceBuilder = resource.toBuilder().clone();
if (resource.hasScalar()) {
resourceBuilder.setScalar(resource.toBuilder().getScalarBuilder().setValue(resource.getScalar().getValue() + matched.get().getScalar().getValue()).build());
resources.set(index, resourceBuilder.build());
} else if (resource.hasRanges()) {
Ranges.Builder newRanges = Ranges.newBuilder();
resource.getRanges().getRangeList().forEach(newRanges::addRange);
matched.get().getRanges().getRangeList().forEach(newRanges::addRange);
resourceBuilder.setRanges(newRanges);
resources.set(index, resourceBuilder.build());
} else {
throw new IllegalStateException(String.format("Can't subtract non-scalar or range resources %s", formatForLogging(resource)));
}
}
}
}
return resources;
}

public static Resources buildResourcesFromMesosResourceList(List<Resource> resources) {
return new Resources(getNumCpus(resources, Optional.<String>absent()), getMemory(resources, Optional.<String>absent()), getNumPorts(resources), getDisk(resources, Optional.<String>absent()));
}
Expand Down
@@ -1,41 +1,67 @@
package com.hubspot.singularity;

import java.util.Collections;
import java.util.List;

import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.hubspot.mesos.MesosUtils;
import com.wordnik.swagger.annotations.ApiModelProperty;

public class SingularityTask extends SingularityTaskIdHolder {

private final SingularityTaskRequest taskRequest;
private final Offer offer;
private final List<Offer> offers;
private final TaskInfo mesosTask;
private final Optional<String> rackId;

@Deprecated
public SingularityTask(SingularityTaskRequest taskRequest, SingularityTaskId taskId, Offer offer, TaskInfo task, Optional<String> rackId) {
this(taskRequest, taskId, null, Collections.singletonList(offer), task, rackId);
}

public SingularityTask(SingularityTaskRequest taskRequest, SingularityTaskId taskId, List<Offer> offers, TaskInfo task, Optional<String> rackId) {
this(taskRequest, taskId, null, offers, task, rackId);
}

@JsonCreator
public SingularityTask(@JsonProperty("taskRequest") SingularityTaskRequest taskRequest, @JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("offer") Offer offer,
@JsonProperty("mesosTask") TaskInfo task, @JsonProperty("rackId") Optional<String> rackId) {
public SingularityTask(@JsonProperty("taskRequest") SingularityTaskRequest taskRequest, @JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("offer") Offer offer, @JsonProperty("offers") List<Offer> offers,
@JsonProperty("mesosTask") TaskInfo task, @JsonProperty("rackId") Optional<String> rackId) {
super(taskId);
Preconditions.checkArgument(offer != null || offers != null, "Must specify at least one of offer / offers");
this.taskRequest = taskRequest;
this.offer = offer;
this.mesosTask = task;
this.rackId = rackId;
if (offers != null) {
this.offers = offers;
} else {
this.offers = Collections.singletonList(offer);
}
}

public SingularityTaskRequest getTaskRequest() {
return taskRequest;
}

/*
* Use getOffers instead. getOffer will currently return the first offer in getOffers
*/
@Deprecated
@ApiModelProperty(hidden=true)
public Offer getOffer() {
return offer;
return offers.get(0);
}

@ApiModelProperty(hidden=true)
public List<Offer> getOffers() {
return offers;
}

@ApiModelProperty(hidden=true)
Expand All @@ -57,11 +83,21 @@ public Optional<Long> getPortByIndex(int index) {
}
}

@JsonIgnore
public SlaveID getSlaveId() {
return offers.get(0).getSlaveId();
}

@JsonIgnore
public String getHostname() {
return offers.get(0).getHostname();
}

@Override
public String toString() {
return "SingularityTask{" +
"taskRequest=" + taskRequest +
", offer=" + MesosUtils.formatForLogging(offer) +
", offer=" + MesosUtils.formatForLogging(offers) +
", mesosTask=" + MesosUtils.formatForLogging(mesosTask) +
", rackId=" + rackId +
'}';
Expand Down
Expand Up @@ -470,7 +470,7 @@ public List<SingularityTask> getTasksOnSlave(Collection<SingularityTaskId> activ
for (SingularityTaskId activeTaskId : activeTaskIds) {
if (activeTaskId.getSanitizedHost().equals(sanitizedHost)) {
Optional<SingularityTask> maybeTask = getTask(activeTaskId);
if (maybeTask.isPresent() && slave.getId().equals(maybeTask.get().getOffer().getSlaveId().getValue())) {
if (maybeTask.isPresent() && slave.getId().equals(maybeTask.get().getSlaveId().getValue())) {
tasks.add(maybeTask.get());
}
}
Expand Down Expand Up @@ -906,7 +906,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws
}

saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(task.getTaskId(), now, ExtendedTaskState.TASK_LAUNCHED, Optional.of(msg), Optional.<String>absent()));
saveLastActiveTaskStatus(new SingularityTaskStatusHolder(task.getTaskId(), Optional.<TaskStatus>absent(), now, serverId, Optional.of(task.getOffer().getSlaveId().getValue())));
saveLastActiveTaskStatus(new SingularityTaskStatusHolder(task.getTaskId(), Optional.<TaskStatus>absent(), now, serverId, Optional.of(task.getSlaveId().getValue())));

try {
final String path = getTaskPath(task.getTaskId());
Expand Down
Expand Up @@ -46,7 +46,7 @@ public void applyMigration() {

taskStatus = Optional.of(TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.getId()))
.setSlaveId(task.get().getOffer().getSlaveId())
.setSlaveId(task.get().getSlaveId())
.setState(update.getTaskState().toTaskState().get())
.build());

Expand Down
Expand Up @@ -189,7 +189,7 @@ private List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String
final Optional<Long> maybeLoadBalancerPort = task.getPortByIndex(task.getTaskRequest().getDeploy().getLoadBalancerPortIndex().or(0));

if (maybeLoadBalancerPort.isPresent()) {
String upstream = String.format("%s:%d", task.getOffer().getHostname(), maybeLoadBalancerPort.get());
String upstream = String.format("%s:%d", task.getHostname(), maybeLoadBalancerPort.get());
Optional<String> group = loadBalancerUpstreamGroup;

if (taskLabelForLoadBalancerUpstreamGroup.isPresent()) {
Expand Down
Expand Up @@ -73,7 +73,7 @@ private Optional<MesosExecutorObject> findExecutor(SingularityTaskId taskId, Lis
private void loadDirectoryAndContainer(SingularityTask task) {
final long start = System.currentTimeMillis();

final String slaveUri = mesosClient.getSlaveUri(task.getOffer().getHostname());
final String slaveUri = mesosClient.getSlaveUri(task.getHostname());

LOG.info("Fetching slave data to find log directory and container id for task {} from uri {}", task.getTaskId(), slaveUri);

Expand Down