Skip to content

Commit

Permalink
Merge pull request #420 from broadinstitute/jt_GAWB-662
Browse files Browse the repository at this point in the history
GAWB-662 new workflow queue status endpoint
  • Loading branch information
jmthibault79 committed May 13, 2016
2 parents cd25e24 + 8dc2890 commit 823628c
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 1 deletion.
22 changes: 22 additions & 0 deletions src/main/resources/swagger/rawls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,28 @@ paths:
- openid
- email
- profile
'/api/submissions/queueStatus':
get:
responses:
'200':
description: Successful Request
schema:
type: object
description: Map[String,Int]
'500':
description: Rawls Internal Error
schema:
$ref: '#/definitions/ErrorReport'
description: List workflow counts by queueing state
tags:
- submissions
summary: workflow queue status
operationId: workflowQueueStatus
security:
- authorization:
- openid
- email
- profile
'/api/workspaces/{workspaceNamespace}/{workspaceName}/entities/{entityType}/{entityName}/rename':
post:
responses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ trait WorkflowComponent {
findWorkflowsBySubmissionId(submissionId).filter(_.status inSet(statuses.map(_.toString))).result
}

def countWorkflowsByQueueStatus: ReadAction[Map[String, Int]] = {
val groupedSeq = findQueuedAndRunningWorkflows.groupBy(_.status).map { case (status, recs) => (status, recs.length) }.result
groupedSeq.map(_.toMap)
}

def listWorkflowRecsForSubmission(submissionId: UUID): ReadAction[Seq[WorkflowRecord]] = {
findWorkflowsBySubmissionId(submissionId).result
}
Expand Down Expand Up @@ -322,6 +327,10 @@ trait WorkflowComponent {
(workflowFailureQuery filter (_.submissionId === submissionId))
}

def findQueuedAndRunningWorkflows: WorkflowQueryType = {
filter(rec => rec.status inSetBind((WorkflowStatuses.queuedStatuses ++ WorkflowStatuses.runningStatuses) map { _.toString }))
}

