Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: improve EventStream doc #32348

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

package akka.actor.typed.eventstream;

// #imports
import akka.actor.Actor;
import akka.actor.AllDeadLetters;
import akka.actor.SuppressedDeadLetter;
import akka.actor.Terminated;
import akka.actor.testkit.typed.javadsl.ActorTestKit;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.Behavior;
Expand All @@ -17,12 +18,13 @@
import akka.actor.typed.eventstream.EventStream.Subscribe;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.testkit.javadsl.TestKit;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import org.junit.Assert;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
// #imports-deadletter
Expand All @@ -35,10 +37,15 @@ public class LoggingDocTest extends JUnitSuite {

@Test
public void subscribeToDeadLetters() {
// #deadletters
ActorSystem<DeadLetter> system = ActorSystem.create(Behaviors.empty(), "DeadLetters");
system.eventStream().tell(new Subscribe<>(DeadLetter.class, system));
// #deadletters
ActorSystem<SpawnProtocol.Command> system = ActorSystem.create(
Behaviors.setup(ctx -> {
Behavior<DeadLetter> deadLetterListener = Behaviors.empty();
// #subscribe-deadletter
ActorRef<DeadLetter> listener = ctx.spawn(deadLetterListener, "listener");
ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener));
// #subscribe-deadletter
return SpawnProtocol.create();
}), "DeadLettersSystem");
ActorTestKit.shutdown(system);
}

Expand All @@ -56,14 +63,15 @@ public DeadLetterActor(ActorContext<String> context) {
DeadLetter.class,
d -> d.message().toString()
);
// subscribe DeadLetter at startup.
context.getSystem().eventStream()
.tell(new Subscribe<>(DeadLetter.class, messageAdapter));
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessage(String.class, msg -> {
System.out.println(msg);
getContext().getLog().info("receive dead letter: {}", msg);
return Behaviors.same();
}).build();
}
Expand Down Expand Up @@ -108,15 +116,13 @@ public Listener(ActorContext<AllKindsOfMusic> context) {
public Receive<AllKindsOfMusic> createReceive() {
return newReceiveBuilder()
.onMessage(Jazz.class, msg -> {
System.out.printf("%s is listening to: %s%n",
getContext().getSelf().path().name(),
getContext().getLog().info("{} is listening to Jazz: {}", getContext().getSelf().path().name(),
msg);
return Behaviors.same();
})
.onMessage(Electronic.class, msg -> {
System.out.printf("%s is listening to: %s%n",
getContext().getSelf().path().name(),
msg);
getContext().getLog().info("{} is listening to Electronic: {}",
getContext().getSelf().path().name(), msg);
return Behaviors.same();
}).build();
}
Expand Down Expand Up @@ -164,10 +170,19 @@ public void subscribeBySubclassification() {
public void subscribeToSuppressedDeadLetters() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SuppressedDeadLetter");
TestProbe<SuppressedDeadLetter> probe = TestProbe.create(system);
ActorRef<SuppressedDeadLetter> actor = probe.ref();
ActorRef<SuppressedDeadLetter> listener = probe.ref();
akka.actor.ActorRef mockRef = Adapter.toClassic(listener);
// #suppressed-deadletters
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor));
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, listener));
// #suppressed-deadletters
Terminated suppression = Terminated.apply(mockRef, false, false);
SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, mockRef, mockRef);
system.eventStream().tell(new Publish<>(deadLetter));

SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass(
SuppressedDeadLetter.class);
Assert.assertNotNull(suppressedDeadLetter);
Assert.assertEquals(deadLetter, suppressedDeadLetter);

ActorTestKit.shutdown(system);
}
Expand All @@ -176,11 +191,29 @@ public void subscribeToSuppressedDeadLetters() {
public void subscribeToAllDeadLetters() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "AllDeadLetters");
TestProbe<AllDeadLetters> probe = TestProbe.create(system);
ActorRef<AllDeadLetters> actor = probe.ref();
ActorRef<AllDeadLetters> listener = probe.ref();
akka.actor.ActorRef mockRef = Adapter.toClassic(listener);
// #all-deadletters
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor));
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, listener));
// #all-deadletters

