Skip to content

Commit

Permalink
Implement the WES service-info endpoint (#4455)
Browse files Browse the repository at this point in the history
* Implement the WES service-info endpoint
  • Loading branch information
geoffjentry committed Dec 6, 2018
1 parent ec67d65 commit 92b9fda
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 51 deletions.
8 changes: 8 additions & 0 deletions build.sbt
Expand Up @@ -238,8 +238,16 @@ lazy val engine = project
.dependsOn(ossFileSystem)
.dependsOn(gcsFileSystem)
.dependsOn(sraFileSystem)
.dependsOn(awsS3FileSystem % "test->test")
.dependsOn(demoDosFileSystem % "test->test")
.dependsOn(httpFileSystem % "test->test")
.dependsOn(ftpFileSystem % "test->test")
.dependsOn(`cloud-nio-spi`)
.dependsOn(languageFactoryCore)
.dependsOn(cwlV1_0LanguageFactory % "test->test")
.dependsOn(wdlDraft2LanguageFactory % "test->test")
.dependsOn(wdlDraft3LanguageFactory % "test->test")
.dependsOn(wdlBiscayneLanguageFactory % "test->test")
.dependsOn(common % "test->test")
.dependsOn(core % "test->test")
.dependsOn(backend % "test->test")
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Expand Up @@ -478,3 +478,12 @@ load-control {
# How often each actor should update its perceived load
monitoring-frequency = 5 seconds
}

ga4gh {
wes {
# URLs for site-specific auth process as well as contact info for security issues.
# These aren't really compelling defaults, but there aren't really any compelling defaults
auth-instructions-url = "https://cromwell.readthedocs.io/en/stable/"
contact-info-url = "https://cromwell.readthedocs.io/en/stable/"
}
}
Expand Up @@ -36,6 +36,8 @@ class CromwellFileSystems(globalConfig: Config) {
constructorMap.unsafe("Failed to initialize Cromwell filesystems")
} else Map.empty

val supportedFileSystems: Iterable[String] = factoryBuilders.keys

// Generate the appropriate constructor and optional singleton instance for a filesystem
private def processFileSystem(key: String, fsConfig: ConfigObject): ErrorOr[(String, (Constructor[_], Option[AnyRef]))] = {
// This is the (optional) singleton instance shared by all factory instances
Expand Down
Expand Up @@ -3,6 +3,7 @@ package cromwell.engine.workflow.workflowstore
import java.time.OffsetDateTime

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.pipe
import cats.data.NonEmptyList
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.core._
Expand All @@ -18,6 +19,8 @@ final case class WorkflowStoreActor private(
extends Actor with ActorLogging with GracefulShutdownHelper {
import WorkflowStoreActor._

implicit val ec = context.dispatcher

lazy val workflowStoreSubmitActor: ActorRef = context.actorOf(
WorkflowStoreSubmitActor.props(
workflowStoreDatabase = workflowStore,
Expand Down Expand Up @@ -45,6 +48,11 @@ final case class WorkflowStoreActor private(
case cmd: WorkflowStoreActorSubmitCommand => workflowStoreSubmitActor forward cmd
case cmd: WorkflowStoreActorEngineCommand => workflowStoreEngineActor forward cmd
case cmd: WorkflowStoreWriteHeartbeatCommand => workflowStoreHeartbeatWriteActor forward cmd
case GetWorkflowStoreStats =>
// Retrieve the workflow store stats, convert the WorkflowStoreStates to WorkflowStates
val stats = workflowStore.stats.map(m => m.map(e => WorkflowState.withName(e._1.toString) -> e._2))
stats pipeTo sender
()
}
}

Expand All @@ -66,6 +74,8 @@ object WorkflowStoreActor {
final case class SubmitWorkflow(source: WorkflowSourceFilesCollection) extends WorkflowStoreActorSubmitCommand
final case class BatchSubmitWorkflows(sources: NonEmptyList[WorkflowSourceFilesCollection]) extends WorkflowStoreActorSubmitCommand

final case object GetWorkflowStoreStats

case class WorkflowStoreWriteHeartbeatCommand(workflowId: WorkflowId, submissionTime: OffsetDateTime)

def props(
Expand Down
@@ -0,0 +1,83 @@
package cromwell.webservice.routes.wes

import java.net.URL

import akka.actor.ActorRef
import akka.util.Timeout
import cromwell.core.filesystem.CromwellFileSystems
import cromwell.engine.language.CromwellLanguages
import cromwell.webservice.routes.CromwellApiService
import net.ceedubs.ficus.Ficus._
import com.typesafe.config.ConfigFactory
import spray.json.DefaultJsonProtocol
import akka.pattern.ask
import cromwell.core.WorkflowState
import cromwell.engine.workflow.workflowstore.WorkflowStoreActor.GetWorkflowStoreStats
import cromwell.webservice.routes.wes.WesState.WesState

import scala.concurrent.{ExecutionContext, Future}

object ServiceInfo {
private lazy val config = ConfigFactory.load()

// Which languages and which versions of those languages does this Cromwell support?
// CromwellLanguages.instance.languages is a Map[Language, Map[Version, Factory]]. We want Map[Language -> Version]
lazy val WorkflowTypeVersion = CromwellLanguages.instance.languages.map(x => x._1 -> x._2.allVersions.keys)

// Which versions of WES does this Cromwell support?
val SupportedWesVersions = List("1.0")

// Which filesystems does this Cromwell support? NB: It's a known flaw of spec that not all backends may support all FS
lazy val SupportedFilesystemProtocols = CromwellFileSystems.instance.supportedFileSystems

// In Cromwell terms, default workflow options. Viewing this as too much trouble to fill in unless someone complains
val DefaultWorkflowEngineParameters = List.empty[DefaultWorkflowEngineParameter]

// Which engines and which version(s) of those engines does this WES support. Kinda obvious for Cromwell
val WorkflowEngineVerisons = Map("Cromwell" -> CromwellApiService.cromwellVersion)

// URL which provides site specific authorization information. Doesn't really apply to Cromwell for now
lazy val AuthInstructionsUrl = config.as[URL]("ga4gh.wes.auth-instructions-url")
// URL which provides site specific contact information for security issues
lazy val ContactInfoUrl = config.as[URL]("ga4gh.wes.contact-info-url")

/*
Optional key/value pairs. Could potentially be added to the config but not going to bother until if/when someone asks for it
*/
val Tags = Map.empty[String, String]

/**
* Generate any runtime level information and create a response to the client
*/
def toWesResponse(workflowStoreActor: ActorRef)(implicit ec: ExecutionContext, timeout: Timeout): Future[WesStatusInfoResponse] = {
workflowStats(workflowStoreActor).map(stats =>
WesStatusInfoResponse(WorkflowTypeVersion,
SupportedWesVersions,
SupportedFilesystemProtocols,
WorkflowEngineVerisons,
DefaultWorkflowEngineParameters,
stats,
AuthInstructionsUrl.toString,
ContactInfoUrl.toString,
Tags)
)
}

/**
* Retrieve a map from state to count for all represented non-terminal workflow states
*/
private def workflowStats(workflowStoreActor: ActorRef)(implicit ec: ExecutionContext, timeout: Timeout): Future[Map[WesState, Int]] = {
workflowStoreActor.ask(GetWorkflowStoreStats)
.mapTo[Map[WorkflowState, Int]]
.map(m => m.map(e => WesState.fromCromwellStatus(e._1) -> e._2)) // Convert WorkflowState -> WesState
}
}

/*
Models the type DefaultWorkflowEngineParameter in the WES Swagger
*/
final case class DefaultWorkflowEngineParameter(name: String, `type`: String, default_value: String)

object DefaultWorkflowEngineParameter extends DefaultJsonProtocol {
implicit val DefaultWorkflowEngineParameterFormat = jsonFormat3(DefaultWorkflowEngineParameter.apply)
}
Expand Up @@ -9,12 +9,24 @@ final case class WesErrorResponse(msg: String, status_code: Int) extends WesResp
final case class WesRunId(run_id: String) extends WesResponse
final case class WesRunStatus(run_id: String, state: WesState) extends WesResponse

final case class WesStatusInfoResponse(workflow_type_version: Map[String, Iterable[String]],
supported_wes_versions: Iterable[String],
supported_filesystem_protocols: Iterable[String],
workflow_engine_versions: Map[String, String],
default_workflow_engine_parameters: Iterable[DefaultWorkflowEngineParameter],
system_state_counts: Map[WesState, Int],
auth_instructions_url: String,
contact_info_url: String,
tags: Map[String, String]) extends WesResponse

object WesResponseJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import WesStateJsonSupport._
import DefaultWorkflowEngineParameter.DefaultWorkflowEngineParameterFormat

implicit val WesResponseErrorFormat = jsonFormat2(WesErrorResponse)
implicit val WesResponseRunIdFormat = jsonFormat1(WesRunId)
implicit val WesResponseStatusFormat = jsonFormat2(WesRunStatus)
implicit val WesResponseStatusInfoFormat = jsonFormat9(WesStatusInfoResponse)

implicit object WesResponseFormat extends RootJsonFormat[WesResponse] {
import spray.json._
Expand All @@ -24,6 +36,7 @@ object WesResponseJsonSupport extends SprayJsonSupport with DefaultJsonProtocol
case r: WesRunId => r.toJson
case s: WesRunStatus => s.toJson
case e: WesErrorResponse => e.toJson
case i: WesStatusInfoResponse => i.toJson
}
}

Expand Down
Expand Up @@ -44,32 +44,39 @@ trait WesRouteSupport extends HttpInstrumentation {
val wesRoutes: Route =
instrumentRequest {
concat(
pathPrefix("ga4gh" / "wes" / "v1" / "runs") {
pathPrefix("ga4gh" / "wes" / "v1") {
concat(
path(Segment / "status") { possibleWorkflowId =>
val response = validateWorkflowId(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse])
// WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have
onComplete(response) {
case Success(s: StatusLookupResponse) =>
val wesState = WesState.fromCromwellStatus(s.status)
complete(WesRunStatus(s.workflowId.toString, wesState))
case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Success(m: MetadataServiceResponse) =>
// This should never happen, but ....
val error = new IllegalStateException("Unexpected response from Metadata service: " + m)
error.errorRequest(StatusCodes.InternalServerError)
case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError)
case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue))
}
path("service-info") {
complete(ServiceInfo.toWesResponse(workflowStoreActor))
},
path(Segment / "cancel") { possibleWorkflowId =>
post {
CromwellApiService.abortWorkflow(possibleWorkflowId,
workflowStoreActor,
workflowManagerActor,
successHandler = WesAbortSuccessHandler,
errorHandler = WesAbortErrorHandler)
}
pathPrefix("runs") {
concat(
path(Segment / "status") { possibleWorkflowId =>
val response = validateWorkflowId(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse])
// WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have
onComplete(response) {
case Success(s: StatusLookupResponse) =>
val wesState = WesState.fromCromwellStatus(s.status)
complete(WesRunStatus(s.workflowId.toString, wesState))
case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Success(m: MetadataServiceResponse) =>
// This should never happen, but ....
val error = new IllegalStateException("Unexpected response from Metadata service: " + m)
error.errorRequest(StatusCodes.InternalServerError)
case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError)
case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue))
}
},
path(Segment / "cancel") { possibleWorkflowId =>
post {
CromwellApiService.abortWorkflow(possibleWorkflowId,
workflowStoreActor,
workflowManagerActor,
successHandler = WesAbortSuccessHandler,
errorHandler = WesAbortErrorHandler)
}
}
)
}
)
}
Expand Down
Expand Up @@ -10,7 +10,7 @@ import cromwell.core._
import cromwell.core.abort.{WorkflowAbortFailureResponse, WorkflowAbortRequestedResponse, WorkflowAbortedResponse}
import cromwell.engine.workflow.WorkflowManagerActor
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.engine.workflow.workflowstore.WorkflowStoreActor.{AbortWorkflowCommand, BatchSubmitWorkflows, SubmitWorkflow, WorkflowOnHoldToSubmittedCommand}
import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._
import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{WorkflowOnHoldToSubmittedFailure, WorkflowOnHoldToSubmittedSuccess}
import cromwell.engine.workflow.workflowstore.WorkflowStoreSubmitActor.{WorkflowSubmittedToStore, WorkflowsBatchSubmittedToStore}
import cromwell.services.healthmonitor.HealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse, SubsystemStatus}
Expand Down Expand Up @@ -579,6 +579,7 @@ object CromwellApiServiceSpec {
case WorkflowId(_) => throw new Exception("Something untoward happened")
}
sender ! message
case GetWorkflowStoreStats => sender ! Map(WorkflowRunning -> 5, WorkflowSubmitted -> 3, WorkflowAborting -> 2)
}
}

Expand Down
@@ -0,0 +1,40 @@
package cromwell.webservice.routes.wes

import akka.actor.Props
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.util.Timeout
import cromwell.engine.language.{CromwellLanguages, LanguageConfiguration}
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.CromwellApiServiceSpec.MockWorkflowStoreActor
import org.scalatest.{AsyncFlatSpec, Matchers}

import scala.concurrent.duration._

class ServiceInfoSpec extends AsyncFlatSpec with ScalatestRouteTest with Matchers {
val actorRefFactory = system
implicit val ec = system.dispatcher
implicit val timeout: Timeout = 5.seconds

val workflowStoreActor = actorRefFactory.actorOf(Props(new MockWorkflowStoreActor()))

CromwellLanguages.initLanguages(LanguageConfiguration.AllLanguageEntries)

behavior of "ServiceInfo"

val expectedResponse = WesStatusInfoResponse(Map("CWL" -> Set("v1.0"), "WDL" -> Set("draft-2", "1.0", "biscayne")),
List("1.0"),
Set("ftp", "s3", "demo-dos", "gcs", "oss", "http"),
Map("Cromwell" -> CromwellApiService.cromwellVersion),
List(),
Map(WesState.Running -> 5, WesState.Queued -> 3, WesState.Canceling -> 2),
"https://cromwell.readthedocs.io/en/stable/",
"https://cromwell.readthedocs.io/en/stable/",
Map())


it should "should eventually build the right WesResponse" in {
ServiceInfo.toWesResponse(workflowStoreActor) map { r =>
assert(r == expectedResponse)
}
}
}
55 changes: 29 additions & 26 deletions wes2cromwell/src/main/scala/wes2cromwell/WesRunRoutes.scala
Expand Up @@ -40,39 +40,42 @@ trait WesRunRoutes extends RequestSupport {
lazy val runRoutes: Route =
optionalHeaderValue(extractAuthorizationHeader) { authHeader =>
val cromwellRequestHeaders = authHeader.toList

pathPrefix("ga4gh" / "wes" / "v1" / "runs") {
pathPrefix("ga4gh" / "wes" / "v1") {
concat(
pathEnd {
pathPrefix("runs") {
concat(
get {
parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) =>
completeCromwellResponse(wes2CromwellInterface.listRuns(pageSize, pageToken, cromwellRequestHeaders))
}
pathEnd {
concat(
get {
parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) =>
completeCromwellResponse(wes2CromwellInterface.listRuns(pageSize, pageToken, cromwellRequestHeaders))
}
},
post {
extractStrictRequest { request =>
extractSubmission() { submission =>
completeCromwellResponse(wes2CromwellInterface.runWorkflow(submission, cromwellRequestHeaders))
}
}
}
)
},
post {
extractStrictRequest { request =>
extractSubmission() { submission =>
completeCromwellResponse(wes2CromwellInterface.runWorkflow(submission, cromwellRequestHeaders))
path(Segment) { workflowId =>
concat(
get {
completeCromwellResponse(wes2CromwellInterface.runLog(workflowId, cromwellRequestHeaders))
},
delete {
completeCromwellResponse(wes2CromwellInterface.cancelRun(workflowId, cromwellRequestHeaders))
}
}
}
)
},
path(Segment) { workflowId =>
concat(
get {
completeCromwellResponse(wes2CromwellInterface.runLog(workflowId, cromwellRequestHeaders))
)
},
delete {
completeCromwellResponse(wes2CromwellInterface.cancelRun(workflowId, cromwellRequestHeaders))
path(Segment / "status") { workflowId =>
get {
completeCromwellResponse(wes2CromwellInterface.runStatus(workflowId, cromwellRequestHeaders))
}
}
)
},
path(Segment / "status") { workflowId =>
get {
completeCromwellResponse(wes2CromwellInterface.runStatus(workflowId, cromwellRequestHeaders))
}
}
)
}
Expand Down

0 comments on commit 92b9fda

Please sign in to comment.