这次给大家分享一下akka-actor如何在单台机器上进行性能优化,以达到最大化利用单台机器上的硬件资源的目的。
当今,处理器早已步入多核时代。
所以要最大化利用单台机器上的硬件资源,其中很重要的一点就是要充分利用线程异步完成任务,
即使用akka创造一个非阻塞、异步的消息驱动系统。
以下涉及的所有代码可以在下面github地址找到。
https://github.com/deanzz/akka-dispatcher-test
Dispatcher是一个执行上下文,actor中的任务都会交由Dispatcher去执行,Dispatcher将如何执行任务与何时运行任务两者解耦。
大家也可以先简单把Dispatcher看成一个可执行任务的线程池。
代码中,通过ActorSystem.dispatcher可以得到默认的dispatcher(配置文件中default-dispatcher),
通过ActorSystem.dispatchers.lookup可以获取在配置文件中自定义的dispatcher
scala.concurrent.ExecutionContext,是scala中Future的可执行上下文,大家可以把Dispatcher和ExecutionContext看成是一个东西,Dispatcher继承了ExecutionContext,
给Future传递ExecutionContext时可直接将dispatcher传入。
Router是akka中一个用于负载均衡和路由的抽象,Router会创建多个actor实例一起完成任务。
Router有很多种,今天我们会涉及到的是RoundRobinPool和BalancingPool。
RoundRobinPool:
这种路由的策略是会依次向Pool中的各个节点发送消息,循环往复。
BalancingPool:
BalancingPool这个路由策略有点特殊。只可以用于本地Actor。多个Actor共享同一个邮箱,一有空闲就处理邮箱中的任务。这种策略可以确保所有Actor都处于繁忙状态。对于本地集群来说,经常会优先选择这个路由策略。
Pipe是一种消息传递方式,它接受Future的结果作为参数,然后将其传递给所提供的Actor引用,
比如:pipe(future) to sender()
我们从一个用akka-actor写的糟糕的例子开始我们的优化之旅。
假设我们在1个流程中想完成这样的工作:
- 查询
1次
数据库获取数据,显示结果(阻塞IO任务) - 跑
2次
计算量较大的算法,显示结果(cpu密集型任务) - 内存中查询
40次
结果,并显示(非阻塞IO任务)
我们会测试50个这样的流程,也就是说,阻塞IO任务执行50次,cpu密集型任务执行100次,非阻塞IO任务执行2000次。
根据需求我们写出了如下几个类:
- BlockingJobActor,干活的actor,接收一个NewJob消息处理上面所说的3个工作
- NonBlockingJobActor,内存中查询结果的actor,接收一个NonBlockingJobReq消息显示传入的字符串
- BlockingDao,模拟数据库查询的类,调用findByKey方法,会模拟阻塞等待10秒查询结果
- BlockingCPUWorker,模拟运行算法的类,调用compute方法,会让cpu满负荷运转5秒
- Launcher,程序运行的入口,在这里切换调用各种优化方法
- TimerActor,用于计算程序执行时间的actor
- conf/blocking.conf,akka的配置文件
具体内容可阅读代码中blocking包的内容。
- BlockingJobActor
按照常规逻辑,可以通过最简单的同步串行方式实现,
首先调用dao.findByKey(info)查询数据库获取结果,并显示结果;
然后调用几次cpuWorker.compute(100)跑计算量较大的算法,并显示结果;
最后请求内存中查询一些结果,并显示。
override def receive: Receive = {
case NewJob(info) =>
// some blocking IO operation
val res = dao.findByKey(info)
// some non-blocking IO operation depend on blocking IO result
nonBlockingActor ! NonBlockingJobReq(res)
timerActor ! Finish
// some high cpu operation
(0 until cpuTaskCount).foreach{
_ =>
val r = cpuWorker.compute(100)
// some non-blocking IO operation depend on cpu work result
nonBlockingActor ! NonBlockingJobReq(r.toString)
}
// some non-blocking IO operation independent of blocking IO result
(0 until nonBlockingTaskCount).foreach {
_ => nonBlockingActor ! NonBlockingJobReq("independent of any result")
}
case NonBlockingJobResp(info) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, NonBlockingJobResp($info)")
timerActor ! Finish
}
- NonBlockingJobActor
非阻塞任务内存查询的逻辑。
override def receive: Receive = {
case NonBlockingJobReq(info) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, NonBlockingJobReq($info)")
Thread.sleep(20)
sender() ! NonBlockingJobResp(s"${info.toUpperCase}")
}
- blocking.conf
这里配置了默认的dispatcher,这里配置了10个线程。
akka.actor{
default-dispatcher{
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 10
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 5.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 20
}
}
执行时间:未完成
日志:
17:23:14: d-akka.actor.default-dispatcher-4, start findByKey(blocking-job)
17:23:24: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(db result is blocking-job)
17:23:24: d-akka.actor.default-dispatcher-4, start compute(100)
17:23:29: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(66263807)
17:23:29: d-akka.actor.default-dispatcher-4, start compute(100)
17:23:34: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(69022445)
17:23:34: d-akka.actor.default-dispatcher-4, start findByKey(blocking-job)
17:23:34: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
...
17:23:35: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
17:23:44: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(db result is blocking-job)
17:23:44: d-akka.actor.default-dispatcher-4, start compute(100)
17:23:49: d-akka.actor.default-dispatcher-4, start compute(100)
17:23:49: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(67621455)
17:23:54: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(67664446)
17:23:54: d-akka.actor.default-dispatcher-4, start findByKey(blocking-job)
17:23:54: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
...
17:23:55: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
17:24:04: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is blocking-job)
...
线程使用情况:
黄色代表等待,线程真正等待或在模拟阻塞IO数据库查询的操作时,使用Thread.sleep时都会显示为此状态
绿色代表运行
红色代表阻塞
从日志和线程使用情况看可以看出,除去akka内部用于发消息用的调度器线程d-scheduler-1,
干活的线程就3个,d-akka.actor.default-dispatcher-2、d-akka.actor.default-dispatcher-3和d-akka.actor.default-dispatcher-4,
d-akka.actor.default-dispatcher-2、3承担了内存中查询结果的工作,由于是非阻塞IO的任务,经常在等待同步任务,所以经常处在等待的状态
d-akka.actor.default-dispatcher-4承担了查询数据库和跑算法的工作,查询数据库是阻塞IO的任务,所以线程在此期间会处于等待状态;跑算法的任务是cpu密集型任务,所以线程在此期间是运行状态。
由于BlockingJobActor中代码的写法完全是同步方式,导致耗时的工作都放在一个线程上同步执行,浪费了剩余7个线程(配置的10个线程-使用的3个线程),所以延迟很高,吞吐量很低。
对于糟糕的同步方案,我们自然而然想到的是用异步操作优化,即将查询数据库的任务和跑算法任务都放到Future里执行。
我们将原有BlockingJobActor优化为OptimizationV1Actor,Future使用的执行上下文我们先简单的使用default-dispatcher,akka配置不变。
具体内容可阅读代码中optimizationV1包的内容。
- OptimizationV1Actor
implicit val executionContext = context.system.dispatcher
override def receive: Receive = {
case NewJob(info) =>
// some blocking IO operation
Future(dao.findByKey(info)).onComplete{
case Success(res) =>
// some non-blocking IO operation depend on blocking IO result
nonBlockingActor ! NonBlockingJobReq(res)
case Failure(e) =>
e.printStackTrace()
println(e.toString)
}
// some high cpu work
(0 until cpuTaskCount).foreach {
_ =>
Future(cpuWorker.compute(100)).onComplete{
case Success(r) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, ComputeResult($r)")
// some non-blocking IO operation depend on cpu work result
nonBlockingActor ! NonBlockingJobReq(r.toString)
case Failure(e) =>
e.printStackTrace()
println(e.toString)
}
}
// some non-blocking IO operation independent of blocking IO result
(0 until nonBlockingTaskCount).foreach {
_ => nonBlockingActor ! NonBlockingJobReq("independent of any result")
}
case NonBlockingJobResp(info) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, NonBlockingJobResp($info)")
timerActor ! Finish
}
执行时间:约136秒
日志:
19:11:54: d-akka.actor.default-dispatcher-9, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-10, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-5, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-7, NonBlockingJobReq(independent of any result)
19:11:54: d-akka.actor.default-dispatcher-6, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-8, start findByKey(optimizationV1-job)
19:11:54: d-akka.actor.default-dispatcher-4, start findByKey(optimizationV1-job)
19:11:54: d-akka.actor.default-dispatcher-3, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-11, start findByKey(optimizationV1-job)
19:11:54: d-akka.actor.default-dispatcher-7, NonBlockingJobReq(independent of any result)
19:11:54: d-akka.actor.default-dispatcher-2, start compute(100)
19:11:54: d-akka.actor.default-dispatcher-7, NonBlockingJobReq(independent of any result)
...
19:11:55: d-akka.actor.default-dispatcher-7, NonBlockingJobReq(independent of any result)
19:11:55: d-akka.actor.default-dispatcher-7, start findByKey(optimizationV1-job)
19:11:59: d-akka.actor.default-dispatcher-9, NonBlockingJobReq(independent of any result)
19:11:59: d-akka.actor.default-dispatcher-6, start compute(100)
19:11:59: d-akka.actor.default-dispatcher-10, ComputeResult(36675535)
19:11:59: d-akka.actor.default-dispatcher-10, start compute(100)
19:11:59: d-akka.actor.default-dispatcher-3, ComputeResult(36280636)
19:11:59: d-akka.actor.default-dispatcher-3, start findByKey(optimizationV1-job)
19:11:59: d-akka.actor.default-dispatcher-5, ComputeResult(37208591)
19:11:59: d-akka.actor.default-dispatcher-5, ComputeResult(36462217)
19:11:59: d-akka.actor.default-dispatcher-5, ComputeResult(36296118)
19:11:59: d-akka.actor.default-dispatcher-5, start compute(100)
19:11:59: d-akka.actor.default-dispatcher-9, NonBlockingJobReq(independent of any result)
19:11:59: d-akka.actor.default-dispatcher-2, start compute(100)
19:11:59: d-akka.actor.default-dispatcher-9, NonBlockingJobReq(independent of any result)
...
19:12:00: d-akka.actor.default-dispatcher-9, NonBlockingJobReq(independent of any result)
19:12:00: d-akka.actor.default-dispatcher-9, start findByKey(optimizationV1-job)
19:12:04: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
19:12:04: d-akka.actor.default-dispatcher-4, start compute(100)
19:12:04: d-akka.actor.default-dispatcher-6, start compute(100)
19:12:04: d-akka.actor.default-dispatcher-11, ComputeResult(54510901)
19:12:04: d-akka.actor.default-dispatcher-11, start findByKey(optimizationV1-job)
19:12:04: d-akka.actor.default-dispatcher-5, ComputeResult(54440819)
19:12:04: d-akka.actor.default-dispatcher-5, start compute(100)
19:12:04: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
19:12:04: d-akka.actor.default-dispatcher-2, start compute(100)
19:12:04: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
...
19:12:04: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
19:12:04: d-akka.actor.default-dispatcher-10, start findByKey(optimizationV1-job)
19:12:04: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
...
19:12:05: d-akka.actor.default-dispatcher-8, NonBlockingJobReq(independent of any result)
19:12:05: d-akka.actor.default-dispatcher-7, start compute(100)
19:12:05: d-akka.actor.default-dispatcher-8, start compute(100)
19:12:09: d-akka.actor.default-dispatcher-6, start compute(100)
19:12:09: d-akka.actor.default-dispatcher-3, start compute(100)
19:12:09: d-akka.actor.default-dispatcher-5, ComputeResult(38017172)
19:12:09: d-akka.actor.default-dispatcher-5, ComputeResult(38675825)
19:12:09: d-akka.actor.default-dispatcher-5, start findByKey(optimizationV1-job)
19:12:09: d-akka.actor.default-dispatcher-4, start findByKey(optimizationV1-job)
19:12:09: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
...
19:12:10: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
19:12:10: d-akka.actor.default-dispatcher-7, start compute(100)
19:12:10: d-akka.actor.default-dispatcher-8, ComputeResult(38036533)
...
线程使用情况:
从日志和线程使用情况看可以看出,除去akka内部用于发消息用的调度器线程d-scheduler-1,
其余default-dispatcher中的10个线程都在工作了,非常好,没有线程闲着了,而且执行完成了。
不过每个线程的状态都是不停地在运行和等待状态间交替,说明一个线程一会儿在做阻塞IO的任务,一会儿在做cpu密集型任务,一会儿在做内存查询,导致线程很忙,
而且本不用依赖其他任务的非阻塞内存查询操作,被阻塞住了。
对于糟糕的同步方案,我们除了使用Future,还可以使用Router,单纯的增加干活的actor的数量,我们先使用RoundRobinPool的策略。
- Launcher.optimizationV2
val jobActor = system.actorOf(RoundRobinPool(10).props(Props(classOf[BlockingJobActor], cpuTaskCount, nonBlockingTaskCount)), "optimizationV2-actor")
执行时间:约120秒
日志:
20:01:24: d-akka.actor.default-dispatcher-4, start findByKey(optimizationV2-job)
20:01:34: d-akka.actor.default-dispatcher-5, start compute(100)
20:01:34: d-akka.actor.default-dispatcher-3, start compute(100)
...
20:01:39: d-akka.actor.default-dispatcher-8, start compute(100)
20:01:39: d-akka.actor.default-dispatcher-2, start compute(100)
20:01:44: d-akka.actor.default-dispatcher-5, start findByKey(optimizationV2-job)
20:01:44: d-akka.actor.default-dispatcher-11, start findByKey(optimizationV2-job)
20:01:44: d-akka.actor.default-dispatcher-6, start findByKey(optimizationV2-job)
20:01:44: d-akka.actor.default-dispatcher-10, start findByKey(optimizationV2-job)
20:01:44: d-akka.actor.default-dispatcher-9, start findByKey(optimizationV2-job)
...
20:01:44: d-akka.actor.default-dispatcher-7, start findByKey(optimizationV2-job)
20:01:44: d-akka.actor.default-dispatcher-2, start findByKey(optimizationV2-job)
20:01:54: d-akka.actor.default-dispatcher-11, start compute(100)
20:01:54: d-akka.actor.default-dispatcher-5, start compute(100)
20:01:54: d-akka.actor.default-dispatcher-6, start compute(100)
20:01:54: d-akka.actor.default-dispatcher-10, start compute(100)
...
20:01:59: d-akka.actor.default-dispatcher-5, start compute(100)
...
20:03:08: d-akka.actor.default-dispatcher-11, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
20:03:08: d-akka.actor.default-dispatcher-11, NonBlockingJobResp(DB RESULT IS
...
线程使用情况:
Router中的10个actor都在工作了,default-dispatcher中的10个线程也都用上了,同样也执行完成了,但是依然存在严重的线程等待问题,
同样本不用依赖其他任务的非阻塞内存查询操作,被阻塞住了。
我们还可以使用Router的BalancingPool的策略,来增加干活的actor的数量。
- Launcher.optimizationV3
val jobActor = system.actorOf(BalancingPool(10).props(Props(classOf[BlockingJobActor], cpuTaskCount, nonBlockingTaskCount)), "optimizationV3-actor")
执行时间:约105秒
日志:
20:56:44: d-BalancingPool-/optimizationV3-actor-19, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-5, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-7, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-6, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-18, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-8, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-9, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-17, start findByKey(optimizationV3-job)
20:56:44: d-BalancingPool-/optimizationV3-actor-10, start findByKey(optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-16, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-15, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-12, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-20, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-22, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-akka.actor.default-dispatcher-21, NonBlockingJobReq(db result is optimizationV3-job)
20:56:54: d-BalancingPool-/optimizationV3-actor-17, start compute(100)
...
20:56:54: d-BalancingPool-/optimizationV3-actor-6, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-21, NonBlockingJobReq(21337335)
20:56:59: d-akka.actor.default-dispatcher-16, NonBlockingJobReq(21912069)
20:56:59: d-BalancingPool-/optimizationV3-actor-17, start compute(100)
20:56:59: d-BalancingPool-/optimizationV3-actor-18, start compute(100)
20:56:59: d-BalancingPool-/optimizationV3-actor-9, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-22, NonBlockingJobReq(21851866)
20:56:59: d-BalancingPool-/optimizationV3-actor-19, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-20, NonBlockingJobReq(21988980)
20:56:59: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(23503990)
20:56:59: d-BalancingPool-/optimizationV3-actor-7, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-12, NonBlockingJobReq(21511595)
20:56:59: d-BalancingPool-/optimizationV3-actor-10, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(21921178)
20:56:59: d-BalancingPool-/optimizationV3-actor-8, start compute(100)
20:56:59: d-BalancingPool-/optimizationV3-actor-5, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(22026859)
20:56:59: d-akka.actor.default-dispatcher-21, NonBlockingJobReq(21721455)
20:56:59: d-BalancingPool-/optimizationV3-actor-11, start compute(100)
20:56:59: d-BalancingPool-/optimizationV3-actor-6, start compute(100)
20:56:59: d-akka.actor.default-dispatcher-12, NonBlockingJobReq(21849218)
20:57:04: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(23146505)
20:57:04: d-akka.actor.default-dispatcher-12, NonBlockingJobReq(22942295)
20:57:04: d-akka.actor.default-dispatcher-21, NonBlockingJobReq(23012881)
20:57:04: d-BalancingPool-/optimizationV3-actor-17, start findByKey(optimizationV3-job)
20:57:04: d-BalancingPool-/optimizationV3-actor-7, start findByKey(optimizationV3-job)
20:57:04: d-BalancingPool-/optimizationV3-actor-18, start findByKey(optimizationV3-job)
20:57:04: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(23278757)
20:57:04: d-BalancingPool-/optimizationV3-actor-10, start findByKey(optimizationV3-job)
20:57:04: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(23351830)
...
20:57:04: d-BalancingPool-/optimizationV3-actor-5, start findByKey(optimizationV3-job)
20:57:04: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(independent of any result)
20:57:04: d-akka.actor.default-dispatcher-21, NonBlockingJobReq(independent of any result)
20:57:04: d-akka.actor.default-dispatcher-23, NonBlockingJobReq(23426269)
20:57:04: d-BalancingPool-/optimizationV3-actor-11, start findByKey(optimizationV3-job)
20:57:04: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(independent of any result)
20:57:04: d-akka.actor.default-dispatcher-24, NonBlockingJobReq(23207851)
20:57:04: d-BalancingPool-/optimizationV3-actor-6, start findByKey(optimizationV3-job)
20:57:04: d-akka.actor.default-dispatcher-2, NonBlockingJobReq(independent of any result)
...
20:57:04: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(independent of any result)
...
线程使用情况:
从日志看出,因为BalancingPool策略共享一个邮箱,所以拿取任务更充分,保证pool中的actor一直保持忙碌,但是依然存在严重的线程等待问题,
同样本不用依赖其他任务的非阻塞内存查询操作,被阻塞住了。
上面的优化方案都不尽人意,接下来会采取一种资源隔离的方案,
把阻塞IO的任务分离到单独的dispatcher,把需要大量计算、运行时间较长的任务分离到单独的dispatcher。
具体内容可阅读代码中conf目录和optimizationV4包的内容。
- optimizationV4.conf
akka.actor{
default-dispatcher{
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 5.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 20
}
blocking-io-dispatcher{
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 4
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 10.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 15
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 30
}
cpu-work-dispatcher{
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 4
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 10.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 15
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 30
}
}
- OptimizationV4Actor
private val blockingExecutionContext = context.system.dispatchers.lookup("akka.actor.blocking-io-dispatcher")
private val cpuExecutionContext = context.system.dispatchers.lookup("akka.actor.cpu-work-dispatcher")
override def receive: Receive = {
case NewJob(info) =>
// some blocking IO operation
Future(dao.findByKey(info))(blockingExecutionContext).onComplete {
case Success(res) =>
// some non-blocking IO operation depend on blocking IO result
nonBlockingActor ! NonBlockingJobReq(res)
case Failure(e) =>
e.printStackTrace()
println(e.toString)
}(blockingExecutionContext)
// some high cpu work
(0 until cpuTaskCount).foreach {
_ =>
Future(cpuWorker.compute(100))(cpuExecutionContext).onComplete{
case Success(r) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, ComputeResult($r)")
// some non-blocking IO operation depend on cpu work result
nonBlockingActor ! NonBlockingJobReq(r.toString)
case Failure(e) =>
e.printStackTrace()
println(e.toString)
}(cpuExecutionContext)
}
// some non-blocking IO operation independent of blocking IO result
(0 until nonBlockingTaskCount).foreach {
_ => nonBlockingActor ! NonBlockingJobReq("independent of any result")
}
case NonBlockingJobResp(info) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, NonBlockingJobResp($info)")
timerActor ! Finish
}
- Launcher.optimizationV4
val jobActor = system.actorOf(Props(classOf[OptimizationV4Actor], cpuTaskCount, nonBlockingTaskCount), "optimizationV4-actor")
执行时间:约50秒
日志:
00:02:27: d-akka.actor.blocking-io-dispatcher-20, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-17, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-16, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-28, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-23, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-25, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-10, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-29, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-9, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-21, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-26, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-11, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-19, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-27, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-14, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-5, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-12, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-24, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-13, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-7, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-31, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-30, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-15, start compute(100)
00:02:27: d-akka.actor.cpu-work-dispatcher-6, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-34, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-18, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.blocking-io-dispatcher-33, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.cpu-work-dispatcher-22, start compute(100)
00:02:27: d-akka.actor.blocking-io-dispatcher-8, start findByKey(optimizationV4-job)
00:02:27: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(independent of any result)
00:02:27: d-akka.actor.blocking-io-dispatcher-32, start findByKey(optimizationV4-job)
00:02:28: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(independent of any result)
...
00:02:33: d-akka.actor.default-dispatcher-2, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
00:02:33: d-akka.actor.default-dispatcher-35, NonBlockingJobReq(independent of any result)
00:02:33: d-akka.actor.cpu-work-dispatcher-17, start compute(100)
00:02:33: d-akka.actor.cpu-work-dispatcher-25, start compute(100)
00:02:33: d-akka.actor.cpu-work-dispatcher-16, start compute(100)
00:02:33: d-akka.actor.cpu-work-dispatcher-15, ComputeResult(12953293)
00:02:33: d-akka.actor.cpu-work-dispatcher-9, ComputeResult(12683075)
00:02:33: d-akka.actor.cpu-work-dispatcher-22, ComputeResult(12150066)
00:02:33: d-akka.actor.cpu-work-dispatcher-21, ComputeResult(12132914)
...
00:03:17: d-akka.actor.default-dispatcher-4, NonBlockingJobResp(12498333)
00:03:17: d-akka.actor.default-dispatcher-35, NonBlockingJobReq(db result is optimizationV4-job)
00:03:17: d-akka.actor.default-dispatcher-4, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV4-JOB)
00:03:17: d-akka.actor.default-dispatcher-35, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV4-JOB)
...
线程使用情况:
我们重新启用了Future,并将数据库查询任务分离到名叫blocking-io-dispatcher的dispatcher,
将跑算法的任务分离到名叫cpu-work-dispatcher的dispatcher,默认的default-dispatcher用来跑非阻塞的任务,
资源隔离减少了不同种类任务资源的竞争,一方面可以保证各类任务不会互相影响,提高运行速度,一方面可以确保应用程序在糟糕的情况下仍然能够有资源去运行其他任务,保证应用程序的其他部分还是能够迅速地做出响应。
接下来我们还沿用资源隔离的方案,但是完全采用actor模型的设计模式,即万物都为actor,所以数据库查询任务抽象到DaoActor,跑算法抽象到CPUWorkerActor,
同时使用pipe传递future消息。
- OptimizationV5Actor
override def receive: Receive = {
case NewJob(info) =>
// some blocking IO operation
daoActor ! FindByKey(info)
// some non-blocking IO operation independent of blocking IO result
(0 until nonBlockingTaskCount).foreach {
_ => nonBlockingActor ! NonBlockingJobReq("independent of any result")
}
// some high cpu work
(0 until cpuTaskCount).foreach{
_ => cpuWorkActor ! Compute(100)
}
case FindByKeyResult(res) =>
// some non-blocking IO operation depend on blocking IO result
nonBlockingActor ! NonBlockingJobReq(res)
case ComputeResult(res) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, ComputeResult($res)")
// some non-blocking IO operation depend on cpu work result
nonBlockingActor ! NonBlockingJobReq(res.toString)
case NonBlockingJobResp(info) =>
println(s"${DateTime.now().toString("HH:mm:ss")}: ${Thread.currentThread().getName}, NonBlockingJobResp($info)")
timerActor ! Finish
}
- DaoActor
private implicit val blockingExecutionContext = context.system.dispatchers.lookup("akka.actor.blocking-io-dispatcher")
override def receive: Receive = {
case FindByKey(key) =>
val future = Future(FindByKeyResult(findByKey(key)))
pipe(future) to sender()
}
- CPUWorkerActor
private implicit val cpuExecutionContext = context.system.dispatchers.lookup("akka.actor.cpu-work-dispatcher")
override def receive: Receive = {
case Compute(n) =>
val future = Future(ComputeResult(compute(n)))
pipe(future) to sender()
}
- Launcher.optimizationV5
val jobActor = system.actorOf(Props(classOf[OptimizationV5Actor], cpuTaskCount, nonBlockingTaskCount), "optimizationV5-actor")
执行时间:约50秒
日志:
14:42:09: d-akka.actor.cpu-work-dispatcher-23, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-25, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-22, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-16, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-26, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-21, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-18, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-13, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-30, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-15, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-19, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-14, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-20, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-9, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-11, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-12, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-28, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-33, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-32, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-37, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-34, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-38, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-36, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-10, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-35, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.blocking-io-dispatcher-7, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-24, start compute(100)
14:42:09: d-akka.actor.blocking-io-dispatcher-31, start findByKey(optimizationV5-job)
14:42:09: d-akka.actor.cpu-work-dispatcher-27, start compute(100)
14:42:09: d-akka.actor.cpu-work-dispatcher-29, start compute(100)
14:42:09: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(independent of any result)
14:42:09: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(independent of any result)
14:42:09: d-akka.actor.default-dispatcher-8, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
...
14:43:00: d-akka.actor.default-dispatcher-17, NonBlockingJobResp(20660410)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(22093932)
14:43:00: d-akka.actor.default-dispatcher-2, NonBlockingJobResp(20728029)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV5-job)
14:43:00: d-akka.actor.default-dispatcher-17, NonBlockingJobResp(22093932)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV5-job)
14:43:00: d-akka.actor.default-dispatcher-2, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV5-JOB)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV5-job)
14:43:00: d-akka.actor.default-dispatcher-17, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV5-JOB)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV5-job)
14:43:00: d-akka.actor.default-dispatcher-17, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV5-JOB)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(db result is optimizationV5-job)
14:43:00: d-akka.actor.default-dispatcher-2, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV5-JOB)
14:43:00: d-akka.actor.default-dispatcher-3, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV5-JOB)
线程使用情况:
这种方案的性能与优化方案4的性能相当,因为同样采用了资源隔离和Future的方案,但是这种方案更符合actor的思维模式,也是最为推荐的方式。
还有没有更快的方案呢?答案是肯定的,我们可以在资源隔离和Future的方案基础上,加入Router来继续提高性能。
- Launcher.optimizationV6
这里jobActor使用优化方案4或5中的actor没有区别,本质都是资源隔离+Future,
Router使用BalancingPool和RoundRobinPool在本测试中效果一样,不过理论应该是BalancingPool更好一些,不过这也与dispatcher的配置有关。
val jobActor = system.actorOf(BalancingPool(10).props(Props(classOf[OptimizationV4Actor/*OptimizationV5Actor*/], cpuTaskCount, nonBlockingTaskCount)), "optimizationV6-actor")
//val jobActor = system.actorOf(RoundRobinPool(10).props(Props(classOf[OptimizationV4Actor/*OptimizationV5Actor*/], cpuTaskCount, nonBlockingTaskCount)), "optimizationV6-actor")
执行时间:约40秒
日志:
11:27:15: d-akka.actor.blocking-io-dispatcher-26, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-25, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-24, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.cpu-work-dispatcher-34, start compute(100)
11:27:15: d-akka.actor.blocking-io-dispatcher-29, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-21, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.cpu-work-dispatcher-47, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-42, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-43, start compute(100)
11:27:15: d-akka.actor.blocking-io-dispatcher-28, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-50, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-27, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.cpu-work-dispatcher-46, start compute(100)
11:27:15: d-akka.actor.default-dispatcher-14, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.default-dispatcher-17, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.default-dispatcher-16, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.cpu-work-dispatcher-40, start compute(100)
11:27:15: d-akka.actor.blocking-io-dispatcher-23, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-31, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.default-dispatcher-15, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.cpu-work-dispatcher-37, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-48, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-45, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-38, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-41, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-36, start compute(100)
11:27:15: d-akka.actor.blocking-io-dispatcher-33, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.blocking-io-dispatcher-32, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.cpu-work-dispatcher-44, start compute(100)
11:27:15: d-akka.actor.cpu-work-dispatcher-39, start compute(100)
11:27:15: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.blocking-io-dispatcher-49, start findByKey(optimizationV6-job)
11:27:15: d-akka.actor.default-dispatcher-20, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.default-dispatcher-3, NonBlockingJobReq(independent of any result)
11:27:15: d-BalancingPool-/optimizationV6-actor-6, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
11:27:15: d-akka.actor.default-dispatcher-15, NonBlockingJobReq(independent of any result)
11:27:15: d-akka.actor.default-dispatcher-16, NonBlockingJobReq(independent of any result)
11:27:15: d-BalancingPool-/optimizationV6-actor-19, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
11:27:15: d-BalancingPool-/optimizationV6-actor-6, NonBlockingJobResp(INDEPENDENT OF ANY RESULT)
...
11:27:51: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(17608292)
11:27:51: d-BalancingPool-/optimizationV6-actor-12, NonBlockingJobResp(17608292)
11:27:55: d-akka.actor.default-dispatcher-4, NonBlockingJobReq(db result is optimizationV6-job)
11:27:55: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(db result is optimizationV6-job)
11:27:55: d-akka.actor.default-dispatcher-13, NonBlockingJobReq(db result is optimizationV6-job)
11:27:55: d-BalancingPool-/optimizationV6-actor-10, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV6-JOB)
11:27:55: d-BalancingPool-/optimizationV6-actor-12, NonBlockingJobResp(DB RESULT IS OPTIMIZATIONV6-JOB)
线程使用情况:
通过加入Router,我们增加了可以同时干活的actor实例,结合Future和资源隔离,性能又提高了不少。
通过上面的优化过程,我们从最初糟糕的同步方案开始,
执行时间从"未完成",优化到136秒,优化到120秒,优化到105秒,优化到50秒,最后优化到40秒,
cpu使用率也从最初的不到5%提高到大于95%,
这是一个从同步到异步的过程,也是一个cpu资源逐渐充分利用的过程,更是一种编程思维方式的转变过程。
最后请大家在使用akka-actor时记住以下几点,有助于我们写出更高效的程序
- 尽量避免在actor中使用阻塞IO的的技术,比如数据库驱动,尽量选择非阻塞数据库驱动
- 尽量避免写出阻塞IO的代码,比如使用Await.result或Await.ready阻塞线程,除非你的场景不得不这样做
- 推荐使用Future+资源隔离的方案再配合Router以完成一个非阻塞、异步的系统
- 通过性能测试,获取每个dispatcher的最佳配置