/*
the marshal and unmarshal methods
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ trait RawlsEnumeration[T <: RawlsEnumeration[T]] { self: T =>
}

object WorkflowStatuses {
val terminalStatuses: Seq[WorkflowStatus] = Seq(Failed, Succeeded, Aborted, Unknown)
val queuedStatuses: Seq[WorkflowStatus] = Seq(Queued, Launching)
val runningStatuses: Seq[WorkflowStatus] = Seq(Submitted, Running, Aborting)
val terminalStatuses: Seq[WorkflowStatus] = Seq(Failed, Succeeded, Aborted, Unknown)

sealed trait WorkflowStatus extends RawlsEnumeration[WorkflowStatus] {
def isDone = {
Expand All @@ -304,6 +305,8 @@ object WorkflowStatuses {

def withName(name: String): WorkflowStatus = {
name match {
case "Queued" => Queued
case "Launching" => Launching
case "Submitted" => Submitted
case "Running" => Running
case "Failed" => Failed
Expand All @@ -315,6 +318,8 @@ object WorkflowStatuses {
}
}

case object Queued extends WorkflowStatus
case object Launching extends WorkflowStatus
case object Submitted extends WorkflowStatus
case object Running extends WorkflowStatus
case object Failed extends WorkflowStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ trait SubmissionApiService extends HttpService with PerRequestCreator with UserI
requestContext => perRequest(requestContext, WorkspaceService.props(workspaceServiceConstructor, userInfo),
WorkspaceService.GetWorkflowOutputs(WorkspaceName(workspaceNamespace, workspaceName), submissionId, workflowId))
}
} ~
path("submissions" / "queueStatus") {
get {
requestContext => perRequest(requestContext,
WorkspaceService.props(workspaceServiceConstructor, userInfo),
WorkspaceService.WorkflowQueueStatus)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ object WorkspaceService {
case class AbortSubmission(workspaceName: WorkspaceName, submissionId: String) extends WorkspaceServiceMessage
case class GetWorkflowOutputs(workspaceName: WorkspaceName, submissionId: String, workflowId: String) extends WorkspaceServiceMessage
case class GetWorkflowMetadata(workspaceName: WorkspaceName, submissionId: String, workflowId: String) extends WorkspaceServiceMessage
case object WorkflowQueueStatus extends WorkspaceServiceMessage

case object AdminListAllActiveSubmissions extends WorkspaceServiceMessage
case class AdminAbortSubmission(workspaceNamespace: String, workspaceName: String, submissionId: String) extends WorkspaceServiceMessage
Expand Down Expand Up @@ -158,6 +159,7 @@ class WorkspaceService(protected val userInfo: UserInfo, dataSource: SlickDataSo
case AbortSubmission(workspaceName, submissionId) => pipe(abortSubmission(workspaceName, submissionId)) to sender
case GetWorkflowOutputs(workspaceName, submissionId, workflowId) => pipe(workflowOutputs(workspaceName, submissionId, workflowId)) to sender
case GetWorkflowMetadata(workspaceName, submissionId, workflowId) => pipe(workflowMetadata(workspaceName, submissionId, workflowId)) to sender
case WorkflowQueueStatus => pipe(workflowQueueStatus()) to sender

case AdminListAllActiveSubmissions => asAdmin { listAllActiveSubmissions() } pipeTo sender
case AdminAbortSubmission(workspaceNamespace,workspaceName,submissionId) => pipe(adminAbortSubmission(WorkspaceName(workspaceNamespace,workspaceName),submissionId)) to sender
Expand Down Expand Up @@ -1158,6 +1160,12 @@ class WorkspaceService(protected val userInfo: UserInfo, dataSource: SlickDataSo
}
}

def workflowQueueStatus() = {
dataSource.inTransaction { dataAccess =>
dataAccess.workflowQuery.countWorkflowsByQueueStatus.map(RequestComplete(StatusCodes.OK, _))
}
}

def listAllActiveSubmissions() = {
dataSource.inTransaction { dataAccess =>
dataAccess.submissionQuery.listAllActiveSubmissions().map(RequestComplete(StatusCodes.OK, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,57 @@ class SubmissionApiServiceSpec extends ApiServiceSpec {
}
}
}

it should "return 200 when checking the queue status" in withTestDataApiServices { services =>

val existingSubmittedWorkflowCount = 12
val existingWorkflowCounts = Map("Submitted" -> existingSubmittedWorkflowCount)

Get("/submissions/queueStatus") ~>
sealRoute(services.submissionRoutes) ~>
check {
assertResult(StatusCodes.OK) {status}
assertResult(existingWorkflowCounts) {
responseAs[Map[String, Int]]
}
}

val newWorkflows = Map(
WorkflowStatuses.Queued -> 1,
WorkflowStatuses.Launching -> 2,
WorkflowStatuses.Submitted -> 4,
WorkflowStatuses.Running -> 8,
WorkflowStatuses.Failed -> 16,
WorkflowStatuses.Succeeded -> 32,
WorkflowStatuses.Aborting -> 64,
WorkflowStatuses.Aborted -> 128,
WorkflowStatuses.Unknown -> 256
)

val newWorkflowCounts = Map(
"Queued" -> 1,
"Launching" -> 2,
"Submitted" -> (4 + existingSubmittedWorkflowCount),
"Running" -> 8,
"Aborting" -> 64
)

withWorkspaceContext(testData.workspace) { context =>
newWorkflows foreach { case (status, count) =>
for (i <- 1 to count) {
val wf = Workflow(s"workflow${i}_of_$count", status, testDate, None, testData.inputResolutions)
runAndWait(workflowQuery.save(context, UUID.fromString(testData.submissionUpdateEntity.submissionId), wf))
}
}
}

Get("/submissions/queueStatus") ~>
sealRoute(services.submissionRoutes) ~>
check {
assertResult(StatusCodes.OK) {status}
assertResult(newWorkflowCounts) {
responseAs[Map[String, Int]]
}
}
}
}

0 comments on commit 823628c

Please sign in to comment.