From 53b6cfd4e3fc2bfe18736244cf8cc31715c7d445 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 9 Dec 2019 10:57:22 +0100 Subject: [PATCH 1/2] Publish operations from InmemJournal * small feature that is useful for verifying that expected events were persisted --- .../journal/inmem/InmemJournal.scala | 26 +++++++- .../journal/inmem/InmemJournalSpec.scala | 65 +++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index ec13207415f..476a4faf6a4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -9,6 +9,8 @@ import scala.concurrent.Future import scala.util.Try import scala.util.control.NonFatal +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi import akka.persistence.journal.AsyncWriteJournal import akka.persistence.PersistentRepr import akka.persistence.AtomicWrite @@ -17,12 +19,28 @@ import akka.serialization.Serializers import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +/** + * The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to + * verify that expected events have been persisted or deleted. + * + * InmemJournal is only intended to be used for tests and therefore binary backwards compatibility + * of the published messages are not guaranteed. + */ +@ApiMayChange +object InmemJournal { + sealed trait Operation + + final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation + + final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation +} + /** * INTERNAL API. * * In-memory journal for testing purposes only. */ -private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages { +@InternalApi private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages { def this() = this(ConfigFactory.empty()) @@ -34,11 +52,14 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w private val serialization = SerializationExtension(context.system) + private val eventStream = context.system.eventStream + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { try { for (w <- messages; p <- w.payload) { verifySerialization(p.payload) add(p) + eventStream.publish(InmemJournal.Write(p.payload, p.persistenceId, p.sequenceNr)) } Future.successful(Nil) // all good } catch { @@ -67,6 +88,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w delete(persistenceId, snr) snr += 1 } + eventStream.publish(InmemJournal.Delete(persistenceId, toSeqNr)) Future.successful(()) } @@ -84,7 +106,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w /** * INTERNAL API. */ -private[persistence] trait InmemMessages { +@InternalApi private[persistence] trait InmemMessages { // persistenceId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] // persistenceId -> highest used sequence number diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala new file mode 100644 index 00000000000..37982c28c2c --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2017-2019 Lightbend Inc. + */ + +package akka.persistence.journal.inmem + +import akka.actor.Props +import akka.persistence.PersistenceSpec +import akka.persistence.PersistentActor +import akka.testkit._ + +object InmemJournalSpec { + + def testProps(name: String): Props = + Props(new TestPersistentActor(name)) + + final case class Cmd(s: String) + final case class Delete(toSeqNr: Long) + final case class Evt(s: String) + + class TestPersistentActor(name: String) extends PersistentActor { + + override def persistenceId: String = name + + override def receiveRecover: Receive = { + case Evt(_) => + } + override def receiveCommand: Receive = { + case Cmd(s) => persist(Evt(s))(_ => ()) + case Delete(toSeqNr) => deleteMessages(toSeqNr) + } + } + +} + +class InmemJournalSpec + extends PersistenceSpec(PersistenceSpec.config("inmem", "InmemJournalSpec")) + with ImplicitSender { + import InmemJournalSpec._ + + system.eventStream.subscribe(testActor, classOf[InmemJournal.Operation]) + + "InmemJournal" must { + "publish writes" in { + val p1 = system.actorOf(testProps("p1")) + p1 ! Cmd("A") + p1 ! Cmd("B") + expectMsg(InmemJournal.Write(Evt("A"), "p1", 1L)) + expectMsg(InmemJournal.Write(Evt("B"), "p1", 2L)) + } + + "publish deletes" in { + val p1 = system.actorOf(testProps("p2")) + p1 ! Cmd("A") + p1 ! Cmd("B") + p1 ! Cmd("C") + p1 ! Delete(2) + expectMsg(InmemJournal.Write(Evt("A"), "p2", 1L)) + expectMsg(InmemJournal.Write(Evt("B"), "p2", 2L)) + expectMsg(InmemJournal.Write(Evt("C"), "p2", 3L)) + expectMsg(InmemJournal.Delete("p2", 2L)) + } + } + +} From 3f4ef936eca045892112715d86978e5ac833ea23 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 16 Dec 2019 09:05:48 +0100 Subject: [PATCH 2/2] doc example * also enable serialization test config --- .../sharding/typed/AccountExampleDocTest.java | 39 ++++++++++++++++++- .../sharding/typed/AccountExampleTest.java | 2 +- ...ccountExampleWithEventHandlersInState.java | 6 ++- .../typed/AccountExampleWithMutableState.java | 6 ++- .../typed/AccountExampleWithNullState.java | 6 ++- .../typed/AccountExampleDocSpec.scala | 25 ++++++++++++ .../main/paradox/typed/persistence-testing.md | 12 ++++++ 7 files changed, 88 insertions(+), 8 deletions(-) diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java index 564a020c8db..4bbc25944eb 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java @@ -7,6 +7,7 @@ // #test import java.math.BigDecimal; import java.util.UUID; + import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -20,6 +21,12 @@ // #test +// #test-events +import akka.actor.typed.eventstream.EventStream; +import akka.persistence.journal.inmem.InmemJournal; + +// #test-events + import org.scalatest.junit.JUnitSuite; import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; @@ -33,7 +40,9 @@ public class AccountExampleDocTest // #inmem-config private static final String inmemConfig = - "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"; + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" + + "akka.persistence.journal.inmem.test-serialization = on \n"; + // #inmem-config // #snapshot-store-config @@ -96,5 +105,33 @@ public void handleGetBalance() { BigDecimal.valueOf(100), getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance); } + + // #test + // #test-events + @Test + public void storeEvents() { + TestProbe eventProbe = testKit.createTestProbe(); + testKit + .system() + .eventStream() + .tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef())); + + ActorRef ref = + testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4"))); + TestProbe probe = + testKit.createTestProbe(AccountEntity.OperationResult.class); + ref.tell(new AccountEntity.CreateAccount(probe.getRef())); + assertEquals( + AccountEntity.AccountCreated.INSTANCE, + eventProbe.expectMessageClass(InmemJournal.Write.class).event()); + + ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef())); + assertEquals( + BigDecimal.valueOf(100), + ((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event()) + .amount); + } + // #test + // #test-events } // #test diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java index b1877778554..55f70a995d1 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java @@ -179,7 +179,7 @@ public void verifySerialization() { .serializationTestKit() .verifySerialization(new CurrentBalance(BigDecimal.valueOf(100)), false); - testKit.serializationTestKit().verifySerialization(new AccountCreated(), false); + testKit.serializationTestKit().verifySerialization(AccountCreated.INSTANCE, false); testKit .serializationTestKit() .verifySerialization(new Deposited(BigDecimal.valueOf(100)), false); diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java index 77ef382bcb1..edd1faa4465 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java @@ -120,7 +120,9 @@ public CurrentBalance(BigDecimal balance) { // Event interface Event extends CborSerializable {} - public static class AccountCreated implements Event {} + public enum AccountCreated implements Event { + INSTANCE + } public static class Deposited implements Event { public final BigDecimal amount; @@ -219,7 +221,7 @@ public CommandHandlerWithReply commandHandler() { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() - .persist(new AccountCreated()) + .persist(AccountCreated.INSTANCE) .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java index 067897bee12..8f16d0b58a7 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java @@ -114,7 +114,9 @@ public CurrentBalance(BigDecimal balance) { // Event interface Event extends CborSerializable {} - public static class AccountCreated implements Event {} + public enum AccountCreated implements Event { + INSTANCE + } public static class Deposited implements Event { public final BigDecimal amount; @@ -212,7 +214,7 @@ public CommandHandlerWithReply commandHandler() { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() - .persist(new AccountCreated()) + .persist(AccountCreated.INSTANCE) .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java index 39ade682ec9..6daa48e12af 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java @@ -114,7 +114,9 @@ public CurrentBalance(BigDecimal balance) { // Event interface Event extends CborSerializable {} - public static class AccountCreated implements Event {} + public enum AccountCreated implements Event { + INSTANCE + } public static class Deposited implements Event { public final BigDecimal amount; @@ -211,7 +213,7 @@ public CommandHandlerWithReply commandHandler() { private ReplyEffect createAccount(CreateAccount command) { return Effect() - .persist(new AccountCreated()) + .persist(AccountCreated.INSTANCE) .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala index bd2f49102b0..bd3ba1b7fda 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala @@ -14,6 +14,12 @@ import org.scalatest.WordSpecLike //#test +//#test-events +import akka.persistence.journal.inmem.InmemJournal +import akka.actor.typed.eventstream.EventStream + +//#test-events + import docs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity object AccountExampleDocSpec { @@ -21,6 +27,7 @@ object AccountExampleDocSpec { //#inmem-config """ akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.journal.inmem.test-serialization = on """ //#inmem-config @@ -76,6 +83,24 @@ class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s""" ref ! AccountEntity.GetBalance(getProbe.ref) getProbe.expectMessage(AccountEntity.CurrentBalance(100)) } + + //#test + //#test-events + "store events" in { + val eventProbe = createTestProbe[InmemJournal.Operation]() + system.eventStream ! EventStream.Subscribe(eventProbe.ref) + + val probe = createTestProbe[AccountEntity.OperationResult]() + val ref = spawn(AccountEntity("4", PersistenceId("Account", "4"))) + ref ! AccountEntity.CreateAccount(probe.ref) + eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated) + + ref ! AccountEntity.Deposit(100, probe.ref) + probe.expectMessage(AccountEntity.Confirmed) + eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100)) + } + //#test-events + //#test } } //#test diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 286b392b704..14292efddc8 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -31,6 +31,8 @@ Scala Java : @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #inmem-config } +The `test-serialization = on` configuration of the `InmemJournal` will verify that persisted events can be serialized and deserialized. + Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`]. @@ -53,6 +55,16 @@ Java Note that each test case is using a different `PersistenceId` to not interfere with each other. +The @apidoc[akka.persistence.journal.inmem.InmemJournal$] publishes `Write` and `Delete` operations to the +`eventStream`, which makes it possible to verify that the expected events have been emitted and stored by the +`EventSourcedBehavior`. You can subscribe to to the `eventStream` with a `TestProbe` like this: + +Scala +: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test-events } + +Java +: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test-events } + ## Integration testing The in-memory journal and file based snapshot store can be used also for integration style testing of a single