Skip to content

Commit

Permalink
Allow configuration of the number of threads created by Netty. #2691
Browse files Browse the repository at this point in the history
  • Loading branch information
bantonsson committed Nov 9, 2012
1 parent 0f105ac commit 5fa9ba1
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
30 changes: 30 additions & 0 deletions akka-remote/src/main/resources/reference.conf
Expand Up @@ -151,6 +151,36 @@ akka {

# (O) Maximum time window that a client should try to reconnect for
reconnection-time-window = 600s

# (I&O) Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
pool-size-min = 2

# The pool size factor is used to determine thread pool size
# using the following formula: ceil(available processors * factor).
# Resulting size is then bounded by the pool-size-min and
# pool-size-max values.
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 8
}

# (I&O) Used to configure the number of I/O worker threads on client sockets
client-socket-worker-pool {
# Min number of threads to cap factor-based number to
pool-size-min = 2

# The pool size factor is used to determine thread pool size
# using the following formula: ceil(available processors * factor).
# Resulting size is then bounded by the pool-size-min and
# pool-size-max values.
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 8
}
}

# The dispatcher used for the system actor "network-event-sender"
Expand Down
Expand Up @@ -61,7 +61,8 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor

val clientChannelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(system.threadFactory),
Executors.newCachedThreadPool(system.threadFactory))
Executors.newCachedThreadPool(system.threadFactory),
settings.ClientSocketWorkerPoolSize)

private val remoteClients = new HashMap[Address, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
Expand Down
3 changes: 2 additions & 1 deletion akka-remote/src/main/scala/akka/remote/netty/Server.scala
Expand Up @@ -29,7 +29,8 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) {

private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(netty.system.threadFactory),
Executors.newCachedThreadPool(netty.system.threadFactory))
Executors.newCachedThreadPool(netty.system.threadFactory),
settings.ServerSocketWorkerPoolSize)

private val executionHandler = new ExecutionHandler(netty.executor)

Expand Down
11 changes: 11 additions & 0 deletions akka-remote/src/main/scala/akka/remote/netty/Settings.scala
Expand Up @@ -8,6 +8,7 @@ import akka.util.Duration
import java.util.concurrent.TimeUnit._
import java.net.InetAddress
import akka.config.ConfigurationException
import akka.dispatch.ThreadPoolConfig

@deprecated("Will become private[akka] in 2.1, this is not user-api", "2.0.2")
class NettySettings(config: Config, val systemName: String) {
Expand Down Expand Up @@ -70,4 +71,14 @@ class NettySettings(config: Config, val systemName: String) {
case sz sz
}

private def computeWPS(config: Config): Int =
ThreadPoolConfig.scaledPoolSize(
config.getInt("pool-size-min"),
config.getDouble("pool-size-factor"),
config.getInt("pool-size-max"))

val ServerSocketWorkerPoolSize = computeWPS(config.getConfig("server-socket-worker-pool"))

val ClientSocketWorkerPoolSize = computeWPS(config.getConfig("client-socket-worker-pool"))

}
25 changes: 25 additions & 0 deletions akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala
Expand Up @@ -57,7 +57,32 @@ class RemoteConfigSpec extends AkkaSpec(
WriteTimeout must be(10 seconds)
AllTimeout must be(0 millis)
ReconnectionTimeWindow must be(10 minutes)
ServerSocketWorkerPoolSize must be >= (2)
ServerSocketWorkerPoolSize must be <= (8)
ClientSocketWorkerPoolSize must be >= (2)
ClientSocketWorkerPoolSize must be <= (8)
}

"contain correct configuration values in reference.conf" in {
val c = system.asInstanceOf[ExtendedActorSystem].
provider.asInstanceOf[RemoteActorRefProvider].
remoteSettings.config.getConfig("akka.remote.netty")

// server-socket-worker-pool
{
val pool = c.getConfig("server-socket-worker-pool")
pool.getInt("pool-size-min") must equal(2)
pool.getDouble("pool-size-factor") must equal(1.0)
pool.getInt("pool-size-max") must equal(8)
}

// client-socket-worker-pool
{
val pool = c.getConfig("client-socket-worker-pool")
pool.getInt("pool-size-min") must equal(2)
pool.getDouble("pool-size-factor") must equal(1.0)
pool.getInt("pool-size-max") must equal(8)
}
}
}
}

0 comments on commit 5fa9ba1

Please sign in to comment.