Skip to content

Commit

Permalink
Allow for JES QPS to be tunable
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffjentry committed Nov 30, 2016
1 parent 2d6acee commit 593710e
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 23

* The `meta` and `parameter_meta` blocks are now valid within `workflow` blocks, not just `task`
* The JES backend configuration now has an option `genomicsApiQueriesPer100Seconds` to help tune the rate of batch polling against the JES servers
* Added an option `call-caching.invalidate-bad-cache-results` (default: `true`). If true, Cromwell will invalidate cached results which have failed to copy as part of a cache hit.
* Timing diagrams and metadata now receive more fine grained workflow states between submission and Running.
* Support for the Pair WDL type (e.g. `Pair[Int, File] floo = (3, "gs://blar/blaz/qlux.txt")`)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ backend {
config {
project = "my-project"
root = "gs://my-bucket"
genomicsApiQueriesPer100Seconds = 1000
.
.
.
Expand All @@ -1121,6 +1122,8 @@ backend {
}
```

If your project has API quotas other than the defaults set the `genomicsApiQueriesPer100Seconds` value to be the lesser of the `Queries per 100 seconds per user` and `Queries per 100 seconds` quotas. This value will be used to help tune Cromwell's rate of interaction with JES.

### Configuring Authentication

The `google` stanza in the Cromwell configuration file defines how to authenticate to Google. There are four different
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ backend {
# # Base bucket for workflow executions
# root = "gs://my-cromwell-workflows-bucket"
#
# # Set this to the lower of the two values "Queries per 100 seconds" and "Queries per 100 seconds per user" for
# # your project.
# #
# # Used to help determine maximum throughput to the google genomics API. Setting this value too low will
# # cause a drop in performance. Setting this value too high will cause QPS based locks from Google.
# # 1000 is the default "Queries per 100 seconds per user", 50000 is the default "Queries per 100 seconds"
# # See https://cloud.google.com/genomics/quotas for more information
# genomicsApiQueriesPer100Seconds = 1000
#
# # Polling for completion backs-off gradually for slower-running jobs.
# # This is the maximum polling interval (in seconds):
# maximum-polling-interval = 600
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ case class JesAttributes(project: String,
auths: JesAuths,
executionBucket: String,
endpointUrl: URL,
maxPollingInterval: Int)
maxPollingInterval: Int,
qps: Int)

object JesAttributes {

Expand Down Expand Up @@ -48,11 +49,14 @@ object JesAttributes {
val genomicsAuthName: ErrorOr[String] = backendConfig.validateString("genomics.auth")
val gcsFilesystemAuthName: ErrorOr[String] = backendConfig.validateString("filesystems.gcs.auth")

// 1000 per 100s is the default API limit
val qps = backendConfig.as[Option[Int]]("genomicsApiQueriesPer100Seconds").getOrElse(1000) / 100

(project |@| executionBucket |@| endpointUrl |@| genomicsAuthName |@| gcsFilesystemAuthName) map {
(_, _, _, _, _)
} flatMap { case (p, b, u, genomicsName, gcsName) =>
(googleConfig.auth(genomicsName) |@| googleConfig.auth(gcsName)) map { case (genomicsAuth, gcsAuth) =>
JesAttributes(p, computeServiceAccount, JesAuths(genomicsAuth, gcsAuth), b, u, maxPollingInterval)
JesAttributes(p, computeServiceAccount, JesAuths(genomicsAuth, gcsAuth), b, u, maxPollingInterval, qps)
}
} match {
case Valid(r) => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class JesBackendLifecycleActorFactory(name: String, configurationDescriptor
initializationData.toJes.get.workflowPaths.workflowRoot
}

override def backendSingletonActorProps = Option(JesBackendSingletonActor.props())
override def backendSingletonActorProps = Option(JesBackendSingletonActor.props(jesConfiguration))

override lazy val fileHashingFunction: Option[FileHashingFunction] = Option(FileHashingFunction(JesBackendFileHashing.getCrc32c))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package cromwell.backend.impl.jes

import akka.actor.{Actor, ActorLogging, Props}
import cromwell.backend.impl.jes.statuspolling.{JesApiQueryManager}
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager.DoPoll

class JesBackendSingletonActor extends Actor with ActorLogging {
final case class JesBackendSingletonActor(jesConfiguration: JesConfiguration) extends Actor with ActorLogging {

val pollingActor = context.actorOf(JesApiQueryManager.props)
val pollingActor = context.actorOf(JesApiQueryManager.props(jesConfiguration))

override def receive = {
case poll: DoPoll =>
Expand All @@ -16,5 +16,5 @@ class JesBackendSingletonActor extends Actor with ActorLogging {
}

object JesBackendSingletonActor {
def props(): Props = Props(new JesBackendSingletonActor())
def props(jesConfiguration: JesConfiguration): Props = Props(JesBackendSingletonActor(jesConfiguration))
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ class JesConfiguration(val configurationDescriptor: BackendConfigurationDescript
val genomicsFactory = GenomicsFactory(googleConfig.applicationName, jesAuths.genomics, jesAttributes.endpointUrl)
val dockerCredentials = DockerConfiguration.build(configurationDescriptor.backendConfig).dockerCredentials map JesDockerCredentials.apply
val needAuthFileUpload = jesAuths.gcs.requiresAuthFile || dockerCredentials.isDefined
val qps = jesAttributes.qps
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.backend.impl.jes.statuspolling

import akka.actor.{Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated}
import cats.data.NonEmptyList
import cromwell.backend.impl.jes.Run
import cromwell.backend.impl.jes.{JesConfiguration, Run}
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager._

import scala.collection.immutable.Queue
Expand All @@ -11,15 +11,15 @@ import scala.collection.immutable.Queue
* Currently, just holds a set of JES status poll requests until a PollingActor pulls the work.
* TODO: Could eventually move all of the JES queries into a single work-pulling model.
*/
class JesApiQueryManager extends Actor with ActorLogging {
class JesApiQueryManager(val jesConfiguration: JesConfiguration) extends Actor with ActorLogging {

// workQueue is protected for the unit tests, not intended to be generally overridden
protected[statuspolling] var workQueue: Queue[JesStatusPollQuery] = Queue.empty
private var workInProgress: Map[ActorRef, JesPollingWorkBatch] = Map.empty

// If the statusPoller dies, we want to stop it and handle the termination ourselves.
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
private def statusPollerProps = JesPollingActor.props(self)
private def statusPollerProps = JesPollingActor.props(self, jesConfiguration)

// statusPoller is protected for the unit tests, not intended to be generally overridden
protected[statuspolling] var statusPoller: ActorRef = _
Expand Down Expand Up @@ -99,7 +99,7 @@ class JesApiQueryManager extends Actor with ActorLogging {

object JesApiQueryManager {

def props: Props = Props(new JesApiQueryManager)
def props(jesConfiguration: JesConfiguration): Props = Props(new JesApiQueryManager(jesConfiguration))

/**
* Poll the job represented by the Run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback
import com.google.api.client.googleapis.json.GoogleJsonError
import com.google.api.client.http.HttpHeaders
import com.google.api.services.genomics.model.Operation
import cromwell.backend.impl.jes.Run
import cromwell.backend.impl.jes.{JesConfiguration, Run}
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager.{JesPollingWorkBatch, JesStatusPollQuery, NoWorkToDo}
import cromwell.backend.impl.jes.statuspolling.JesPollingActor._

Expand All @@ -19,12 +19,11 @@ import scala.concurrent.duration._
/**
* Polls JES for status. Pipes the results back (so expect either a RunStatus or a akka.actor.Status.Failure).
*/
class JesPollingActor(pollingManager: ActorRef) extends Actor with ActorLogging {
class JesPollingActor(val pollingManager: ActorRef, val jesConfiguration: JesConfiguration) extends Actor with ActorLogging {
// The interval to delay between submitting each batch
lazy val batchInterval = determineBatchInterval(jesConfiguration.qps)
log.debug("JES batch polling interval is %", batchInterval)

// We want to query at just under our fixed JES QPS limit of 20 per second. That should hopefully allow some room at the edges
// for things like new calls, etc.
val MaxBatchSize = 100
val BatchInterval = 5.5.seconds
self ! NoWorkToDo // Starts the check-for-work cycle

implicit val ec: ExecutionContext = context.dispatcher
Expand Down Expand Up @@ -109,13 +108,31 @@ class JesPollingActor(pollingManager: ActorRef) extends Actor with ActorLogging
* Warning: Only use this from inside a receive method.
*/
private def scheduleCheckForWork(): Unit = {
context.system.scheduler.scheduleOnce(BatchInterval) { pollingManager ! JesApiQueryManager.RequestJesPollingWork(MaxBatchSize) }
context.system.scheduler.scheduleOnce(batchInterval) { pollingManager ! JesApiQueryManager.RequestJesPollingWork(MaxBatchSize) }
()
}
}

object JesPollingActor {
def props(pollingManager: ActorRef) = Props(new JesPollingActor(pollingManager))
def props(pollingManager: ActorRef, jesConfiguration: JesConfiguration) = Props(new JesPollingActor(pollingManager, jesConfiguration))

// The Batch API limits us to 100 at a time
val MaxBatchSize = 100

/**
* Given the Genomics API queries per 100 seconds and given MaxBatchSize will determine a batch interval which
* is at 90% of the quota. The (still crude) delta is to provide some room at the edges for things like new
* calls, etc.
*
* Forcing the minimum value to be 1 second, for now it seems unlikely to matter and it makes testing a bit
* easier
*/
def determineBatchInterval(qps: Int): FiniteDuration = {
val batchesPerSecond = qps / MaxBatchSize.toDouble // Force this to be floating point in case the value is < 1
val maxInterval = 1 / batchesPerSecond
val interval = Math.max(maxInterval * 0.9, 1)
interval.seconds
}

final case class JesPollFailed(e: GoogleJsonError, responseHeaders: HttpHeaders)
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ object JesTestConfig {

val JesBackendConfig = ConfigFactory.parseString(JesBackendConfigString)
val JesGlobalConfig = ConfigFactory.parseString(JesGlobalConfigString)
val JesBackendConfigurationDescriptor = new BackendConfigurationDescriptor(JesBackendConfig, JesGlobalConfig)
val JesBackendConfigurationDescriptor = BackendConfigurationDescriptor(JesBackendConfig, JesGlobalConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.backend.impl.jes.statuspolling

import akka.actor.{ActorRef, Props}
import akka.testkit.{TestActorRef, TestProbe}
import cromwell.backend.impl.jes.Run
import cromwell.backend.impl.jes.{JesConfiguration, Run}
import cromwell.core.TestKitSuite
import org.scalatest.{FlatSpecLike, Matchers}

Expand Down Expand Up @@ -108,7 +108,7 @@ object JesApiQueryManagerSpec {
/**
* This test class allows us to hook into the JesApiQueryManager's makeStatusPoller and provide our own TestProbes instead
*/
class TestJesApiQueryManager(statusPollerProbes: ActorRef*) extends JesApiQueryManager {
class TestJesApiQueryManager(jesConfiguration: JesConfiguration, statusPollerProbes: ActorRef*) extends JesApiQueryManager(jesConfiguration) {
var testProbes: Queue[ActorRef] = _
var testPollerCreations: Int = _

Expand Down Expand Up @@ -137,5 +137,8 @@ class TestJesApiQueryManager(statusPollerProbes: ActorRef*) extends JesApiQueryM
}

object TestJesApiQueryManager {
def props(statusPollers: ActorRef*): Props = Props(new TestJesApiQueryManager(statusPollers: _*))
import cromwell.backend.impl.jes.JesTestConfig.JesBackendConfigurationDescriptor
val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor)

def props(statusPollers: ActorRef*): Props = Props(new TestJesApiQueryManager(jesConfiguration, statusPollers: _*))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.google.api.client.googleapis.batch.BatchRequest
import com.google.api.client.googleapis.batch.json.JsonBatchCallback
import com.google.api.client.googleapis.json.GoogleJsonError
import com.google.api.services.genomics.model.Operation
import cromwell.backend.impl.jes.{Run, RunStatus}
import cromwell.backend.impl.jes.{JesConfiguration, Run, RunStatus}
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager.JesStatusPollQuery
import cromwell.backend.impl.jes.statuspolling.JesPollingActor.JesPollFailed
import cromwell.backend.impl.jes.statuspolling.TestJesPollingActor.{CallbackFailure, CallbackSuccess, JesBatchCallbackResponse}
Expand All @@ -29,9 +29,17 @@ class JesPollingActorSpec extends TestKitSuite("JesPollingActor") with FlatSpecL
implicit val DefaultPatienceConfig = PatienceConfig(TestExecutionTimeout)
val AwaitAlmostNothing = 30.milliseconds.dilated

import cromwell.backend.impl.jes.JesTestConfig.JesBackendConfigurationDescriptor
val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor)

var managerProbe: TestProbe = _
var jpActor: TestActorRef[TestJesPollingActor] = _

it should "correctly calculate batch intervals" in {
JesPollingActor.determineBatchInterval(10) shouldBe 9.seconds
JesPollingActor.determineBatchInterval(100) shouldBe 1.second
}

it should "query for work and wait for a reply" in {
managerProbe.expectMsgClass(max = TestExecutionTimeout, c = classOf[JesApiQueryManager.RequestJesPollingWork])
managerProbe.expectNoMsg(max = AwaitAlmostNothing)
Expand Down Expand Up @@ -77,7 +85,7 @@ class JesPollingActorSpec extends TestKitSuite("JesPollingActor") with FlatSpecL

before {
managerProbe = TestProbe()
jpActor = TestActorRef(TestJesPollingActor.props(managerProbe.ref), managerProbe.ref)
jpActor = TestActorRef(TestJesPollingActor.props(managerProbe.ref, jesConfiguration), managerProbe.ref)
}
}

Expand All @@ -94,8 +102,9 @@ object JesPollingActorSpec extends Mockito {
* - Mocks out the methods which actually call out to JES, and allows the callbacks to be triggered in a testable way
* - Also waits a **lot** less time before polls!
*/
class TestJesPollingActor(manager: ActorRef) extends JesPollingActor(manager) with Mockito {
override val BatchInterval = 10.milliseconds
class TestJesPollingActor(manager: ActorRef, jesConfiguration: JesConfiguration) extends JesPollingActor(manager, jesConfiguration) with Mockito {

override lazy val batchInterval = 10.milliseconds

var operationStatusResponses: Queue[RunStatus] = Queue.empty
var resultHandlers: Queue[JsonBatchCallback[Operation]] = Queue.empty
Expand Down Expand Up @@ -123,7 +132,7 @@ class TestJesPollingActor(manager: ActorRef) extends JesPollingActor(manager) wi
}

object TestJesPollingActor {
def props(manager: ActorRef) = Props(new TestJesPollingActor(manager))
def props(manager: ActorRef, jesConfiguration: JesConfiguration) = Props(new TestJesPollingActor(manager, jesConfiguration))

sealed trait JesBatchCallbackResponse
case object CallbackSuccess extends JesBatchCallbackResponse
Expand Down

0 comments on commit 593710e

Please sign in to comment.