Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._

import org.apache.pekko
import pekko.actor.ActorRef
import pekko.actor.ActorSystem
import pekko.actor.NoSerializationVerificationNeeded
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.stream.stage.InHandler

object StageActorRefBenchmark {
final val OperationsPerInvocation = 10000
private case object CountDown extends NoSerializationVerificationNeeded

private final class Control {
private val ready = new CountDownLatch(1)
@volatile private var ref: ActorRef = _
@volatile private var latch: CountDownLatch = _

def init(ref: ActorRef): Unit = {
this.ref = ref
ready.countDown()
}

def stageActorRef: ActorRef = {
if (!ready.await(10, TimeUnit.SECONDS))
throw new RuntimeException("Stage actor ref was not initialized")
ref
}

def reset(expectedMessages: Int): Unit =
latch = new CountDownLatch(expectedMessages)

def countDown(): Unit =
latch.countDown()

def awaitDone(): Unit =
if (!latch.await(10, TimeUnit.SECONDS))
throw new RuntimeException("Stage actor ref benchmark messages timed out")
}

private final class StageActorSink extends GraphStageWithMaterializedValue[SinkShape[Any], Control] {
val in: Inlet[Any] = Inlet("StageActorSink.in")
override val shape: SinkShape[Any] = SinkShape(in)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {
val control = new Control

val logic = new GraphStageLogic(shape) {
override def preStart(): Unit = {
control.init(getStageActor {
case (_, CountDown) => control.countDown()
}.ref)
pull(in)
}

setHandler(
in,
new InHandler {
override def onPush(): Unit = pull(in)
})
}

logic -> control
}
}
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class StageActorRefBenchmark {
import StageActorRefBenchmark._

implicit val system: ActorSystem = ActorSystem("StageActorRefBenchmark")

private var completion: Promise[Option[Any]] = _
private var control: Control = _
private var stageActorRef: ActorRef = _

@Setup
def setup(): Unit = {
SystemMaterializer(system).materializer
val materialized = Source.maybe[Any].toMat(Sink.fromGraph(new StageActorSink))(Keep.both).run()
completion = materialized._1
control = materialized._2
stageActorRef = control.stageActorRef
}

@TearDown
def shutdown(): Unit = {
completion.trySuccess(None)
Await.result(system.terminate(), 5.seconds)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def lazy_stage_actor_ref_tell_10k(): Unit = {
control.reset(OperationsPerInvocation)
var remaining = OperationsPerInvocation
while (remaining > 0) {
stageActorRef ! CountDown
remaining -= 1
}
control.awaitDone()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ import scala.concurrent.Promise
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorPath
import pekko.actor.ActorRef
import pekko.actor.Kill
import pekko.actor.NoSerializationVerificationNeeded
import pekko.actor.PoisonPill
import pekko.event.Logging
import pekko.stream._
import pekko.stream.impl.fusing.GraphInterpreter
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.stream.stage.InHandler
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.EventFilter
import pekko.testkit.ImplicitSender
import pekko.testkit.TestEvent
Expand Down Expand Up @@ -181,6 +184,51 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender {
res.futureValue should ===(42)
}

"run non-eager stage actor messages in the graph interpreter" in {
val (_, res) = Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run()

val stageRef = expectMsgType[ActorRef]
stageRef ! AddAndTell(1)
expectMsg(1)

stageRef ! ReportStageActorInterpreter
val location = expectMsgType[StageActorLocation]

location.stageActorParent should ===(location.supervisor)
location.interpreter should !==(location.supervisor)

stageRef ! StopNow
res.futureValue should ===(1)
}

"keep eagerly materialized stage actors usable before stream demand" in {
val (ref, probe) = Source
.actorRef[Int]({
case CompleteNow => CompletionStrategy.Immediately
}, PartialFunction.empty, bufferSize = 8, OverflowStrategy.fail)
.toMat(TestSink[Int]())(Keep.both)
.run()

ref ! 1
probe.request(1).expectNext(1)
ref ! CompleteNow
probe.expectComplete()
}

"keep eagerly materialized stage actors attached to the stream supervisor" in {
val (source, res) = Source.maybe[Int].toMat(eagerLocationStage(testActor))(Keep.both).run()

val stageRef = expectMsgType[ActorRef]
stageRef ! ReportEagerStageActorInterpreter
val location = expectMsgType[EagerStageActorLocation]

location.stageActorParent should ===(location.supervisor)
location.stageActorParent should !==(location.interpreter)

source.success(None)
res.futureValue should ===(0)
}

}

}
Expand All @@ -194,10 +242,19 @@ object StageActorRefSpec {
case object BecomeStringEcho extends NoSerializationVerificationNeeded
case object PullNow extends NoSerializationVerificationNeeded
case object StopNow extends NoSerializationVerificationNeeded
case object ReportStageActorInterpreter extends NoSerializationVerificationNeeded
case object ReportEagerStageActorInterpreter extends NoSerializationVerificationNeeded
case object CompleteNow extends NoSerializationVerificationNeeded
final case class StageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath)
extends NoSerializationVerificationNeeded
final case class EagerStageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath)
extends NoSerializationVerificationNeeded
}