Terminated suppression = Terminated.apply(Actor.noSender(), false, false);
SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression,
mockRef,
mockRef);
system.eventStream().tell(new Publish<>(suppressedDeadLetter));
DeadLetter deadLetter = DeadLetter.apply("deadLetter", mockRef, mockRef);
system.eventStream().tell(new Publish<>(deadLetter));

// both of the following messages will be received by the subscription actor
SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass(
SuppressedDeadLetter.class);
Assert.assertNotNull(receiveSuppressed);
Assert.assertEquals(suppressedDeadLetter, receiveSuppressed);
DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class);
Assert.assertNotNull(receiveDeadLetter);
Assert.assertEquals(deadLetter, receiveDeadLetter);

ActorTestKit.shutdown(system);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.actor.typed.eventstream

import akka.actor.DeadLetter
import akka.actor.Terminated
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
Expand All @@ -15,6 +16,7 @@ import akka.actor.typed.SpawnProtocol
import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.eventstream.EventStream.Publish
import akka.actor.typed.eventstream.EventStream.Subscribe
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.wordspec.AnyWordSpecLike

Expand All @@ -24,6 +26,7 @@ import scala.concurrent.Future
object LoggingDocSpec {

//#deadletters
import akka.actor.DeadLetter
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream.Subscribe
import akka.actor.typed.scaladsl.Behaviors
Expand All @@ -37,7 +40,7 @@ object LoggingDocSpec {

Behaviors.receiveMessage {
case msg: String =>
println(msg)
context.log.info("receive dead letter: {}", msg)
Behaviors.same
}
}
Expand Down Expand Up @@ -80,6 +83,18 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
// #deadletters
}

"allow registration to dead letters" in {
ActorSystem(Behaviors.setup[Void] { context =>
val deadLetterListener = Behaviors.empty[DeadLetter]
// #subscribe-deadletter
val listenerRef: ActorRef[DeadLetter] = context.spawn(deadLetterListener, "DeadLetterListener")
context.system.eventStream ! Subscribe[DeadLetter](listenerRef)
// #subscribe-deadletter

Behaviors.empty
}, "System")
}

"demonstrate superclass subscriptions on typed eventStream" in {
import LoggingDocSpec.ListenerActor._
//#superclass-subscription-eventstream
Expand All @@ -106,17 +121,29 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
}

"allow registration to suppressed dead letters" in {
val listener: ActorRef[Any] = TestProbe().ref
val probe: TestProbe[Any] = TestProbe()
val listener: ActorRef[Any] = probe.ref
val mockRef = listener.toClassic

//#suppressed-deadletters
import akka.actor.SuppressedDeadLetter
system.eventStream ! Subscribe[SuppressedDeadLetter](listener)
//#suppressed-deadletters
val suppression = Terminated(mockRef)(existenceConfirmed = false, addressTerminated = false)
val suppressionDeadLetter = SuppressedDeadLetter(suppression, mockRef, mockRef)
system.eventStream ! Publish(suppressionDeadLetter)

val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter]
receivedSuppression shouldBe suppressionDeadLetter

//#all-deadletters
import akka.actor.AllDeadLetters
system.eventStream ! Subscribe[AllDeadLetters](listener)
//#all-deadletters
val deadLetter = DeadLetter("deadLetter", mockRef, mockRef)
system.eventStream ! Publish(deadLetter)
val receivedDeadLetter = probe.expectMessageType[DeadLetter]
receivedDeadLetter shouldBe deadLetter
}

}
15 changes: 12 additions & 3 deletions akka-docs/src/main/paradox/typed/event-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl

## How to use

The following example demonstrates how a subscription works. Given an actor:
The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from startup:

Scala
: @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters }
Expand All @@ -45,15 +45,24 @@ Java
: @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #imports-deadletter }


@@@ div { .group-scala }

Or you can also subscribe after Actor starts:

@@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #subscribe-deadletter }

@@@


@@@ div { .group-java }

the actor definition like this:

@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletter-actor }

it can be subscribed like this:
Or you can also subscribe after Actor starts:

@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletters }
@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #subscribe-deadletter }

@@@

Expand Down
Loading