Skip to content

Commit

Permalink
Batch writes of call caching related data.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed May 3, 2017
1 parent 98e0e6e commit bc5389d
Show file tree
Hide file tree
Showing 26 changed files with 189 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.core.actor

import akka.actor.ActorRef
import cats.data.NonEmptyVector
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -48,4 +49,6 @@ object BatchingDbWriter {
case object DbWriteComplete extends BatchingDbWriterMessage
case object FlushBatchToDb extends BatchingDbWriterMessage
case object ScheduledFlushToDb extends BatchingDbWriterMessage

case class CommandAndReplyTo[C](command: C, replyTo: ActorRef)
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cromwell.database.slick

import cats.data.NonEmptyList
import cats.instances.list._
import cats.instances.tuple._
import cats.syntax.foldable._
import cromwell.database.sql._
import cromwell.database.sql.joins.CallCachingJoin
import cromwell.database.sql.tables.CallCachingEntry
import cromwell.database.sql.tables._

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -12,16 +15,39 @@ trait CallCachingSlickDatabase extends CallCachingSqlDatabase {

import dataAccess.driver.api._

override def addCallCaching(callCachingJoin: CallCachingJoin)
override def addCallCaching(joins: Seq[CallCachingJoin], batchSize: Int)
(implicit ec: ExecutionContext): Future[Unit] = {

// Construct parallel lists of parent entries, hashes, simpletons, and detritus from `CallCachingJoin`s.
val (entries, hashes, simpletons, detritus) = joins.toList.foldMap { j =>
(List(j.callCachingEntry), List(j.callCachingHashEntries), List(j.callCachingSimpletonEntries), List(j.callCachingDetritusEntries)) }

// Use the supplied `assigner` function to assign parent entry row IDs into the parallel `Seq` of children entities.
def assignEntryIdsToChildren[C](ids: Seq[Int], groupingsOfChildren: Seq[Seq[C]], assigner: (Int, C) => C): Seq[C] = {
(ids zip groupingsOfChildren) flatMap { case (id, children) => children.map(assigner(id, _)) }
}

// Batch insert entities into the appropriate `Table`.
def batchInsert[E, T <: Table[E]](entries: Seq[E], tableQuery: TableQuery[T]): DBIO[_] = {
DBIO.sequence(entries.grouped(batchSize).map { tableQuery ++= _ })
}

// Functions to assign call cache entry IDs into child hash entry, simpleton, and detritus rows.
def hashAssigner(id: Int, hash: CallCachingHashEntry) = hash.copy(callCachingEntryId = Option(id))
def simpletonAssigner(id: Int, simpleton: CallCachingSimpletonEntry) = simpleton.copy(callCachingEntryId = Option(id))
def detritusAssigner(id: Int, detritus: CallCachingDetritusEntry) = detritus.copy(callCachingEntryId = Option(id))

val action = for {
callCachingEntryId <- dataAccess.callCachingEntryIdsAutoInc += callCachingJoin.callCachingEntry
_ <- dataAccess.callCachingHashEntryIdsAutoInc ++= callCachingJoin.callCachingHashEntries.
map(_.copy(callCachingEntryId = Option(callCachingEntryId)))
_ <- dataAccess.callCachingSimpletonEntryIdsAutoInc ++= callCachingJoin.callCachingSimpletonEntries.
map(_.copy(callCachingEntryId = Option(callCachingEntryId)))
_ <- dataAccess.callCachingDetritusEntryIdsAutoInc ++= callCachingJoin.callCachingDetritusEntries.
map(_.copy(callCachingEntryId = Option(callCachingEntryId)))
entryIds <- dataAccess.callCachingEntryIdsAutoInc ++= entries

hashEntries = assignEntryIdsToChildren(entryIds, hashes, hashAssigner)
_ <- batchInsert(hashEntries, dataAccess.callCachingHashEntries)

simpletonEntries = assignEntryIdsToChildren(entryIds, simpletons, simpletonAssigner)
_ <- batchInsert(simpletonEntries, dataAccess.callCachingSimpletonEntries)

detritusEntries = assignEntryIdsToChildren(entryIds, detritus, detritusAssigner)
_ <- batchInsert(detritusEntries, dataAccess.callCachingDetritusEntries)
} yield ()
runTransaction(action)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait CallCachingDetritusEntryComponent {
index("UC_CALL_CACHING_DETRITUS_ENTRY_CCEI_DK", (callCachingEntryId, detritusKey), unique = true)
}

protected val callCachingDetritusEntries = TableQuery[CallCachingDetritusEntries]
val callCachingDetritusEntries = TableQuery[CallCachingDetritusEntries]

val callCachingDetritusEntryIdsAutoInc =
callCachingDetritusEntries returning callCachingDetritusEntries.map(_.callCachingDetritusEntryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait CallCachingHashEntryComponent {
index("UC_CALL_CACHING_HASH_ENTRY_CCEI_HK", (callCachingEntryId, hashKey), unique = true)
}

protected val callCachingHashEntries = TableQuery[CallCachingHashEntries]
val callCachingHashEntries = TableQuery[CallCachingHashEntries]

val callCachingHashEntryIdsAutoInc = callCachingHashEntries returning
callCachingHashEntries.map(_.callCachingHashEntryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait CallCachingSimpletonEntryComponent {
index("UC_CALL_CACHING_SIMPLETON_ENTRY_CCEI_SK", (callCachingEntryId, simpletonKey), unique = true)
}

protected val callCachingSimpletonEntries = TableQuery[CallCachingSimpletonEntries]
val callCachingSimpletonEntries = TableQuery[CallCachingSimpletonEntries]

val callCachingSimpletonEntryIdsAutoInc = callCachingSimpletonEntries returning
callCachingSimpletonEntries.map(_.callCachingSimpletonEntryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cromwell.database.sql.tables.CallCachingEntry
import scala.concurrent.{ExecutionContext, Future}

trait CallCachingSqlDatabase {
def addCallCaching(callCachingJoin: CallCachingJoin)(implicit ec: ExecutionContext): Future[Unit]
def addCallCaching(joins: Seq[CallCachingJoin], batchSize: Int)(implicit ec: ExecutionContext): Future[Unit]

def queryCallCachingEntryIds(hashKeyHashValues: NonEmptyList[(String, String)])
(implicit ec: ExecutionContext): Future[Seq[Int]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ object WorkflowActor {
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
serverMode: Boolean): Props = {
Props(new WorkflowActor(workflowId, startMode, wdlSource, conf, ioActor, serviceRegistryActor, workflowLogCopyRouter,
jobStoreActor, subWorkflowStoreActor, callCacheReadActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, serverMode)).withDispatcher(EngineDispatcher)
jobStoreActor, subWorkflowStoreActor, callCacheReadActor, callCacheWriteActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, serverMode)).withDispatcher(EngineDispatcher)
}
}

Expand All @@ -164,6 +165,7 @@ class WorkflowActor(val workflowId: WorkflowId,
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
Expand Down Expand Up @@ -216,6 +218,7 @@ class WorkflowActor(val workflowId: WorkflowId,
jobStoreActor,
subWorkflowStoreActor,
callCacheReadActor,
callCacheWriteActor,
dockerHashActor,
jobTokenDispenserActor,
backendSingletonCollection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ object WorkflowManagerActor {
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
abortJobsOnTerminate: Boolean,
serverMode: Boolean): Props = {
val params = WorkflowManagerActorParams(ConfigFactory.load, workflowStore, ioActor, serviceRegistryActor,
workflowLogCopyRouter, jobStoreActor, subWorkflowStoreActor, callCacheReadActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection,
workflowLogCopyRouter, jobStoreActor, subWorkflowStoreActor, callCacheReadActor, callCacheWriteActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection,
abortJobsOnTerminate, serverMode)
Props(new WorkflowManagerActor(params)).withDispatcher(EngineDispatcher)
}
Expand Down Expand Up @@ -95,6 +96,7 @@ case class WorkflowManagerActorParams(config: Config,
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
Expand Down Expand Up @@ -290,7 +292,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
}

val wfProps = WorkflowActor.props(workflowId, startMode, workflow.sources, config, params.ioActor, params.serviceRegistryActor,
params.workflowLogCopyRouter, params.jobStoreActor, params.subWorkflowStoreActor, params.callCacheReadActor,
params.workflowLogCopyRouter, params.jobStoreActor, params.subWorkflowStoreActor, params.callCacheReadActor, params.callCacheWriteActor,
params.dockerHashActor, params.jobTokenDispenserActor,
params.backendSingletonCollection, params.serverMode)
val wfActor = context.actorOf(wfProps, name = s"WorkflowActor-$workflowId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import wdl4s.TaskOutput

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
import CallCacheWriteActor._

class EngineJobExecutionActor(replyTo: ActorRef,
jobDescriptorKey: BackendJobDescriptorKey,
Expand All @@ -38,6 +39,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
ioActor: ActorRef,
jobStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonActor: Option[ActorRef],
Expand Down Expand Up @@ -447,12 +449,6 @@ class EngineJobExecutionActor(replyTo: ActorRef,
s"$workflowIdForLogging-BackendCacheHitCopyingActor-$jobTag-${cacheResultId.id}"
}

protected def createSaveCacheResultsActor(hashes: CallCacheHashes, success: JobSucceededResponse): Unit = {
val callCache = new CallCache(SingletonServicesStore.databaseInterface)
context.actorOf(CallCacheWriteActor.props(callCache, workflowIdForLogging, hashes, success), s"CallCacheWriteActor-$tag")
()
}

private def invalidateCacheHitAndTransition(cacheId: CallCachingEntryId, data: ResponsePendingData, reason: Throwable) = {
val invalidationRequired = effectiveCallCachingMode match {
case CallCachingOff => throw new RuntimeException("Should not be calling invalidateCacheHit if call caching is off!") // Very unexpected. Fail out of this bad-state EJEA.
Expand All @@ -474,7 +470,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
}

private def saveCacheResults(hashes: CallCacheHashes, data: SucceededResponseData) = {
createSaveCacheResultsActor(hashes, data.successResponse)
callCacheWriteActor ! SaveCallCacheHashes(workflowIdForLogging, hashes, data)
val updatedData = data.copy(hashes = Option(Success(hashes)))
goto(UpdatingCallCache) using updatedData
}
Expand Down Expand Up @@ -546,6 +542,7 @@ object EngineJobExecutionActor {
ioActor: ActorRef,
jobStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonActor: Option[ActorRef],
Expand All @@ -562,6 +559,7 @@ object EngineJobExecutionActor {
ioActor = ioActor,
jobStoreActor = jobStoreActor,
callCacheReadActor = callCacheReadActor,
callCacheWriteActor = callCacheWriteActor,
dockerHashActor = dockerHashActor,
jobTokenDispenserActor = jobTokenDispenserActor,
backendSingletonActor = backendSingletonActor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SubWorkflowExecutionActor(key: SubWorkflowKey,
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
Expand Down Expand Up @@ -151,6 +152,7 @@ class SubWorkflowExecutionActor(key: SubWorkflowKey,
jobStoreActor,
subWorkflowStoreActor,
callCacheReadActor,
callCacheWriteActor,
dockerHashActor,
jobTokenDispenserActor,
backendSingletonCollection,
Expand Down Expand Up @@ -255,6 +257,7 @@ object SubWorkflowExecutionActor {
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
Expand All @@ -269,6 +272,7 @@ object SubWorkflowExecutionActor {
jobStoreActor,
subWorkflowStoreActor,
callCacheReadActor,
callCacheWriteActor,
dockerHashActor,
jobTokenDispenserActor,
backendSingletonCollection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ case class WorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor,
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
Expand Down Expand Up @@ -473,7 +474,7 @@ case class WorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor,
val backendSingleton = backendSingletonCollection.backendSingletonActors(backendName)
val ejeaProps = EngineJobExecutionActor.props(
self, jobKey, data, factory, initializationData.get(backendName), restarting, serviceRegistryActor, ioActor,
jobStoreActor, callCacheReadActor, dockerHashActor, jobTokenDispenserActor, backendSingleton, backendName, workflowDescriptor.callCachingMode)
jobStoreActor, callCacheReadActor, callCacheWriteActor, dockerHashActor, jobTokenDispenserActor, backendSingleton, backendName, workflowDescriptor.callCachingMode)
val ejeaRef = context.actorOf(ejeaProps, ejeaName)
context watch ejeaRef
pushNewCallMetadata(jobKey, Option(backendName))
Expand All @@ -490,7 +491,7 @@ case class WorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor,
private def processRunnableSubWorkflow(key: SubWorkflowKey, data: WorkflowExecutionActorData): Try[WorkflowExecutionDiff] = {
val sweaRef = context.actorOf(
SubWorkflowExecutionActor.props(key, data, backendFactories, ioActor, serviceRegistryActor, jobStoreActor, subWorkflowStoreActor,
callCacheReadActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, initializationData, restarting),
callCacheReadActor, callCacheWriteActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, initializationData, restarting),
s"SubWorkflowExecutionActor-${key.tag}"
)

Expand Down Expand Up @@ -769,13 +770,14 @@ object WorkflowExecutionActor {
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
callCacheWriteActor: ActorRef,
dockerHashActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
initializationData: AllBackendInitializationData,
restarting: Boolean): Props = {
Props(WorkflowExecutionActor(workflowDescriptor, ioActor, serviceRegistryActor, jobStoreActor, subWorkflowStoreActor,
callCacheReadActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, initializationData, restarting)).withDispatcher(EngineDispatcher)
callCacheReadActor, callCacheWriteActor, dockerHashActor, jobTokenDispenserActor, backendSingletonCollection, initializationData, restarting)).withDispatcher(EngineDispatcher)
}

implicit class EnhancedWorkflowOutputs(val outputs: Map[LocallyQualifiedName, WdlValue]) extends AnyVal {
Expand Down

0 comments on commit bc5389d

Please sign in to comment.