import ControlProtocol._

def eagerLocationStage(probe: ActorRef) = EagerLocationStage(probe)

case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] {
val in = Inlet[Int]("IntSum.in")
override val shape: SinkShape[Int] = SinkShape.of(in)
Expand All @@ -216,10 +273,15 @@ object StageActorRefSpec {

def behavior(m: (ActorRef, Any)): Unit = {
m match {
case (_, Add(n)) => sum += n
case (_, PullNow) => pull(in)
case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref
case (_, BecomeStringEcho) =>
case (_, Add(n)) => sum += n
case (_, PullNow) => pull(in)
case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref
case (sender, ReportStageActorInterpreter) =>
sender ! StageActorLocation(
stageActor.ref.path.parent,
interpreter.materializer.supervisor.path,
GraphInterpreter.currentInterpreter.context.path)
case (_, BecomeStringEcho) =>
getStageActor {
case (theSender, msg) => theSender ! msg.toString
}
Expand Down Expand Up @@ -258,4 +320,52 @@ object StageActorRefSpec {
}
}

case class EagerLocationStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] {
val in = Inlet[Int]("EagerLocation.in")
override val shape: SinkShape[Int] = SinkShape.of(in)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val p: Promise[Int] = Promise()

val logic = new GraphStageLogic(shape) {
var stageRef: ActorRef = _
var interpreterPath: ActorPath = _
var supervisorPath: ActorPath = _

override def preStart(): Unit = {
interpreterPath = interpreter.context.path
supervisorPath = interpreter.materializer.supervisor.path
stageRef = getEagerStageActor(interpreter.materializer) {
case (sender, ReportEagerStageActorInterpreter) =>
sender ! EagerStageActorLocation(stageRef.path.parent, supervisorPath, interpreterPath)
case _ => throw new RuntimeException("unexpected message")
}.ref
pull(in)
probe ! stageRef
}

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
p.trySuccess(grab(in))
completeStage()
}

override def onUpstreamFinish(): Unit = {
p.trySuccess(0)
completeStage()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
})
}

logic -> p.future
}
}

}
7 changes: 7 additions & 0 deletions stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ pekko {
# Allows to accelerate message processing that happening within same actor but keep system responsive.
sync-processing-limit = 1000

# Upper bound on stage-actor messages drained per envelope for non-eager `getStageActor` refs. Lazy
# stage actors batch external `tell` deliveries into a MPSC queue and elect a single drain envelope;
# this cap bounds the burst so that other BoundaryEvents (pull/push/complete) can still interleave
# naturally via the actor mailbox. Smaller = better fairness for upstream/downstream events;
# larger = better tell throughput. Must be >= 1.
stage-actor-drain-batch = 16

debug {
# Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more
Expand Down
Loading
Loading