Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@ public enum PendingType {
private final PendingType pendingType;
private final Optional<String> user;
private final List<String> cmdLineArgsList;
private final Optional<String> runId;

public SingularityPendingRequest(String requestId, String deployId, long timestamp, PendingType pendingType) {
this(requestId, deployId, timestamp, Optional.<String> absent(), pendingType, Collections.<String> emptyList());
this(requestId, deployId, timestamp, Optional.<String> absent(), pendingType, Collections.<String> emptyList(), Optional.<String> absent());
}

public SingularityPendingRequest(String requestId, String deployId, long timestamp, Optional<String> user, PendingType pendingType) {
this(requestId, deployId, timestamp, user, pendingType, Collections.<String> emptyList(), Optional.<String> absent());
}

@JsonCreator
public SingularityPendingRequest(@JsonProperty("requestId") String requestId, @JsonProperty("deployId") String deployId, @JsonProperty("timestamp") long timestamp,
@JsonProperty("user") Optional<String> user, @JsonProperty("pendingType") PendingType pendingType, @JsonProperty("cmdLineArgsList") List<String> cmdLineArgsList) {
@JsonProperty("user") Optional<String> user, @JsonProperty("pendingType") PendingType pendingType, @JsonProperty("cmdLineArgsList") List<String> cmdLineArgsList,
@JsonProperty("runId") Optional<String> runId) {
this.requestId = requestId;
this.deployId = deployId;
this.timestamp = timestamp;
this.user = user;
this.cmdLineArgsList = cmdLineArgsList;
this.pendingType = pendingType;
this.runId = runId;
}

public Optional<String> getRunId() {
return runId;
}

public long getTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.base.Optional;

public class SingularityPendingRequestParent extends SingularityRequestParent {

private final SingularityPendingRequest pendingRequest;

public static SingularityPendingRequestParent fromSingularityRequestParent(SingularityRequestParent singularityRequestParent, SingularityPendingRequest pendingRequest) {
Expand All @@ -26,4 +27,5 @@ public String toString() {
return "SingularityRequestParent [request=" + getRequest() + ", state=" + getState() + ", requestDeployState=" + getRequestDeployState() + ", activeDeploy=" + getActiveDeploy() + ", pendingDeploy=" + getPendingDeploy() + ", pendingDeployState="
+ getPendingDeployState() + ", pendingRequest=" + pendingRequest + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class SingularityPendingTask {
private final SingularityPendingTaskId pendingTaskId;
private final List<String> cmdLineArgsList;
private final Optional<String> user;
private final Optional<String> runId;

public static Predicate<SingularityPendingTask> matchingRequest(final String requestId) {
return new Predicate<SingularityPendingTask>() {
Expand All @@ -40,10 +41,12 @@ public boolean apply(@Nonnull SingularityPendingTask input) {
}

@JsonCreator
public SingularityPendingTask(@JsonProperty("pendingTaskId") SingularityPendingTaskId pendingTaskId, @JsonProperty("cmdLineArgsList") List<String> cmdLineArgsList, @JsonProperty("user") Optional<String> user) {
public SingularityPendingTask(@JsonProperty("pendingTaskId") SingularityPendingTaskId pendingTaskId, @JsonProperty("cmdLineArgsList") List<String> cmdLineArgsList,
@JsonProperty("user") Optional<String> user, @JsonProperty("runId") Optional<String> runId) {
this.pendingTaskId = pendingTaskId;
this.user = user;
this.cmdLineArgsList = JavaUtils.nonNullImmutable(cmdLineArgsList);
this.runId = runId;
}

@Override
Expand Down Expand Up @@ -78,9 +81,13 @@ public List<String> getCmdLineArgsList() {
return cmdLineArgsList;
}

public Optional<String> getRunId() {
return runId;
}

@Override
public String toString() {
return "SingularityPendingTask [pendingTaskId=" + pendingTaskId + ", user=" + user + ", cmdLineArgsList=" + cmdLineArgsList + "]";
return "SingularityPendingTask [pendingTaskId=" + pendingTaskId + ", cmdLineArgsList=" + cmdLineArgsList + ", user=" + user + ", runId=" + runId + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public class SingularityTaskIdHistory implements Comparable<SingularityTaskIdHis
private final SingularityTaskId taskId;
private final long updatedAt;
private final Optional<ExtendedTaskState> lastTaskState;
private final Optional<String> runId;

public static SingularityTaskIdHistory fromTaskIdAndUpdates(SingularityTaskId taskId, List<SingularityTaskHistoryUpdate> updates) {
public static SingularityTaskIdHistory fromTaskIdAndTaskAndUpdates(SingularityTaskId taskId, SingularityTask task, List<SingularityTaskHistoryUpdate> updates) {
ExtendedTaskState lastTaskState = null;
long updatedAt = taskId.getStartedAt();

Expand All @@ -25,27 +26,29 @@ public static SingularityTaskIdHistory fromTaskIdAndUpdates(SingularityTaskId ta
updatedAt = lastUpdate.getTimestamp();
}

return new SingularityTaskIdHistory(taskId, updatedAt, Optional.fromNullable(lastTaskState));
return new SingularityTaskIdHistory(taskId, updatedAt, Optional.fromNullable(lastTaskState), task.getTaskRequest().getPendingTask().getRunId());
}

@JsonCreator
public SingularityTaskIdHistory(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("updatedAt") long updatedAt, @JsonProperty("lastStatus") Optional<ExtendedTaskState> lastTaskState) {
public SingularityTaskIdHistory(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("updatedAt") long updatedAt,
@JsonProperty("lastStatus") Optional<ExtendedTaskState> lastTaskState, @JsonProperty("runId") Optional<String> runId) {
this.taskId = taskId;
this.updatedAt = updatedAt;
this.lastTaskState = lastTaskState;
this.runId = runId;
}

@Override
public int compareTo(SingularityTaskIdHistory o) {
return ComparisonChain.start()
.compare(o.getUpdatedAt(), updatedAt)
.compare(taskId.getId(), o.getTaskId().getId())
.result();
.compare(o.getUpdatedAt(), updatedAt)
.compare(taskId.getId(), o.getTaskId().getId())
.result();
}

@Override
public int hashCode() {
return Objects.hashCode(taskId, updatedAt, lastTaskState);
return Objects.hashCode(taskId, updatedAt, lastTaskState, runId);
}

@Override
Expand All @@ -60,7 +63,8 @@ public boolean equals(Object other) {
SingularityTaskIdHistory that = (SingularityTaskIdHistory) other;
return Objects.equal(this.taskId , that.taskId)
&& Objects.equal(this.updatedAt , that.updatedAt)
&& Objects.equal(this.lastTaskState , that.lastTaskState);
&& Objects.equal(this.lastTaskState , that.lastTaskState)
&& Objects.equal(this.runId, that.runId);
}

public SingularityTaskId getTaskId() {
Expand All @@ -75,9 +79,13 @@ public long getUpdatedAt() {
return updatedAt;
}

public Optional<String> getRunId() {
return runId;
}

@Override
public String toString() {
return "SingularityTaskIdHistory [taskId=" + taskId + ", updatedAt=" + updatedAt + ", lastTaskState=" + lastTaskState + "]";
return "SingularityTaskIdHistory [taskId=" + taskId + ", updatedAt=" + updatedAt + ", lastTaskState=" + lastTaskState + ", runId=" + runId + "]";
}

}
3 changes: 2 additions & 1 deletion SingularityService/singularity_test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ CREATE TABLE taskHistory (
requestId VARCHAR(100) NOT NULL,
updatedAt TIMESTAMP NOT NULL DEFAULT '1971-01-01 00:00:01',
lastTaskStatus VARCHAR(25) NULL,
runId VARCHAR(100) NULL,
bytes BLOB NOT NULL,
);
);
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public interface HistoryJDBI {
@SqlUpdate("INSERT INTO deployHistory (requestId, deployId, createdAt, user, deployStateAt, deployState, bytes) VALUES (:requestId, :deployId, :createdAt, :user, :deployStateAt, :deployState, :bytes)")
void insertDeployHistory(@Bind("requestId") String requestId, @Bind("deployId") String deployId, @Bind("createdAt") Date createdAt, @Bind("user") String user, @Bind("deployStateAt") Date deployStateAt, @Bind("deployState") String deployState, @Bind("bytes") byte[] bytes);

@SqlUpdate("INSERT INTO taskHistory (requestId, taskId, bytes, updatedAt, lastTaskStatus) VALUES (:requestId, :taskId, :bytes, :updatedAt, :lastTaskStatus)")
void insertTaskHistory(@Bind("requestId") String requestId, @Bind("taskId") String taskId, @Bind("bytes") byte[] bytes, @Bind("updatedAt") Date updatedAt, @Bind("lastTaskStatus") String lastTaskStatus);
@SqlUpdate("INSERT INTO taskHistory (requestId, taskId, bytes, updatedAt, lastTaskStatus, runId) VALUES (:requestId, :taskId, :bytes, :updatedAt, :lastTaskStatus, :runId)")
void insertTaskHistory(@Bind("requestId") String requestId, @Bind("taskId") String taskId, @Bind("bytes") byte[] bytes, @Bind("updatedAt") Date updatedAt, @Bind("lastTaskStatus") String lastTaskStatus, @Bind("runId") String runId);

@SqlQuery("SELECT bytes FROM taskHistory WHERE taskId = :taskId")
byte[] getTaskHistoryForTask(@Bind("taskId") String taskId);
Expand All @@ -35,7 +35,7 @@ public interface HistoryJDBI {
@SqlQuery("SELECT requestId, deployId, createdAt, user, deployStateAt, deployState FROM deployHistory WHERE requestId = :requestId ORDER BY createdAt DESC LIMIT :limitStart, :limitCount")
List<SingularityDeployHistory> getDeployHistoryForRequest(@Bind("requestId") String requestId, @Bind("limitStart") Integer limitStart, @Bind("limitCount") Integer limitCount);

@SqlQuery("SELECT taskId, requestId, updatedAt, lastTaskStatus FROM taskHistory WHERE requestId = :requestId ORDER BY updatedAt DESC LIMIT :limitStart, :limitCount")
@SqlQuery("SELECT taskId, requestId, updatedAt, lastTaskStatus, runId FROM taskHistory WHERE requestId = :requestId ORDER BY updatedAt DESC LIMIT :limitStart, :limitCount")
List<SingularityTaskIdHistory> getTaskHistoryForRequest(@Bind("requestId") String requestId, @Bind("limitStart") Integer limitStart, @Bind("limitCount") Integer limitCount);

@SqlQuery("SELECT request, createdAt, requestState, user FROM requestHistory WHERE requestId = :requestId ORDER BY createdAt <orderDirection> LIMIT :limitStart, :limitCount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ public void saveTaskHistory(SingularityTaskHistory taskHistory) {
return;
}

SingularityTaskIdHistory taskIdHistory = SingularityTaskIdHistory.fromTaskIdAndUpdates(taskHistory.getTask().getTaskId(), taskHistory.getTaskUpdates());
SingularityTaskIdHistory taskIdHistory = SingularityTaskIdHistory.fromTaskIdAndTaskAndUpdates(taskHistory.getTask().getTaskId(), taskHistory.getTask(), taskHistory.getTaskUpdates());

String lastTaskStatus = null;
if (taskIdHistory.getLastTaskState().isPresent()) {
lastTaskStatus = taskIdHistory.getLastTaskState().get().name();
}

history.insertTaskHistory(taskIdHistory.getTaskId().getRequestId(), taskIdHistory.getTaskId().getId(), taskHistoryTranscoder.toBytes(taskHistory), new Date(taskIdHistory.getUpdatedAt()),
lastTaskStatus);
lastTaskStatus, taskHistory.getTask().getTaskRequest().getPendingTask().getRunId().orNull());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public SingularityTaskIdHistory map(int index, ResultSet r, StatementContext ctx
}
}

return new SingularityTaskIdHistory(taskId, r.getTimestamp("updatedAt").getTime(), lastTaskState);
return new SingularityTaskIdHistory(taskId, r.getTimestamp("updatedAt").getTime(), lastTaskState, Optional.fromNullable(r.getString("runId")));
} catch (SingularityTranscoderException e) {
throw new ResultSetException("Could not deserialize database result", e, ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public TaskHistoryHelper(TaskManager taskManager, HistoryManager historyManager)
}

public List<SingularityTaskIdHistory> getHistoriesFor(Collection<SingularityTaskId> taskIds) {
Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> map = taskManager.getTaskHistoryUpdates(taskIds);
Map<SingularityTaskId, SingularityTask> tasks = taskManager.getTasks(taskIds);
Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> history = taskManager.getTaskHistoryUpdates(taskIds);

List<SingularityTaskIdHistory> histories = Lists.newArrayListWithCapacity(taskIds.size());

for (SingularityTaskId taskId : taskIds) {
List<SingularityTaskHistoryUpdate> historyUpdates = map.get(taskId);
List<SingularityTaskHistoryUpdate> historyUpdates = history.get(taskId);
SingularityTask task = tasks.get(taskId);

histories.add(SingularityTaskIdHistory.fromTaskIdAndUpdates(taskId, historyUpdates));
histories.add(SingularityTaskIdHistory.fromTaskIdAndTaskAndUpdates(taskId, task, historyUpdates));
}

Collections.sort(histories);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private void checkPendingRequests() {
try {
for (String pendingRequest : curator.getChildren().forPath(REQUEST_PENDING_PATH)) {
SingularityPendingRequestPrevious previous = objectMapper.readValue(curator.getData().forPath(ZKPaths.makePath(REQUEST_PENDING_PATH, pendingRequest)), SingularityPendingRequestPrevious.class);
SingularityPendingRequest newRequest = new SingularityPendingRequest(previous.requestId, previous.deployId, previous.timestamp, previous.user, previous.pendingType, getCmdLineArgs(previous.cmdLineArgs));
SingularityPendingRequest newRequest = new SingularityPendingRequest(previous.requestId, previous.deployId, previous.timestamp, previous.user, previous.pendingType,
getCmdLineArgs(previous.cmdLineArgs), Optional.<String> absent());

LOG.info("Re-saving {}", newRequest);

Expand All @@ -140,7 +141,7 @@ private void checkPendingTasks() {
for (SingularityPendingTaskId pendingTaskId : taskManager.getPendingTaskIds()) {
Optional<String> cmdLineArgs = getCmdLineArgs(pendingTaskId);

SingularityCreateResult result = taskManager.savePendingTask(new SingularityPendingTask(pendingTaskId, getCmdLineArgs(cmdLineArgs), Optional.<String> absent()));
SingularityCreateResult result = taskManager.savePendingTask(new SingularityPendingTask(pendingTaskId, getCmdLineArgs(cmdLineArgs), Optional.<String> absent(), Optional.<String> absent()));

LOG.info("Saving {} ({}) {}", pendingTaskId, cmdLineArgs, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void applyMigration() {

Optional<String> cmdLineArgs = getCmdLineArgs(pendingTaskId);

taskManager.savePendingTask(new SingularityPendingTask(newPendingTaskId, cmdLineArgs.isPresent() ? Collections.singletonList(cmdLineArgs.get()) : Collections.<String> emptyList(), Optional.<String> absent()));
taskManager.savePendingTask(new SingularityPendingTask(newPendingTaskId, cmdLineArgs.isPresent() ? Collections.singletonList(cmdLineArgs.get()) :
Collections.<String> emptyList(), Optional.<String> absent(), Optional.<String> absent()));

curator.delete().forPath(ZKPaths.makePath(PENDING_TASKS_ROOT, pendingTaskId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public class SingularityDriver {
this.scheduler = scheduler;

if (configuration.isUseNativeCode()) {
this.driver = new MesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster());
this.driver = new MesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster());
} else {
this.driver = new JesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster());
this.driver = new JesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import static com.hubspot.singularity.WebExceptions.checkConflict;
import static com.hubspot.singularity.WebExceptions.checkNotNullBadRequest;

import java.util.Collections;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -108,7 +106,7 @@ public SingularityRequestParent deploy(@ApiParam(required=true) SingularityDeplo
}

if (request.isDeployable()) {
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, deployMarker.getDeployId(), now, deployUser, PendingType.NEW_DEPLOY, Collections.<String> emptyList()));
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, deployMarker.getDeployId(), now, deployUser, PendingType.NEW_DEPLOY));
}

return fillEntireRequest(requestWithState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import static com.hubspot.singularity.WebExceptions.checkConflict;
import static com.hubspot.singularity.WebExceptions.checkNotNullBadRequest;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -208,8 +208,9 @@ public SingularityRequestParent bounce(@ApiParam("The request ID to bounce") @Pa
@ApiResponses({
@ApiResponse(code=400, message="Singularity Request is not scheduled or one-off"),
})
public SingularityRequestParent scheduleImmediately(@ApiParam("The request ID to run") @PathParam("requestId") String requestId,
public SingularityPendingRequestParent scheduleImmediately(@ApiParam("The request ID to run") @PathParam("requestId") String requestId,
@ApiParam("Username of the person requesting the execution") @QueryParam("user") Optional<String> queryUser,
@ApiParam("Run ID to associate with this task. If not specified, one will be generated") @QueryParam("runId") Optional<String> runId,
@ApiParam("Additional command line arguments to append to the task") List<String> commandLineArgs) {
SingularityRequestWithState requestWithState = fetchRequestWithState(requestId);

Expand All @@ -228,7 +229,11 @@ public SingularityRequestParent scheduleImmediately(@ApiParam("The request ID to
throw badRequest("Can not request an immediate run of a non-scheduled / always running request (%s)", requestWithState.getRequest());
}

final SingularityPendingRequest pendingRequest = new SingularityPendingRequest(requestId, getAndCheckDeployId(requestId), System.currentTimeMillis(), queryUser, pendingType, commandLineArgs);
if (!runId.isPresent()) {
runId = Optional.of(UUID.randomUUID().toString());
}

final SingularityPendingRequest pendingRequest = new SingularityPendingRequest(requestId, getAndCheckDeployId(requestId), System.currentTimeMillis(), queryUser, pendingType, commandLineArgs, runId);

SingularityCreateResult result = requestManager.addToPendingQueue(pendingRequest);

Expand Down Expand Up @@ -295,7 +300,7 @@ public SingularityRequestParent unpause(@ApiParam("The request ID to unpause") @
requestManager.unpause(requestWithState.getRequest(), now, queryUser);

if (maybeDeployId.isPresent() && !requestWithState.getRequest().isOneOff()) {
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, queryUser, PendingType.UNPAUSED, Collections.<String> emptyList()));
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, queryUser, PendingType.UNPAUSED));
}

return fillEntireRequest(new SingularityRequestWithState(requestWithState.getRequest(), RequestState.ACTIVE, now));
Expand All @@ -315,7 +320,7 @@ public SingularityRequestParent exitCooldown(@PathParam("requestId") String requ
requestManager.exitCooldown(requestWithState.getRequest(), now, user);

if (maybeDeployId.isPresent() && !requestWithState.getRequest().isOneOff()) {
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, user, PendingType.IMMEDIATE, Collections.<String>emptyList()));
requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, user, PendingType.IMMEDIATE));
}

return fillEntireRequest(requestWithState);
Expand Down
Loading