/
TypedForkJoinActorBenchmark.scala
137 lines (113 loc) · 4.23 KB
/
TypedForkJoinActorBenchmark.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.BenchmarkMode
import org.openjdk.jmh.annotations.Fork
import org.openjdk.jmh.annotations.Level
import org.openjdk.jmh.annotations.Measurement
import org.openjdk.jmh.annotations.Mode
import org.openjdk.jmh.annotations.OperationsPerInvocation
import org.openjdk.jmh.annotations.Param
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations.Setup
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.annotations.TearDown
import org.openjdk.jmh.annotations.Threads
import org.openjdk.jmh.annotations.Warmup
import akka.actor.typed.scaladsl.AskPattern._
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 15, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class TypedForkJoinActorBenchmark {
import TypedBenchmarkActors._
import TypedForkJoinActorBenchmark._
@Param(Array("50"))
var tpt = 0
@Param(Array(coresStr)) // coresStr, cores2xStr, cores4xStr
var threads = ""
@Param(
Array(
"akka.dispatch.UnboundedMailbox",
"akka.dispatch.SingleConsumerOnlyUnboundedMailbox",
"akka.actor.ManyToOneArrayMailbox",
"akka.actor.JCToolsMailbox"))
var mailbox = ""
implicit var system: ActorSystem[PingPongCommand] = _
@Setup(Level.Trial)
def setup(): Unit = {
akka.actor.BenchmarkActors.requireRightNumberOfCores(cores)
system = ActorSystem(
TypedBenchmarkActors.benchmarkPingPongSupervisor(),
"TypedForkJoinActorBenchmark",
ConfigFactory.parseString(s"""
akka.actor {
default-mailbox.mailbox-capacity = 512
fjp-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $threads
parallelism-factor = 1.0
parallelism-max = $threads
}
throughput = $tpt
mailbox-type = "$mailbox"
}
}
"""))
}
@Benchmark
@OperationsPerInvocation(totalMessagesLessThanCores)
def pingPongLessActorsThanCores(): Unit =
runPingPongBench(messages, lessThanCoresActors, "fjp-dispatcher", tpt)
@Benchmark
@OperationsPerInvocation(totalMessagesSameAsCores)
def pingPongSameNumberOfActorsAsCores(): Unit =
runPingPongBench(messages, sameAsCoresActors, "fjp-dispatcher", tpt)
@Benchmark
@OperationsPerInvocation(totalMessagesMoreThanCores)
def pingPongMoreActorsThanCores(): Unit =
runPingPongBench(messages, moreThanCoresActors, "fjp-dispatcher", tpt)
def runPingPongBench(numMessages: Int, numActors: Int, dispatcher: String, tpt: Int): Unit = {
val response: Future[PingPongStarted] =
system.ask[PingPongStarted](ref => StartPingPong(numMessages, numActors, dispatcher, tpt, timeout, ref))(
timeout,
system.scheduler)
val started = Await.result(response, timeout)
started.completedLatch.await(timeout.toSeconds, TimeUnit.SECONDS)
printProgress(started.totalNumMessages, numActors, started.startNanoTime)
system ! Stop
}
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
}
object TypedForkJoinActorBenchmark {
final val messages = 2000000 // messages per actor pair
val timeout = 30.seconds
// Constants because they are used in annotations
// update according to cpu
final val cores = 8
final val coresStr = "8"
final val cores2xStr = "16"
final val cores4xStr = "24"
final val twoActors = 2
final val moreThanCoresActors = cores * 2
final val lessThanCoresActors = cores / 2
final val sameAsCoresActors = cores
final val totalMessagesTwoActors = messages
final val totalMessagesMoreThanCores = (moreThanCoresActors * messages) / 2
final val totalMessagesLessThanCores = (lessThanCoresActors * messages) / 2
final val totalMessagesSameAsCores = (sameAsCoresActors * messages) / 2
}