Skip to content

Commit

Permalink
WX-1217 Workflow completion callback (#7213)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Llanwarne <cjllanwarne@users.noreply.github.com>
  • Loading branch information
jgainerdewar and cjllanwarne committed Sep 22, 2023
1 parent 5456e40 commit 660f6e3
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 29 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ WDL `size` engine function now works for HTTP files.
Cromwell can now send logs to Azure Application Insights. To enable, set environment
variable `APPLICATIONINSIGHTS_INSTRUMENTATIONKEY` to your account's key. [See here for information.](https://learn.microsoft.com/en-us/azure/azure-monitor/app/sdk-connection-string)

### Workflow Completion Callback

Cromwell can be configured to send a POST request to a specified URL when a workflow completes. The request body
includes the workflow id, terminal state, and (if applicable) final outputs or error message.
See `WorkflowCallback` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information.

## 85 Release Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case object AzureCredentials {
new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)

def getAccessToken(identityClientId: Option[String]): ErrorOr[String] = {
def getAccessToken(identityClientId: Option[String] = None): ErrorOr[String] = {
val credentials = identityClientId.foldLeft(defaultCredentialBuilder) {
(builder, clientId) => builder.managedIdentityClientId(clientId)
}.build()
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,19 @@ ga4gh {
contact-info-url = "https://cromwell.readthedocs.io/en/stable/"
}
}

workflow-state-callback {
enabled: false
## The number of threads to allocate for performing callbacks
# num-threads: 5
# endpoint: "http://example.com/foo" # Can be overridden in workflow options
# auth.azure: true
## Users can override default retry behavior if desired
# request-backoff {
# min: "3 seconds"
# max: "5 minutes"
# multiplier: 1.1
# }
# max-retries = 10

}
1 change: 1 addition & 0 deletions core/src/main/scala/cromwell/core/WorkflowOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object WorkflowOptions {
case object WorkflowFailureMode extends WorkflowOption("workflow_failure_mode")
case object UseReferenceDisks extends WorkflowOption("use_reference_disks")
case object MemoryRetryMultiplier extends WorkflowOption("memory_retry_multiplier")
case object WorkflowCallbackUri extends WorkflowOption("workflow_callback_uri")

private lazy val WorkflowOptionsConf = ConfigFactory.load.getConfig("workflow-options")
private lazy val EncryptedFields: Seq[String] = WorkflowOptionsConf.getStringList("encrypted-fields").asScala.toList
Expand Down
58 changes: 58 additions & 0 deletions docs/cromwell_features/WorkflowCallback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
The workflow callback is a simple way to integrate Cromwell with an external system. When each workflow reaches a terminal
state, Cromwell will attempt to POST a message to a provided URL (see below for schema of this message).
Messages are sent for root workflows only, not subworkflows. Callback status information, including success or failure,
will be recorded in workflow metadata with keys containing `workflowCallback`.

### Configuration

This feature will only be used if enabled via config. All config items except `enabled` are optional.

```
workflow-state-callback {
enabled: true
num-threads: 5
endpoint: "http://example.com"
auth.azure: true
request-backoff {
min: "3 seconds",
max: "5 minutes",
multiplier: 1.1
}
max-retries = 10
}
```

* `enabled`: This boolean controls whether a callback will be attempted or not.
* `num-threads`: The number of threads Cromwell will allocate for performing callbacks.
* `endpoint`: This is the default URL to send the message to. If this is unset, and no URL is set in workflow options, no callback will be sent.
* `auth.azure`: If true, and if Cromwell is running in an Azure environment, Cromwell will include an auth header with bearer token generated from local Azure credentials.
* `request-backoff` and `max-retries`: Include these to override the default retry behavior (default behavior shown here).

### Workflow Options

You may choose to override the `endpoint` set in config by including this workflow option:
```
{
"workflow_callback_uri": "http://mywebsite.com"
}
```

### Callback schema

Below is an example of a callback request body.

```
{
"workflowId": "00001111-2222-3333-4444-555566667777",
"state": "Succeeded",
"outputs": {
"task1.out": 5,
"task2.out": "/some/file.txt"
}
}
```

* `workflowId`: The UUID of the workflow
* `state`: The terminal state of the workflow. The list of possible values is: `Succeeded`, `Failed`, `Aborted`
* `outputs`: The final outputs of the workflow, as would be returned from the `api/workflows/{version}/{id}/outputs` endpoint. Expected to be empty when the workflow is not successful..
* `failures`: A list of strings describing the workflow's failures. Expected to be empty if the workflow did not fail.
60 changes: 38 additions & 22 deletions engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.engine.workflow

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import com.typesafe.config.Config
Expand All @@ -23,6 +22,7 @@ import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion}
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand
import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse}
import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor}
import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor
Expand Down Expand Up @@ -149,7 +149,7 @@ object WorkflowActor {
initializationData: AllBackendInitializationData,
lastStateReached: StateCheckpoint,
effectiveStartableState: StartableState,
workflowFinalOutputs: Set[WomValue] = Set.empty,
workflowFinalOutputs: Option[CallOutputs] = None,
workflowAllOutputs: Set[WomValue] = Set.empty,
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty,
failedInitializationAttempts: Int = 0)
Expand All @@ -176,6 +176,7 @@ object WorkflowActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -199,6 +200,7 @@ object WorkflowActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -226,6 +228,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
ioActor: ActorRef,
override val serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -534,26 +537,27 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here.
}

