Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] attempt to use external submission for rescheduling of actor mailboxes #31156

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ object ActorSystemSpec {
override protected[akka] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint)
hasSystemMessageHint: Boolean,
reschedule: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint, reschedule)
doneIt.switchOn {
TestKit.awaitCond(mbox.actor.actor != null, 1.second)
mbox.actor.actor match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object ForkJoinPoolStarvationSpec {
|actorhang {
| task-dispatcher {
| mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 5
| throughput = 1000
| fork-join-executor {
| parallelism-factor = 2
| parallelism-max = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
*/
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
registerForExecution(actor.mailbox, false, true, false)
}

/**
Expand Down Expand Up @@ -277,7 +277,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[akka] def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
registerForExecution(mbox, false, false)
registerForExecution(mbox, false, false, false)
}

/**
Expand All @@ -302,7 +302,8 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[akka] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean
hasSystemMessageHint: Boolean,
rescheduled: Boolean): Boolean

// TODO check whether this should not actually be a property of the mailbox
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[akka] class BalancingDispatcher(

override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
if (!registerForExecution(receiver.mailbox, false, false)) teamWork()
if (!registerForExecution(receiver.mailbox, false, false, false)) teamWork()
}

protected def teamWork(): Unit =
Expand All @@ -108,7 +108,7 @@ private[akka] class BalancingDispatcher(
case lm: LoadMetrics => !lm.atFullThrottle()
case _ => true
})
&& !registerForExecution(i.next.mailbox, false, false))
&& !registerForExecution(i.next.mailbox, false, false, false))
scheduleOne(i)

scheduleOne()
Expand Down
12 changes: 8 additions & 4 deletions akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Dispatcher(
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
registerForExecution(mbox, true, false, false)
}

/**
Expand All @@ -70,7 +70,7 @@ class Dispatcher(
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
val mbox = receiver.mailbox
mbox.systemEnqueue(receiver.self, invocation)
registerForExecution(mbox, false, true)
registerForExecution(mbox, false, true, false)
}

/**
Expand Down Expand Up @@ -120,11 +120,15 @@ class Dispatcher(
protected[akka] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
hasSystemMessageHint: Boolean,
rescheduled: Boolean): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registerForExecution is called from 3 places. Do we need the extra parameter or can we handle all of them as executeExternal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one of the remaining questions. We would have to benchmark to be sure. The idea of this approach is that we only use external submission if an actor exhausted its throughput batch. This way all other submissions (e.g. sending a message to another actor) could still be scheduled on the local queue. Also, all the other calls would use a normal method invocation for scheduling (instead of a reflective method handle invocation) which might have a performance benefit.

Whether the solution in this PR is good enough is another question, because it would mean that a busy loop involving two or more actors could still starve a pool thread. But that's the whole issue here: we would have to define a sensible fairness metric and experiment if optimizing for that fairness metric would make a difference in certain scenarios. After all, right now fairness is based on "number of message processed" (by setting throughput) which may or may not lead to a sensible behavior (and which is easy to enough to "exploit" accidentally by blocking or long running CPU-intensive tasks).

if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.execute(mbox)
if (rescheduled)
executorService.executeExternal(mbox)
else
executorService.execute(mbox)
true
} catch {
case _: RejectedExecutionException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
package akka.dispatch

import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }

import com.typesafe.config.Config

import java.lang.invoke.MethodHandles

object ForkJoinExecutorConfigurator {

/**
Expand All @@ -28,11 +29,22 @@ object ForkJoinExecutorConfigurator {

override def execute(r: Runnable): Unit =
if (r ne null)
super.execute(
(if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
super.execute(createTask(r))
else
throw new NullPointerException("Runnable was null")

def executeExternal(r: Runnable): Unit =
handle.invokeWithArguments(createTask(r))

private val handle = {
val m = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's hope that this method is available on all supported JDKs...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m.setAccessible(true)
MethodHandles.lookup().unreflect(m).bindTo(this)
}

private def createTask(r: Runnable): ForkJoinTask[Any] =
(if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}

Expand Down
2 changes: 1 addition & 1 deletion akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
dispatcher.registerForExecution(this, false, false, true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the magic improvement where we say that we want to "reschedule" the mailbox, i.e. we yield because this actor has reached its throughput.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.dispatch

import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool

import java.util.Collection
import java.util.concurrent.{
ArrayBlockingQueue,
Expand All @@ -21,7 +23,6 @@ import java.util.concurrent.{
TimeUnit
}
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import scala.concurrent.{ BlockContext, CanAwait }
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -217,6 +218,10 @@ trait ExecutorServiceDelegate extends ExecutorService {
def executor: ExecutorService

def execute(command: Runnable) = executor.execute(command)
def executeExternal(command: Runnable) =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public API, so not sure if would want to add that method here publicly?

if (executor.isInstanceOf[AkkaForkJoinPool])
executor.asInstanceOf[AkkaForkJoinPool].executeExternal(command)
else executor.execute(command)

def shutdown(): Unit = { executor.shutdown() }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) exte
protected[akka] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = false
hasSystemMessageHint: Boolean,
reschedule: Boolean): Boolean = false

protected[akka] override def shutdownTimeout = 1 second

Expand Down
2 changes: 2 additions & 0 deletions project/JdkOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object JdkOptions extends AutoPlugin {
if (isJdk17orHigher) {
// for aeron
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" ::
// for reflective access to ForkJoinPool
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ::
// for LevelDB
"--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil
} else Nil
Expand Down