From 92b9fda58d62ba928840411dfd1d2e176847e6fc Mon Sep 17 00:00:00 2001 From: Jeff Gentry Date: Thu, 6 Dec 2018 11:45:31 -0500 Subject: [PATCH] Implement the WES service-info endpoint (#4455) * Implement the WES service-info endpoint --- build.sbt | 8 ++ core/src/main/resources/reference.conf | 9 ++ .../core/filesystem/CromwellFileSystems.scala | 2 + .../workflowstore/WorkflowStoreActor.scala | 10 +++ .../webservice/routes/wes/ServiceInfo.scala | 83 +++++++++++++++++++ .../webservice/routes/wes/WesResponse.scala | 13 +++ .../routes/wes/WesRouteSupport.scala | 55 ++++++------ .../routes/CromwellApiServiceSpec.scala | 3 +- .../routes/wes/ServiceInfoSpec.scala | 40 +++++++++ .../scala/wes2cromwell/WesRunRoutes.scala | 55 ++++++------ 10 files changed, 227 insertions(+), 51 deletions(-) create mode 100644 engine/src/main/scala/cromwell/webservice/routes/wes/ServiceInfo.scala create mode 100644 engine/src/test/scala/cromwell/webservice/routes/wes/ServiceInfoSpec.scala diff --git a/build.sbt b/build.sbt index fa4a43734bb..f1dd725c6b8 100644 --- a/build.sbt +++ b/build.sbt @@ -238,8 +238,16 @@ lazy val engine = project .dependsOn(ossFileSystem) .dependsOn(gcsFileSystem) .dependsOn(sraFileSystem) + .dependsOn(awsS3FileSystem % "test->test") + .dependsOn(demoDosFileSystem % "test->test") + .dependsOn(httpFileSystem % "test->test") + .dependsOn(ftpFileSystem % "test->test") .dependsOn(`cloud-nio-spi`) .dependsOn(languageFactoryCore) + .dependsOn(cwlV1_0LanguageFactory % "test->test") + .dependsOn(wdlDraft2LanguageFactory % "test->test") + .dependsOn(wdlDraft3LanguageFactory % "test->test") + .dependsOn(wdlBiscayneLanguageFactory % "test->test") .dependsOn(common % "test->test") .dependsOn(core % "test->test") .dependsOn(backend % "test->test") diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 65e27f7f6ba..409d9223c19 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -478,3 +478,12 @@ load-control { # How often each actor should update its perceived load monitoring-frequency = 5 seconds } + +ga4gh { + wes { + # URLs for site-specific auth process as well as contact info for security issues. + # These aren't really compelling defaults, but there aren't really any compelling defaults + auth-instructions-url = "https://cromwell.readthedocs.io/en/stable/" + contact-info-url = "https://cromwell.readthedocs.io/en/stable/" + } +} diff --git a/core/src/main/scala/cromwell/core/filesystem/CromwellFileSystems.scala b/core/src/main/scala/cromwell/core/filesystem/CromwellFileSystems.scala index a31b53f2161..41c8d52b58c 100644 --- a/core/src/main/scala/cromwell/core/filesystem/CromwellFileSystems.scala +++ b/core/src/main/scala/cromwell/core/filesystem/CromwellFileSystems.scala @@ -36,6 +36,8 @@ class CromwellFileSystems(globalConfig: Config) { constructorMap.unsafe("Failed to initialize Cromwell filesystems") } else Map.empty + val supportedFileSystems: Iterable[String] = factoryBuilders.keys + // Generate the appropriate constructor and optional singleton instance for a filesystem private def processFileSystem(key: String, fsConfig: ConfigObject): ErrorOr[(String, (Constructor[_], Option[AnyRef]))] = { // This is the (optional) singleton instance shared by all factory instances diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala index 1d4c8fdd632..9d5659e75e3 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala @@ -3,6 +3,7 @@ package cromwell.engine.workflow.workflowstore import java.time.OffsetDateTime import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.pattern.pipe import cats.data.NonEmptyList import cromwell.core.Dispatcher.EngineDispatcher import cromwell.core._ @@ -18,6 +19,8 @@ final case class WorkflowStoreActor private( extends Actor with ActorLogging with GracefulShutdownHelper { import WorkflowStoreActor._ + implicit val ec = context.dispatcher + lazy val workflowStoreSubmitActor: ActorRef = context.actorOf( WorkflowStoreSubmitActor.props( workflowStoreDatabase = workflowStore, @@ -45,6 +48,11 @@ final case class WorkflowStoreActor private( case cmd: WorkflowStoreActorSubmitCommand => workflowStoreSubmitActor forward cmd case cmd: WorkflowStoreActorEngineCommand => workflowStoreEngineActor forward cmd case cmd: WorkflowStoreWriteHeartbeatCommand => workflowStoreHeartbeatWriteActor forward cmd + case GetWorkflowStoreStats => + // Retrieve the workflow store stats, convert the WorkflowStoreStates to WorkflowStates + val stats = workflowStore.stats.map(m => m.map(e => WorkflowState.withName(e._1.toString) -> e._2)) + stats pipeTo sender + () } } @@ -66,6 +74,8 @@ object WorkflowStoreActor { final case class SubmitWorkflow(source: WorkflowSourceFilesCollection) extends WorkflowStoreActorSubmitCommand final case class BatchSubmitWorkflows(sources: NonEmptyList[WorkflowSourceFilesCollection]) extends WorkflowStoreActorSubmitCommand + final case object GetWorkflowStoreStats + case class WorkflowStoreWriteHeartbeatCommand(workflowId: WorkflowId, submissionTime: OffsetDateTime) def props( diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/ServiceInfo.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/ServiceInfo.scala new file mode 100644 index 00000000000..3901c8dc67b --- /dev/null +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/ServiceInfo.scala @@ -0,0 +1,83 @@ +package cromwell.webservice.routes.wes + +import java.net.URL + +import akka.actor.ActorRef +import akka.util.Timeout +import cromwell.core.filesystem.CromwellFileSystems +import cromwell.engine.language.CromwellLanguages +import cromwell.webservice.routes.CromwellApiService +import net.ceedubs.ficus.Ficus._ +import com.typesafe.config.ConfigFactory +import spray.json.DefaultJsonProtocol +import akka.pattern.ask +import cromwell.core.WorkflowState +import cromwell.engine.workflow.workflowstore.WorkflowStoreActor.GetWorkflowStoreStats +import cromwell.webservice.routes.wes.WesState.WesState + +import scala.concurrent.{ExecutionContext, Future} + +object ServiceInfo { + private lazy val config = ConfigFactory.load() + + // Which languages and which versions of those languages does this Cromwell support? + // CromwellLanguages.instance.languages is a Map[Language, Map[Version, Factory]]. We want Map[Language -> Version] + lazy val WorkflowTypeVersion = CromwellLanguages.instance.languages.map(x => x._1 -> x._2.allVersions.keys) + + // Which versions of WES does this Cromwell support? + val SupportedWesVersions = List("1.0") + + // Which filesystems does this Cromwell support? NB: It's a known flaw of spec that not all backends may support all FS + lazy val SupportedFilesystemProtocols = CromwellFileSystems.instance.supportedFileSystems + + // In Cromwell terms, default workflow options. Viewing this as too much trouble to fill in unless someone complains + val DefaultWorkflowEngineParameters = List.empty[DefaultWorkflowEngineParameter] + + // Which engines and which version(s) of those engines does this WES support. Kinda obvious for Cromwell + val WorkflowEngineVerisons = Map("Cromwell" -> CromwellApiService.cromwellVersion) + + // URL which provides site specific authorization information. Doesn't really apply to Cromwell for now + lazy val AuthInstructionsUrl = config.as[URL]("ga4gh.wes.auth-instructions-url") + // URL which provides site specific contact information for security issues + lazy val ContactInfoUrl = config.as[URL]("ga4gh.wes.contact-info-url") + + /* + Optional key/value pairs. Could potentially be added to the config but not going to bother until if/when someone asks for it + */ + val Tags = Map.empty[String, String] + + /** + * Generate any runtime level information and create a response to the client + */ + def toWesResponse(workflowStoreActor: ActorRef)(implicit ec: ExecutionContext, timeout: Timeout): Future[WesStatusInfoResponse] = { + workflowStats(workflowStoreActor).map(stats => + WesStatusInfoResponse(WorkflowTypeVersion, + SupportedWesVersions, + SupportedFilesystemProtocols, + WorkflowEngineVerisons, + DefaultWorkflowEngineParameters, + stats, + AuthInstructionsUrl.toString, + ContactInfoUrl.toString, + Tags) + ) + } + + /** + * Retrieve a map from state to count for all represented non-terminal workflow states + */ + private def workflowStats(workflowStoreActor: ActorRef)(implicit ec: ExecutionContext, timeout: Timeout): Future[Map[WesState, Int]] = { + workflowStoreActor.ask(GetWorkflowStoreStats) + .mapTo[Map[WorkflowState, Int]] + .map(m => m.map(e => WesState.fromCromwellStatus(e._1) -> e._2)) // Convert WorkflowState -> WesState + } +} + +/* + Models the type DefaultWorkflowEngineParameter in the WES Swagger + */ +final case class DefaultWorkflowEngineParameter(name: String, `type`: String, default_value: String) + +object DefaultWorkflowEngineParameter extends DefaultJsonProtocol { + implicit val DefaultWorkflowEngineParameterFormat = jsonFormat3(DefaultWorkflowEngineParameter.apply) +} diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesResponse.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesResponse.scala index 805d2c662f8..ef3746f72cf 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesResponse.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesResponse.scala @@ -9,12 +9,24 @@ final case class WesErrorResponse(msg: String, status_code: Int) extends WesResp final case class WesRunId(run_id: String) extends WesResponse final case class WesRunStatus(run_id: String, state: WesState) extends WesResponse +final case class WesStatusInfoResponse(workflow_type_version: Map[String, Iterable[String]], + supported_wes_versions: Iterable[String], + supported_filesystem_protocols: Iterable[String], + workflow_engine_versions: Map[String, String], + default_workflow_engine_parameters: Iterable[DefaultWorkflowEngineParameter], + system_state_counts: Map[WesState, Int], + auth_instructions_url: String, + contact_info_url: String, + tags: Map[String, String]) extends WesResponse + object WesResponseJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { import WesStateJsonSupport._ + import DefaultWorkflowEngineParameter.DefaultWorkflowEngineParameterFormat implicit val WesResponseErrorFormat = jsonFormat2(WesErrorResponse) implicit val WesResponseRunIdFormat = jsonFormat1(WesRunId) implicit val WesResponseStatusFormat = jsonFormat2(WesRunStatus) + implicit val WesResponseStatusInfoFormat = jsonFormat9(WesStatusInfoResponse) implicit object WesResponseFormat extends RootJsonFormat[WesResponse] { import spray.json._ @@ -24,6 +36,7 @@ object WesResponseJsonSupport extends SprayJsonSupport with DefaultJsonProtocol case r: WesRunId => r.toJson case s: WesRunStatus => s.toJson case e: WesErrorResponse => e.toJson + case i: WesStatusInfoResponse => i.toJson } } diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala index a351be9e0fc..ab856a34962 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala @@ -44,32 +44,39 @@ trait WesRouteSupport extends HttpInstrumentation { val wesRoutes: Route = instrumentRequest { concat( - pathPrefix("ga4gh" / "wes" / "v1" / "runs") { + pathPrefix("ga4gh" / "wes" / "v1") { concat( - path(Segment / "status") { possibleWorkflowId => - val response = validateWorkflowId(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]) - // WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have - onComplete(response) { - case Success(s: StatusLookupResponse) => - val wesState = WesState.fromCromwellStatus(s.status) - complete(WesRunStatus(s.workflowId.toString, wesState)) - case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError) - case Success(m: MetadataServiceResponse) => - // This should never happen, but .... - val error = new IllegalStateException("Unexpected response from Metadata service: " + m) - error.errorRequest(StatusCodes.InternalServerError) - case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError) - case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) - } + path("service-info") { + complete(ServiceInfo.toWesResponse(workflowStoreActor)) }, - path(Segment / "cancel") { possibleWorkflowId => - post { - CromwellApiService.abortWorkflow(possibleWorkflowId, - workflowStoreActor, - workflowManagerActor, - successHandler = WesAbortSuccessHandler, - errorHandler = WesAbortErrorHandler) - } + pathPrefix("runs") { + concat( + path(Segment / "status") { possibleWorkflowId => + val response = validateWorkflowId(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]) + // WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have + onComplete(response) { + case Success(s: StatusLookupResponse) => + val wesState = WesState.fromCromwellStatus(s.status) + complete(WesRunStatus(s.workflowId.toString, wesState)) + case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError) + case Success(m: MetadataServiceResponse) => + // This should never happen, but .... + val error = new IllegalStateException("Unexpected response from Metadata service: " + m) + error.errorRequest(StatusCodes.InternalServerError) + case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError) + case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) + } + }, + path(Segment / "cancel") { possibleWorkflowId => + post { + CromwellApiService.abortWorkflow(possibleWorkflowId, + workflowStoreActor, + workflowManagerActor, + successHandler = WesAbortSuccessHandler, + errorHandler = WesAbortErrorHandler) + } + } + ) } ) } diff --git a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala index 054eb7f874b..2fda5a2c4be 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala @@ -10,7 +10,7 @@ import cromwell.core._ import cromwell.core.abort.{WorkflowAbortFailureResponse, WorkflowAbortRequestedResponse, WorkflowAbortedResponse} import cromwell.engine.workflow.WorkflowManagerActor import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException -import cromwell.engine.workflow.workflowstore.WorkflowStoreActor.{AbortWorkflowCommand, BatchSubmitWorkflows, SubmitWorkflow, WorkflowOnHoldToSubmittedCommand} +import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._ import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{WorkflowOnHoldToSubmittedFailure, WorkflowOnHoldToSubmittedSuccess} import cromwell.engine.workflow.workflowstore.WorkflowStoreSubmitActor.{WorkflowSubmittedToStore, WorkflowsBatchSubmittedToStore} import cromwell.services.healthmonitor.HealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse, SubsystemStatus} @@ -579,6 +579,7 @@ object CromwellApiServiceSpec { case WorkflowId(_) => throw new Exception("Something untoward happened") } sender ! message + case GetWorkflowStoreStats => sender ! Map(WorkflowRunning -> 5, WorkflowSubmitted -> 3, WorkflowAborting -> 2) } } diff --git a/engine/src/test/scala/cromwell/webservice/routes/wes/ServiceInfoSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/wes/ServiceInfoSpec.scala new file mode 100644 index 00000000000..61cb66298c6 --- /dev/null +++ b/engine/src/test/scala/cromwell/webservice/routes/wes/ServiceInfoSpec.scala @@ -0,0 +1,40 @@ +package cromwell.webservice.routes.wes + +import akka.actor.Props +import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.util.Timeout +import cromwell.engine.language.{CromwellLanguages, LanguageConfiguration} +import cromwell.webservice.routes.CromwellApiService +import cromwell.webservice.routes.CromwellApiServiceSpec.MockWorkflowStoreActor +import org.scalatest.{AsyncFlatSpec, Matchers} + +import scala.concurrent.duration._ + +class ServiceInfoSpec extends AsyncFlatSpec with ScalatestRouteTest with Matchers { + val actorRefFactory = system + implicit val ec = system.dispatcher + implicit val timeout: Timeout = 5.seconds + + val workflowStoreActor = actorRefFactory.actorOf(Props(new MockWorkflowStoreActor())) + + CromwellLanguages.initLanguages(LanguageConfiguration.AllLanguageEntries) + + behavior of "ServiceInfo" + + val expectedResponse = WesStatusInfoResponse(Map("CWL" -> Set("v1.0"), "WDL" -> Set("draft-2", "1.0", "biscayne")), + List("1.0"), + Set("ftp", "s3", "demo-dos", "gcs", "oss", "http"), + Map("Cromwell" -> CromwellApiService.cromwellVersion), + List(), + Map(WesState.Running -> 5, WesState.Queued -> 3, WesState.Canceling -> 2), + "https://cromwell.readthedocs.io/en/stable/", + "https://cromwell.readthedocs.io/en/stable/", + Map()) + + + it should "should eventually build the right WesResponse" in { + ServiceInfo.toWesResponse(workflowStoreActor) map { r => + assert(r == expectedResponse) + } + } +} diff --git a/wes2cromwell/src/main/scala/wes2cromwell/WesRunRoutes.scala b/wes2cromwell/src/main/scala/wes2cromwell/WesRunRoutes.scala index 1e1a98117ef..ac60d227141 100644 --- a/wes2cromwell/src/main/scala/wes2cromwell/WesRunRoutes.scala +++ b/wes2cromwell/src/main/scala/wes2cromwell/WesRunRoutes.scala @@ -40,39 +40,42 @@ trait WesRunRoutes extends RequestSupport { lazy val runRoutes: Route = optionalHeaderValue(extractAuthorizationHeader) { authHeader => val cromwellRequestHeaders = authHeader.toList - - pathPrefix("ga4gh" / "wes" / "v1" / "runs") { + pathPrefix("ga4gh" / "wes" / "v1") { concat( - pathEnd { + pathPrefix("runs") { concat( - get { - parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) => - completeCromwellResponse(wes2CromwellInterface.listRuns(pageSize, pageToken, cromwellRequestHeaders)) - } + pathEnd { + concat( + get { + parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) => + completeCromwellResponse(wes2CromwellInterface.listRuns(pageSize, pageToken, cromwellRequestHeaders)) + } + }, + post { + extractStrictRequest { request => + extractSubmission() { submission => + completeCromwellResponse(wes2CromwellInterface.runWorkflow(submission, cromwellRequestHeaders)) + } + } + } + ) }, - post { - extractStrictRequest { request => - extractSubmission() { submission => - completeCromwellResponse(wes2CromwellInterface.runWorkflow(submission, cromwellRequestHeaders)) + path(Segment) { workflowId => + concat( + get { + completeCromwellResponse(wes2CromwellInterface.runLog(workflowId, cromwellRequestHeaders)) + }, + delete { + completeCromwellResponse(wes2CromwellInterface.cancelRun(workflowId, cromwellRequestHeaders)) } - } - } - ) - }, - path(Segment) { workflowId => - concat( - get { - completeCromwellResponse(wes2CromwellInterface.runLog(workflowId, cromwellRequestHeaders)) + ) }, - delete { - completeCromwellResponse(wes2CromwellInterface.cancelRun(workflowId, cromwellRequestHeaders)) + path(Segment / "status") { workflowId => + get { + completeCromwellResponse(wes2CromwellInterface.runStatus(workflowId, cromwellRequestHeaders)) + } } ) - }, - path(Segment / "status") { workflowId => - get { - completeCromwellResponse(wes2CromwellInterface.runStatus(workflowId, cromwellRequestHeaders)) - } } ) }