Skip to content

Commit

Permalink
EventSeq in Typed EventAdapter, #26909
Browse files Browse the repository at this point in the history
* Tests for usage of EventSeq
* Moved event adapter tests from EventSourcedBehaviorSpec to
  new EventSourcedEventAdapterSpec
* Also support for the event adapter manifest
* Note in migration guide
  • Loading branch information
patriknw committed Jul 3, 2019
1 parent 75e52ba commit 318bdd9
Show file tree
Hide file tree
Showing 14 changed files with 486 additions and 131 deletions.
Expand Up @@ -206,15 +206,15 @@ By default, these remoting features are disabled when not using Akka Cluster:
When used with Cluster, all previous behavior is the same except a remote watch of an actor is no longer possible before a node joins a cluster, only after.

To optionally enable them without Cluster, if you understand
the @ref[consequences](../remoting-artery.md#quarantine), set
the @ref[consequences](../remoting-artery.md#quarantine), set
```
akka.remote.use-unsafe-remote-features-without-cluster = on`.
```

When used without Cluster

* An initial warning is logged on startup of `RemoteActorRefProvider`
* A warning will be logged on remote watch attempts, which you can suppress by setting
* A warning will be logged on remote watch attempts, which you can suppress by setting
```
akka.remote.warn-unsafe-watch-without-cluster = off
```
Expand Down Expand Up @@ -305,7 +305,7 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off

`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].

### Akka now uses Fork Join Pool from JDK

Expand Down Expand Up @@ -387,6 +387,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
* Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout]
* Changed method signature for `EventAdapter.fromJournal` and support for `manifest` in `EventAdapter`.
* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.
Expand Down
10 changes: 5 additions & 5 deletions akka-docs/src/main/paradox/typed/persistence.md
Expand Up @@ -144,7 +144,7 @@ interpreted correctly on replay. Cluster Sharding ensures that there is only one

## Accessing the ActorContext

If the persistent behavior needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
If the `EventSourcedBehavior` needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
wrapping construction with `Behaviors.setup`:

Scala
Expand Down Expand Up @@ -357,20 +357,20 @@ to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:

Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #event-wrapper }

Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }

Then install it on a persistent behavior:
Then install it on a `EventSourcedBehavior`:

Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #install-event-adapter }

Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }

## Wrapping Persistent Behaviors
## Wrapping EventSourcedBehavior

When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
Expand Down
Expand Up @@ -4,26 +4,110 @@

package akka.persistence.typed

import scala.annotation.varargs
import scala.collection.immutable

import akka.annotation.InternalApi

/**
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
* Typical use cases include (but are not limited to):
* <ul>
* <li>extracting events from "envelopes"</li>
* <li>manually converting to the Journals storage format, such as JSON, BSON or any specialised binary format</li>
* <li>adapting incoming events from a "data model" to the "domain model"</li>
* <li>adding metadata that is understood by the journal</li>
* <li>migration by splitting up events into sequences of other events</li>
* <li>migration filtering out unused events, or replacing an event with another</li>
* </ul>
*/
abstract class EventAdapter[E, P] {

/**
* Type of the event to persist
* Convert domain event to journal event type.
*
* Some journal may require a specific type to be returned to them,
* for example if a primary key has to be associated with each event then a journal
* may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`.
*
* The `toJournal` adaptation must be an 1-to-1 transformation.
* It is not allowed to drop incoming events during the `toJournal` adaptation.
*
* @param e the application-side domain event to be adapted to the journal model
* @return the adapted event object, possibly the same object if no adaptation was performed
*/
type Per = P
def toJournal(e: E): P

/**
* Transform event on the way to the journal
* Return the manifest (type hint) that will be provided in the `fromJournal` method.
* Use `""` if manifest is not needed.
*/
def toJournal(e: E): Per
def manifest(event: E): String

/**
* Transform the event on recovery from the journal.
* Note that this is not called in any read side so will need to be applied
* manually when using Query.
*
* Convert a event from its journal model to the applications domain model.
*
* One event may be adapter into multiple (or none) events which should be delivered to the `EventSourcedBehavior`.
* Use the specialised [[EventSeq.single]] method to emit exactly one event,
* or [[EventSeq.empty]] in case the adapter is not handling this event.
*
* @param p event to be adapted before delivering to the `EventSourcedBehavior`
* @param manifest optionally provided manifest (type hint) in case the Adapter has stored one
* for this event, `""` if none
* @return sequence containing the adapted events (possibly zero) which will be delivered to
* the `EventSourcedBehavior`
*/
def fromJournal(p: Per): E
def fromJournal(p: P, manifest: String): EventSeq[E]
}

sealed trait EventSeq[+A] {
def events: immutable.Seq[A]
def isEmpty: Boolean = events.isEmpty
def nonEmpty: Boolean = events.nonEmpty
def size: Int
}
object EventSeq {

final def empty[A]: EventSeq[A] = EmptyEventSeq.asInstanceOf[EventSeq[A]]

final def single[A](event: A): EventSeq[A] = SingleEventSeq(event)

/** Java API */
@varargs final def create[A](events: A*): EventSeq[A] = EventsSeq(events.toList)

/** Java API */
final def create[A](events: java.util.List[A]): EventSeq[A] = {
import akka.util.ccompat.JavaConverters._
EventsSeq(events.asScala.toList)
}

/** Scala API */
final def apply[A](events: A*): EventSeq[A] = EventsSeq(events.toList)

/** Scala API */
final def apply[A](events: immutable.Seq[A]): EventSeq[A] = EventsSeq(events)

}

