Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish operations from InmemJournal #28332

Merged
merged 2 commits into from Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -7,6 +7,7 @@
// #test
import java.math.BigDecimal;
import java.util.UUID;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -20,6 +21,12 @@

// #test

// #test-events
import akka.actor.typed.eventstream.EventStream;
import akka.persistence.journal.inmem.InmemJournal;

// #test-events

import org.scalatest.junit.JUnitSuite;

import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
Expand All @@ -33,7 +40,9 @@ public class AccountExampleDocTest

// #inmem-config
private static final String inmemConfig =
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n";
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
+ "akka.persistence.journal.inmem.test-serialization = on \n";

// #inmem-config

// #snapshot-store-config
Expand Down Expand Up @@ -96,5 +105,33 @@ public void handleGetBalance() {
BigDecimal.valueOf(100),
getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance);
}

// #test
// #test-events
@Test
public void storeEvents() {
TestProbe<InmemJournal.Operation> eventProbe = testKit.createTestProbe();
testKit
.system()
.eventStream()
.tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef()));

ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4")));
TestProbe<AccountEntity.OperationResult> probe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
assertEquals(
AccountEntity.AccountCreated.INSTANCE,
eventProbe.expectMessageClass(InmemJournal.Write.class).event());

ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
assertEquals(
BigDecimal.valueOf(100),
((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event())
.amount);
}
// #test
// #test-events
}
// #test
Expand Up @@ -179,7 +179,7 @@ public void verifySerialization() {
.serializationTestKit()
.verifySerialization(new CurrentBalance(BigDecimal.valueOf(100)), false);

testKit.serializationTestKit().verifySerialization(new AccountCreated(), false);
testKit.serializationTestKit().verifySerialization(AccountCreated.INSTANCE, false);
testKit
.serializationTestKit()
.verifySerialization(new Deposited(BigDecimal.valueOf(100)), false);
Expand Down
Expand Up @@ -120,7 +120,9 @@ public CurrentBalance(BigDecimal balance) {
// Event
interface Event extends CborSerializable {}

public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}

public static class Deposited implements Event {
public final BigDecimal amount;
Expand Down Expand Up @@ -219,7 +221,7 @@ public CommandHandlerWithReply<Command, Event, Account> commandHandler() {

private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

Expand Down
Expand Up @@ -114,7 +114,9 @@ public CurrentBalance(BigDecimal balance) {
// Event
interface Event extends CborSerializable {}

public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}

public static class Deposited implements Event {
public final BigDecimal amount;
Expand Down Expand Up @@ -212,7 +214,7 @@ public CommandHandlerWithReply<Command, Event, Account> commandHandler() {

private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

Expand Down
Expand Up @@ -114,7 +114,9 @@ public CurrentBalance(BigDecimal balance) {
// Event
interface Event extends CborSerializable {}

public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}

public static class Deposited implements Event {
public final BigDecimal amount;
Expand Down Expand Up @@ -211,7 +213,7 @@ public CommandHandlerWithReply<Command, Event, Account> commandHandler() {

private ReplyEffect<Event, Account> createAccount(CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

Expand Down
Expand Up @@ -14,13 +14,20 @@ import org.scalatest.WordSpecLike

//#test

//#test-events
import akka.persistence.journal.inmem.InmemJournal
import akka.actor.typed.eventstream.EventStream

//#test-events

import docs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity

object AccountExampleDocSpec {
val inmemConfig =
//#inmem-config
"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
"""
//#inmem-config

Expand Down Expand Up @@ -76,6 +83,24 @@ class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s"""
ref ! AccountEntity.GetBalance(getProbe.ref)
getProbe.expectMessage(AccountEntity.CurrentBalance(100))
}

//#test
//#test-events
"store events" in {
val eventProbe = createTestProbe[InmemJournal.Operation]()
system.eventStream ! EventStream.Subscribe(eventProbe.ref)

val probe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("4", PersistenceId("Account", "4")))
ref ! AccountEntity.CreateAccount(probe.ref)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated)

ref ! AccountEntity.Deposit(100, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100))
}
//#test-events
//#test
}
}
//#test
12 changes: 12 additions & 0 deletions akka-docs/src/main/paradox/typed/persistence-testing.md
Expand Up @@ -31,6 +31,8 @@ Scala
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #inmem-config }

The `test-serialization = on` configuration of the `InmemJournal` will verify that persisted events can be serialized and deserialized.

Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the
following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`].

Expand All @@ -53,6 +55,16 @@ Java

Note that each test case is using a different `PersistenceId` to not interfere with each other.

The @apidoc[akka.persistence.journal.inmem.InmemJournal$] publishes `Write` and `Delete` operations to the
`eventStream`, which makes it possible to verify that the expected events have been emitted and stored by the
`EventSourcedBehavior`. You can subscribe to to the `eventStream` with a `TestProbe` like this:

Scala
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test-events }

Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test-events }

## Integration testing

The in-memory journal and file based snapshot store can be used also for integration style testing of a single
Expand Down
Expand Up @@ -9,6 +9,8 @@ import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.PersistentRepr
import akka.persistence.AtomicWrite
Expand All @@ -17,12 +19,28 @@ import akka.serialization.Serializers
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

/**
* The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to
* verify that expected events have been persisted or deleted.
*
* InmemJournal is only intended to be used for tests and therefore binary backwards compatibility
* of the published messages are not guaranteed.
*/
@ApiMayChange
object InmemJournal {
sealed trait Operation

final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation

final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation
}

/**
* INTERNAL API.
*
* In-memory journal for testing purposes only.
*/
private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages {
@InternalApi private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages {

def this() = this(ConfigFactory.empty())

Expand All @@ -34,11 +52,14 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w

private val serialization = SerializationExtension(context.system)

private val eventStream = context.system.eventStream

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
try {
for (w <- messages; p <- w.payload) {
verifySerialization(p.payload)
add(p)
eventStream.publish(InmemJournal.Write(p.payload, p.persistenceId, p.sequenceNr))
}
Future.successful(Nil) // all good
} catch {
Expand Down Expand Up @@ -67,6 +88,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
delete(persistenceId, snr)
snr += 1
}
eventStream.publish(InmemJournal.Delete(persistenceId, toSeqNr))
Future.successful(())
}

Expand All @@ -84,7 +106,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
/**
* INTERNAL API.
*/
private[persistence] trait InmemMessages {
@InternalApi private[persistence] trait InmemMessages {
// persistenceId -> persistent message
var messages = Map.empty[String, Vector[PersistentRepr]]
// persistenceId -> highest used sequence number
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.journal.inmem

import akka.actor.Props
import akka.persistence.PersistenceSpec
import akka.persistence.PersistentActor
import akka.testkit._

object InmemJournalSpec {

def testProps(name: String): Props =
Props(new TestPersistentActor(name))

final case class Cmd(s: String)
final case class Delete(toSeqNr: Long)
final case class Evt(s: String)

class TestPersistentActor(name: String) extends PersistentActor {

override def persistenceId: String = name

override def receiveRecover: Receive = {
case Evt(_) =>
}
override def receiveCommand: Receive = {
case Cmd(s) => persist(Evt(s))(_ => ())
case Delete(toSeqNr) => deleteMessages(toSeqNr)
}
}

}

class InmemJournalSpec
extends PersistenceSpec(PersistenceSpec.config("inmem", "InmemJournalSpec"))
with ImplicitSender {
import InmemJournalSpec._

system.eventStream.subscribe(testActor, classOf[InmemJournal.Operation])

"InmemJournal" must {
"publish writes" in {
val p1 = system.actorOf(testProps("p1"))
p1 ! Cmd("A")
p1 ! Cmd("B")
expectMsg(InmemJournal.Write(Evt("A"), "p1", 1L))
expectMsg(InmemJournal.Write(Evt("B"), "p1", 2L))
}

"publish deletes" in {
val p1 = system.actorOf(testProps("p2"))
p1 ! Cmd("A")
p1 ! Cmd("B")
p1 ! Cmd("C")
p1 ! Delete(2)
expectMsg(InmemJournal.Write(Evt("A"), "p2", 1L))
expectMsg(InmemJournal.Write(Evt("B"), "p2", 2L))
expectMsg(InmemJournal.Write(Evt("C"), "p2", 3L))
expectMsg(InmemJournal.Delete("p2", 2L))
}
}

}