From 318bdd9af88fa7d34f1976e3047557013b9c40f0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Jun 2019 08:41:38 +0200 Subject: [PATCH] EventSeq in Typed EventAdapter, #26909 * Tests for usage of EventSeq * Moved event adapter tests from EventSourcedBehaviorSpec to new EventSourcedEventAdapterSpec * Also support for the event adapter manifest * Note in migration guide --- .../project/migration-guide-2.5.x-2.6.x.md | 7 +- .../src/main/paradox/typed/persistence.md | 10 +- .../akka/persistence/typed/EventAdapter.scala | 97 ++++++- .../typed/internal/BehaviorSetup.scala | 2 +- .../typed/internal/ExternalInteractions.scala | 31 ++- .../typed/internal/ReplayingEvents.scala | 31 ++- .../persistence/typed/internal/Running.scala | 7 +- .../typed/javadsl/EventSourcedBehavior.scala | 4 + .../PersistentActorCompileOnlyTest.java | 21 +- .../javadsl/PersistentActorJavaDslTest.java | 44 +++- .../scaladsl/EventSourcedBehaviorSpec.scala | 73 ----- .../EventSourcedEventAdapterSpec.scala | 249 ++++++++++++++++++ .../BasicPersistentBehaviorCompileOnly.scala | 22 ++ .../scala/akka/persistence/Persistent.scala | 19 +- 14 files changed, 486 insertions(+), 131 deletions(-) create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 6c9db0478f3..8d7fa471f18 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -206,7 +206,7 @@ By default, these remoting features are disabled when not using Akka Cluster: When used with Cluster, all previous behavior is the same except a remote watch of an actor is no longer possible before a node joins a cluster, only after. To optionally enable them without Cluster, if you understand -the @ref[consequences](../remoting-artery.md#quarantine), set +the @ref[consequences](../remoting-artery.md#quarantine), set ``` akka.remote.use-unsafe-remote-features-without-cluster = on`. ``` @@ -214,7 +214,7 @@ akka.remote.use-unsafe-remote-features-without-cluster = on`. When used without Cluster * An initial warning is logged on startup of `RemoteActorRefProvider` -* A warning will be logged on remote watch attempts, which you can suppress by setting +* A warning will be logged on remote watch attempts, which you can suppress by setting ``` akka.remote.warn-unsafe-watch-without-cluster = off ``` @@ -305,7 +305,7 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off `StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure -the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. +the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. ### Akka now uses Fork Join Pool from JDK @@ -387,6 +387,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible * `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup` to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`. * Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout] +* Changed method signature for `EventAdapter.fromJournal` and support for `manifest` in `EventAdapter`. * `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter]. `interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index a78a1e598b6..b0a40f1908c 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -144,7 +144,7 @@ interpreted correctly on replay. Cluster Sharding ensures that there is only one ## Accessing the ActorContext -If the persistent behavior needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by +If the `EventSourcedBehavior` needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by wrapping construction with `Behaviors.setup`: Scala @@ -357,20 +357,20 @@ to another type that is then passed to the journal. Defining an event adapter is done by extending an EventAdapter: Scala -: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper } +: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #event-wrapper } Java : @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper } -Then install it on a persistent behavior: +Then install it on a `EventSourcedBehavior`: Scala -: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter } +: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #install-event-adapter } Java : @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter } -## Wrapping Persistent Behaviors +## Wrapping EventSourcedBehavior When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala index 4551d70ecaa..f683ed1d8b0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala @@ -4,26 +4,110 @@ package akka.persistence.typed +import scala.annotation.varargs +import scala.collection.immutable + import akka.annotation.InternalApi +/** + * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals. + * Typical use cases include (but are not limited to): + * + */ abstract class EventAdapter[E, P] { /** - * Type of the event to persist + * Convert domain event to journal event type. + * + * Some journal may require a specific type to be returned to them, + * for example if a primary key has to be associated with each event then a journal + * may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`. + * + * The `toJournal` adaptation must be an 1-to-1 transformation. + * It is not allowed to drop incoming events during the `toJournal` adaptation. + * + * @param e the application-side domain event to be adapted to the journal model + * @return the adapted event object, possibly the same object if no adaptation was performed */ - type Per = P + def toJournal(e: E): P /** - * Transform event on the way to the journal + * Return the manifest (type hint) that will be provided in the `fromJournal` method. + * Use `""` if manifest is not needed. */ - def toJournal(e: E): Per + def manifest(event: E): String /** * Transform the event on recovery from the journal. * Note that this is not called in any read side so will need to be applied * manually when using Query. + * + * Convert a event from its journal model to the applications domain model. + * + * One event may be adapter into multiple (or none) events which should be delivered to the `EventSourcedBehavior`. + * Use the specialised [[EventSeq.single]] method to emit exactly one event, + * or [[EventSeq.empty]] in case the adapter is not handling this event. + * + * @param p event to be adapted before delivering to the `EventSourcedBehavior` + * @param manifest optionally provided manifest (type hint) in case the Adapter has stored one + * for this event, `""` if none + * @return sequence containing the adapted events (possibly zero) which will be delivered to + * the `EventSourcedBehavior` */ - def fromJournal(p: Per): E + def fromJournal(p: P, manifest: String): EventSeq[E] +} + +sealed trait EventSeq[+A] { + def events: immutable.Seq[A] + def isEmpty: Boolean = events.isEmpty + def nonEmpty: Boolean = events.nonEmpty + def size: Int +} +object EventSeq { + + final def empty[A]: EventSeq[A] = EmptyEventSeq.asInstanceOf[EventSeq[A]] + + final def single[A](event: A): EventSeq[A] = SingleEventSeq(event) + + /** Java API */ + @varargs final def create[A](events: A*): EventSeq[A] = EventsSeq(events.toList) + + /** Java API */ + final def create[A](events: java.util.List[A]): EventSeq[A] = { + import akka.util.ccompat.JavaConverters._ + EventsSeq(events.asScala.toList) + } + + /** Scala API */ + final def apply[A](events: A*): EventSeq[A] = EventsSeq(events.toList) + + /** Scala API */ + final def apply[A](events: immutable.Seq[A]): EventSeq[A] = EventsSeq(events) + +} + +/** INTERNAL API */ +@InternalApi private[akka] final case class SingleEventSeq[A](event: A) extends EventSeq[A] { + override def events: immutable.Seq[A] = List(event) + override def size: Int = 1 +} + +/** INTERNAL API */ +@InternalApi private[akka] case object EmptyEventSeq extends EventSeq[Nothing] { + override def events: immutable.Seq[Nothing] = Nil + override def size: Int = 0 +} + +/** INTERNAL API */ +@InternalApi private[akka] final case class EventsSeq[A](override val events: immutable.Seq[A]) extends EventSeq[A] { + override def size: Int = events.size } /** @@ -39,5 +123,6 @@ abstract class EventAdapter[E, P] { */ @InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] { override def toJournal(e: E): Any = e - override def fromJournal(p: Any): E = p.asInstanceOf[E] + override def fromJournal(p: Any, manifest: String): EventSeq[E] = EventSeq.single(p.asInstanceOf[E]) + override def manifest(event: E): String = "" } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 2713e86da3d..2c2be2083ba 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -44,7 +44,7 @@ private[akka] final class BehaviorSetup[C, E, S]( val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity, private val signalHandler: PartialFunction[(S, Signal), Unit], val tagger: E => Set[String], - val eventAdapter: EventAdapter[E, _], + val eventAdapter: EventAdapter[E, Any], val snapshotWhen: (S, E, Long) => Boolean, val recovery: Recovery, val retention: RetentionCriteria, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 84edca5ecef..9b613d30848 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -26,17 +26,20 @@ private[akka] trait JournalInteractions[C, E, S] { type EventOrTagged = Any // `Any` since can be `E` or `Tagged` - protected def internalPersist(state: Running.RunningState[S], event: EventOrTagged): Running.RunningState[S] = { + protected def internalPersist( + state: Running.RunningState[S], + event: EventOrTagged, + eventAdapterManifest: String): Running.RunningState[S] = { val newState = state.nextSequenceNr() - val senderNotKnownBecauseAkkaTyped = null val repr = PersistentRepr( event, persistenceId = setup.persistenceId.id, sequenceNr = newState.seqNr, + manifest = eventAdapterManifest, writerUuid = setup.writerIdentity.writerUuid, - sender = senderNotKnownBecauseAkkaTyped) + sender = ActorRef.noSender) val write = AtomicWrite(repr) :: Nil setup.journal @@ -46,19 +49,21 @@ private[akka] trait JournalInteractions[C, E, S] { } protected def internalPersistAll( - events: immutable.Seq[EventOrTagged], - state: Running.RunningState[S]): Running.RunningState[S] = { + state: Running.RunningState[S], + events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = { if (events.nonEmpty) { var newState = state - val writes = events.map { event => - newState = newState.nextSequenceNr() - PersistentRepr( - event, - persistenceId = setup.persistenceId.id, - sequenceNr = newState.seqNr, - writerUuid = setup.writerIdentity.writerUuid, - sender = ActorRef.noSender) + val writes = events.map { + case (event, eventAdapterManifest) => + newState = newState.nextSequenceNr() + PersistentRepr( + event, + persistenceId = setup.persistenceId.id, + sequenceNr = newState.seqNr, + manifest = eventAdapterManifest, + writerUuid = setup.writerIdentity.writerUuid, + sender = ActorRef.noSender) } val write = AtomicWrite(writes) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index cc06f170e9b..880ad89d111 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -6,6 +6,7 @@ package akka.persistence.typed.internal import scala.util.control.NonFatal import scala.concurrent.duration._ + import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.internal.PoisonPill import akka.actor.typed.internal.UnstashException @@ -14,10 +15,14 @@ import akka.annotation.{ InternalApi, InternalStableApi } import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ +import akka.persistence.typed.EmptyEventSeq +import akka.persistence.typed.EventsSeq import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.SingleEventSeq import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible +import akka.util.OptionVal import akka.util.unused import akka.util.PrettyDuration._ @@ -103,19 +108,31 @@ private[akka] final class ReplayingEvents[C, E, S]( try { response match { case ReplayedMessage(repr) => - val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per]) - + var eventForErrorReporting: OptionVal[Any] = OptionVal.None try { - state = state.copy( - seqNr = repr.sequenceNr, - state = setup.eventHandler(state.state, event), - eventSeenInInterval = true) + val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest) + + def handleEvent(event: E): Unit = { + eventForErrorReporting = OptionVal.Some(event) + state = state.copy( + seqNr = repr.sequenceNr, + state = setup.eventHandler(state.state, event), + eventSeenInInterval = true) + } + + eventSeq match { + case SingleEventSeq(event) => handleEvent(event) + case EventsSeq(events) => events.foreach(handleEvent) + case EmptyEventSeq => // no events + } + this } catch { case NonFatal(ex) => state = state.copy(repr.sequenceNr) - onRecoveryFailure(ex, Some(event)) + onRecoveryFailure(ex, Option(eventForErrorReporting.getOrElse(null))) } + case RecoverySuccess(highestSeqNr) => setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr) onRecoveryCompleted(state) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 26b3cf0cc2e..3597ace0c25 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -148,8 +148,9 @@ private[akka] object Running { val newState = state.applyEvent(setup, event) val eventToPersist = adaptEvent(event) + val eventAdapterManifest = setup.eventAdapter.manifest(event) - val newState2 = internalPersist(newState, eventToPersist) + val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) @@ -169,9 +170,9 @@ private[akka] object Running { (currentState.applyEvent(setup, event), shouldSnapshot) } - val eventsToPersist = events.map(adaptEvent) + val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt))) - val newState2 = internalPersistAll(eventsToPersist, newState) + val newState2 = internalPersistAll(newState, eventsToPersist) persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 46e515159a8..5d699141363 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -149,6 +149,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( */ def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet() + /** + * Transform the event in another type before giving to the journal. Can be used to wrap events + * in types Journals understand but is of a different type than `Event`. + */ def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] /** diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index 668ce9aecc0..366e2e13922 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -10,6 +10,7 @@ import akka.actor.typed.Scheduler; import akka.actor.typed.javadsl.Behaviors; import akka.japi.function.Procedure; +import akka.persistence.typed.EventSeq; import akka.persistence.typed.SnapshotSelectionCriteria; import akka.persistence.typed.EventAdapter; import akka.actor.testkit.typed.javadsl.TestInbox; @@ -27,14 +28,14 @@ public abstract static class Simple { // #event-wrapper public static class Wrapper { - private final T t; + private final T event; - public Wrapper(T t) { - this.t = t; + public Wrapper(T event) { + this.event = event; } - public T getT() { - return t; + public T getEvent() { + return event; } } @@ -46,8 +47,14 @@ public Wrapper toJournal(SimpleEvent simpleEvent) { } @Override - public SimpleEvent fromJournal(Wrapper simpleEventWrapper) { - return simpleEventWrapper.getT(); + public String manifest(SimpleEvent event) { + return ""; + } + + @Override + public EventSeq fromJournal( + Wrapper simpleEventWrapper, String manifest) { + return EventSeq.single(simpleEventWrapper.getEvent()); } } // #event-wrapper diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 8dc745807fd..c0f7156aeda 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -17,6 +17,7 @@ import akka.persistence.query.Sequence; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.persistence.typed.*; +import akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; @@ -35,7 +36,6 @@ import java.util.*; import static akka.Done.done; -import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -43,7 +43,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public static final Config config = ConfigFactory.parseString("akka.loggers = [akka.testkit.TestEventListener]") - .withFallback(conf().withFallback(ConfigFactory.load())); + .withFallback(EventSourcedBehaviorSpec.conf().withFallback(ConfigFactory.load())); @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); @@ -583,15 +583,47 @@ public void transformEvent() throws Exception { } // event-wrapper - class WrapperEventAdapter extends EventAdapter { + public static class Wrapper implements Serializable { + private final T t; + + public Wrapper(T t) { + this.t = t; + } + + public T getT() { + return t; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Wrapper wrapper = (Wrapper) o; + + return t.equals(wrapper.t); + } + @Override - public Wrapper toJournal(Incremented incremented) { + public int hashCode() { + return t.hashCode(); + } + } + + class WrapperEventAdapter extends EventAdapter> { + @Override + public Wrapper toJournal(Incremented incremented) { return new Wrapper<>(incremented); } @Override - public Incremented fromJournal(Wrapper wrapper) { - return (Incremented) wrapper.t(); + public String manifest(Incremented event) { + return ""; + } + + @Override + public EventSeq fromJournal(Wrapper wrapper, String manifest) { + return EventSeq.single(wrapper.getT()); } } // event-wrapper diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 35c8c8af665..4abc10b28fa 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -32,7 +32,6 @@ import akka.persistence.query.PersistenceQuery import akka.persistence.query.Sequence import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.snapshot.SnapshotStore -import akka.persistence.typed.EventAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted @@ -51,14 +50,6 @@ import org.scalatest.WordSpecLike object EventSourcedBehaviorSpec { - //#event-wrapper - case class Wrapper[T](t: T) - class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] { - override def toJournal(e: T): Wrapper[T] = Wrapper(e) - override def fromJournal(p: Wrapper[T]): T = p.t - } - //#event-wrapper - class SlowInMemorySnapshotStore extends SnapshotStore { private var state = Map.empty[String, (Any, UntypedSnapshotMetadata)] @@ -484,70 +475,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1))) } - "adapt events" in { - val pid = nextPid - val c = spawn(Behaviors.setup[Command] { ctx => - val persistentBehavior = counter(ctx, pid) - - //#install-event-adapter - persistentBehavior.eventAdapter(new WrapperEventAdapter[Event]) - //#install-event-adapter - }) - val replyProbe = TestProbe[State]() - - c ! Increment - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - - val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) - - val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]))) - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - - } - - "adapter multiple events with persist all" in { - val pid = nextPid - val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]))) - val replyProbe = TestProbe[State]() - - c ! IncrementWithPersistAll(2) - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(2, Vector(0, 1))) - - val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue - events shouldEqual List( - EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))), - EventEnvelope(Sequence(2), pid.id, 2, Wrapper(Incremented(1)))) - - val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]))) - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(2, Vector(0, 1))) - } - - "adapt and tag events" in { - val pid = nextPid - val c = spawn(Behaviors.setup[Command](ctx => - counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new WrapperEventAdapter[Event]))) - val replyProbe = TestProbe[State]() - - c ! Increment - c ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - - val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) - - val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]))) - c2 ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(1, Vector(0))) - - val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue - taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) - } - "handle scheduled message arriving before recovery completed " in { val c = spawn(Behaviors.withTimers[Command] { timers => timers.startSingleTimer("tick", Increment, 1.millis) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala new file mode 100644 index 00000000000..1669d1957d1 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala @@ -0,0 +1,249 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.query.EventEnvelope +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.Sequence +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.typed.EventAdapter +import akka.persistence.typed.EventSeq +import akka.persistence.typed.PersistenceId +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedEventAdapterSpec { + + private val conf = ConfigFactory.parseString(s""" + akka.loggers = [akka.testkit.TestEventListener] + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + """) + + case class Wrapper(t: String) + class WrapperEventAdapter extends EventAdapter[String, Wrapper] { + override def toJournal(e: String): Wrapper = Wrapper("<" + e) + override def fromJournal(p: Wrapper, manifest: String): EventSeq[String] = EventSeq.single(p.t + ">") + override def manifest(event: String): String = "" + } + + class FilterEventAdapter extends EventAdapter[String, String] { + override def toJournal(e: String): String = e.toUpperCase() + + override def fromJournal(p: String, manifest: String): EventSeq[String] = { + if (p == "B") EventSeq.empty + else EventSeq.single(p) + } + + override def manifest(event: String): String = "" + } + + class SplitEventAdapter extends EventAdapter[String, String] { + override def toJournal(e: String): String = e.toUpperCase() + + override def fromJournal(p: String, manifest: String): EventSeq[String] = { + EventSeq(p.map("<" + _.toString + ">")) + } + + override def manifest(event: String): String = "" + } + + class EventAdapterWithManifest extends EventAdapter[String, String] { + override def toJournal(e: String): String = e.toUpperCase() + + override def fromJournal(p: String, manifest: String): EventSeq[String] = { + EventSeq.single(p + manifest) + } + + override def manifest(event: String): String = event.length.toString + } + + case class GenericWrapper[T](t: T) + class GenericWrapperEventAdapter[T] extends EventAdapter[T, GenericWrapper[T]] { + override def toJournal(e: T): GenericWrapper[T] = GenericWrapper(e) + override def fromJournal(p: GenericWrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.t) + override def manifest(event: T): String = "" + } + +} + +class EventSourcedEventAdapterSpec + extends ScalaTestWithActorTestKit(EventSourcedEventAdapterSpec.conf) + with WordSpecLike { + import EventSourcedEventAdapterSpec._ + import EventSourcedBehaviorSpec.{ + counter, + Command, + Event, + GetValue, + Increment, + IncrementWithPersistAll, + Incremented, + State + } + + import akka.actor.typed.scaladsl.adapter._ + system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + implicit val materializer = ActorMaterializer()(system.toUntyped) + val queries: LeveldbReadJournal = + PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) + + private def behavior(pid: PersistenceId, probe: ActorRef[String]): EventSourcedBehavior[String, String, String] = + EventSourcedBehavior(pid, "", commandHandler = { (_, command) => + Effect.persist(command).thenRun(newState => probe ! newState) + }, eventHandler = { (state, evt) => + state + evt + }) + + "Event adapter" must { + + "wrap single events" in { + val probe = TestProbe[String]() + val pid = nextPid() + val ref = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter)) + + ref ! "a" + ref ! "b" + probe.expectMessage("a") + probe.expectMessage("ab") + + // replay + val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter)) + ref2 ! "c" + probe.expectMessage("c") + } + + "filter unused events" in { + val probe = TestProbe[String]() + val pid = nextPid() + val ref = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter)) + + ref ! "a" + ref ! "b" + ref ! "c" + probe.expectMessage("a") + probe.expectMessage("ab") + probe.expectMessage("abc") + + // replay + val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter)) + ref2 ! "d" + probe.expectMessage("ACd") + } + + "split one event into several" in { + val probe = TestProbe[String]() + val pid = nextPid() + val ref = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter)) + + ref ! "a" + ref ! "bc" + probe.expectMessage("a") + probe.expectMessage("abc") + + // replay + val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter)) + ref2 ! "d" + probe.expectMessage("d") + } + + "support manifest" in { + val probe = TestProbe[String]() + val pid = nextPid() + val ref = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest)) + + ref ! "a" + ref ! "bcd" + probe.expectMessage("a") + probe.expectMessage("abcd") + + // replay + val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest)) + ref2 ! "e" + probe.expectMessage("A1BCD3e") + } + + "adapt events" in { + val pid = nextPid() + val c = spawn(Behaviors.setup[Command] { ctx => + val persistentBehavior = counter(ctx, pid) + + persistentBehavior.eventAdapter(new GenericWrapperEventAdapter[Event]) + }) + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + + val c2 = + spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + } + + "adapter multiple events with persist all" in { + val pid = nextPid() + val c = + spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) + val replyProbe = TestProbe[State]() + + c ! IncrementWithPersistAll(2) + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue + events shouldEqual List( + EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))), + EventEnvelope(Sequence(2), pid.id, 2, GenericWrapper(Incremented(1)))) + + val c2 = + spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + } + + "adapt and tag events" in { + val pid = nextPid() + val c = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new GenericWrapperEventAdapter[Event]))) + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + + val c2 = + spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue + taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + } + } +} diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 0493c9ff8ca..925ba8d93a5 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -5,11 +5,14 @@ package docs.akka.persistence.typed import scala.concurrent.duration._ + import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.DeleteEventsFailed import akka.persistence.typed.DeleteSnapshotsFailed +import akka.persistence.typed.EventAdapter +import akka.persistence.typed.EventSeq //#behavior import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.PersistenceId @@ -230,4 +233,23 @@ object BasicPersistentBehaviorCompileOnly { } //#retentionCriteriaWithSignals + //#event-wrapper + case class Wrapper[T](event: T) + class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] { + override def toJournal(e: T): Wrapper[T] = Wrapper(e) + override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event) + override def manifest(event: T): String = "" + } + //#event-wrapper + + //#install-event-adapter + val eventAdapterBehavior: Behavior[Command] = + EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId("abc"), + emptyState = State(), + commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), + eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) + .eventAdapter(new WrapperEventAdapter[Event]) + //#install-event-adapter + } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 079d37bf1b9..0c7507ca160 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -6,9 +6,10 @@ package akka.persistence import akka.actor.{ ActorRef, NoSerializationVerificationNeeded } import akka.persistence.serialization.Message - import scala.collection.immutable +import akka.annotation.DoNotInherit + /** * INTERNAL API * @@ -66,15 +67,17 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per * @see [[akka.persistence.journal.AsyncWriteJournal]] * @see [[akka.persistence.journal.AsyncRecovery]] */ -trait PersistentRepr extends Message { +@DoNotInherit trait PersistentRepr extends Message { /** - * This persistent message's payload. + * This persistent message's payload (the event). */ def payload: Any /** - * Returns the persistent payload's manifest if available + * Returns the event adapter manifest for the persistent payload (event) if available + * May be `""` if event adapter manifest is not used. + * Note that this is not the same as the manifest of the serialized representation of the `payload`. */ def manifest: String @@ -96,16 +99,18 @@ trait PersistentRepr extends Message { def writerUuid: String /** - * Creates a new persistent message with the specified `payload`. + * Creates a new persistent message with the specified `payload` (event). */ def withPayload(payload: Any): PersistentRepr /** - * Creates a new persistent message with the specified `manifest`. + * Creates a new persistent message with the specified event adapter `manifest`. */ def withManifest(manifest: String): PersistentRepr /** + * Not used, can always be `false`. + * * Not used in new records stored with Akka v2.4, but * old records from v2.3 may have this as `true` if * it was a non-permanent delete. @@ -113,7 +118,7 @@ trait PersistentRepr extends Message { def deleted: Boolean /** - * Sender of this message. + * Not used, can be `null` */ def sender: ActorRef