From 1b6a12a3994f5aeb127c1f464f5e2c16a3424e03 Mon Sep 17 00:00:00 2001 From: Whitney Sorenson Date: Tue, 25 Aug 2015 17:31:51 -0400 Subject: [PATCH] add an optional runId to attach to running tasks over the API --- .../SingularityPendingRequest.java | 15 +++++++- .../SingularityPendingRequestParent.java | 2 + .../singularity/SingularityPendingTask.java | 11 +++++- .../singularity/SingularityTaskIdHistory.java | 26 ++++++++----- SingularityService/singularity_test.sql | 3 +- .../singularity/data/history/HistoryJDBI.java | 6 +-- .../data/history/JDBIHistoryManager.java | 4 +- .../data/history/SingularityMappers.java | 2 +- .../data/history/TaskHistoryHelper.java | 8 ++-- .../SingularityCmdLineArgsMigration.java | 5 ++- .../SingularityPendingTaskIdMigration.java | 3 +- .../singularity/mesos/SingularityDriver.java | 4 +- .../singularity/resources/DeployResource.java | 4 +- .../resources/RequestResource.java | 15 +++++--- .../scheduler/SingularityScheduler.java | 2 +- .../SingularityHistoryPurgerTest.java | 37 +++++++++++++++++++ .../SingularitySchedulerTestBase.java | 4 +- .../SingularityMesosTaskBuilderTest.java | 7 +--- .../mesos/SingularityStartupTest.java | 2 +- .../scheduler/SingularitySchedulerTest.java | 20 +++++----- mysql/migrations.sql | 4 ++ 21 files changed, 129 insertions(+), 55 deletions(-) diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequest.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequest.java index 09ab160368..2f5c8c0c88 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequest.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequest.java @@ -19,20 +19,31 @@ public enum PendingType { private final PendingType pendingType; private final Optional user; private final List cmdLineArgsList; + private final Optional runId; public SingularityPendingRequest(String requestId, String deployId, long timestamp, PendingType pendingType) { - this(requestId, deployId, timestamp, Optional. absent(), pendingType, Collections. emptyList()); + this(requestId, deployId, timestamp, Optional. absent(), pendingType, Collections. emptyList(), Optional. absent()); + } + + public SingularityPendingRequest(String requestId, String deployId, long timestamp, Optional user, PendingType pendingType) { + this(requestId, deployId, timestamp, user, pendingType, Collections. emptyList(), Optional. absent()); } @JsonCreator public SingularityPendingRequest(@JsonProperty("requestId") String requestId, @JsonProperty("deployId") String deployId, @JsonProperty("timestamp") long timestamp, - @JsonProperty("user") Optional user, @JsonProperty("pendingType") PendingType pendingType, @JsonProperty("cmdLineArgsList") List cmdLineArgsList) { + @JsonProperty("user") Optional user, @JsonProperty("pendingType") PendingType pendingType, @JsonProperty("cmdLineArgsList") List cmdLineArgsList, + @JsonProperty("runId") Optional runId) { this.requestId = requestId; this.deployId = deployId; this.timestamp = timestamp; this.user = user; this.cmdLineArgsList = cmdLineArgsList; this.pendingType = pendingType; + this.runId = runId; + } + + public Optional getRunId() { + return runId; } public long getTimestamp() { diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequestParent.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequestParent.java index 207eb17465..1a4691d212 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequestParent.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingRequestParent.java @@ -5,6 +5,7 @@ import com.google.common.base.Optional; public class SingularityPendingRequestParent extends SingularityRequestParent { + private final SingularityPendingRequest pendingRequest; public static SingularityPendingRequestParent fromSingularityRequestParent(SingularityRequestParent singularityRequestParent, SingularityPendingRequest pendingRequest) { @@ -26,4 +27,5 @@ public String toString() { return "SingularityRequestParent [request=" + getRequest() + ", state=" + getState() + ", requestDeployState=" + getRequestDeployState() + ", activeDeploy=" + getActiveDeploy() + ", pendingDeploy=" + getPendingDeploy() + ", pendingDeployState=" + getPendingDeployState() + ", pendingRequest=" + pendingRequest + "]"; } + } diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingTask.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingTask.java index 158de16160..cfa75479b4 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingTask.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityPendingTask.java @@ -16,6 +16,7 @@ public class SingularityPendingTask { private final SingularityPendingTaskId pendingTaskId; private final List cmdLineArgsList; private final Optional user; + private final Optional runId; public static Predicate matchingRequest(final String requestId) { return new Predicate() { @@ -40,10 +41,12 @@ public boolean apply(@Nonnull SingularityPendingTask input) { } @JsonCreator - public SingularityPendingTask(@JsonProperty("pendingTaskId") SingularityPendingTaskId pendingTaskId, @JsonProperty("cmdLineArgsList") List cmdLineArgsList, @JsonProperty("user") Optional user) { + public SingularityPendingTask(@JsonProperty("pendingTaskId") SingularityPendingTaskId pendingTaskId, @JsonProperty("cmdLineArgsList") List cmdLineArgsList, + @JsonProperty("user") Optional user, @JsonProperty("runId") Optional runId) { this.pendingTaskId = pendingTaskId; this.user = user; this.cmdLineArgsList = JavaUtils.nonNullImmutable(cmdLineArgsList); + this.runId = runId; } @Override @@ -78,9 +81,13 @@ public List getCmdLineArgsList() { return cmdLineArgsList; } + public Optional getRunId() { + return runId; + } + @Override public String toString() { - return "SingularityPendingTask [pendingTaskId=" + pendingTaskId + ", user=" + user + ", cmdLineArgsList=" + cmdLineArgsList + "]"; + return "SingularityPendingTask [pendingTaskId=" + pendingTaskId + ", cmdLineArgsList=" + cmdLineArgsList + ", user=" + user + ", runId=" + runId + "]"; } } diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskIdHistory.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskIdHistory.java index 29277addf8..2efbf6dd75 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskIdHistory.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskIdHistory.java @@ -14,8 +14,9 @@ public class SingularityTaskIdHistory implements Comparable lastTaskState; + private final Optional runId; - public static SingularityTaskIdHistory fromTaskIdAndUpdates(SingularityTaskId taskId, List updates) { + public static SingularityTaskIdHistory fromTaskIdAndTaskAndUpdates(SingularityTaskId taskId, SingularityTask task, List updates) { ExtendedTaskState lastTaskState = null; long updatedAt = taskId.getStartedAt(); @@ -25,27 +26,29 @@ public static SingularityTaskIdHistory fromTaskIdAndUpdates(SingularityTaskId ta updatedAt = lastUpdate.getTimestamp(); } - return new SingularityTaskIdHistory(taskId, updatedAt, Optional.fromNullable(lastTaskState)); + return new SingularityTaskIdHistory(taskId, updatedAt, Optional.fromNullable(lastTaskState), task.getTaskRequest().getPendingTask().getRunId()); } @JsonCreator - public SingularityTaskIdHistory(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("updatedAt") long updatedAt, @JsonProperty("lastStatus") Optional lastTaskState) { + public SingularityTaskIdHistory(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("updatedAt") long updatedAt, + @JsonProperty("lastStatus") Optional lastTaskState, @JsonProperty("runId") Optional runId) { this.taskId = taskId; this.updatedAt = updatedAt; this.lastTaskState = lastTaskState; + this.runId = runId; } @Override public int compareTo(SingularityTaskIdHistory o) { return ComparisonChain.start() - .compare(o.getUpdatedAt(), updatedAt) - .compare(taskId.getId(), o.getTaskId().getId()) - .result(); + .compare(o.getUpdatedAt(), updatedAt) + .compare(taskId.getId(), o.getTaskId().getId()) + .result(); } @Override public int hashCode() { - return Objects.hashCode(taskId, updatedAt, lastTaskState); + return Objects.hashCode(taskId, updatedAt, lastTaskState, runId); } @Override @@ -60,7 +63,8 @@ public boolean equals(Object other) { SingularityTaskIdHistory that = (SingularityTaskIdHistory) other; return Objects.equal(this.taskId , that.taskId) && Objects.equal(this.updatedAt , that.updatedAt) - && Objects.equal(this.lastTaskState , that.lastTaskState); + && Objects.equal(this.lastTaskState , that.lastTaskState) + && Objects.equal(this.runId, that.runId); } public SingularityTaskId getTaskId() { @@ -75,9 +79,13 @@ public long getUpdatedAt() { return updatedAt; } + public Optional getRunId() { + return runId; + } + @Override public String toString() { - return "SingularityTaskIdHistory [taskId=" + taskId + ", updatedAt=" + updatedAt + ", lastTaskState=" + lastTaskState + "]"; + return "SingularityTaskIdHistory [taskId=" + taskId + ", updatedAt=" + updatedAt + ", lastTaskState=" + lastTaskState + ", runId=" + runId + "]"; } } diff --git a/SingularityService/singularity_test.sql b/SingularityService/singularity_test.sql index e5f6f06303..5bb6f8fdb4 100644 --- a/SingularityService/singularity_test.sql +++ b/SingularityService/singularity_test.sql @@ -23,5 +23,6 @@ CREATE TABLE taskHistory ( requestId VARCHAR(100) NOT NULL, updatedAt TIMESTAMP NOT NULL DEFAULT '1971-01-01 00:00:01', lastTaskStatus VARCHAR(25) NULL, + runId VARCHAR(100) NULL, bytes BLOB NOT NULL, -); \ No newline at end of file +); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/HistoryJDBI.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/HistoryJDBI.java index bd8de84f59..648774e538 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/HistoryJDBI.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/HistoryJDBI.java @@ -23,8 +23,8 @@ public interface HistoryJDBI { @SqlUpdate("INSERT INTO deployHistory (requestId, deployId, createdAt, user, deployStateAt, deployState, bytes) VALUES (:requestId, :deployId, :createdAt, :user, :deployStateAt, :deployState, :bytes)") void insertDeployHistory(@Bind("requestId") String requestId, @Bind("deployId") String deployId, @Bind("createdAt") Date createdAt, @Bind("user") String user, @Bind("deployStateAt") Date deployStateAt, @Bind("deployState") String deployState, @Bind("bytes") byte[] bytes); - @SqlUpdate("INSERT INTO taskHistory (requestId, taskId, bytes, updatedAt, lastTaskStatus) VALUES (:requestId, :taskId, :bytes, :updatedAt, :lastTaskStatus)") - void insertTaskHistory(@Bind("requestId") String requestId, @Bind("taskId") String taskId, @Bind("bytes") byte[] bytes, @Bind("updatedAt") Date updatedAt, @Bind("lastTaskStatus") String lastTaskStatus); + @SqlUpdate("INSERT INTO taskHistory (requestId, taskId, bytes, updatedAt, lastTaskStatus, runId) VALUES (:requestId, :taskId, :bytes, :updatedAt, :lastTaskStatus, :runId)") + void insertTaskHistory(@Bind("requestId") String requestId, @Bind("taskId") String taskId, @Bind("bytes") byte[] bytes, @Bind("updatedAt") Date updatedAt, @Bind("lastTaskStatus") String lastTaskStatus, @Bind("runId") String runId); @SqlQuery("SELECT bytes FROM taskHistory WHERE taskId = :taskId") byte[] getTaskHistoryForTask(@Bind("taskId") String taskId); @@ -35,7 +35,7 @@ public interface HistoryJDBI { @SqlQuery("SELECT requestId, deployId, createdAt, user, deployStateAt, deployState FROM deployHistory WHERE requestId = :requestId ORDER BY createdAt DESC LIMIT :limitStart, :limitCount") List getDeployHistoryForRequest(@Bind("requestId") String requestId, @Bind("limitStart") Integer limitStart, @Bind("limitCount") Integer limitCount); - @SqlQuery("SELECT taskId, requestId, updatedAt, lastTaskStatus FROM taskHistory WHERE requestId = :requestId ORDER BY updatedAt DESC LIMIT :limitStart, :limitCount") + @SqlQuery("SELECT taskId, requestId, updatedAt, lastTaskStatus, runId FROM taskHistory WHERE requestId = :requestId ORDER BY updatedAt DESC LIMIT :limitStart, :limitCount") List getTaskHistoryForRequest(@Bind("requestId") String requestId, @Bind("limitStart") Integer limitStart, @Bind("limitCount") Integer limitCount); @SqlQuery("SELECT request, createdAt, requestState, user FROM requestHistory WHERE requestId = :requestId ORDER BY createdAt LIMIT :limitStart, :limitCount") diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/JDBIHistoryManager.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/JDBIHistoryManager.java index d5a6a0cafa..5ff1ccd4b3 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/JDBIHistoryManager.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/JDBIHistoryManager.java @@ -93,7 +93,7 @@ public void saveTaskHistory(SingularityTaskHistory taskHistory) { return; } - SingularityTaskIdHistory taskIdHistory = SingularityTaskIdHistory.fromTaskIdAndUpdates(taskHistory.getTask().getTaskId(), taskHistory.getTaskUpdates()); + SingularityTaskIdHistory taskIdHistory = SingularityTaskIdHistory.fromTaskIdAndTaskAndUpdates(taskHistory.getTask().getTaskId(), taskHistory.getTask(), taskHistory.getTaskUpdates()); String lastTaskStatus = null; if (taskIdHistory.getLastTaskState().isPresent()) { @@ -101,7 +101,7 @@ public void saveTaskHistory(SingularityTaskHistory taskHistory) { } history.insertTaskHistory(taskIdHistory.getTaskId().getRequestId(), taskIdHistory.getTaskId().getId(), taskHistoryTranscoder.toBytes(taskHistory), new Date(taskIdHistory.getUpdatedAt()), - lastTaskStatus); + lastTaskStatus, taskHistory.getTask().getTaskRequest().getPendingTask().getRunId().orNull()); } @Override diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityMappers.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityMappers.java index 0ba33706aa..64c6b396d6 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityMappers.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityMappers.java @@ -115,7 +115,7 @@ public SingularityTaskIdHistory map(int index, ResultSet r, StatementContext ctx } } - return new SingularityTaskIdHistory(taskId, r.getTimestamp("updatedAt").getTime(), lastTaskState); + return new SingularityTaskIdHistory(taskId, r.getTimestamp("updatedAt").getTime(), lastTaskState, Optional.fromNullable(r.getString("runId"))); } catch (SingularityTranscoderException e) { throw new ResultSetException("Could not deserialize database result", e, ctx); } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/TaskHistoryHelper.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/TaskHistoryHelper.java index 970cbea634..de93ba22df 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/TaskHistoryHelper.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/TaskHistoryHelper.java @@ -29,14 +29,16 @@ public TaskHistoryHelper(TaskManager taskManager, HistoryManager historyManager) } public List getHistoriesFor(Collection taskIds) { - Map> map = taskManager.getTaskHistoryUpdates(taskIds); + Map tasks = taskManager.getTasks(taskIds); + Map> history = taskManager.getTaskHistoryUpdates(taskIds); List histories = Lists.newArrayListWithCapacity(taskIds.size()); for (SingularityTaskId taskId : taskIds) { - List historyUpdates = map.get(taskId); + List historyUpdates = history.get(taskId); + SingularityTask task = tasks.get(taskId); - histories.add(SingularityTaskIdHistory.fromTaskIdAndUpdates(taskId, historyUpdates)); + histories.add(SingularityTaskIdHistory.fromTaskIdAndTaskAndUpdates(taskId, task, historyUpdates)); } Collections.sort(histories); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityCmdLineArgsMigration.java b/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityCmdLineArgsMigration.java index c7c6546775..a75e336a36 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityCmdLineArgsMigration.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityCmdLineArgsMigration.java @@ -114,7 +114,8 @@ private void checkPendingRequests() { try { for (String pendingRequest : curator.getChildren().forPath(REQUEST_PENDING_PATH)) { SingularityPendingRequestPrevious previous = objectMapper.readValue(curator.getData().forPath(ZKPaths.makePath(REQUEST_PENDING_PATH, pendingRequest)), SingularityPendingRequestPrevious.class); - SingularityPendingRequest newRequest = new SingularityPendingRequest(previous.requestId, previous.deployId, previous.timestamp, previous.user, previous.pendingType, getCmdLineArgs(previous.cmdLineArgs)); + SingularityPendingRequest newRequest = new SingularityPendingRequest(previous.requestId, previous.deployId, previous.timestamp, previous.user, previous.pendingType, + getCmdLineArgs(previous.cmdLineArgs), Optional. absent()); LOG.info("Re-saving {}", newRequest); @@ -140,7 +141,7 @@ private void checkPendingTasks() { for (SingularityPendingTaskId pendingTaskId : taskManager.getPendingTaskIds()) { Optional cmdLineArgs = getCmdLineArgs(pendingTaskId); - SingularityCreateResult result = taskManager.savePendingTask(new SingularityPendingTask(pendingTaskId, getCmdLineArgs(cmdLineArgs), Optional. absent())); + SingularityCreateResult result = taskManager.savePendingTask(new SingularityPendingTask(pendingTaskId, getCmdLineArgs(cmdLineArgs), Optional. absent(), Optional. absent())); LOG.info("Saving {} ({}) {}", pendingTaskId, cmdLineArgs, result); } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityPendingTaskIdMigration.java b/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityPendingTaskIdMigration.java index 57d59fa9d4..89ccb68586 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityPendingTaskIdMigration.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/zkmigrations/SingularityPendingTaskIdMigration.java @@ -57,7 +57,8 @@ public void applyMigration() { Optional cmdLineArgs = getCmdLineArgs(pendingTaskId); - taskManager.savePendingTask(new SingularityPendingTask(newPendingTaskId, cmdLineArgs.isPresent() ? Collections.singletonList(cmdLineArgs.get()) : Collections. emptyList(), Optional. absent())); + taskManager.savePendingTask(new SingularityPendingTask(newPendingTaskId, cmdLineArgs.isPresent() ? Collections.singletonList(cmdLineArgs.get()) : + Collections. emptyList(), Optional. absent(), Optional. absent())); curator.delete().forPath(ZKPaths.makePath(PENDING_TASKS_ROOT, pendingTaskId)); } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityDriver.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityDriver.java index 1cbf141a9d..26ccc901f9 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityDriver.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityDriver.java @@ -64,9 +64,9 @@ public class SingularityDriver { this.scheduler = scheduler; if (configuration.isUseNativeCode()) { - this.driver = new MesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster()); + this.driver = new MesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster()); } else { - this.driver = new JesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster()); + this.driver = new JesosSchedulerDriver(scheduler, frameworkInfo, configuration.getMaster()); } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/resources/DeployResource.java b/SingularityService/src/main/java/com/hubspot/singularity/resources/DeployResource.java index 83a8130a78..226c19eb46 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/resources/DeployResource.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/resources/DeployResource.java @@ -4,8 +4,6 @@ import static com.hubspot.singularity.WebExceptions.checkConflict; import static com.hubspot.singularity.WebExceptions.checkNotNullBadRequest; -import java.util.Collections; - import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -108,7 +106,7 @@ public SingularityRequestParent deploy(@ApiParam(required=true) SingularityDeplo } if (request.isDeployable()) { - requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, deployMarker.getDeployId(), now, deployUser, PendingType.NEW_DEPLOY, Collections. emptyList())); + requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, deployMarker.getDeployId(), now, deployUser, PendingType.NEW_DEPLOY)); } return fillEntireRequest(requestWithState); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/resources/RequestResource.java b/SingularityService/src/main/java/com/hubspot/singularity/resources/RequestResource.java index 703e2c810d..a361b8c919 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/resources/RequestResource.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/resources/RequestResource.java @@ -5,9 +5,9 @@ import static com.hubspot.singularity.WebExceptions.checkConflict; import static com.hubspot.singularity.WebExceptions.checkNotNullBadRequest; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -208,8 +208,9 @@ public SingularityRequestParent bounce(@ApiParam("The request ID to bounce") @Pa @ApiResponses({ @ApiResponse(code=400, message="Singularity Request is not scheduled or one-off"), }) - public SingularityRequestParent scheduleImmediately(@ApiParam("The request ID to run") @PathParam("requestId") String requestId, + public SingularityPendingRequestParent scheduleImmediately(@ApiParam("The request ID to run") @PathParam("requestId") String requestId, @ApiParam("Username of the person requesting the execution") @QueryParam("user") Optional queryUser, + @ApiParam("Run ID to associate with this task. If not specified, one will be generated") @QueryParam("runId") Optional runId, @ApiParam("Additional command line arguments to append to the task") List commandLineArgs) { SingularityRequestWithState requestWithState = fetchRequestWithState(requestId); @@ -228,7 +229,11 @@ public SingularityRequestParent scheduleImmediately(@ApiParam("The request ID to throw badRequest("Can not request an immediate run of a non-scheduled / always running request (%s)", requestWithState.getRequest()); } - final SingularityPendingRequest pendingRequest = new SingularityPendingRequest(requestId, getAndCheckDeployId(requestId), System.currentTimeMillis(), queryUser, pendingType, commandLineArgs); + if (!runId.isPresent()) { + runId = Optional.of(UUID.randomUUID().toString()); + } + + final SingularityPendingRequest pendingRequest = new SingularityPendingRequest(requestId, getAndCheckDeployId(requestId), System.currentTimeMillis(), queryUser, pendingType, commandLineArgs, runId); SingularityCreateResult result = requestManager.addToPendingQueue(pendingRequest); @@ -295,7 +300,7 @@ public SingularityRequestParent unpause(@ApiParam("The request ID to unpause") @ requestManager.unpause(requestWithState.getRequest(), now, queryUser); if (maybeDeployId.isPresent() && !requestWithState.getRequest().isOneOff()) { - requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, queryUser, PendingType.UNPAUSED, Collections. emptyList())); + requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, queryUser, PendingType.UNPAUSED)); } return fillEntireRequest(new SingularityRequestWithState(requestWithState.getRequest(), RequestState.ACTIVE, now)); @@ -315,7 +320,7 @@ public SingularityRequestParent exitCooldown(@PathParam("requestId") String requ requestManager.exitCooldown(requestWithState.getRequest(), now, user); if (maybeDeployId.isPresent() && !requestWithState.getRequest().isOneOff()) { - requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, user, PendingType.IMMEDIATE, Collections.emptyList())); + requestManager.addToPendingQueue(new SingularityPendingRequest(requestId, maybeDeployId.get(), now, user, PendingType.IMMEDIATE)); } return fillEntireRequest(requestWithState); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java index ebf81b6bfb..c64df9e76a 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityScheduler.java @@ -589,7 +589,7 @@ private List getScheduledTaskIds(int numMissingInstances } newTasks.add(new SingularityPendingTask(new SingularityPendingTaskId(request.getId(), deployId, nextRunAt.get(), nextInstanceNumber, pendingRequest.getPendingType(), pendingRequest.getTimestamp()), - pendingRequest.getCmdLineArgsList(), pendingRequest.getUser())); + pendingRequest.getCmdLineArgsList(), pendingRequest.getUser(), pendingRequest.getRunId())); nextInstanceNumber++; } diff --git a/SingularityService/src/test/java/com/hubspot/singularity/SingularityHistoryPurgerTest.java b/SingularityService/src/test/java/com/hubspot/singularity/SingularityHistoryPurgerTest.java index 2579067979..bc1556bac9 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/SingularityHistoryPurgerTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/SingularityHistoryPurgerTest.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.mesos.Protos.TaskState; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -17,6 +18,7 @@ import com.hubspot.singularity.config.HistoryPurgingConfiguration; import com.hubspot.singularity.data.history.HistoryManager; import com.hubspot.singularity.data.history.SingularityHistoryPurger; +import com.hubspot.singularity.data.history.SingularityTaskHistoryPersister; import liquibase.Liquibase; import liquibase.database.Database; @@ -32,6 +34,9 @@ public class SingularityHistoryPurgerTest extends SingularitySchedulerTestBase { @Inject protected HistoryManager historyManager; + @Inject + protected SingularityTaskHistoryPersister taskHistoryPersister; + public SingularityHistoryPurgerTest() { super(true); } @@ -151,4 +156,36 @@ public void historyPurgerTest() { Assert.assertEquals(1, historyManager.getTaskHistoryForRequest(requestId, 0, 10).size()); } + @Test + public void testRunId() { + initScheduledRequest(); + initFirstDeploy(); + + String runId = "my-run-id"; + String user = "my-user"; + + SingularityPendingRequestParent parent = requestResource.scheduleImmediately(requestId, Optional.of(user), Optional.of(runId), Collections. emptyList()); + + Assert.assertEquals(runId, parent.getPendingRequest().getRunId().get()); + Assert.assertEquals(user, parent.getPendingRequest().getUser().get()); + + resourceOffers(); + + Assert.assertEquals(runId, taskManager.getActiveTasks().get(0).getTaskRequest().getPendingTask().getRunId().get()); + Assert.assertEquals(user, taskManager.getActiveTasks().get(0).getTaskRequest().getPendingTask().getUser().get()); + + SingularityTaskId taskId = taskManager.getActiveTaskIds().get(0); + + statusUpdate(taskManager.getTask(taskId).get(), TaskState.TASK_FINISHED); + + taskHistoryPersister.runActionOnPoll(); + + Assert.assertEquals(runId, historyManager.getTaskHistory(taskId.getId()).get().getTask().getTaskRequest().getPendingTask().getRunId().get()); + Assert.assertEquals(runId, historyManager.getTaskHistoryForRequest(requestId, 0, 10).get(0).getRunId().get()); + + parent = requestResource.scheduleImmediately(requestId, Optional. absent(), Optional. absent(), Collections. emptyList()); + + Assert.assertTrue(parent.getPendingRequest().getRunId().isPresent()); + } + } diff --git a/SingularityService/src/test/java/com/hubspot/singularity/SingularitySchedulerTestBase.java b/SingularityService/src/test/java/com/hubspot/singularity/SingularitySchedulerTestBase.java index 02fc0dc79e..28243d0fe9 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/SingularitySchedulerTestBase.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/SingularitySchedulerTestBase.java @@ -184,7 +184,7 @@ protected SingularityTask launchTask(SingularityRequest request, SingularityDepl protected SingularityPendingTask buildPendingTask(SingularityRequest request, SingularityDeploy deploy, long launchTime, int instanceNo) { SingularityPendingTaskId pendingTaskId = new SingularityPendingTaskId(request.getId(), deploy.getId(), launchTime, instanceNo, PendingType.IMMEDIATE, launchTime); - SingularityPendingTask pendingTask = new SingularityPendingTask(pendingTaskId, Collections. emptyList(), Optional. absent()); + SingularityPendingTask pendingTask = new SingularityPendingTask(pendingTaskId, Collections. emptyList(), Optional. absent(), Optional. absent()); return pendingTask; } @@ -351,7 +351,7 @@ protected SingularityPendingTask createAndSchedulePendingTask(String deployId) { SingularityPendingTaskId pendingTaskId = new SingularityPendingTaskId(requestId, deployId, System.currentTimeMillis() + TimeUnit.DAYS.toMillis(random.nextInt(3)), random.nextInt(10), PendingType.IMMEDIATE, System.currentTimeMillis()); - SingularityPendingTask pendingTask = new SingularityPendingTask(pendingTaskId, Collections. emptyList(), Optional. absent()); + SingularityPendingTask pendingTask = new SingularityPendingTask(pendingTaskId, Collections. emptyList(), Optional. absent(), Optional. absent()); taskManager.savePendingTask(pendingTask); diff --git a/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityMesosTaskBuilderTest.java b/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityMesosTaskBuilderTest.java index d60c6e54c6..9bcf6b0433 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityMesosTaskBuilderTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityMesosTaskBuilderTest.java @@ -10,10 +10,6 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicLong; -import com.hubspot.mesos.SingularityContainerType; -import com.hubspot.mesos.SingularityDockerNetworkType; -import com.hubspot.mesos.SingularityDockerVolumeMode; -import com.hubspot.singularity.config.SingularityConfiguration; import org.apache.mesos.Protos; import org.apache.mesos.Protos.ContainerInfo.Type; import org.apache.mesos.Protos.FrameworkID; @@ -59,7 +55,8 @@ public class SingularityMesosTaskBuilderTest { @Before public void createMocks() { - pendingTask = new SingularityPendingTask(new SingularityPendingTaskId("test", "1", 0, 1, PendingType.IMMEDIATE, 0), Collections. emptyList(), Optional. absent()); + pendingTask = new SingularityPendingTask(new SingularityPendingTaskId("test", "1", 0, 1, PendingType.IMMEDIATE, 0), Collections. emptyList(), + Optional. absent(), Optional. absent()); final SingularitySlaveAndRackHelper slaveAndRackHelper = mock(SingularitySlaveAndRackHelper.class); final ExecutorIdGenerator idGenerator = mock(ExecutorIdGenerator.class); diff --git a/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityStartupTest.java b/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityStartupTest.java index 7e9ac32058..141042bd12 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityStartupTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/mesos/SingularityStartupTest.java @@ -111,7 +111,7 @@ public void testScheduledTasksDontGetRescheduledDuringRun() { boolean caughtException = false; try { - requestResource.scheduleImmediately(requestId, Optional. absent(), null); + requestResource.scheduleImmediately(requestId, Optional. absent(), Optional. absent(), null); } catch (Exception e) { caughtException = true; } diff --git a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java index ef77aba8ee..c3c7a361d3 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java @@ -64,9 +64,9 @@ public void testSchedulerIsolatesPendingTasksBasedOnDeploy() { initFirstDeploy(); initSecondDeploy(); - SingularityPendingTask p1 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis(), 1, PendingType.ONEOFF, System.currentTimeMillis()), Collections. emptyList(), Optional. absent()); - SingularityPendingTask p2 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis(), 1, PendingType.TASK_DONE, System.currentTimeMillis()), Collections. emptyList(), Optional. absent()); - SingularityPendingTask p3 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, secondDeployId, System.currentTimeMillis(), 1, PendingType.TASK_DONE, System.currentTimeMillis()),Collections. emptyList(), Optional. absent()); + SingularityPendingTask p1 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis(), 1, PendingType.ONEOFF, System.currentTimeMillis()), Collections. emptyList(), Optional. absent(), Optional. absent()); + SingularityPendingTask p2 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis(), 1, PendingType.TASK_DONE, System.currentTimeMillis()), Collections. emptyList(), Optional. absent(), Optional. absent()); + SingularityPendingTask p3 = new SingularityPendingTask(new SingularityPendingTaskId(requestId, secondDeployId, System.currentTimeMillis(), 1, PendingType.TASK_DONE, System.currentTimeMillis()),Collections. emptyList(), Optional. absent(), Optional. absent()); taskManager.savePendingTask(p1); taskManager.savePendingTask(p2); @@ -135,16 +135,16 @@ public void testDeployClearsObsoleteScheduledTasks() { initSecondDeploy(); SingularityPendingTaskId taskIdOne = new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3), 1, PendingType.IMMEDIATE, System.currentTimeMillis()); - SingularityPendingTask taskOne = new SingularityPendingTask(taskIdOne, Collections. emptyList(), Optional. absent()); + SingularityPendingTask taskOne = new SingularityPendingTask(taskIdOne, Collections. emptyList(), Optional. absent(), Optional. absent()); SingularityPendingTaskId taskIdTwo = new SingularityPendingTaskId(requestId, firstDeployId, System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1), 2, PendingType.IMMEDIATE, System.currentTimeMillis()); - SingularityPendingTask taskTwo = new SingularityPendingTask(taskIdTwo, Collections. emptyList(), Optional. absent()); + SingularityPendingTask taskTwo = new SingularityPendingTask(taskIdTwo, Collections. emptyList(), Optional. absent(), Optional. absent()); SingularityPendingTaskId taskIdThree = new SingularityPendingTaskId(requestId, secondDeployId, System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3), 1, PendingType.IMMEDIATE, System.currentTimeMillis()); - SingularityPendingTask taskThree = new SingularityPendingTask(taskIdThree, Collections. emptyList(), Optional. absent()); + SingularityPendingTask taskThree = new SingularityPendingTask(taskIdThree, Collections. emptyList(), Optional. absent(), Optional. absent()); SingularityPendingTaskId taskIdFour = new SingularityPendingTaskId(requestId + "hi", firstDeployId, System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3), 5, PendingType.IMMEDIATE, System.currentTimeMillis()); - SingularityPendingTask taskFour = new SingularityPendingTask(taskIdFour,Collections. emptyList(), Optional. absent()); + SingularityPendingTask taskFour = new SingularityPendingTask(taskIdFour,Collections. emptyList(), Optional. absent(), Optional. absent()); taskManager.savePendingTask(taskOne); taskManager.savePendingTask(taskTwo); @@ -382,7 +382,7 @@ public void testOneOffsDontRunByThemselves() { Assert.assertTrue(requestManager.getPendingRequests().isEmpty()); - requestResource.scheduleImmediately(requestId, user, Collections. emptyList()); + requestResource.scheduleImmediately(requestId, user, Optional. absent(), Collections. emptyList()); resourceOffers(); @@ -394,7 +394,7 @@ public void testOneOffsDontRunByThemselves() { Assert.assertEquals(0, taskManager.getActiveTaskIds().size()); Assert.assertEquals(0, taskManager.getPendingTaskIds().size()); - requestResource.scheduleImmediately(requestId, user, Collections. emptyList()); + requestResource.scheduleImmediately(requestId, user, Optional. absent(), Collections. emptyList()); resourceOffers(); @@ -413,7 +413,7 @@ public void testOneOffsDontMoveDuringDecomission() { requestResource.submit(bldr.build(), Optional. absent()); deploy("d2"); - requestResource.scheduleImmediately(requestId, user, Collections. emptyList()); + requestResource.scheduleImmediately(requestId, user, Optional. absent(), Collections. emptyList()); validateTaskDoesntMoveDuringDecommission(); } diff --git a/mysql/migrations.sql b/mysql/migrations.sql index 5078e1c863..fe76e01262 100644 --- a/mysql/migrations.sql +++ b/mysql/migrations.sql @@ -69,3 +69,7 @@ ALTER TABLE `deployHistory` ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8; ALTER TABLE `requestHistory` ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8; ALTER TABLE `taskHistory` ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8; + +--changeset wsorenson:5 dbms:mysql +ALTER TABLE `taskHistory` ADD COLUMN runId VARCHAR(100) NULL; +