From cf41327164f110f69aa11d7d979b34ac6e6348c8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Nov 2019 14:45:17 +0100 Subject: [PATCH] Reliable delivery in Typed, #20984 --- .../src/test/resources/logback-test.xml | 2 + .../delivery/ReliableDeliverySpec.scala | 823 ++++++++++++++++++ .../delivery/ConsumerController.scala | 308 +++++++ .../delivery/ProducerController.scala | 333 +++++++ .../delivery/ShardingProducerController.scala | 128 +++ .../internal/delivery/SimulatedSharding.scala | 45 + .../WorkPullingProducerController.scala | 120 +++ 7 files changed, 1759 insertions(+) create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/delivery/ReliableDeliverySpec.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ConsumerController.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ProducerController.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ShardingProducerController.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/SimulatedSharding.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/WorkPullingProducerController.scala diff --git a/akka-actor-typed-tests/src/test/resources/logback-test.xml b/akka-actor-typed-tests/src/test/resources/logback-test.xml index 6f342784375..1293aeed3be 100644 --- a/akka-actor-typed-tests/src/test/resources/logback-test.xml +++ b/akka-actor-typed-tests/src/test/resources/logback-test.xml @@ -27,5 +27,7 @@ + + diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/delivery/ReliableDeliverySpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/delivery/ReliableDeliverySpec.scala new file mode 100644 index 00000000000..47965dc1a69 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/delivery/ReliableDeliverySpec.scala @@ -0,0 +1,823 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import java.util.concurrent.ThreadLocalRandom + +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.Terminated +import akka.actor.typed.TypedActorContext +import akka.actor.typed.internal.delivery.ConsumerController.SequencedMessage +import akka.actor.typed.internal.delivery.SimuatedSharding.ShardingEnvelope +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.LoggerOps +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object ReliableDeliverySpec { + private val config = ConfigFactory.parseString(""" + """) + + object TestProducer { + + trait Command + final case class RequestNext(sendTo: ActorRef[TestConsumer.Job]) extends Command + private final case object Tick extends Command + + def apply(delay: FiniteDuration): Behavior[Command] = { + if (delay == Duration.Zero) + activeNoDelay(1) // simulate fast producer + else { + Behaviors.withTimers { timers ⇒ + timers.startTimerWithFixedDelay(Tick, Tick, delay) + idle(0) + } + } + } + + private def idle(n: Int): Behavior[Command] = { + Behaviors.receiveMessage { + case Tick ⇒ Behaviors.same + case RequestNext(sendTo) ⇒ active(n + 1, sendTo) + } + } + + private def active(n: Int, sendTo: ActorRef[TestConsumer.Job]): Behavior[Command] = { + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case Tick ⇒ + sendMessage(n, sendTo, ctx) + idle(n) + + case RequestNext(_) ⇒ + throw new IllegalStateException("Unexpected RequestNext, already got one.") + } + } + } + + private def activeNoDelay(n: Int): Behavior[Command] = { + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case RequestNext(sendTo) ⇒ + sendMessage(n, sendTo, ctx) + activeNoDelay(n + 1) + } + } + } + + private def sendMessage(n: Int, sendTo: ActorRef[TestConsumer.Job], ctx: ActorContext[Command]): Unit = { + val msg = s"msg-$n" + ctx.log.info("sent {}", msg) + sendTo ! TestConsumer.Job(msg) + } + } + + object TestConsumer { + + final case class Job(payload: String) + trait Command + private final case class JobDelivery( + producerId: String, + seqNr: Long, + msg: Job, + confirmTo: ActorRef[ConsumerController.Confirmed]) + extends Command + final case class SomeAsyncJob( + producerId: String, + seqNr: Long, + msg: Job, + confirmTo: ActorRef[ConsumerController.Confirmed]) + extends Command + + final case class AddConsumerController(controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]) + extends Command + + def apply( + delay: FiniteDuration, + endCondition: SomeAsyncJob => Boolean, + endReplyTo: ActorRef[SomeAsyncJob], + controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]): Behavior[Command] = + Behaviors.setup { ctx ⇒ + val deliverTo: ActorRef[ConsumerController.Delivery[Job]] = + ctx.messageAdapter(d => JobDelivery(d.producerId, d.seqNr, d.msg, d.confirmTo)) + ctx.self ! AddConsumerController(controller) + (new TestConsumer(delay, endCondition, endReplyTo, deliverTo)).active(Set.empty) + } + + // dynamically adding ConsumerController via message AddConsumerController + def apply( + delay: FiniteDuration, + endCondition: SomeAsyncJob => Boolean, + endReplyTo: ActorRef[SomeAsyncJob]): Behavior[Command] = + Behaviors.setup { ctx ⇒ + val deliverTo: ActorRef[ConsumerController.Delivery[Job]] = + ctx.messageAdapter(d => JobDelivery(d.producerId, d.seqNr, d.msg, d.confirmTo)) + (new TestConsumer(delay, endCondition, endReplyTo, deliverTo)).active(Set.empty) + } + } + + class TestConsumer( + delay: FiniteDuration, + endCondition: TestConsumer.SomeAsyncJob => Boolean, + endReplyTo: ActorRef[TestConsumer.SomeAsyncJob], + deliverTo: ActorRef[ConsumerController.Delivery[TestConsumer.Job]]) { + import TestConsumer._ + + private def active(processed: Set[(String, Long)]): Behavior[Command] = { + Behaviors.receive { (ctx, m) => + m match { + case JobDelivery(producerId, seqNr, msg, confirmTo) ⇒ + // confirmation can be later, asynchronously + if (delay == Duration.Zero) + ctx.self ! SomeAsyncJob(producerId, seqNr, msg, confirmTo) + else + // schedule to simulate slow consumer + ctx.scheduleOnce(10.millis, ctx.self, SomeAsyncJob(producerId, seqNr, msg, confirmTo)) + Behaviors.same + + case job @ SomeAsyncJob(producerId, seqNr, _, confirmTo) ⇒ + // when replacing producer the seqNr may start from 1 again + val cleanProcessed = + if (seqNr == 1L) processed.filterNot { case (pid, _) => pid == producerId } else processed + + if (cleanProcessed((producerId, seqNr))) + throw new RuntimeException(s"Received duplicate [($producerId,$seqNr)]") + ctx.log.info("processed [{}] from [{}]", seqNr, producerId) + confirmTo ! ConsumerController.Confirmed(seqNr) + + if (endCondition(job)) { + endReplyTo ! job + Behaviors.stopped + } else + active(cleanProcessed + (producerId -> seqNr)) + + case AddConsumerController(controller) => + controller ! ConsumerController.Start(deliverTo) + Behaviors.same + } + } + } + + } + + object TestProducerWithConfirmation { + + trait Command + final case class RequestNext(sendTo: ActorRef[ProducerController.MessageWithConfirmation[TestConsumer.Job]]) + extends Command + private case object Tick extends Command + private final case class Confirmed(seqNr: Long) extends Command + private case object AskTimeout extends Command + + private implicit val askTimeout: Timeout = 10.seconds + + def apply(delay: FiniteDuration, replyProbe: ActorRef[Long]): Behavior[Command] = { + Behaviors.withTimers { timers ⇒ + timers.startTimerWithFixedDelay(Tick, Tick, delay) + idle(0, replyProbe) + } + } + + private def idle(n: Int, replyProbe: ActorRef[Long]): Behavior[Command] = { + Behaviors.receiveMessage { + case Tick ⇒ Behaviors.same + case RequestNext(sendTo) ⇒ active(n + 1, replyProbe, sendTo) + case Confirmed(seqNr) => + replyProbe ! seqNr + Behaviors.same + } + } + + private def active( + n: Int, + replyProbe: ActorRef[Long], + sendTo: ActorRef[ProducerController.MessageWithConfirmation[TestConsumer.Job]]): Behavior[Command] = { + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case Tick ⇒ + val msg = s"msg-$n" + ctx.log.info("sent {}", msg) + ctx.ask( + sendTo, + (askReplyTo: ActorRef[Long]) => + ProducerController.MessageWithConfirmation(TestConsumer.Job(msg), askReplyTo)) { + case Success(seqNr) => Confirmed(seqNr) + case Failure(_) => AskTimeout + } + idle(n, replyProbe) + + case RequestNext(_) ⇒ + throw new IllegalStateException("Unexpected RequestNext, already got one.") + + case Confirmed(seqNr) => + ctx.log.info("Reply Confirmed [{}]", seqNr) + replyProbe ! seqNr + Behaviors.same + + case AskTimeout => + ctx.log.warn("Timeout") + Behaviors.same + } + } + } + + } + + object TestShardingProducer { + + trait Command + final case class RequestNext(sendToRef: ActorRef[ShardingEnvelope[TestConsumer.Job]]) extends Command + + private final case object Tick extends Command + + def apply(): Behavior[Command] = { + // simulate fast producer + Behaviors.withTimers { timers ⇒ + timers.startTimerWithFixedDelay(Tick, Tick, 20.millis) + idle(0) + } + } + + private def idle(n: Int): Behavior[Command] = { + Behaviors.receiveMessage { + case Tick ⇒ Behaviors.same + case RequestNext(sendTo) ⇒ active(n + 1, sendTo) + } + } + + private def active(n: Int, sendTo: ActorRef[ShardingEnvelope[TestConsumer.Job]]): Behavior[Command] = { + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case Tick ⇒ + val msg = s"msg-$n" + val entityId = s"entity-${n % 3}" + ctx.log.info2("sent {} to {}", msg, entityId) + sendTo ! ShardingEnvelope(entityId, TestConsumer.Job(msg)) + idle(n) + + case RequestNext(_) ⇒ + throw new IllegalStateException("Unexpected RequestNext, already got one.") + } + } + } + + } + + // FIXME this should also become part of Akka + object TestShardingConsumer { + def apply( + delay: FiniteDuration, + endCondition: TestConsumer.SomeAsyncJob => Boolean, + endReplyTo: ActorRef[TestConsumer.SomeAsyncJob]) + : Behavior[ConsumerController.SequencedMessage[TestConsumer.Job]] = { + Behaviors.setup { context => + val consumer = context.spawn(TestConsumer(delay, endCondition, endReplyTo), name = "consumer") + // if consumer terminates this actor will also terminate + context.watch(consumer) + active(consumer, Map.empty) + } + } + + private def active( + consumer: ActorRef[TestConsumer.Command], + controllers: Map[String, ActorRef[ConsumerController.Command[TestConsumer.Job]]]) + : Behavior[ConsumerController.SequencedMessage[TestConsumer.Job]] = { + Behaviors + .receive[ConsumerController.SequencedMessage[TestConsumer.Job]] { (ctx, msg) => + controllers.get(msg.producerId) match { + case Some(c) => + c ! msg + Behaviors.same + case None => + val c = ctx.spawn( + ConsumerController[TestConsumer.Job](resendLost = true), + s"consumerController-${msg.producerId}") + // FIXME watch msg.producerController to cleanup terminated producers + consumer ! TestConsumer.AddConsumerController(c) + c ! msg + active(consumer, controllers.updated(msg.producerId, c)) + } + } + .receiveSignal { + case (_, Terminated(_)) => + Behaviors.stopped + } + } + } + + object RandomFlakyNetwork { + def apply[T](dropProbability: Any => Double): BehaviorInterceptor[T, T] = + new RandomFlakyNetwork(dropProbability).asInstanceOf[BehaviorInterceptor[T, T]] + } + + class RandomFlakyNetwork(dropProbability: Any => Double) extends BehaviorInterceptor[Any, Any] { + override def aroundReceive( + ctx: TypedActorContext[Any], + msg: Any, + target: BehaviorInterceptor.ReceiveTarget[Any]): Behavior[Any] = { + if (ThreadLocalRandom.current().nextDouble() < dropProbability(msg)) { + ctx.asScala.log.info("dropped {}", msg) + Behaviors.same + } else { + target(ctx, msg) + } + } + + } + +} + +class ReliableDeliverySpec + extends ScalaTestWithActorTestKit(ReliableDeliverySpec.config) + with WordSpecLike + with LogCapturing { + import ReliableDeliverySpec._ + + private var idCount = 0 + private def nextId(): Int = { + idCount += 1 + idCount + } + + private val defaultProducerDelay = 20.millis + private val defaultConsumerDelay = 10.millis + + private def consumerEndCondition(seqNr: Long): TestConsumer.SomeAsyncJob => Boolean = { + case TestConsumer.SomeAsyncJob(_, nr, _, _) => nr == seqNr + } + + "ReliableDelivery" must { + + "illustrate point-to-point usage" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + name = s"destination-${idCount}") + + val producer = spawn(TestProducer(defaultProducerDelay), name = s"producer-${idCount}") + val producerController = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + // FIXME too bad that the type of the nextParam can't be infered + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(5.seconds) + + testKit.stop(producer) + testKit.stop(producerController) + testKit.stop(consumerController) + } + + "illustrate point-to-point usage with confirmations" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + name = s"destination-${idCount}") + + val replyProbe = createTestProbe[Long]() + + val producer = + spawn(TestProducerWithConfirmation(defaultProducerDelay, replyProbe.ref), name = s"producer-${idCount}") + val producerController = + spawn( + ProducerController.withConfirmation[TestConsumer.Job, TestProducerWithConfirmation.RequestNext]( + s"p-${idCount}", + // FIXME too bad that the type of the nextParam can't be infered + (nextParam: ProducerController.RequestNextParam[ + ProducerController.MessageWithConfirmation[TestConsumer.Job]]) => + TestProducerWithConfirmation.RequestNext(nextParam.sendNextTo), + producer), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(5.seconds) + + replyProbe.receiveMessages(42, 5.seconds).toSet should ===((1L to 42L).toSet) + + testKit.stop(producer) + testKit.stop(producerController) + testKit.stop(consumerController) + } + + "illustrate work-pulling usage" in { + nextId() + val jobProducer = spawn(TestProducer(defaultProducerDelay), name = s"jobProducer-${idCount}") + // FIXME unfortunate that the types of the WorkPullingController can't be inferred + val workPullingController = + spawn( + WorkPullingProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + TestProducer.RequestNext(_), + jobProducer), + s"workPullingController-${idCount}") + + val consumerEndProbe1 = createTestProbe[TestConsumer.SomeAsyncJob]() + val workerController1 = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"workerController1-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe1.ref, workerController1), + name = s"worker1-${idCount}") + + val consumerEndProbe2 = createTestProbe[TestConsumer.SomeAsyncJob]() + val workerController2 = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"workerController2-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe2.ref, workerController2), + name = s"worker2-${idCount}") + + val registrationReplyProbe = createTestProbe[Done]() + workPullingController ! WorkPullingProducerController.RegisterWorker( + workerController1, + registrationReplyProbe.ref) + workPullingController ! WorkPullingProducerController.RegisterWorker( + workerController2, + registrationReplyProbe.ref) + registrationReplyProbe.expectMessage(Done) + registrationReplyProbe.expectMessage(Done) + + consumerEndProbe1.receiveMessage(10.seconds) + consumerEndProbe2.receiveMessage() + + testKit.stop(jobProducer) + testKit.stop(workPullingController) + testKit.stop(workerController1) + testKit.stop(workerController2) + } + + "illustrate sharding usage" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] = + spawn( + SimuatedSharding( + _ => TestShardingConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref)), + s"sharding-${idCount}") + + val producer = spawn(TestShardingProducer(), name = s"shardingProducer-${idCount}") + val shardingController = + spawn( + ShardingProducerController[TestConsumer.Job, TestShardingProducer.RequestNext]( + s"p-${idCount}", + TestShardingProducer.RequestNext(_), + producer, + sharding), + s"shardingController-${idCount}") + + // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" + consumerEndProbe.receiveMessages(3, 5.seconds).map(_.producerId) + + testKit.stop(producer) + testKit.stop(shardingController) + testKit.stop(sharding) + } + + "illustrate sharding usage with several producers" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] = + spawn( + SimuatedSharding( + _ => TestShardingConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref)), + s"sharding-${idCount}") + + val producer1 = spawn(TestShardingProducer(), name = s"shardingProducer1-${idCount}") + val shardingController1 = + spawn( + ShardingProducerController[TestConsumer.Job, TestShardingProducer.RequestNext]( + s"p1-${idCount}", // note different producerId + TestShardingProducer.RequestNext(_), + producer1, + sharding), + s"shardingController1-${idCount}") + + val producer2 = spawn(TestShardingProducer(), name = s"shardingProducer2-${idCount}") + val shardingController2 = + spawn( + ShardingProducerController[TestConsumer.Job, TestShardingProducer.RequestNext]( + s"p2-${idCount}", // note different producerId + TestShardingProducer.RequestNext(_), + producer2, + sharding), + s"shardingController2-${idCount}") + + // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" + val endMessages = consumerEndProbe.receiveMessages(3, 5.seconds) + // verify that they received messages from both producers + endMessages.map(_.producerId).toSet should ===(Set(s"p1-${idCount}", s"p2-${idCount}")) + + testKit.stop(producer1) + testKit.stop(producer2) + testKit.stop(shardingController1) + testKit.stop(shardingController2) + testKit.stop(sharding) + } + + def testWithDelays(producerDelay: FiniteDuration, consumerDelay: FiniteDuration): Unit = { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController-${idCount}") + spawn( + TestConsumer(consumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + name = s"destination-${idCount}") + + val producer = spawn(TestProducer(producerDelay), name = s"producer-${idCount}") + val producerController = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(5.seconds) + + testKit.stop(producer) + testKit.stop(producerController) + testKit.stop(consumerController) + } + + "work with slow producer and fast consumer" in { + testWithDelays(producerDelay = 30.millis, consumerDelay = Duration.Zero) + } + + "work with fast producer and slow consumer" in { + testWithDelays(producerDelay = Duration.Zero, consumerDelay = 30.millis) + } + + "work with fast producer and fast consumer" in { + testWithDelays(producerDelay = Duration.Zero, consumerDelay = Duration.Zero) + } + + "allow replacement of destination" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController1-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + s"consumer1-${idCount}") + + val producer = spawn(TestProducer(defaultProducerDelay), name = s"producer-${idCount}") + val producerController = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(5.seconds) + + val consumerEndProbe2 = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController2 = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController2-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe2.ref, consumerController2), + s"consumer2-${idCount}") + producerController ! ProducerController.RegisterConsumer(consumerController2, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe2.receiveMessage(5.seconds) + + testKit.stop(producer) + testKit.stop(producerController) + testKit.stop(consumerController) + } + + "allow replacement of producer" in { + nextId() + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + name = s"destination-${idCount}") + + val producer1 = spawn(TestProducer(defaultProducerDelay), name = s"producer1-${idCount}") + val producerController1 = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer1), + s"producerController1-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController1 ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + // FIXME better way of testing this + Thread.sleep(300) + testKit.stop(producer1) + testKit.stop(producerController1) + + val producer2 = spawn(TestProducer(defaultProducerDelay), name = s"producer2-${idCount}") + val producerController2 = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", // must keep the same producerId + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer2), + s"producerController2-${idCount}") + + producerController2 ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(5.seconds) + + testKit.stop(producer2) + testKit.stop(producerController2) + testKit.stop(consumerController) + } + + "work with flaky network" in { + // FIXME there is bug: if the first Request message from ConsumerController is dropped (some missing resending) + + nextId() + // RandomFlakyNetwork to simulate lost messages from producerController to consumerController + val consumerDrop: Any => Double = { + case _: ConsumerController.SequencedMessage[_] => 0.1 + case _ => 0.0 + } + + val consumerEndProbe = createTestProbe[TestConsumer.SomeAsyncJob]() + val consumerController = + spawn( + Behaviors.intercept(() => RandomFlakyNetwork[ConsumerController.Command[TestConsumer.Job]](consumerDrop))( + ConsumerController[TestConsumer.Job](resendLost = true)), + s"consumerController-${idCount}") + spawn( + TestConsumer(defaultConsumerDelay, consumerEndCondition(42), consumerEndProbe.ref, consumerController), + name = s"destination-${idCount}") + + val producer = spawn(TestProducer(defaultProducerDelay), name = s"producer-${idCount}") + // RandomFlakyNetwork to simulate lost messages from consumerController to producerController + val producerDrop: Any => Double = { + case _: ProducerController.Internal.Request => 0.3 + case _: ProducerController.Internal.Resend => 0.3 + case _ => 0.0 + } + + val producerController = spawn( + Behaviors.intercept(() => RandomFlakyNetwork[ProducerController.Command[TestConsumer.Job]](producerDrop))( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producer)), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + consumerEndProbe.receiveMessage(30.seconds) + + testKit.stop(producer) + testKit.stop(producerController) + testKit.stop(consumerController) + + } + + "deliver for normal scenario" in { + nextId() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](resendLost = true), s"consumerController-${idCount}") + val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]() + + val producerProbe = createTestProbe[TestProducer.RequestNext]() + val producerController = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producerProbe.ref), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerController, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + // initial RequestNext + val sendTo = producerProbe.receiveMessage().sendTo + + consumerController ! ConsumerController.Start(consumerProbe.ref) + sendTo ! TestConsumer.Job("msg-1") + + val delivery1 = consumerProbe.receiveMessage() + delivery1.seqNr should ===(1) + delivery1.msg should ===(TestConsumer.Job("msg-1")) + delivery1.confirmTo ! ConsumerController.Confirmed(delivery1.seqNr) + + sendTo ! TestConsumer.Job("msg-2") + producerProbe.receiveMessage() // RequestNext immediately + sendTo ! TestConsumer.Job("msg-3") + producerProbe.receiveMessage() + + val delivery2 = consumerProbe.receiveMessage() + delivery2.seqNr should ===(2) + delivery2.msg should ===(TestConsumer.Job("msg-2")) + // msg-3 isn't delivered before msg-2 has been confirmed (but we could allow more in flight) + consumerProbe.expectNoMessage() + delivery2.confirmTo ! ConsumerController.Confirmed(delivery2.seqNr) + + val delivery3 = consumerProbe.receiveMessage() + delivery3.seqNr should ===(3) + delivery3.msg should ===(TestConsumer.Job("msg-3")) + delivery3.confirmTo ! ConsumerController.Confirmed(delivery3.seqNr) + + testKit.stop(producerController) + testKit.stop(consumerController) + } + + "resend initial SequencedMessage if lost" in { + nextId() + val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]() + + val producerProbe = createTestProbe[TestProducer.RequestNext]() + val producerController = + spawn( + ProducerController[TestConsumer.Job, TestProducer.RequestNext]( + s"p-${idCount}", + (nextParam: ProducerController.RequestNextParam[TestConsumer.Job]) => + TestProducer.RequestNext(nextParam.sendNextTo), + producerProbe.ref), + s"producerController-${idCount}") + + val registerDoneProbe = createTestProbe[Done]() + producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref, registerDoneProbe.ref) + registerDoneProbe.expectMessage(Done) + + val producer = producerProbe.receiveMessage().sendTo + producer ! TestConsumer.Job("msg-1") + + val internalProducerController = producerController.unsafeUpcast[ProducerController.InternalCommand] + + consumerControllerProbe.expectMessage( + ConsumerController.SequencedMessage(s"p-${idCount}", 1L, TestConsumer.Job("msg-1"), true)( + internalProducerController)) + + // the ConsumerController will send initial `Request` back, but if that is lost or if the first + // `SequencedMessage` is lost the ProducerController will resend the SequencedMessage + consumerControllerProbe.expectMessage( + ConsumerController.SequencedMessage(s"p-${idCount}", 1L, TestConsumer.Job("msg-1"), true)( + internalProducerController)) + + internalProducerController ! ProducerController.Internal.Request(1L, 10L, true, false) + consumerControllerProbe.expectNoMessage(1100.millis) + + testKit.stop(producerController) + + // FIXME more tests of resend first, + // * for supportResend=false + // * for registration of new ConsumerController + // * when replacing producer + } + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ConsumerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ConsumerController.scala new file mode 100644 index 00000000000..7bb7d92688e --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ConsumerController.scala @@ -0,0 +1,308 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import scala.concurrent.duration._ + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.StashBuffer +import akka.annotation.InternalApi + +// FIXME Scaladoc describes how it works, internally. Rewrite for end user and keep internals as impl notes. + +/** + * The destination consumer will start the flow by sending an initial `Start` message + * to the `ConsumerController`. + * + * The `ProducerController` sends the first message to the `ConsumerController` without waiting for + * a `Request` from the `ConsumerController`. The main reason for this is that when used with + * Cluster Sharding the first message will typically create the `ConsumerController`. It's + * also a way to connect the ProducerController and ConsumerController in a dynamic way, for + * example when the ProducerController is replaced. + * + * The `ConsumerController` sends [[ProducerController.Internal.Request]] to the `ProducerController` + * to specify it's ready to receive up to the requested sequence number. + * + * The `ConsumerController` sends the first `Request` when it receives the first `SequencedMessage` + * and has received the `Start` message from the consumer. + * + * It sends new `Request` when half of the requested window is remaining, but it also retries + * the `Request` if no messages are received because that could be caused by lost messages. + * + * Apart from the first message the producer will not send more messages than requested. + * + * Received messages are wrapped in [[ConsumerController.Delivery]] when sent to the consumer, + * which is supposed to reply with [[ConsumerController.Confirmed]] when it has processed the message. + * Next message is not delivered until the previous is confirmed. + * More messages from the producer that arrive while waiting for the confirmation are stashed by + * the `ConsumerController` and delivered when previous message was confirmed. + * + * The consumer and the `ConsumerController` are supposed to be local so that these messages are fast and not lost. + * + * If the `ConsumerController` receives a message with unexpected sequence number (not previous + 1) + * it sends [[ProducerController.Internal.Resend]] to the `ProducerController` and will ignore all messages until + * the expected sequence number arrives. + */ +object ConsumerController { + + sealed trait InternalCommand + sealed trait Command[+A] extends InternalCommand + final case class SequencedMessage[A](producerId: String, seqNr: Long, msg: A, first: Boolean)( + /** INTERNAL API */ + @InternalApi private[akka] val producer: ActorRef[ProducerController.InternalCommand]) + extends Command[A] + private final case object RetryRequest extends InternalCommand + + final case class Delivery[A](producerId: String, seqNr: Long, msg: A, confirmTo: ActorRef[Confirmed]) + final case class Start[A](deliverTo: ActorRef[Delivery[A]]) extends Command[A] + final case class Confirmed(seqNr: Long) extends InternalCommand + + private final case class State( + producer: ActorRef[ProducerController.InternalCommand], + receivedSeqNr: Long, + requestedSeqNr: Long) + + private val RequestWindow = 20 // FIXME should be a param, ofc + + def apply[A](resendLost: Boolean): Behavior[Command[A]] = { + Behaviors + .setup[InternalCommand] { ctx ⇒ + Behaviors.withStash(100) { stashBuffer => + def becomeActive( + producerId: String, + producer: ActorRef[ProducerController.InternalCommand], + start: Start[A], + firstSeqNr: Long): Behavior[InternalCommand] = { + val requestedSeqNr = firstSeqNr - 1 + RequestWindow + + // FIXME adjust all logging, most should probably be debug + ctx.log.infoN( + "Become active for producer [{}] / [{}], requestedSeqNr [{}]", + producerId, + producer, + requestedSeqNr) + + producer ! ProducerController.Internal.Request( + confirmedSeqNr = 0, + requestedSeqNr, + resendLost, + viaReceiveTimeout = false) + + ctx.setReceiveTimeout(1.second, RetryRequest) + val next = new ConsumerController[A](ctx, producerId, start.deliverTo, stashBuffer, resendLost) + .active(State(producer, receivedSeqNr = 0, requestedSeqNr)) + stashBuffer.unstashAll(next) + } + + // wait for both the `Start` message from the consumer and the first `SequencedMessage` from the producer + def idle( + producer: Option[(String, ActorRef[ProducerController.InternalCommand])], + start: Option[Start[A]], + firstSeqNr: Long): Behavior[InternalCommand] = { + Behaviors.receiveMessagePartial { + case s: Start[A] @unchecked => + producer match { + case None => idle(None, Some(s), firstSeqNr) + case Some((pid, p)) => becomeActive(pid, p, s, firstSeqNr) + + } + + case seqMsg: SequencedMessage[A] @unchecked => + if (seqMsg.first) { + ctx.log.info("Received first SequencedMessage [{}]", seqMsg.seqNr) + stashBuffer.stash(seqMsg) + start match { + case None => idle(Some((seqMsg.producerId, seqMsg.producer)), start, seqMsg.seqNr) + case Some(s) => becomeActive(seqMsg.producerId, seqMsg.producer, s, seqMsg.seqNr) + } + } else if (seqMsg.seqNr > firstSeqNr) { + ctx.log.info("Stashing non-first SequencedMessage [{}]", seqMsg.seqNr) + stashBuffer.stash(seqMsg) + Behaviors.same + } else { + ctx.log.info("Dropping non-first SequencedMessage [{}]", seqMsg.seqNr) + Behaviors.same + } + + } + + } + + idle(None, None, 1L) + } + } + .narrow // expose Command, but not InternalCommand + } + +} + +private class ConsumerController[A]( + context: ActorContext[ConsumerController.InternalCommand], + producerId: String, + deliverTo: ActorRef[ConsumerController.Delivery[A]], + stashBuffer: StashBuffer[ConsumerController.InternalCommand], + resendLost: Boolean) { + + import ConsumerController._ + import ProducerController.Internal.Request + import ProducerController.Internal.Resend + + // Expecting a SequencedMessage from ProducerController, that will be delivered to the consumer if + // the seqNr is right. + private def active(s: State): Behavior[InternalCommand] = { + Behaviors.receiveMessage { + case seqMsg @ SequencedMessage(pid, seqNr, msg: A @unchecked, first) ⇒ + checkProducerId(producerId, pid, seqNr) + val expectedSeqNr = s.receivedSeqNr + 1 + if (seqNr == expectedSeqNr || (first && seqNr >= expectedSeqNr) || (first && seqMsg.producer != s.producer)) { + logIfChangingProducer(s.producer, seqMsg, pid, seqNr) + deliverTo ! Delivery(pid, seqNr, msg, context.self) + waitingForConfirmation( + s.copy(producer = seqMsg.producer, receivedSeqNr = seqNr, requestedSeqNr = s.requestedSeqNr), + first) + } else if (seqNr > expectedSeqNr) { + logIfChangingProducer(s.producer, seqMsg, pid, seqNr) + context.log.infoN("from producer [{}], missing [{}], received [{}]", pid, expectedSeqNr, seqNr) + if (resendLost) { + seqMsg.producer ! Resend(fromSeqNr = expectedSeqNr) + resending(s.copy(producer = seqMsg.producer)) + } else { + deliverTo ! Delivery(pid, seqNr, msg, context.self) + waitingForConfirmation(s.copy(producer = seqMsg.producer, receivedSeqNr = seqNr), first) + } + } else { // seqNr < expectedSeqNr + context.log.infoN("from producer [{}], deduplicate [{}], expected [{}]", pid, seqNr, expectedSeqNr) + Behaviors.same + } + + case RetryRequest ⇒ + retryRequest(s) + + case Confirmed(seqNr) ⇒ + context.log.warn("Unexpected confirmed [{}]", seqNr) + Behaviors.unhandled + + case Start(_) => + Behaviors.unhandled + } + } + + // It has detected a missing seqNr and requested a Resend. Expecting a SequencedMessage from the + // ProducerController with the missing seqNr. Other SequencedMessage with different seqNr will be + // discarded since they were in flight before the Resend request and will anyway be sent again. + private def resending(s: State): Behavior[InternalCommand] = { + Behaviors.receiveMessage { + case seqMsg @ SequencedMessage(pid, seqNr, msg: A @unchecked, first) ⇒ + checkProducerId(producerId, pid, seqNr) + + // FIXME is SequencedMessage.first possible here? + if (seqNr == s.receivedSeqNr + 1) { + logIfChangingProducer(s.producer, seqMsg, pid, seqNr) + context.log.info("from producer [{}], received missing [{}]", pid, seqNr) + deliverTo ! Delivery(pid, seqNr, msg, context.self) + waitingForConfirmation(s.copy(producer = seqMsg.producer, receivedSeqNr = seqNr), first) + } else { + context.log.infoN("from producer [{}], ignoring [{}], waiting for [{}]", pid, seqNr, s.receivedSeqNr + 1) + Behaviors.same // ignore until we receive the expected + } + + case RetryRequest ⇒ + // in case the Resend message was lost + context.log.info("retry Resend [{}]", s.receivedSeqNr + 1) + s.producer ! Resend(fromSeqNr = s.receivedSeqNr + 1) + Behaviors.same + + case Confirmed(seqNr) ⇒ + context.log.warn("Unexpected confirmed [{}]", seqNr) + Behaviors.unhandled + + case Start(_) => + Behaviors.unhandled + } + } + + // The message has been delivered to the consumer and it is now waiting for Confirmed from + // the consumer. New SequencedMessage from the ProducerController will be stashed. + private def waitingForConfirmation(s: State, first: Boolean): Behavior[InternalCommand] = { + Behaviors.receiveMessage { + case Confirmed(seqNr) ⇒ + val expectedSeqNr = s.receivedSeqNr + if (seqNr > expectedSeqNr) { + throw new IllegalStateException( + s"Expected confirmation of seqNr [$expectedSeqNr], but received higher [$seqNr]") + } else if (seqNr != expectedSeqNr) { + // FIXME restart of consumer is not fully thought about yet + context.log.info( + "Expected confirmation of seqNr [{}] but received [{}]. Perhaps the consumer was restarted.", + expectedSeqNr, + seqNr) + } + context.log.info("Confirmed [{}], stashed size [{}]", seqNr, stashBuffer.size) + val newRequestedSeqNr = + if (first) { + // confirm the first message immediately to cancel resending of first + val newRequestedSeqNr = seqNr - 1 + RequestWindow + context.log.info("Request after first [{}]", newRequestedSeqNr) + s.producer ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaReceiveTimeout = false) + newRequestedSeqNr + } else if ((s.requestedSeqNr - seqNr) == RequestWindow / 2) { + val newRequestedSeqNr = s.requestedSeqNr + RequestWindow / 2 + context.log.info("Request [{}]", newRequestedSeqNr) + s.producer ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaReceiveTimeout = false) + newRequestedSeqNr + } else { + s.requestedSeqNr + } + // FIXME can we use unstashOne instead of all? + stashBuffer.unstashAll(active(s.copy(receivedSeqNr = seqNr, requestedSeqNr = newRequestedSeqNr))) + + case RetryRequest ⇒ + retryRequest(s) + + case msg ⇒ + context.log.info("Stash [{}]", msg) + stashBuffer.stash(msg) + Behaviors.same + } + } + + // in case the Request or the SequencedMessage triggering the Request is lost + private def retryRequest(s: State) = { + val newRequestedSeqNr = s.receivedSeqNr + RequestWindow + context.log.info("retry Request [{}]", newRequestedSeqNr) + // FIXME may watch the producer to avoid sending retry Request to dead producer + s.producer ! Request(s.receivedSeqNr, newRequestedSeqNr, resendLost, viaReceiveTimeout = true) + active(s.copy(requestedSeqNr = newRequestedSeqNr)) + } + + private def checkProducerId(producerId: String, incomingProducerId: String, seqNr: Long): Unit = { + if (incomingProducerId != producerId) + throw new IllegalArgumentException( + s"Unexpected producerId, expected [$producerId], received [$incomingProducerId], " + + s"seqNr [$seqNr].") + } + + private def logIfChangingProducer( + producer: ActorRef[ProducerController.InternalCommand], + seqMsg: SequencedMessage[Any], + producerId: String, + seqNr: Long): Unit = { + if (seqMsg.producer != producer) + context.log.infoN( + "changing producer [{}] from [{}] to [{}], seqNr [{}]", + producerId, + producer, + seqMsg.producer, + seqNr) + } + +} + +// FIXME it must be possible to restart the consumer, then it might send a non-matching Confirmed(seqNr) +// FIXME could use watch to detect when producer or consumer are terminated diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ProducerController.scala new file mode 100644 index 00000000000..fd46bb94c34 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ProducerController.scala @@ -0,0 +1,333 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.TimerScheduler +import akka.actor.typed.scaladsl.LoggerOps + +// FIXME Scaladoc describes how it works, internally. Rewrite for end user and keep internals as impl notes. + +/** + * The `ProducerController` sends `RequestNext` to the actual producer, which is then allowed to send + * one message to the `ProducerController`. The `RequestMessage` message is defined via a factory + * function so that the producer can decide what type to use. The producer and `ProducerController` + * are supposed to be local so that these messages are fast and not lost. + * + * The `ProducerController` sends the first message to the `ConsumerController` without waiting for + * a `Request` from the `ConsumerController`. The main reason for this is that when used with + * Cluster Sharding the first message will typically create the `ConsumerController`. It's + * also a way to connect the ProducerController and ConsumerController in a dynamic way, for + * example when the ProducerController is replaced. + * + * When the first message is received by the `ConsumerController` it sends back the initial `Request`. + * + * Apart from the first message the `ProducerController` will not send more messages than requested + * by the `ConsumerController`. + * + * When there is demand from the consumer side the `ProducerController` sends `RequestNext` to the + * actual producer, which is then allowed to send one more message. + * + * Each message is wrapped by the `ProducerController` in [[ConsumerController.SequencedMessage]] with + * a monotonically increasing sequence number without gaps, starting at 1. + * + * The `Request` message also contains a `confirmedSeqNr` that is the acknowledgement + * from the consumer that it has received and processed all messages up to that sequence number. + * + * The `ConsumerController` will send [[ProducerController.Internal.Resend]] if a lost message is detected + * and then the `ProducerController` will resend all messages from that sequence number. The producer keeps + * unconfirmed messages in a buffer to be able to resend them. The buffer size is limited + * by the request window size. + * + * The resending is optional, and the `ConsumerController` can be started with `resendLost=false` + * to ignore lost messages, and then the `ProducerController` will not buffer unconfirmed messages. + * In that mode it provides only flow control but no reliable delivery. + */ +object ProducerController { + + sealed trait InternalCommand + + sealed trait Command[A] extends InternalCommand + + final case class RegisterConsumer[A]( + consumerController: ActorRef[ConsumerController.Command[A]], + replyTo: ActorRef[Done]) + extends Command[A] + + final case class RequestNextParam[A](currentSeqNr: Long, confirmedSeqNr: Long, sendNextTo: ActorRef[A]) + + final case class MessageWithConfirmation[A](message: A, replyTo: ActorRef[Long]) + + object Internal { + final case class Request(confirmedSeqNr: Long, upToSeqNr: Long, supportResend: Boolean, viaReceiveTimeout: Boolean) + extends InternalCommand { + require(confirmedSeqNr < upToSeqNr) + } + final case class Resend(fromSeqNr: Long) extends InternalCommand + } + + private case class Msg[A](msg: A) extends InternalCommand + private case object ResendFirst extends InternalCommand + + private final case class State[A]( + requested: Boolean, + currentSeqNr: Long, + confirmedSeqNr: Long, + requestedSeqNr: Long, + pendingReplies: Map[Long, ActorRef[Long]], + unconfirmed: Option[Vector[ConsumerController.SequencedMessage[A]]], // FIXME use OptionVal + firstSeqNr: Long, + send: ConsumerController.SequencedMessage[A] => Unit) + + def apply[A: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: RequestNextParam[A] ⇒ RequestNext, + producer: ActorRef[RequestNext]): Behavior[Command[A]] = { + + Behaviors + .receiveMessagePartial[InternalCommand] { + case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked, replyTo) => + replyTo ! Done + becomeActive(producerId, requestNextFactory, producer, consumerController) + } + .narrow + } + + /** + * For custom `send` function. For example used with Sharding where the message must be wrapped in + * `ShardingEnvelope(SequencedMessage(msg))`. + */ + def apply[A: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: RequestNextParam[A] ⇒ RequestNext, + producer: ActorRef[RequestNext], + send: ConsumerController.SequencedMessage[A] => Unit): Behavior[Command[A]] = { + becomeActive(producerId, requestNextFactory, producer, send).narrow + } + + /** + * For confirmation message back to the producer when the message has been fully delivered, processed, + * and confirmed by the consumer. Typically used with `ask` from the producer. + */ + def withConfirmation[A: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: RequestNextParam[MessageWithConfirmation[A]] ⇒ RequestNext, + producer: ActorRef[RequestNext]): Behavior[Command[A]] = { + Behaviors + .receiveMessagePartial[InternalCommand] { + case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked, replyTo) => + replyTo ! Done + becomeActive[A, MessageWithConfirmation[A], RequestNext]( + producerId, + requestNextFactory, + producer, + consumerController) + } + .narrow + } + + private def becomeActive[A: ClassTag, B: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: RequestNextParam[B] ⇒ RequestNext, + producer: ActorRef[RequestNext], + consumerController: ActorRef[ConsumerController.Command[A]]): Behavior[InternalCommand] = { + val send: ConsumerController.SequencedMessage[A] => Unit = consumerController ! _ + becomeActive[A, B, RequestNext](producerId, requestNextFactory, producer, send) + } + + private def becomeActive[A: ClassTag, B: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: RequestNextParam[B] ⇒ RequestNext, + producer: ActorRef[RequestNext], + send: ConsumerController.SequencedMessage[A] => Unit): Behavior[InternalCommand] = { + + Behaviors.setup { ctx ⇒ + Behaviors.withTimers { timers => + val msgAdapter: ActorRef[B] = ctx.messageAdapter(msg ⇒ Msg(msg)) + producer ! requestNextFactory(RequestNextParam(1L, 0L, msgAdapter)) + new ProducerController[A, B, RequestNext](ctx, producerId, producer, requestNextFactory, msgAdapter, timers) + .active( + State( + requested = true, + currentSeqNr = 1L, + confirmedSeqNr = 0L, + requestedSeqNr = 1L, + pendingReplies = Map.empty, + unconfirmed = Some(Vector.empty), + firstSeqNr = 1L, + send)) + } + } + } + +} + +private class ProducerController[A: ClassTag, B: ClassTag, RequestNext]( + ctx: ActorContext[ProducerController.InternalCommand], + producerId: String, + producer: ActorRef[RequestNext], + requestNextFactory: ProducerController.RequestNextParam[B] ⇒ RequestNext, + msgAdapter: ActorRef[B], + timers: TimerScheduler[ProducerController.InternalCommand]) { + import ProducerController._ + import ProducerController.Internal._ + import ConsumerController.SequencedMessage + + private def active(s: State[A]): Behavior[InternalCommand] = { + + def onMsg(m: A, newPendingReplies: Map[Long, ActorRef[Long]]): Behavior[InternalCommand] = { + if (s.requested && s.currentSeqNr <= s.requestedSeqNr) { + // FIXME adjust all logging, most should probably be debug + ctx.log.info("sent [{}]", s.currentSeqNr) + val seqMsg = SequencedMessage(producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr)(ctx.self) + val newUnconfirmed = s.unconfirmed match { + case Some(u) ⇒ Some(u :+ seqMsg) + case None ⇒ None // no resending, no need to keep unconfirmed + } + if (s.currentSeqNr == s.firstSeqNr) + timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) + + s.send(seqMsg) + val newRequested = + if (s.currentSeqNr == s.requestedSeqNr) + false + else { + producer ! requestNextFactory(RequestNextParam(s.currentSeqNr, s.confirmedSeqNr, msgAdapter)) + true + } + active( + s.copy( + requested = newRequested, + currentSeqNr = s.currentSeqNr + 1, + pendingReplies = newPendingReplies, + unconfirmed = newUnconfirmed)) + } else { + throw new IllegalStateException( + s"Unexpected Msg when no demand, requested ${s.requested}, " + + s"requestedSeqNr ${s.requestedSeqNr}, currentSeqNr ${s.currentSeqNr}") + } + } + + Behaviors.receiveMessage { + case Msg(MessageWithConfirmation(m: A, replyTo)) => + onMsg(m, s.pendingReplies.updated(s.currentSeqNr, replyTo)) + case Msg(m: A) ⇒ + onMsg(m, s.pendingReplies) + + case Request(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaReceiveTimeout) ⇒ + ctx.log.infoN( + "request confirmed [{}], requested [{}], current [{}]", + newConfirmedSeqNr, + newRequestedSeqNr, + s.currentSeqNr) + + // FIXME Better to have a separate Ack message for confirmedSeqNr to be able to have + // more frequent Ack than Request (incr window) + + // FIXME use more efficient Map for pendingReplies, sorted, maybe just `Vector[(Long, ActorRef)]` + val newPendingReplies = + if (s.pendingReplies.isEmpty) + s.pendingReplies + else { + val replies = s.pendingReplies.keys.filter(_ <= newConfirmedSeqNr).toVector.sorted + if (replies.nonEmpty) + ctx.log.info("Confirmation replies from [{}] to [{}]", replies.min, replies.max) + replies.foreach { seqNr => + s.pendingReplies(seqNr) ! seqNr + } + s.pendingReplies -- replies + } + + val newUnconfirmed = + if (supportResend) s.unconfirmed match { + case Some(u) ⇒ Some(u.dropWhile(_.seqNr <= newConfirmedSeqNr)) + case None ⇒ Some(Vector.empty) + } else None + + if (newConfirmedSeqNr == s.firstSeqNr) + timers.cancel(ResendFirst) + + if (viaReceiveTimeout && newUnconfirmed.nonEmpty) { + // the last message was lost and no more message was sent that would trigger Resend + newUnconfirmed.foreach { u ⇒ + ctx.log.info("resending after ReceiveTimeout [{}]", u.map(_.seqNr).mkString(", ")) + u.foreach(s.send) + } + } + + if (newRequestedSeqNr > s.requestedSeqNr) { + if (!s.requested && (newRequestedSeqNr - s.currentSeqNr) > 0) + producer ! requestNextFactory(RequestNextParam(s.currentSeqNr, newConfirmedSeqNr, msgAdapter)) + active( + s.copy( + requested = true, + confirmedSeqNr = math.max(s.confirmedSeqNr, newConfirmedSeqNr), + requestedSeqNr = newRequestedSeqNr, + pendingReplies = newPendingReplies, + unconfirmed = newUnconfirmed)) + } else { + active( + s.copy( + confirmedSeqNr = math.max(s.currentSeqNr, newConfirmedSeqNr), + pendingReplies = newPendingReplies, + unconfirmed = newUnconfirmed)) + } + + case Resend(fromSeqNr) ⇒ + s.unconfirmed match { + case Some(u) ⇒ + val newUnconfirmed = u.dropWhile(_.seqNr < fromSeqNr) + ctx.log.info("resending [{}]", newUnconfirmed.map(_.seqNr).mkString(", ")) + newUnconfirmed.foreach(s.send) + active(s.copy(unconfirmed = Some(newUnconfirmed))) + case None ⇒ + throw new IllegalStateException("Resend not supported, run the ConsumerController with resendLost = true") + } + + case ResendFirst => + s.unconfirmed match { + case Some(u) if u.nonEmpty && u.head.seqNr == s.firstSeqNr ⇒ + ctx.log.info("resending first, [{}]", s.firstSeqNr) + s.send(u.head.copy(first = true)(ctx.self)) + case _ => + if (s.currentSeqNr > s.firstSeqNr) + timers.cancel(ResendFirst) + } + Behaviors.same + + case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked, replyTo) => + val newFirstSeqNr = + if (s.unconfirmed.isEmpty) s.currentSeqNr else s.unconfirmed.map(_.head.seqNr).getOrElse(s.currentSeqNr) + ctx.log.info( + "Register new ConsumerController [{}], starting with seqNr [{}].", + consumerController, + newFirstSeqNr) + if (s.unconfirmed.nonEmpty) { + timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) + ctx.self ! ResendFirst + } + replyTo ! Done + // update the send function + val newSend = consumerController ! _ + active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend)) + } + } +} + +// FIXME it must be possible to restart the producer, and then it needs to retrieve the request state, +// which would complicate the protocol. A more pragmatic, and easier to use, approach might be to +// allow for buffering of messages from the producer in the ProducerController if it it has no demand. +// Then the restarted producer can assume that it can send next message. As a recommendation it +// should not abuse this capability. + +// FIXME there should also be a durable version of this (using EventSouredBehavior) that stores the +// unconfirmed messages before sending and stores ack event when confirmed. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ShardingProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ShardingProducerController.scala new file mode 100644 index 00000000000..76042aa60b2 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/ShardingProducerController.scala @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import scala.reflect.ClassTag + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.internal.delivery.SimuatedSharding.ShardingEnvelope +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors + +// FIXME this will be moved to akka-cluster-sharding-typed + +// FIXME there should also be a ShardingConsumerController, see TestShardingConsumer in ReliableDeliverySpec + +object ShardingProducerController { + + sealed trait InternalCommand + + sealed trait Command[A] extends InternalCommand + + private final case class Msg[A](msg: ShardingEnvelope[A]) extends InternalCommand + + private final case class Next[A](sendNextTo: ActorRef[A], entityId: String) extends InternalCommand + + private final case class OutState[A]( + producerController: ActorRef[ProducerController.Command[A]], + sendNextTo: Option[ActorRef[A]], + // FIXME use better Queue than Vector for this + pending: Vector[A]) { + if (sendNextTo.nonEmpty && pending.nonEmpty) + throw new IllegalStateException("sendNextTo and pending shouldn't both be nonEmpty.") + } + + private final case class State[A](producerControllersByEntityId: Map[String, OutState[A]], hasRequested: Boolean) + + def apply[A: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: ActorRef[ShardingEnvelope[A]] ⇒ RequestNext, + producer: ActorRef[RequestNext], + region: ActorRef[ShardingEnvelope[ConsumerController.SequencedMessage[A]]]): Behavior[Command[A]] = { + Behaviors + .setup[InternalCommand] { context => + val msgAdapter: ActorRef[ShardingEnvelope[A]] = context.messageAdapter(envelope ⇒ Msg(envelope)) + val requestNext = requestNextFactory(msgAdapter) + producer ! requestNext + new ShardingProducerController(context, producerId, requestNext, producer, region).active( + State(Map.empty, hasRequested = false)) + } + .narrow + } + + // FIXME withConfirmation not implemented yet, see ProducerController.withConfirmation +} + +class ShardingProducerController[A: ClassTag, RequestNext]( + context: ActorContext[ShardingProducerController.InternalCommand], + producerId: String, + requestNext: RequestNext, + producer: ActorRef[RequestNext], + region: ActorRef[ShardingEnvelope[ConsumerController.SequencedMessage[A]]]) { + import ShardingProducerController._ + + private def active(s: State[A]): Behavior[InternalCommand] = { + + Behaviors.receiveMessage { + + case Next(sendNextTo: ActorRef[A] @unchecked, entityId) => + s.producerControllersByEntityId.get(entityId) match { + case Some(out) => + if (out.sendNextTo.nonEmpty) + throw new IllegalStateException(s"Received Next but already has demand for entityId [$entityId]") + + if (out.pending.nonEmpty) { + sendNextTo ! out.pending.head + val newProducers = s.producerControllersByEntityId.updated(entityId, out.copy(pending = out.pending.tail)) + active(s.copy(newProducers)) + } else { + val newProducers = + s.producerControllersByEntityId.updated(entityId, out.copy(sendNextTo = Some(sendNextTo))) + if (!s.hasRequested) + producer ! requestNext // FIXME way to include entityId in requestNext message? + active(s.copy(newProducers, hasRequested = true)) + } + + case None => + // FIXME support termination and removal of ProducerController + throw new IllegalStateException(s"Unexpected Next for unknown entityId [$entityId]") + } + + case Msg(ShardingEnvelope(entityId, msg: A)) => + val newProducers = + s.producerControllersByEntityId.get(entityId) match { + case Some(out @ OutState(_, Some(sendTo), _)) => + sendTo ! msg + s.producerControllersByEntityId.updated(entityId, out.copy(sendNextTo = None)) + case Some(out @ OutState(_, None, pending)) => + context.log.info("Buffering message to entityId [{}], buffer size [{}]", entityId, pending.size + 1) + s.producerControllersByEntityId.updated(entityId, out.copy(pending = pending :+ msg)) + case None => + context.log.info("Creating ProducerController for entity [{}]", entityId) + val send: ConsumerController.SequencedMessage[A] => Unit = { seqMsg => + region ! ShardingEnvelope(entityId, seqMsg) + } + // FIXME confirmedSeqNr in RequestNextParam not handled yet + val p = context.spawn( + ProducerController[A, Next[A]]( + producerId, + (nextParam: ProducerController.RequestNextParam[A]) => Next(nextParam.sendNextTo, entityId), + context.self.narrow, + send), + entityId) + s.producerControllersByEntityId.updated(entityId, OutState(p, None, Vector(msg))) + } + + // FIXME some way to limit the pending buffers + val hasMoreDemand = newProducers.valuesIterator.exists(_.sendNextTo.nonEmpty) + if (hasMoreDemand) + producer ! requestNext + active(s.copy(newProducers, hasMoreDemand)) + + } + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/SimulatedSharding.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/SimulatedSharding.scala new file mode 100644 index 00000000000..cf117bd7994 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/SimulatedSharding.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.Terminated +import akka.actor.typed.scaladsl.Behaviors + +// FIXME temporary instead of real sharding + +object SimuatedSharding { + final case class ShardingEnvelope[M](entityId: String, message: M) + + def apply[M](entityFactory: String => Behavior[M]): Behavior[ShardingEnvelope[M]] = { + def next(entities: Map[String, ActorRef[M]]): Behavior[ShardingEnvelope[M]] = { + Behaviors + .receive[ShardingEnvelope[M]] { (context, command) => + command match { + case ShardingEnvelope(entityId, message) => + entities.get(entityId) match { + case Some(ref) => + ref ! message + next(entities) + case None => + val ref = context.spawn(entityFactory(entityId), entityId) + context.watch(ref) + ref ! message + next(entities.updated(entityId, ref)) + } + } + } + .receiveSignal { + case (_, Terminated(ref)) => + next(entities - ref.path.name) + } + } + + next(Map.empty) + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/WorkPullingProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/WorkPullingProducerController.scala new file mode 100644 index 00000000000..80d8a63b13f --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/delivery/WorkPullingProducerController.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.delivery + +import java.util.concurrent.ThreadLocalRandom + +import scala.reflect.ClassTag + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors + +object WorkPullingProducerController { + + sealed trait InternalCommand + + sealed trait Command[A] extends InternalCommand + + private final case class Next[A](sendNextTo: ActorRef[A], belongsTo: ActorRef[ConsumerController.Command[A]]) + extends InternalCommand + + private final case class OutState[A]( + producerController: ActorRef[ProducerController.Command[A]], + sendNextTo: Option[ActorRef[A]]) + + private final case class State[A]( + consumers: Map[ActorRef[ConsumerController.Command[A]], OutState[A]], + hasRequested: Boolean) + + // TODO Now the workers have to be registered explicitly/manually. + // We could support automatic registration via Receptionist, similar to how routers work. + + final case class RegisterWorker[A]( + consumerController: ActorRef[ConsumerController.Command[A]], + replyTo: ActorRef[Done]) + extends Command[A] + + private final case class Msg[A](msg: A) extends InternalCommand + + def apply[A: ClassTag, RequestNext]( + producerId: String, + requestNextFactory: ActorRef[A] ⇒ RequestNext, + producer: ActorRef[RequestNext]): Behavior[Command[A]] = { + Behaviors + .setup[InternalCommand] { context => + val msgAdapter: ActorRef[A] = context.messageAdapter(msg ⇒ Msg(msg)) + val requestNext = requestNextFactory(msgAdapter) + new WorkPullingProducerController(context, producerId, requestNext, producer).active( + State(Map.empty, hasRequested = false)) + } + .narrow + } + + // FIXME withConfirmation not implemented yet, see ProducerController.withConfirmation + +} + +class WorkPullingProducerController[A: ClassTag, RequestNext]( + context: ActorContext[WorkPullingProducerController.InternalCommand], + producerId: String, + requestNext: RequestNext, + producer: ActorRef[RequestNext]) { + import WorkPullingProducerController._ + + private def active(s: State[A]): Behavior[InternalCommand] = { + Behaviors.receiveMessage { + case RegisterWorker(c: ActorRef[ConsumerController.Command[A]] @unchecked, replyTo) => + // FIXME adjust all logging, most should probably be debug + context.log.info("Registered worker {}", c) + val p = + context.spawnAnonymous( + ProducerController[A, Next[A]]( + producerId, + // FIXME confirmedSeqNr in RequestNextParam not handled yet + (nextParam: ProducerController.RequestNextParam[A]) => Next(nextParam.sendNextTo, c), + context.self.narrow, + seqMsg => c ! seqMsg)) + replyTo ! Done + // FIXME watch and deregistration not implemented yet + active(s.copy(consumers = s.consumers.updated(c, OutState(p, None)))) + + case next: Next[A] @unchecked => + s.consumers.get(next.belongsTo) match { + case Some(p) => + val newConsumers = s.consumers.updated(next.belongsTo, p.copy(sendNextTo = Some(next.sendNextTo))) + if (s.hasRequested) + active(s.copy(newConsumers)) + else { + producer ! requestNext + active(s.copy(newConsumers, hasRequested = true)) + } + + case None => + // obsolete Next, ConsumerController already deregistered + Behaviors.unhandled + } + + case Msg(msg: A) => + val consumersWithDemand = s.consumers.iterator.filter { case (_, out) => out.sendNextTo.isDefined }.toVector + if (consumersWithDemand.isEmpty) { + // FIXME all consumers have deregistered, need buffering or something + throw new IllegalStateException(s"Deregister of consumers not supported yet. Message not handled: $msg") + } else { + val i = ThreadLocalRandom.current().nextInt(consumersWithDemand.size) + val (c, out) = consumersWithDemand(i) + val newConsumers = s.consumers.updated(c, out.copy(sendNextTo = None)) + out.sendNextTo.get ! msg + val hasMoreDemand = consumersWithDemand.size > 1 + if (hasMoreDemand) + producer ! requestNext + active(s.copy(newConsumers, hasRequested = hasMoreDemand)) + } + + } + } +}