Skip to content

Commit

Permalink
Merge 021d6a6 into ba7dd4c
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Aug 12, 2016
2 parents ba7dd4c + 021d6a6 commit c193a2b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
29 changes: 26 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
kanaloa {

#Default settings
#Default settings for all dispatchers
default-dispatcher {
workTimeout = 1m

Expand All @@ -11,7 +11,7 @@ kanaloa {
updateInterval = 1s

# When backend fails the work, the dispatcher can retry it for this many times.
workRetry = 0
workRetry = 0

workerPool {

Expand All @@ -31,7 +31,7 @@ kanaloa {
logRouteeRetrievalError = true

# whether or not to shutdown the whole dispatcher when all workers died
shutdownOnAllWorkerDeath = true
shutdownOnAllWorkerDeath = false

# default timeout for shutingdown
defaultShutdownTimeout = 30s
Expand Down Expand Up @@ -183,6 +183,29 @@ kanaloa {

}

#Default settings for pulling dispatchers
default-pulling-dispatcher {
workerPool {
shutdownOnAllWorkerDeath = true
}

# for pulling timeout might mean work lost, so it should be more careful.
circuitBreaker {
openDurationBase = 30s
timeoutCountThreshold = 1
}

backPressure {
enabled = off
}

autothrottle {
downsizeAfterUnderUtilization = 30s
}
}



# Your dispatchers config goes here
dispatchers {

Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/kanaloa/reactive/dispatcher/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ object Dispatcher {
.withFallback(referenceConfig)
}

def defaultDispatcherConfig(config: Config = kanaloaConfig()): Config =
config.as[Config]("default-dispatcher")
def defaultDispatcherConfig(
config: Config = kanaloaConfig(),
defaultConfigName: Option[String] = None
): Config = {
val rootDefaultCfg = config.as[Config]("default-dispatcher")
defaultConfigName.fold(rootDefaultCfg)(config.as[Config](_).withFallback(rootDefaultCfg))
}

def defaultDispatcherSettings(config: Config = kanaloaConfig()): Dispatcher.Settings =
toDispatcherSettings(defaultDispatcherConfig(config))
Expand All @@ -112,9 +117,9 @@ object Dispatcher {
settings componentCfg.atPath("root").as[Option[SettingT]]("root") if enabled
} yield settings

def readConfig(dispatcherName: String, rootConfig: Config)(implicit system: ActorSystem): (Settings, Option[Reporter]) = {
def readConfig(dispatcherName: String, rootConfig: Config, defaultConfigName: Option[String] = None)(implicit system: ActorSystem): (Settings, Option[Reporter]) = {
val cfg = kanaloaConfig(rootConfig)
val dispatcherCfg = cfg.as[Option[Config]]("dispatchers." + dispatcherName).getOrElse(ConfigFactory.empty).withFallback(defaultDispatcherConfig(cfg))
val dispatcherCfg = cfg.as[Option[Config]]("dispatchers." + dispatcherName).getOrElse(ConfigFactory.empty).withFallback(defaultDispatcherConfig(cfg, defaultConfigName))

val settings = toDispatcherSettings(dispatcherCfg)

Expand Down Expand Up @@ -203,7 +208,7 @@ object PullingDispatcher {
sendResultsTo: Option[ActorRef] = None,
rootConfig: Config = ConfigFactory.load()
)(resultChecker: ResultChecker)(implicit system: ActorSystem) = {
val (settings, reporter) = Dispatcher.readConfig(name, rootConfig)
val (settings, reporter) = Dispatcher.readConfig(name, rootConfig, Some("default-pulling-dispatcher"))
//for pulling dispatchers because only a new idle worker triggers a pull of work, there maybe cases where there are two idle workers but the system should be deemed as fully utilized.
val metricsCollector = MetricsCollector(reporter, settings.performanceSamplerSettings)
val toBackend = implicitly[BackendAdaptor[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,16 @@ class DispatcherSpec extends SpecWithActorSystem with OptionValues {
val (settings, reporter) = Dispatcher.readConfig("example", ConfigFactory.empty)
settings.workRetry === 0
settings.autothrottle shouldBe defined
settings.workerPool.shutdownOnAllWorkerDeath shouldBe false
reporter shouldBe empty
}

"use a specific default settings" in {
val (settings, _) = Dispatcher.readConfig("example", ConfigFactory.empty, Some("default-pulling-dispatcher"))
settings.workerPool.shutdownOnAllWorkerDeath shouldBe true
settings.autothrottle shouldBe defined
}

"use default-dispatcher settings when dispatcher name is missing in the dispatchers section" in {
val cfgStr =
"""
Expand Down

0 comments on commit c193a2b

Please sign in to comment.