/** INTERNAL API */
@InternalApi private[akka] final case class SingleEventSeq[A](event: A) extends EventSeq[A] {
override def events: immutable.Seq[A] = List(event)
override def size: Int = 1
}

/** INTERNAL API */
@InternalApi private[akka] case object EmptyEventSeq extends EventSeq[Nothing] {
override def events: immutable.Seq[Nothing] = Nil
override def size: Int = 0
}

/** INTERNAL API */
@InternalApi private[akka] final case class EventsSeq[A](override val events: immutable.Seq[A]) extends EventSeq[A] {
override def size: Int = events.size
}

/**
Expand All @@ -39,5 +123,6 @@ abstract class EventAdapter[E, P] {
*/
@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] {
override def toJournal(e: E): Any = e
override def fromJournal(p: Any): E = p.asInstanceOf[E]
override def fromJournal(p: Any, manifest: String): EventSeq[E] = EventSeq.single(p.asInstanceOf[E])
override def manifest(event: E): String = ""
}
Expand Up @@ -44,7 +44,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
private val signalHandler: PartialFunction[(S, Signal), Unit],
val tagger: E => Set[String],
val eventAdapter: EventAdapter[E, _],
val eventAdapter: EventAdapter[E, Any],
val snapshotWhen: (S, E, Long) => Boolean,
val recovery: Recovery,
val retention: RetentionCriteria,
Expand Down
Expand Up @@ -26,17 +26,20 @@ private[akka] trait JournalInteractions[C, E, S] {

type EventOrTagged = Any // `Any` since can be `E` or `Tagged`

protected def internalPersist(state: Running.RunningState[S], event: EventOrTagged): Running.RunningState[S] = {
protected def internalPersist(
state: Running.RunningState[S],
event: EventOrTagged,
eventAdapterManifest: String): Running.RunningState[S] = {

val newState = state.nextSequenceNr()

val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped)
sender = ActorRef.noSender)

val write = AtomicWrite(repr) :: Nil
setup.journal
Expand All @@ -46,19 +49,21 @@ private[akka] trait JournalInteractions[C, E, S] {
}

protected def internalPersistAll(
events: immutable.Seq[EventOrTagged],
state: Running.RunningState[S]): Running.RunningState[S] = {
state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
if (events.nonEmpty) {
var newState = state

val writes = events.map { event =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
val writes = events.map {
case (event, eventAdapterManifest) =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
}
val write = AtomicWrite(writes)

Expand Down
Expand Up @@ -6,6 +6,7 @@ package akka.persistence.typed.internal

import scala.util.control.NonFatal
import scala.concurrent.duration._

import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException
Expand All @@ -14,10 +15,14 @@ import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.EmptyEventSeq
import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.OptionVal
import akka.util.unused
import akka.util.PrettyDuration._

Expand Down Expand Up @@ -103,19 +108,31 @@ private[akka] final class ReplayingEvents[C, E, S](
try {
response match {
case ReplayedMessage(repr) =>
val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])

var eventForErrorReporting: OptionVal[Any] = OptionVal.None
try {
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest)

def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
}

eventSeq match {
case SingleEventSeq(event) => handleEvent(event)
case EventsSeq(events) => events.foreach(handleEvent)
case EmptyEventSeq => // no events
}

this
} catch {
case NonFatal(ex) =>
state = state.copy(repr.sequenceNr)
onRecoveryFailure(ex, Some(event))
onRecoveryFailure(ex, Option(eventForErrorReporting.getOrElse(null)))
}

case RecoverySuccess(highestSeqNr) =>
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
onRecoveryCompleted(state)
Expand Down
Expand Up @@ -148,8 +148,9 @@ private[akka] object Running {
val newState = state.applyEvent(setup, event)

val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)

val newState2 = internalPersist(newState, eventToPersist)
val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest)

val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)

Expand All @@ -169,9 +170,9 @@ private[akka] object Running {
(currentState.applyEvent(setup, event), shouldSnapshot)
}

val eventsToPersist = events.map(adaptEvent)
val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))

val newState2 = internalPersistAll(eventsToPersist, newState)
val newState2 = internalPersistAll(newState, eventsToPersist)

persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)

Expand Down
Expand Up @@ -149,6 +149,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/
def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet()

/**
* Transform the event in another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`.
*/
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]

/**
Expand Down
Expand Up @@ -10,6 +10,7 @@
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.function.Procedure;
import akka.persistence.typed.EventSeq;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
Expand All @@ -27,14 +28,14 @@ public abstract static class Simple {

// #event-wrapper
public static class Wrapper<T> {
private final T t;
private final T event;

public Wrapper(T t) {
this.t = t;
public Wrapper(T event) {
this.event = event;
}

public T getT() {
return t;
public T getEvent() {
return event;
}
}

Expand All @@ -46,8 +47,14 @@ public Wrapper<SimpleEvent> toJournal(SimpleEvent simpleEvent) {
}

@Override
public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) {
return simpleEventWrapper.getT();
public String manifest(SimpleEvent event) {
return "";
}

@Override
public EventSeq<SimpleEvent> fromJournal(
Wrapper<SimpleEvent> simpleEventWrapper, String manifest) {
return EventSeq.single(simpleEventWrapper.getEvent());
}
}
// #event-wrapper
Expand Down

0 comments on commit 318bdd9

Please sign in to comment.