// Copy/Delete workflow logs
if (WorkflowLogger.isEnabled) {
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor.
*/
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions
val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that even if the MaterializeWorkflowDescriptor fails, the workflow options can still be
* accessed outside of the EngineWorkflowDescriptor. Used for both copying workflow log and sending workflow callbacks.
*/
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions

val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}

val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}

// Copy/Delete workflow logs
if (WorkflowLogger.isEnabled) {
workflowOptions.get(FinalWorkflowLogDir).toOption match {
case Some(destinationDir) =>
pathBuilders
Expand All @@ -566,6 +570,18 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
}
}

// Attempt to perform workflow completion callback
workflowCallbackActor.foreach { wca =>
val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption
wca ! PerformCallbackCommand(
workflowId,
callbackUri,
terminalState.workflowState,
stateData.workflowFinalOutputs.getOrElse(CallOutputs.empty),
nextStateData.lastStateReached.failures.toList.flatMap(_.map(_.getMessage))
)
}

// We can't transition from within another transition function, but we can instruct ourselves to with a message:
self ! AwaitMetadataIntegrity

Expand Down Expand Up @@ -627,7 +643,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
rootWorkflowId = rootWorkflowId,
rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(),
rootAndSubworkflowIds = data.rootAndSubworkflowIds,
workflowFinalOutputs = data.workflowFinalOutputs,
workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty),
workflowAllOutputs = data.workflowAllOutputs,
pathBuilders = data.workflowDescriptor.get.pathBuilders,
serviceRegistryActor = serviceRegistryActor,
Expand Down Expand Up @@ -689,7 +705,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
finalizationActor ! StartFinalizationCommand
goto(FinalizingWorkflowState) using data.copy(
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures),
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet,
workflowFinalOutputs = Option(workflowFinalOutputs),
workflowAllOutputs = workflowAllOutputs,
rootAndSubworkflowIds = rootAndSubworkflowIds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object WorkflowManagerActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -75,6 +76,7 @@ object WorkflowManagerActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -124,6 +126,7 @@ case class WorkflowManagerActorParams(config: Config,
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -313,6 +316,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
ioActor = params.ioActor,
serviceRegistryActor = params.serviceRegistryActor,
workflowLogCopyRouter = params.workflowLogCopyRouter,
workflowCallbackActor = params.workflowCallbackActor,
jobStoreActor = params.jobStoreActor,
subWorkflowStoreActor = params.subWorkflowStoreActor,
callCacheReadActor = params.callCacheReadActor,
Expand Down

0 comments on commit 660f6e3

Please sign in to comment.