diff --git a/akka-actor/src/main/scala/akka/util/OptionVal.scala b/akka-actor/src/main/scala/akka/util/OptionVal.scala
index 0f29b9db9f1..273d9d4abeb 100644
--- a/akka-actor/src/main/scala/akka/util/OptionVal.scala
+++ b/akka-actor/src/main/scala/akka/util/OptionVal.scala
@@ -53,6 +53,12 @@ private[akka] final class OptionVal[+A](val x: A) extends AnyVal {
def getOrElse[B >: A](default: B): B =
if (x == null) default else x
+ /**
+ * Convert to `scala.Option`
+ */
+ def toOption: Option[A] =
+ Option(x)
+
def contains[B >: A](it: B): Boolean =
x != null && x == it
diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
index 6c9db0478f3..8d7fa471f18 100644
--- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
+++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
@@ -206,7 +206,7 @@ By default, these remoting features are disabled when not using Akka Cluster:
When used with Cluster, all previous behavior is the same except a remote watch of an actor is no longer possible before a node joins a cluster, only after.
To optionally enable them without Cluster, if you understand
-the @ref[consequences](../remoting-artery.md#quarantine), set
+the @ref[consequences](../remoting-artery.md#quarantine), set
```
akka.remote.use-unsafe-remote-features-without-cluster = on`.
```
@@ -214,7 +214,7 @@ akka.remote.use-unsafe-remote-features-without-cluster = on`.
When used without Cluster
* An initial warning is logged on startup of `RemoteActorRefProvider`
-* A warning will be logged on remote watch attempts, which you can suppress by setting
+* A warning will be logged on remote watch attempts, which you can suppress by setting
```
akka.remote.warn-unsafe-watch-without-cluster = off
```
@@ -305,7 +305,7 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off
`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
-the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
+the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
### Akka now uses Fork Join Pool from JDK
@@ -387,6 +387,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
* Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout]
+* Changed method signature for `EventAdapter.fromJournal` and support for `manifest` in `EventAdapter`.
* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.
diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md
index a78a1e598b6..b0a40f1908c 100644
--- a/akka-docs/src/main/paradox/typed/persistence.md
+++ b/akka-docs/src/main/paradox/typed/persistence.md
@@ -144,7 +144,7 @@ interpreted correctly on replay. Cluster Sharding ensures that there is only one
## Accessing the ActorContext
-If the persistent behavior needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
+If the `EventSourcedBehavior` needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
wrapping construction with `Behaviors.setup`:
Scala
@@ -357,20 +357,20 @@ to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:
Scala
-: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper }
+: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #event-wrapper }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }
-Then install it on a persistent behavior:
+Then install it on a `EventSourcedBehavior`:
Scala
-: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter }
+: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #install-event-adapter }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }
-## Wrapping Persistent Behaviors
+## Wrapping EventSourcedBehavior
When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala
index 4551d70ecaa..6e49a1250cb 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala
@@ -4,26 +4,107 @@
package akka.persistence.typed
+import scala.annotation.varargs
+import scala.collection.immutable
+
import akka.annotation.InternalApi
+/**
+ * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
+ * Typical use cases include (but are not limited to):
+ *
+ * - extracting events from "envelopes"
+ * - adapting events from a "domain model" to the "data model", e.g. converting to the Journals storage format,
+ * such as JSON, BSON or any specialised binary format
+ * - adapting events from a "data model" to the "domain model"
+ * - adding metadata that is understood by the journal
+ * - migration by splitting up events into sequences of other events
+ * - migration filtering out unused events, or replacing an event with another
+ *
+ */
abstract class EventAdapter[E, P] {
/**
- * Type of the event to persist
+ * Convert domain event to journal event type.
+ *
+ * Some journal may require a specific type to be returned to them,
+ * for example if a primary key has to be associated with each event then a journal
+ * may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`.
+ *
+ * The `toJournal` adaptation must be an 1-to-1 transformation.
+ * It is not allowed to drop incoming events during the `toJournal` adaptation.
+ *
+ * @param e the application-side domain event to be adapted to the journal model
+ * @return the adapted event object, possibly the same object if no adaptation was performed
*/
- type Per = P
+ def toJournal(e: E): P
/**
- * Transform event on the way to the journal
+ * Return the manifest (type hint) that will be provided in the `fromJournal` method.
+ * Use `""` if manifest is not needed.
*/
- def toJournal(e: E): Per
+ def manifest(event: E): String
/**
* Transform the event on recovery from the journal.
* Note that this is not called in any read side so will need to be applied
* manually when using Query.
+ *
+ * Convert a event from its journal model to the applications domain model.
+ *
+ * One event may be adapter into multiple (or none) events which should be delivered to the `EventSourcedBehavior`.
+ * Use the specialised [[EventSeq.single]] method to emit exactly one event,
+ * or [[EventSeq.empty]] in case the adapter is not handling this event.
+ *
+ * @param p event to be adapted before delivering to the `EventSourcedBehavior`
+ * @param manifest optionally provided manifest (type hint) in case the Adapter has stored one
+ * for this event, `""` if none
+ * @return sequence containing the adapted events (possibly zero) which will be delivered to
+ * the `EventSourcedBehavior`
*/
- def fromJournal(p: Per): E
+ def fromJournal(p: P, manifest: String): EventSeq[E]
+}
+
+sealed trait EventSeq[+A] {
+ def events: immutable.Seq[A]
+ def isEmpty: Boolean = events.isEmpty
+ def nonEmpty: Boolean = events.nonEmpty
+ def size: Int
+}
+object EventSeq {
+
+ final def empty[A]: EventSeq[A] = EmptyEventSeq.asInstanceOf[EventSeq[A]]
+
+ final def single[A](event: A): EventSeq[A] = SingleEventSeq(event)
+
+ @varargs final def many[A](events: A*): EventSeq[A] = EventsSeq(events.toList)
+
+ /** Java API */
+ final def create[A](events: java.util.List[A]): EventSeq[A] = {
+ import akka.util.ccompat.JavaConverters._
+ EventsSeq(events.asScala.toList)
+ }
+
+ /** Scala API */
+ final def apply[A](events: immutable.Seq[A]): EventSeq[A] = EventsSeq(events)
+
+}
+
+/** INTERNAL API */
+@InternalApi private[akka] final case class SingleEventSeq[A](event: A) extends EventSeq[A] {
+ override def events: immutable.Seq[A] = List(event)
+ override def size: Int = 1
+}
+
+/** INTERNAL API */
+@InternalApi private[akka] case object EmptyEventSeq extends EventSeq[Nothing] {
+ override def events: immutable.Seq[Nothing] = Nil
+ override def size: Int = 0
+}
+
+/** INTERNAL API */
+@InternalApi private[akka] final case class EventsSeq[A](override val events: immutable.Seq[A]) extends EventSeq[A] {
+ override def size: Int = events.size
}
/**
@@ -39,5 +120,6 @@ abstract class EventAdapter[E, P] {
*/
@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] {
override def toJournal(e: E): Any = e
- override def fromJournal(p: Any): E = p.asInstanceOf[E]
+ override def fromJournal(p: Any, manifest: String): EventSeq[E] = EventSeq.single(p.asInstanceOf[E])
+ override def manifest(event: E): String = ""
}
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala
index 2713e86da3d..2c2be2083ba 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala
@@ -44,7 +44,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
private val signalHandler: PartialFunction[(S, Signal), Unit],
val tagger: E => Set[String],
- val eventAdapter: EventAdapter[E, _],
+ val eventAdapter: EventAdapter[E, Any],
val snapshotWhen: (S, E, Long) => Boolean,
val recovery: Recovery,
val retention: RetentionCriteria,
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala
index 84edca5ecef..9b613d30848 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala
@@ -26,17 +26,20 @@ private[akka] trait JournalInteractions[C, E, S] {
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
- protected def internalPersist(state: Running.RunningState[S], event: EventOrTagged): Running.RunningState[S] = {
+ protected def internalPersist(
+ state: Running.RunningState[S],
+ event: EventOrTagged,
+ eventAdapterManifest: String): Running.RunningState[S] = {
val newState = state.nextSequenceNr()
- val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
+ manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
- sender = senderNotKnownBecauseAkkaTyped)
+ sender = ActorRef.noSender)
val write = AtomicWrite(repr) :: Nil
setup.journal
@@ -46,19 +49,21 @@ private[akka] trait JournalInteractions[C, E, S] {
}
protected def internalPersistAll(
- events: immutable.Seq[EventOrTagged],
- state: Running.RunningState[S]): Running.RunningState[S] = {
+ state: Running.RunningState[S],
+ events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
if (events.nonEmpty) {
var newState = state
- val writes = events.map { event =>
- newState = newState.nextSequenceNr()
- PersistentRepr(
- event,
- persistenceId = setup.persistenceId.id,
- sequenceNr = newState.seqNr,
- writerUuid = setup.writerIdentity.writerUuid,
- sender = ActorRef.noSender)
+ val writes = events.map {
+ case (event, eventAdapterManifest) =>
+ newState = newState.nextSequenceNr()
+ PersistentRepr(
+ event,
+ persistenceId = setup.persistenceId.id,
+ sequenceNr = newState.seqNr,
+ manifest = eventAdapterManifest,
+ writerUuid = setup.writerIdentity.writerUuid,
+ sender = ActorRef.noSender)
}
val write = AtomicWrite(writes)
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala
index cc06f170e9b..f2351bad115 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala
@@ -6,6 +6,7 @@ package akka.persistence.typed.internal
import scala.util.control.NonFatal
import scala.concurrent.duration._
+
import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException
@@ -14,10 +15,14 @@ import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
+import akka.persistence.typed.EmptyEventSeq
+import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.RecoveryCompleted
+import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
+import akka.util.OptionVal
import akka.util.unused
import akka.util.PrettyDuration._
@@ -103,19 +108,31 @@ private[akka] final class ReplayingEvents[C, E, S](
try {
response match {
case ReplayedMessage(repr) =>
- val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])
-
+ var eventForErrorReporting: OptionVal[Any] = OptionVal.None
try {
- state = state.copy(
- seqNr = repr.sequenceNr,
- state = setup.eventHandler(state.state, event),
- eventSeenInInterval = true)
+ val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest)
+
+ def handleEvent(event: E): Unit = {
+ eventForErrorReporting = OptionVal.Some(event)
+ state = state.copy(
+ seqNr = repr.sequenceNr,
+ state = setup.eventHandler(state.state, event),
+ eventSeenInInterval = true)
+ }
+
+ eventSeq match {
+ case SingleEventSeq(event) => handleEvent(event)
+ case EventsSeq(events) => events.foreach(handleEvent)
+ case EmptyEventSeq => // no events
+ }
+
this
} catch {
case NonFatal(ex) =>
state = state.copy(repr.sequenceNr)
- onRecoveryFailure(ex, Some(event))
+ onRecoveryFailure(ex, eventForErrorReporting.toOption)
}
+
case RecoverySuccess(highestSeqNr) =>
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
onRecoveryCompleted(state)
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
index 26b3cf0cc2e..3597ace0c25 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
@@ -148,8 +148,9 @@ private[akka] object Running {
val newState = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
+ val eventAdapterManifest = setup.eventAdapter.manifest(event)
- val newState2 = internalPersist(newState, eventToPersist)
+ val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest)
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
@@ -169,9 +170,9 @@ private[akka] object Running {
(currentState.applyEvent(setup, event), shouldSnapshot)
}
- val eventsToPersist = events.map(adaptEvent)
+ val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))
- val newState2 = internalPersistAll(eventsToPersist, newState)
+ val newState2 = internalPersistAll(newState, eventsToPersist)
persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala
index 46e515159a8..5d699141363 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala
@@ -149,6 +149,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/
def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet()
+ /**
+ * Transform the event in another type before giving to the journal. Can be used to wrap events
+ * in types Journals understand but is of a different type than `Event`.
+ */
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
/**
diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java
index 668ce9aecc0..c564f27810f 100644
--- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java
+++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java
@@ -10,6 +10,7 @@
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.function.Procedure;
+import akka.persistence.typed.EventSeq;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
@@ -27,14 +28,14 @@ public abstract static class Simple {
// #event-wrapper
public static class Wrapper {
- private final T t;
+ private final T event;
- public Wrapper(T t) {
- this.t = t;
+ public Wrapper(T event) {
+ this.event = event;
}
- public T getT() {
- return t;
+ public T getEvent() {
+ return event;
}
}
@@ -46,12 +47,21 @@ public Wrapper toJournal(SimpleEvent simpleEvent) {
}
@Override
- public SimpleEvent fromJournal(Wrapper simpleEventWrapper) {
- return simpleEventWrapper.getT();
+ public String manifest(SimpleEvent event) {
+ return "";
+ }
+
+ @Override
+ public EventSeq fromJournal(
+ Wrapper simpleEventWrapper, String manifest) {
+ return EventSeq.single(simpleEventWrapper.getEvent());
}
}
// #event-wrapper
+ // try varargs
+ private EventSeq many = EventSeq.many(new SimpleEvent("a"), new SimpleEvent("b"));
+
public static class SimpleCommand {
public final String data;
diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java
index 8dc745807fd..c0f7156aeda 100644
--- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java
+++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java
@@ -17,6 +17,7 @@
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.*;
+import akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
@@ -35,7 +36,6 @@
import java.util.*;
import static akka.Done.done;
-import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
@@ -43,7 +43,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public static final Config config =
ConfigFactory.parseString("akka.loggers = [akka.testkit.TestEventListener]")
- .withFallback(conf().withFallback(ConfigFactory.load()));
+ .withFallback(EventSourcedBehaviorSpec.conf().withFallback(ConfigFactory.load()));
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
@@ -583,15 +583,47 @@ public void transformEvent() throws Exception {
}
// event-wrapper
- class WrapperEventAdapter extends EventAdapter {
+ public static class Wrapper implements Serializable {
+ private final T t;
+
+ public Wrapper(T t) {
+ this.t = t;
+ }
+
+ public T getT() {
+ return t;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Wrapper> wrapper = (Wrapper>) o;
+
+ return t.equals(wrapper.t);
+ }
+
@Override
- public Wrapper toJournal(Incremented incremented) {
+ public int hashCode() {
+ return t.hashCode();
+ }
+ }
+
+ class WrapperEventAdapter extends EventAdapter> {
+ @Override
+ public Wrapper toJournal(Incremented incremented) {
return new Wrapper<>(incremented);
}
@Override
- public Incremented fromJournal(Wrapper wrapper) {
- return (Incremented) wrapper.t();
+ public String manifest(Incremented event) {
+ return "";
+ }
+
+ @Override
+ public EventSeq fromJournal(Wrapper wrapper, String manifest) {
+ return EventSeq.single(wrapper.getT());
}
}
// event-wrapper
diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
index 35c8c8af665..4abc10b28fa 100644
--- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
+++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
@@ -32,7 +32,6 @@ import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
-import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
@@ -51,14 +50,6 @@ import org.scalatest.WordSpecLike
object EventSourcedBehaviorSpec {
- //#event-wrapper
- case class Wrapper[T](t: T)
- class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
- override def toJournal(e: T): Wrapper[T] = Wrapper(e)
- override def fromJournal(p: Wrapper[T]): T = p.t
- }
- //#event-wrapper
-
class SlowInMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, UntypedSnapshotMetadata)]
@@ -484,70 +475,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1)))
}
- "adapt events" in {
- val pid = nextPid
- val c = spawn(Behaviors.setup[Command] { ctx =>
- val persistentBehavior = counter(ctx, pid)
-
- //#install-event-adapter
- persistentBehavior.eventAdapter(new WrapperEventAdapter[Event])
- //#install-event-adapter
- })
- val replyProbe = TestProbe[State]()
-
- c ! Increment
- c ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(1, Vector(0)))
-
- val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
- events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
-
- val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
- c2 ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(1, Vector(0)))
-
- }
-
- "adapter multiple events with persist all" in {
- val pid = nextPid
- val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
- val replyProbe = TestProbe[State]()
-
- c ! IncrementWithPersistAll(2)
- c ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(2, Vector(0, 1)))
-
- val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
- events shouldEqual List(
- EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))),
- EventEnvelope(Sequence(2), pid.id, 2, Wrapper(Incremented(1))))
-
- val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
- c2 ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(2, Vector(0, 1)))
- }
-
- "adapt and tag events" in {
- val pid = nextPid
- val c = spawn(Behaviors.setup[Command](ctx =>
- counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new WrapperEventAdapter[Event])))
- val replyProbe = TestProbe[State]()
-
- c ! Increment
- c ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(1, Vector(0)))
-
- val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
- events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
-
- val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
- c2 ! GetValue(replyProbe.ref)
- replyProbe.expectMessage(State(1, Vector(0)))
-
- val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
- taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
- }
-
"handle scheduled message arriving before recovery completed " in {
val c = spawn(Behaviors.withTimers[Command] { timers =>
timers.startSingleTimer("tick", Increment, 1.millis)
diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala
new file mode 100644
index 00000000000..1669d1957d1
--- /dev/null
+++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala
@@ -0,0 +1,249 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.persistence.typed.scaladsl
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import akka.actor.testkit.typed.scaladsl.TestProbe
+import akka.actor.typed.ActorRef
+import akka.actor.typed.scaladsl.Behaviors
+import akka.persistence.query.EventEnvelope
+import akka.persistence.query.PersistenceQuery
+import akka.persistence.query.Sequence
+import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
+import akka.persistence.typed.EventAdapter
+import akka.persistence.typed.EventSeq
+import akka.persistence.typed.PersistenceId
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Sink
+import akka.testkit.EventFilter
+import akka.testkit.TestEvent.Mute
+import com.typesafe.config.ConfigFactory
+import org.scalatest.WordSpecLike
+
+object EventSourcedEventAdapterSpec {
+
+ private val conf = ConfigFactory.parseString(s"""
+ akka.loggers = [akka.testkit.TestEventListener]
+ akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
+ akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
+ """)
+
+ case class Wrapper(t: String)
+ class WrapperEventAdapter extends EventAdapter[String, Wrapper] {
+ override def toJournal(e: String): Wrapper = Wrapper("<" + e)
+ override def fromJournal(p: Wrapper, manifest: String): EventSeq[String] = EventSeq.single(p.t + ">")
+ override def manifest(event: String): String = ""
+ }
+
+ class FilterEventAdapter extends EventAdapter[String, String] {
+ override def toJournal(e: String): String = e.toUpperCase()
+
+ override def fromJournal(p: String, manifest: String): EventSeq[String] = {
+ if (p == "B") EventSeq.empty
+ else EventSeq.single(p)
+ }
+
+ override def manifest(event: String): String = ""
+ }
+
+ class SplitEventAdapter extends EventAdapter[String, String] {
+ override def toJournal(e: String): String = e.toUpperCase()
+
+ override def fromJournal(p: String, manifest: String): EventSeq[String] = {
+ EventSeq(p.map("<" + _.toString + ">"))
+ }
+
+ override def manifest(event: String): String = ""
+ }
+
+ class EventAdapterWithManifest extends EventAdapter[String, String] {
+ override def toJournal(e: String): String = e.toUpperCase()
+
+ override def fromJournal(p: String, manifest: String): EventSeq[String] = {
+ EventSeq.single(p + manifest)
+ }
+
+ override def manifest(event: String): String = event.length.toString
+ }
+
+ case class GenericWrapper[T](t: T)
+ class GenericWrapperEventAdapter[T] extends EventAdapter[T, GenericWrapper[T]] {
+ override def toJournal(e: T): GenericWrapper[T] = GenericWrapper(e)
+ override def fromJournal(p: GenericWrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.t)
+ override def manifest(event: T): String = ""
+ }
+
+}
+
+class EventSourcedEventAdapterSpec
+ extends ScalaTestWithActorTestKit(EventSourcedEventAdapterSpec.conf)
+ with WordSpecLike {
+ import EventSourcedEventAdapterSpec._
+ import EventSourcedBehaviorSpec.{
+ counter,
+ Command,
+ Event,
+ GetValue,
+ Increment,
+ IncrementWithPersistAll,
+ Incremented,
+ State
+ }
+
+ import akka.actor.typed.scaladsl.adapter._
+ system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))
+
+ val pidCounter = new AtomicInteger(0)
+ private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
+
+ implicit val materializer = ActorMaterializer()(system.toUntyped)
+ val queries: LeveldbReadJournal =
+ PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
+
+ private def behavior(pid: PersistenceId, probe: ActorRef[String]): EventSourcedBehavior[String, String, String] =
+ EventSourcedBehavior(pid, "", commandHandler = { (_, command) =>
+ Effect.persist(command).thenRun(newState => probe ! newState)
+ }, eventHandler = { (state, evt) =>
+ state + evt
+ })
+
+ "Event adapter" must {
+
+ "wrap single events" in {
+ val probe = TestProbe[String]()
+ val pid = nextPid()
+ val ref = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter))
+
+ ref ! "a"
+ ref ! "b"
+ probe.expectMessage("a")
+ probe.expectMessage("ab")
+
+ // replay
+ val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter))
+ ref2 ! "c"
+ probe.expectMessage("c")
+ }
+
+ "filter unused events" in {
+ val probe = TestProbe[String]()
+ val pid = nextPid()
+ val ref = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter))
+
+ ref ! "a"
+ ref ! "b"
+ ref ! "c"
+ probe.expectMessage("a")
+ probe.expectMessage("ab")
+ probe.expectMessage("abc")
+
+ // replay
+ val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter))
+ ref2 ! "d"
+ probe.expectMessage("ACd")
+ }
+
+ "split one event into several" in {
+ val probe = TestProbe[String]()
+ val pid = nextPid()
+ val ref = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter))
+
+ ref ! "a"
+ ref ! "bc"
+ probe.expectMessage("a")
+ probe.expectMessage("abc")
+
+ // replay
+ val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter))
+ ref2 ! "d"
+ probe.expectMessage("d")
+ }
+
+ "support manifest" in {
+ val probe = TestProbe[String]()
+ val pid = nextPid()
+ val ref = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest))
+
+ ref ! "a"
+ ref ! "bcd"
+ probe.expectMessage("a")
+ probe.expectMessage("abcd")
+
+ // replay
+ val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest))
+ ref2 ! "e"
+ probe.expectMessage("A1BCD3e")
+ }
+
+ "adapt events" in {
+ val pid = nextPid()
+ val c = spawn(Behaviors.setup[Command] { ctx =>
+ val persistentBehavior = counter(ctx, pid)
+
+ persistentBehavior.eventAdapter(new GenericWrapperEventAdapter[Event])
+ })
+ val replyProbe = TestProbe[State]()
+
+ c ! Increment
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+
+ val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
+ events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
+
+ val c2 =
+ spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
+ c2 ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+
+ }
+
+ "adapter multiple events with persist all" in {
+ val pid = nextPid()
+ val c =
+ spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
+ val replyProbe = TestProbe[State]()
+
+ c ! IncrementWithPersistAll(2)
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(2, Vector(0, 1)))
+
+ val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
+ events shouldEqual List(
+ EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))),
+ EventEnvelope(Sequence(2), pid.id, 2, GenericWrapper(Incremented(1))))
+
+ val c2 =
+ spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
+ c2 ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(2, Vector(0, 1)))
+ }
+
+ "adapt and tag events" in {
+ val pid = nextPid()
+ val c = spawn(Behaviors.setup[Command](ctx =>
+ counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new GenericWrapperEventAdapter[Event])))
+ val replyProbe = TestProbe[State]()
+
+ c ! Increment
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+
+ val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
+ events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
+
+ val c2 =
+ spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
+ c2 ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+
+ val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
+ taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
+ }
+ }
+}
diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
index 0493c9ff8ca..925ba8d93a5 100644
--- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
+++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala
@@ -5,11 +5,14 @@
package docs.akka.persistence.typed
import scala.concurrent.duration._
+
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsFailed
+import akka.persistence.typed.EventAdapter
+import akka.persistence.typed.EventSeq
//#behavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
@@ -230,4 +233,23 @@ object BasicPersistentBehaviorCompileOnly {
}
//#retentionCriteriaWithSignals
+ //#event-wrapper
+ case class Wrapper[T](event: T)
+ class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
+ override def toJournal(e: T): Wrapper[T] = Wrapper(e)
+ override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event)
+ override def manifest(event: T): String = ""
+ }
+ //#event-wrapper
+
+ //#install-event-adapter
+ val eventAdapterBehavior: Behavior[Command] =
+ EventSourcedBehavior[Command, Event, State](
+ persistenceId = PersistenceId("abc"),
+ emptyState = State(),
+ commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
+ eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
+ .eventAdapter(new WrapperEventAdapter[Event])
+ //#install-event-adapter
+
}
diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
index 079d37bf1b9..420adc62245 100644
--- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
@@ -6,9 +6,10 @@ package akka.persistence
import akka.actor.{ ActorRef, NoSerializationVerificationNeeded }
import akka.persistence.serialization.Message
-
import scala.collection.immutable
+import akka.annotation.DoNotInherit
+
/**
* INTERNAL API
*
@@ -66,15 +67,17 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per
* @see [[akka.persistence.journal.AsyncWriteJournal]]
* @see [[akka.persistence.journal.AsyncRecovery]]
*/
-trait PersistentRepr extends Message {
+@DoNotInherit trait PersistentRepr extends Message {
/**
- * This persistent message's payload.
+ * This persistent message's payload (the event).
*/
def payload: Any
/**
- * Returns the persistent payload's manifest if available
+ * Returns the event adapter manifest for the persistent payload (event) if available
+ * May be `""` if event adapter manifest is not used.
+ * Note that this is not the same as the manifest of the serialized representation of the `payload`.
*/
def manifest: String
@@ -96,26 +99,28 @@ trait PersistentRepr extends Message {
def writerUuid: String
/**
- * Creates a new persistent message with the specified `payload`.
+ * Creates a new persistent message with the specified `payload` (event).
*/
def withPayload(payload: Any): PersistentRepr
/**
- * Creates a new persistent message with the specified `manifest`.
+ * Creates a new persistent message with the specified event adapter `manifest`.
*/
def withManifest(manifest: String): PersistentRepr
/**
+ * Not used, can always be `false`.
+ *
* Not used in new records stored with Akka v2.4, but
* old records from v2.3 may have this as `true` if
* it was a non-permanent delete.
*/
- def deleted: Boolean
+ def deleted: Boolean // FIXME deprecate, issue #27278
/**
- * Sender of this message.
+ * Not used, can be `null`
*/
- def sender: ActorRef
+ def sender: ActorRef // FIXME deprecate, issue #27278
/**
* Creates a new copy of this [[PersistentRepr]].