Skip to content

Commit

Permalink
First pass of workflow type and version. This takes parameters on RES…
Browse files Browse the repository at this point in the history
…T endpoints, roundtrips to the workflow store,

and publishes to metadata.  Currently hardcodes type and version for the command line.
  • Loading branch information
mcovarr committed Jun 8, 2017
1 parent 4487a51 commit eca0710
Show file tree
Hide file tree
Showing 25 changed files with 408 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ before_install:
- openssl aes-256-cbc -K "$encrypted_5ebd3ff04788_key" -iv "$encrypted_5ebd3ff04788_iv" -in src/bin/travis/resources/jesConf.tar.enc -out jesConf.tar -d || true
env:
global:
- CENTAUR_BRANCH=develop
- CENTAUR_BRANCH=mlc_workflow_type
matrix:
# Setting this variable twice will cause the 'script' section to run twice with the respective env var invoked
- BUILD_TYPE=sbt
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ object WorkflowMetadataKeys {
val SubmissionSection_Inputs = "inputs"
val SubmissionSection_Options = "options"
val SubmissionSection_Imports = "imports"
val SubmissionSection_WorkflowType = "workflowType"
val SubmissionSection_WorkflowTypeVersion = "workflowTypeVersion"

val Labels = "labels"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,45 @@ sealed trait WorkflowSourceFilesCollection {
def inputsJson: WdlJson
def workflowOptionsJson: WorkflowOptionsJson
def labelsJson: WdlJson

def workflowType: Option[WorkflowType]
def workflowTypeVersion: Option[WorkflowTypeVersion]

def importsZipFileOption: Option[Array[Byte]] = this match {
case _: WorkflowSourceFilesWithoutImports => None
case WorkflowSourceFilesWithDependenciesZip(_, _, _, _, importsZip) => Option(importsZip) // i.e. Some(importsZip) if our wiring is correct
case w: WorkflowSourceFilesWithDependenciesZip => Option(w.importsZip) // i.e. Some(importsZip) if our wiring is correct
}

def copyOptions(workflowOptions: WorkflowOptionsJson) = this match {
case w: WorkflowSourceFilesWithoutImports => WorkflowSourceFilesWithoutImports(
wdlSource = w.wdlSource,
inputsJson = w.inputsJson,
workflowOptionsJson = workflowOptions,
labelsJson = w.labelsJson)

case w: WorkflowSourceFilesWithDependenciesZip => WorkflowSourceFilesWithDependenciesZip(
wdlSource = w.wdlSource,
inputsJson = w.inputsJson,
workflowOptionsJson = workflowOptions,
labelsJson = w.labelsJson,
importsZip = w.importsZip)
case w: WorkflowSourceFilesWithoutImports => w.copy(workflowOptionsJson = workflowOptions)
case w: WorkflowSourceFilesWithDependenciesZip => w.copy(workflowOptionsJson = workflowOptions)
}
}

object WorkflowSourceFilesCollection {
def apply(wdlSource: WdlSource,
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WdlJson,
workflowOptionsJson: WorkflowOptionsJson,
labelsJson: WdlJson,
importsFile: Option[Array[Byte]]): WorkflowSourceFilesCollection = importsFile match {
case Some(imports) => WorkflowSourceFilesWithDependenciesZip(wdlSource, inputsJson, workflowOptionsJson, labelsJson, imports)
case None => WorkflowSourceFilesWithoutImports(wdlSource, inputsJson, workflowOptionsJson, labelsJson)
case Some(imports) =>
WorkflowSourceFilesWithDependenciesZip(wdlSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson, imports)
case None =>
WorkflowSourceFilesWithoutImports(wdlSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson)
}
}

final case class WorkflowSourceFilesWithoutImports(wdlSource: WdlSource,
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WdlJson,
workflowOptionsJson: WorkflowOptionsJson,
labelsJson: WdlJson) extends WorkflowSourceFilesCollection

final case class WorkflowSourceFilesWithDependenciesZip(wdlSource: WdlSource,
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WdlJson,
workflowOptionsJson: WorkflowOptionsJson,
labelsJson: WdlJson,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/cromwell/core/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package object core {
type FullyQualifiedName = String
type WorkflowOutputs = Map[FullyQualifiedName, JobOutput]
type WorkflowOptionsJson = String
type WorkflowType = String
type WorkflowTypeVersion = String
type CallOutputs = Map[LocallyQualifiedName, JobOutput]
type HostInputs = Map[String, WdlValue]
type EvaluatedRuntimeAttributes = Map[String, WdlValue]
Expand Down
16 changes: 14 additions & 2 deletions core/src/test/scala/cromwell/util/SampleWdl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@ import scala.language.postfixOps

trait SampleWdl extends TestFileUtil {
def wdlSource(runtime: String = ""): WdlSource
def asWorkflowSources(runtime: String = "", workflowOptions: String = "{}", labels: String = "{}") =
WorkflowSourceFilesWithoutImports(wdlSource = wdlSource(runtime), inputsJson = wdlJson, workflowOptionsJson = workflowOptions, labelsJson = labels)
def asWorkflowSources(runtime: String = "",
workflowOptions: String = "{}",
labels: String = "{}",
workflowType: Option[String] = Option("WDL"),
workflowTypeVersion: Option[String] = None) = {
WorkflowSourceFilesWithoutImports(
wdlSource = wdlSource(runtime),
inputsJson = wdlJson,
workflowOptionsJson = workflowOptions,
labelsJson = labels,
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion)
}

val rawInputs: WorkflowRawInputs

def name = getClass.getSimpleName.stripSuffix("$")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto

val sourceBodyParts = Map(
"wdlSource" -> Option(workflowSubmission.wdl),
"workflowType" -> workflowSubmission.workflowType,
"workflowTypeVersion" -> workflowSubmission.workflowTypeVersion,
"workflowInputs" -> workflowSubmission.inputsJson,
"workflowOptions" -> insertSecrets(workflowSubmission.options, workflowSubmission.refreshToken),
"customLabels" -> Option(workflowSubmission.customLabels.toJson.toString)
Expand All @@ -64,7 +66,15 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto
val requestEntity = requestEntityForSubmit(workflow)

// Make a set of submissions that represent the batch (so we can zip with the results later):
val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission(workflow.wdl, Option(inputs), workflow.options, workflow.customLabels, workflow.zippedImports, workflow.refreshToken))
val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission(
wdl = workflow.wdl,
workflowType = workflow.workflowType,
workflowTypeVersion = workflow.workflowTypeVersion,
inputsJson = Option(inputs),
options = workflow.options,
customLabels = workflow.customLabels,
zippedImports = workflow.zippedImports,
refreshToken = workflow.refreshToken))

makeRequest[List[CromwellStatus]](HttpRequest(HttpMethods.POST, batchSubmitEndpoint, List.empty[HttpHeader], requestEntity)) map { statuses =>
val zipped = submissionSet.zip(statuses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import better.files.File

sealed trait WorkflowSubmission {
val wdl: String
val workflowType: Option[String]
val workflowTypeVersion: Option[String]
val inputsJson: Option[String]
val options: Option[String]
val customLabels: Option[List[Label]]
Expand All @@ -12,18 +14,22 @@ sealed trait WorkflowSubmission {
}

final case class WorkflowSingleSubmission(wdl: String,
workflowType: Option[String],
workflowTypeVersion: Option[String],
inputsJson: Option[String],
options: Option[String],
customLabels: Option[List[Label]],
zippedImports: Option[File],
refreshToken: Option[String]) extends WorkflowSubmission

final case class WorkflowBatchSubmission(wdl: String,
inputsBatch: List[String],
options: Option[String],
customLabels: Option[List[Label]],
zippedImports: Option[File],
refreshToken: Option[String]) extends WorkflowSubmission {
workflowType: Option[String],
workflowTypeVersion: Option[String],
inputsBatch: List[String],
options: Option[String],
customLabels: Option[List[Label]],
zippedImports: Option[File],
refreshToken: Option[String]) extends WorkflowSubmission {

override val inputsJson: Option[String] = Option(inputsBatch.mkString(start="[", sep=",", end="]"))
override val inputsJson: Option[String] = Option(inputsBatch.mkString(start = "[", sep = ",", end = "]"))
}
1 change: 1 addition & 0 deletions database/migration/src/main/resources/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<include file="changesets/call_caching_aggregated_hashes.xml" relativeToChangelogFile="true" />
<include file="changesets/custom_label_entry.xml" relativeToChangelogFile="true" />
<include file="changesets/docker_hash_store.xml" relativeToChangelogFile="true" />
<include file="changesets/workflow_store_type_and_version.xml" relativeToChangelogFile="true" />
</databaseChangeLog>
<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet author="mcovarr" id="workflow-store-type-and-version">
<addColumn tableName="WORKFLOW_STORE_ENTRY">
<column name="WORKFLOW_TYPE" type="VARCHAR(30)">
<constraints nullable="true"/>
</column>
<column name="WORKFLOW_TYPE_VERSION" type="VARCHAR(255)">
<constraints nullable="true"/>
</column>
</addColumn>
</changeSet>

<changeSet author="mcovarr" id="not-null-workflow-store-type">
<addNotNullConstraint columnDataType="VARCHAR(30)"
columnName="WORKFLOW_TYPE"
defaultNullValue="WDL"
tableName="WORKFLOW_STORE_ENTRY"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ trait WorkflowStoreEntryComponent {

def workflowExecutionUuid = column[String]("WORKFLOW_EXECUTION_UUID")

def workflowType = column[String]("WORKFLOW_TYPE", O.Length(30))

def workflowTypeVersion = column[Option[String]]("WORKFLOW_TYPE_VERSION")

def workflowDefinition = column[Option[Clob]]("WORKFLOW_DEFINITION")

def workflowInputs = column[Option[Clob]]("WORKFLOW_INPUTS")
Expand All @@ -29,7 +33,7 @@ trait WorkflowStoreEntryComponent {

def importsZip = column[Option[Blob]]("IMPORTS_ZIP")

override def * = (workflowExecutionUuid, workflowDefinition, workflowInputs, workflowOptions, workflowState,
override def * = (workflowExecutionUuid, workflowDefinition, workflowType, workflowTypeVersion, workflowInputs, workflowOptions, workflowState,
submissionTime, importsZip, customLabels, workflowStoreEntryId.?) <> (WorkflowStoreEntry.tupled, WorkflowStoreEntry.unapply)

def ucWorkflowStoreEntryWeu = index("UC_WORKFLOW_STORE_ENTRY_WEU", workflowExecutionUuid, unique = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ case class WorkflowStoreEntry
(
workflowExecutionUuid: String,
workflowDefinition: Option[Clob],
workflowType: String,
workflowTypeVersion: Option[String],
workflowInputs: Option[Clob],
workflowOptions: Option[Clob],
workflowState: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
pathBuilders: List[PathBuilder]): ErrorOr[EngineWorkflowDescriptor] = {
val namespaceValidation = validateNamespace(sourceFiles)
val labelsValidation = validateLabels(sourceFiles.labelsJson)

(namespaceValidation |@| labelsValidation).tupled flatMap { case (namespace, labels) =>
pushWfNameMetadataService(namespace.workflow.unqualifiedName)
publishLabelsToMetadata(id, namespace.workflow.unqualifiedName, labels)
Expand Down Expand Up @@ -249,10 +249,9 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
val backendAssignmentsValidation = validateBackendAssignments(namespace.taskCalls, workflowOptions, defaultBackendName)
val callCachingModeValidation = validateCallCachingMode(workflowOptions, conf)

(rawInputsValidation |@| failureModeValidation |@| backendAssignmentsValidation |@| callCachingModeValidation) map {
(_, _, _, _)
} flatMap { case (rawInputs, failureMode, backendAssignments, callCachingMode) =>
buildWorkflowDescriptor(id, namespace, rawInputs, backendAssignments, workflowOptions, labels, failureMode, pathBuilders, callCachingMode)
(rawInputsValidation |@| failureModeValidation |@| backendAssignmentsValidation |@| callCachingModeValidation).tupled flatMap {
case (rawInputs, failureMode, backendAssignments, callCachingMode) =>
buildWorkflowDescriptor(id, namespace, rawInputs, backendAssignments, workflowOptions, labels, failureMode, pathBuilders, callCachingMode)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf

private def fromWorkflowStoreEntry(workflowStoreEntry: WorkflowStoreEntry): WorkflowToStart = {
val sources = WorkflowSourceFilesCollection(
workflowStoreEntry.workflowDefinition.toRawString,
workflowStoreEntry.workflowInputs.toRawString,
workflowStoreEntry.workflowOptions.toRawString,
workflowStoreEntry.customLabels.toRawString,
workflowStoreEntry.importsZip.toBytesOption
wdlSource = workflowStoreEntry.workflowDefinition.toRawString,
workflowType = Option(workflowStoreEntry.workflowType),
workflowTypeVersion = workflowStoreEntry.workflowTypeVersion,
inputsJson = workflowStoreEntry.workflowInputs.toRawString,
workflowOptionsJson = workflowStoreEntry.workflowOptions.toRawString,
labelsJson = workflowStoreEntry.customLabels.toRawString,
importsFile = workflowStoreEntry.importsZip.toBytesOption
)
WorkflowToStart(
WorkflowId.fromString(workflowStoreEntry.workflowExecutionUuid),
Expand All @@ -74,6 +76,9 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf
WorkflowStoreEntry(
workflowExecutionUuid = WorkflowId.randomId().toString,
workflowDefinition = workflowSourceFiles.wdlSource.toClobOption,
// TODO ensure safety
workflowType = workflowSourceFiles.workflowType.get,
workflowTypeVersion = workflowSourceFiles.workflowTypeVersion,
workflowInputs = workflowSourceFiles.inputsJson.toClobOption,
workflowOptions = workflowSourceFiles.workflowOptionsJson.toClobOption,
customLabels = workflowSourceFiles.labelsJson.toClob(default = nonEmptyJsonString),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ final case class WorkflowStoreSubmitActor(store: WorkflowStore, serviceRegistryA
*/
private def registerSubmissionWithMetadataService(id: WorkflowId, originalSourceFiles: WorkflowSourceFilesCollection): Unit = {
processSource(_.clearEncryptedValues)(originalSourceFiles) foreach { sourceFiles =>
val submissionEvents = List(
val submissionEvents: List[MetadataEvent] = List(
MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionTime), MetadataValue(OffsetDateTime.now.toString)),
MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Inputs)),
MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Outputs)),
Expand All @@ -87,7 +87,13 @@ final case class WorkflowStoreSubmitActor(store: WorkflowStore, serviceRegistryA
MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionSection, WorkflowMetadataKeys.SubmissionSection_Options), MetadataValue(sourceFiles.workflowOptionsJson))
)

serviceRegistryActor ! PutMetadataAction(submissionEvents)
// Don't publish metadata for either workflow type or workflow type version if not defined.
val workflowTypeAndVersionEvents: List[Option[MetadataEvent]] = List(
sourceFiles.workflowType map { wt => MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionSection, WorkflowMetadataKeys.SubmissionSection_WorkflowType), MetadataValue(wt)) },
sourceFiles.workflowTypeVersion map { wtv => MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionSection, WorkflowMetadataKeys.SubmissionSection_WorkflowTypeVersion), MetadataValue(wtv)) }
)

serviceRegistryActor ! PutMetadataAction(submissionEvents ++ workflowTypeAndVersionEvents.flatten)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,15 @@ class CromwellApiHandler(requestHandlerActor: ActorRef) extends Actor with Workf
context.parent ! RequestComplete((StatusCodes.Created, WorkflowSubmitResponse(id.toString, WorkflowSubmitted.toString)))

case ApiHandlerWorkflowSubmitBatch(sources) => requestHandlerActor !
WorkflowStoreActor.BatchSubmitWorkflows(sources.map(x => WorkflowSourceFilesCollection(x.wdlSource, x.inputsJson, x.workflowOptionsJson, x.labelsJson, x.importsZipFileOption)))
WorkflowStoreActor.BatchSubmitWorkflows(sources.map(w =>
WorkflowSourceFilesCollection(
wdlSource = w.wdlSource,
workflowType = w.workflowType,
workflowTypeVersion = w.workflowTypeVersion,
inputsJson = w.inputsJson,
workflowOptionsJson = w.workflowOptionsJson,
labelsJson = w.labelsJson,
importsFile = w.importsZipFileOption)))


case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(ids) =>
Expand Down

0 comments on commit eca0710

Please sign in to comment.