Skip to content

Commit

Permalink
Workflow Docker Hashing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Jeandet authored and mcovarr committed May 24, 2017
1 parent 81d5945 commit 4101132
Show file tree
Hide file tree
Showing 61 changed files with 1,170 additions and 398 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ before_install:
- openssl aes-256-cbc -K "$encrypted_5ebd3ff04788_key" -iv "$encrypted_5ebd3ff04788_iv" -in src/bin/travis/resources/jesConf.tar.enc -out jesConf.tar -d || true
env:
global:
- CENTAUR_BRANCH=develop
- CENTAUR_BRANCH=cjl_cromwell_2094_2
matrix:
# Setting this variable twice will cause the 'script' section to run twice with the respective env var invoked
- BUILD_TYPE=sbt
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,32 @@ database {
}
```

### Call Caching

Cromwell now supports call caching with floating Docker tags (e.g. `docker: "ubuntu:latest"`). Note it is still considered
a best practice to specify Docker images as hashes where possible, especially for production usages.

Within a single workflow Cromwell will attempt to resolve all floating tags to the same Docker hash, even if Cromwell is restarted
during the execution of a workflow. In call metadata the `docker` runtime attribute is now the same as the
value that actually appeared in the WDL:

```
"runtimeAttributes": {
"docker": "ubuntu:latest",
"failOnStderr": "false",
"continueOnReturnCode": "0"
}
```

Previous versions of Cromwell rewrote the `docker` value to the hash of the Docker image.

There is a new call-level metadata value `dockerImageUsed` which captures the hash of the Docker image actually used to
run the call:

```
"dockerImageUsed": "library/ubuntu@sha256:382452f82a8bbd34443b2c727650af46aced0f94a44463c62a9848133ecb1aa8"
```

### Docker

* The Docker section of the configuration has been slightly reworked
Expand Down
15 changes: 6 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ Cromwell also accepts two [workflow option](#workflow-options) related to call c
* If call caching is enabled, but one wishes to run a workflow but not add any of the calls into the call cache when they finish, the `write_to_cache` option can be set to `false`. This value defaults to `true`.
* If call caching is enabled, but you don't want to check the cache for any `call` invocations, set the option `read_from_cache` to `false`. This value also defaults to `true`

> **Note:** If call caching is disabled, the to workflow options `read_from_cache` and `write_to_cache` will be ignored and the options will be treated as though they were 'false'.
> **Note:** If call caching is disabled, the workflow options `read_from_cache` and `write_to_cache` will be ignored and the options will be treated as though they were 'false'.
## Docker Tags

Expand All @@ -1766,14 +1766,11 @@ When Cromwell finds a job ready to be run, it will first look at its docker runt
* The job does specify a docker runtime attribute:
* The docker image uses a hash: All call caching settings apply normally
* The docker image uses a floating tag:
Call caching `reading` will be disabled for this job. Specifically, Cromwell will *not* attempt to find an entry in the cache for this job.
Additionally, cromwell will attempt to look up the hash of the image. Upon success, it will replace the user's docker value with the hash.
This mechanism ensures that as long as Cromwell is able to lookup the hash, the job is guaranteed to have run on the container with that hash.
The docker value with the hash used for the job will be reported in the runtime attributes section of the metadata.
If Cromwell fails to lookup the hash (unsupported registry, wrong credentials, ...) it will run the job with the user provided floating tag.
If call caching writing is turned on, Cromwell will still write the job in the cache database, using:
* the hash if the lookup succeeded
* the floating tag otherwise.
* Cromwell will attempt to look up the hash of the image. Upon success it will pass both the floating tag and this hash value to the backend.
* All backends currently included with Cromwell will utilize this hash value to run the job.
* Within a single workflow all floating tags will resolve to the same hash value even if Cromwell is restarted when the workflow is running.
* If Cromwell fails to lookup the hash (unsupported registry, wrong credentials, ...) it will run the job with the user provided floating tag.
* The actual Docker image (floating tag or hash) used for the job will be reported in the `dockerImageUsed` attribute of the call metadata.

### Docker Lookup

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object BackendJobExecutionActor {
sealed trait BackendJobExecutionActorResponse extends BackendWorkflowLifecycleActorResponse

sealed trait BackendJobExecutionResponse extends BackendJobExecutionActorResponse { def jobKey: JobKey }
case class JobSucceededResponse(jobKey: BackendJobDescriptorKey, returnCode: Option[Int], jobOutputs: CallOutputs, jobDetritusFiles: Option[Map[String, Path]], executionEvents: Seq[ExecutionEvent]) extends BackendJobExecutionResponse
case class JobSucceededResponse(jobKey: BackendJobDescriptorKey, returnCode: Option[Int], jobOutputs: CallOutputs, jobDetritusFiles: Option[Map[String, Path]], executionEvents: Seq[ExecutionEvent], dockerImageUsed: Option[String]) extends BackendJobExecutionResponse
case class AbortedResponse(jobKey: BackendJobDescriptorKey) extends BackendJobExecutionResponse
sealed trait BackendJobFailedResponse extends BackendJobExecutionResponse { def throwable: Throwable; def returnCode: Option[Int] }
case class JobFailedNonRetryableResponse(jobKey: JobKey, throwable: Throwable, returnCode: Option[Int]) extends BackendJobFailedResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object AsyncBackendJobExecutionActor {

trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging =>

def dockerImageUsed: Option[String]

// The scala package object (scala/package.scala) contains a neat list of runtime errors that are always going to be fatal.
// We also consider any Error as fatal, and include the CromwellFatalExceptionMarker so we can mark our own fatal exceptions.
def isFatal(throwable: Throwable): Boolean = throwable match {
Expand Down Expand Up @@ -83,7 +85,7 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging =>
context.system.scheduler.scheduleOnce(pollBackOff.backoffMillis.millis, self, IssuePollRequest(handle))
()
case Finish(SuccessfulExecutionHandle(outputs, returnCode, jobDetritusFiles, executionEvents, _)) =>
completionPromise.success(JobSucceededResponse(jobDescriptor.key, Some(returnCode), outputs, Option(jobDetritusFiles), executionEvents))
completionPromise.success(JobSucceededResponse(jobDescriptor.key, Some(returnCode), outputs, Option(jobDetritusFiles), executionEvents, dockerImageUsed))
context.stop(self)
case Finish(FailedNonRetryableExecutionHandle(throwable, returnCode)) =>
completionPromise.success(JobFailedNonRetryableResponse(jobDescriptor.key, throwable, returnCode))
Expand Down
11 changes: 4 additions & 7 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.backend

import com.typesafe.config.Config
import cromwell.core.WorkflowOptions.WorkflowOption
import cromwell.core.callcaching.CallCachingEligibility
import cromwell.core.callcaching.MaybeCallCachingEligible
import cromwell.core.labels.Labels
import cromwell.core.{CallKey, WorkflowId, WorkflowOptions}
import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse
Expand All @@ -28,7 +28,7 @@ case class BackendJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor,
key: BackendJobDescriptorKey,
runtimeAttributes: Map[LocallyQualifiedName, WdlValue],
inputDeclarations: EvaluatedTaskInputs,
callCachingEligibility: CallCachingEligibility,
maybeCallCachingEligible: MaybeCallCachingEligible,
prefetchedKvStoreEntries: Map[String, KvResponse]) {
val fullyQualifiedInputs = inputDeclarations map { case (declaration, value) => declaration.fullyQualifiedName -> value }
val call = key.call
Expand Down Expand Up @@ -67,11 +67,8 @@ case class BackendWorkflowDescriptor(id: WorkflowId,
*/
case class BackendConfigurationDescriptor(backendConfig: Config, globalConfig: Config) {

lazy val backendRuntimeConfig = backendConfig.hasPath("default-runtime-attributes") match {
case true => Option(backendConfig.getConfig("default-runtime-attributes"))
case false => None
}

lazy val backendRuntimeConfig = if (backendConfig.hasPath("default-runtime-attributes"))
Option(backendConfig.getConfig("default-runtime-attributes")) else None
}

final case class AttemptedLookupResult(name: String, value: Try[WdlValue]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
def succeedAndStop(returnCode: Option[Int], copiedJobOutputs: CallOutputs, detritusMap: DetritusMap) = {
import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter
serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), startMetadataKeyValues)
context.parent ! JobSucceededResponse(jobDescriptor.key, returnCode, copiedJobOutputs, Option(detritusMap), Seq.empty)
context.parent ! JobSucceededResponse(jobDescriptor.key, returnCode, copiedJobOutputs, Option(detritusMap), Seq.empty, None)
context stop self
stay()
}
Expand Down
12 changes: 6 additions & 6 deletions backend/src/test/scala/cromwell/backend/BackendSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.backend

import cromwell.backend.BackendJobExecutionActor.{BackendJobExecutionResponse, JobFailedNonRetryableResponse, JobFailedRetryableResponse, JobSucceededResponse}
import cromwell.backend.io.TestWorkflows._
import cromwell.core.callcaching.CallCachingEligible
import cromwell.core.callcaching.NoDocker
import cromwell.core.labels.Labels
import cromwell.core.{WorkflowId, WorkflowOptions}
import lenthall.exception.AggregatedException
Expand Down Expand Up @@ -55,7 +55,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito {
val inputDeclarations = call.evaluateTaskInputs(inputs, NoFunctions).get // .get is ok because this is a test
val evaluatedAttributes = RuntimeAttributeDefinition.evaluateRuntimeAttributes(call.task.runtimeAttributes, NoFunctions, inputDeclarations).get // .get is OK here because this is a test
val runtimeAttributes = RuntimeAttributeDefinition.addDefaultsToAttributes(runtimeAttributeDefinitions, options)(evaluatedAttributes)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, CallCachingEligible, Map.empty)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, NoDocker, Map.empty)
}

def jobDescriptorFromSingleCallWorkflow(wdl: WdlSource,
Expand All @@ -67,7 +67,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito {
val inputDeclarations = fqnMapToDeclarationMap(workflowDescriptor.knownValues)
val evaluatedAttributes = RuntimeAttributeDefinition.evaluateRuntimeAttributes(call.task.runtimeAttributes, NoFunctions, inputDeclarations).get // .get is OK here because this is a test
val runtimeAttributes = RuntimeAttributeDefinition.addDefaultsToAttributes(runtimeAttributeDefinitions, options)(evaluatedAttributes)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, CallCachingEligible, Map.empty)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, NoDocker, Map.empty)
}

def jobDescriptorFromSingleCallWorkflow(wdl: WdlSource,
Expand All @@ -81,12 +81,12 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito {
val inputDeclarations = fqnMapToDeclarationMap(workflowDescriptor.knownValues)
val evaluatedAttributes = RuntimeAttributeDefinition.evaluateRuntimeAttributes(call.task.runtimeAttributes, NoFunctions, inputDeclarations).get // .get is OK here because this is a test
val runtimeAttributes = RuntimeAttributeDefinition.addDefaultsToAttributes(runtimeAttributeDefinitions, options)(evaluatedAttributes)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, CallCachingEligible, Map.empty)
BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, NoDocker, Map.empty)
}

def assertResponse(executionResponse: BackendJobExecutionResponse, expectedResponse: BackendJobExecutionResponse) = {
(executionResponse, expectedResponse) match {
case (JobSucceededResponse(_, _, responseOutputs, _, _), JobSucceededResponse(_, _, expectedOutputs, _, _)) =>
case (JobSucceededResponse(_, _, responseOutputs, _, _, _), JobSucceededResponse(_, _, expectedOutputs, _, _, _)) =>
responseOutputs.size shouldBe expectedOutputs.size
responseOutputs foreach {
case (fqn, out) =>
Expand Down Expand Up @@ -123,7 +123,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito {

def firstJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor,
inputs: Map[String, WdlValue] = Map.empty) = {
BackendJobDescriptor(workflowDescriptor, firstJobDescriptorKey(workflowDescriptor), Map.empty, fqnMapToDeclarationMap(inputs), CallCachingEligible, Map.empty)
BackendJobDescriptor(workflowDescriptor, firstJobDescriptorKey(workflowDescriptor), Map.empty, fqnMapToDeclarationMap(inputs), NoDocker, Map.empty)
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package cromwell.core.callcaching

sealed trait MaybeCallCachingEligible {
def dockerHash: Option[String]
}

sealed trait CallCachingEligible extends MaybeCallCachingEligible
sealed trait CallCachingIneligible extends MaybeCallCachingEligible

case object NoDocker extends CallCachingEligible {
override def dockerHash: Option[String] = None
}
case class DockerWithHash(dockerAttribute: String) extends CallCachingEligible {
override def dockerHash: Option[String] = Option(dockerAttribute)
}

case class FloatingDockerTagWithoutHash(dockerTag: String) extends CallCachingIneligible {
override def dockerHash: Option[String] = None
}
1 change: 1 addition & 0 deletions database/migration/src/main/resources/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<include file="changesets/failure_metadata_2.xml" relativeToChangelogFile="true" />
<include file="changesets/call_caching_aggregated_hashes.xml" relativeToChangelogFile="true" />
<include file="changesets/custom_label_entry.xml" relativeToChangelogFile="true" />
<include file="changesets/docker_hash_store.xml" relativeToChangelogFile="true" />
</databaseChangeLog>
<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet author="tjeandet" id="DOCKER_HASH_STORE_ENTRY">
<comment>
Temporary storage area for docker hashes from workflows that are still in progress.
</comment>
<createTable tableName="DOCKER_HASH_STORE_ENTRY">
<column autoIncrement="true" name="DOCKER_HASH_STORE_ENTRY_ID" type="INT">
<constraints primaryKey="true" primaryKeyName="PK_DOCKER_HASH_STORE_ENTRY"/>
</column>
<column name="WORKFLOW_EXECUTION_UUID" type="VARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="DOCKER_TAG" type="VARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="DOCKER_HASH" type="VARCHAR(255)">
<constraints nullable="false"/>
</column>
</createTable>
<modifySql dbms="mysql">
<append value=" ENGINE=INNODB"/>
</modifySql>
</changeSet>

<changeSet author="tjeandet" id="docker_hash_store_uuid_unique_constraint">
<addUniqueConstraint
constraintName="UC_DOCKER_HASH_STORE_ENTRY_WEU_DT"
tableName="DOCKER_HASH_STORE_ENTRY"
columnNames="WORKFLOW_EXECUTION_UUID, DOCKER_TAG" />
</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cromwell.database.slick

import cats.instances.future._
import cats.syntax.functor._
import cromwell.database.sql.DockerHashStoreSqlDatabase
import cromwell.database.sql.tables.DockerHashStoreEntry

import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

trait DockerHashStoreSlickDatabase extends DockerHashStoreSqlDatabase {
this: SlickDatabase =>

import dataAccess.driver.api._

/**
* Adds a docker hash entry to the store.
*/
override def addDockerHashStoreEntry(dockerHashStoreEntry: DockerHashStoreEntry)
(implicit ec: ExecutionContext): Future[Unit] = {
val action = dataAccess.dockerHashStoreEntries += dockerHashStoreEntry
runTransaction(action) void
}

/**
* Retrieves docker hash entries for a workflow.
*
*/
override def queryDockerHashStoreEntries(workflowExecutionUuid: String)
(implicit ec: ExecutionContext): Future[Seq[DockerHashStoreEntry]] = {
val action = dataAccess.dockerHashStoreEntriesForWorkflowExecutionUuid(workflowExecutionUuid).result
runTransaction(action)
}

/**
* Deletes docker hash entries related to a workflow, returning the number of rows affected.
*/
override def removeDockerHashStoreEntries(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Int] = {
val action = dataAccess.dockerHashStoreEntriesForWorkflowExecutionUuid(workflowExecutionUuid).delete
runTransaction(action)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class SlickDatabase(override val originalDatabaseConfig: Config) extends SqlData
with JobStoreSlickDatabase
with CallCachingSlickDatabase
with SummaryStatusSlickDatabase
with SubWorkflowStoreSlickDatabase {
with SubWorkflowStoreSlickDatabase
with DockerHashStoreSlickDatabase {

override val urlKey = SlickDatabase.urlKey(originalDatabaseConfig)
private val slickConfig = DatabaseConfig.forConfig[JdbcProfile]("", databaseConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class DataAccessComponent(val driver: JdbcProfile)
with WorkflowMetadataSummaryEntryComponent
with WorkflowStoreEntryComponent
with SubWorkflowStoreEntryComponent
with CustomLabelEntryComponent {
with CustomLabelEntryComponent
with DockerHashStoreEntryComponent {

import driver.api._

Expand All @@ -27,6 +28,7 @@ class DataAccessComponent(val driver: JdbcProfile)
callCachingHashEntries.schema ++
callCachingSimpletonEntries.schema ++
callCachingAggregationEntries.schema ++
dockerHashStoreEntries.schema ++
jobKeyValueEntries.schema ++
jobStoreEntries.schema ++
jobStoreSimpletonEntries.schema ++
Expand Down

0 comments on commit 4101132

Please sign in to comment.