Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventSeq in Typed EventAdapter, #26909 #27130

Merged
merged 3 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions akka-actor/src/main/scala/akka/util/OptionVal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ private[akka] final class OptionVal[+A](val x: A) extends AnyVal {
def getOrElse[B >: A](default: B): B =
if (x == null) default else x

/**
* Convert to `scala.Option`
*/
def toOption: Option[A] =
Option(x)

def contains[B >: A](it: B): Boolean =
x != null && x == it

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ By default, these remoting features are disabled when not using Akka Cluster:
When used with Cluster, all previous behavior is the same except a remote watch of an actor is no longer possible before a node joins a cluster, only after.

To optionally enable them without Cluster, if you understand
the @ref[consequences](../remoting-artery.md#quarantine), set
the @ref[consequences](../remoting-artery.md#quarantine), set
```
akka.remote.use-unsafe-remote-features-without-cluster = on`.
```

When used without Cluster

* An initial warning is logged on startup of `RemoteActorRefProvider`
* A warning will be logged on remote watch attempts, which you can suppress by setting
* A warning will be logged on remote watch attempts, which you can suppress by setting
```
akka.remote.warn-unsafe-watch-without-cluster = off
```
Expand Down Expand Up @@ -305,7 +305,7 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off

`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].

### Akka now uses Fork Join Pool from JDK

Expand Down Expand Up @@ -387,6 +387,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
* Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout]
* Changed method signature for `EventAdapter.fromJournal` and support for `manifest` in `EventAdapter`.
* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.
Expand Down
10 changes: 5 additions & 5 deletions akka-docs/src/main/paradox/typed/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ interpreted correctly on replay. Cluster Sharding ensures that there is only one

## Accessing the ActorContext

If the persistent behavior needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
If the `EventSourcedBehavior` needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
wrapping construction with `Behaviors.setup`:

Scala
Expand Down Expand Up @@ -357,20 +357,20 @@ to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:

Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #event-wrapper }

Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }

Then install it on a persistent behavior:
Then install it on a `EventSourcedBehavior`:

Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #install-event-adapter }

Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }

## Wrapping Persistent Behaviors
## Wrapping EventSourcedBehavior

When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,107 @@

package akka.persistence.typed

import scala.annotation.varargs
import scala.collection.immutable

import akka.annotation.InternalApi

/**
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
* Typical use cases include (but are not limited to):
* <ul>
* <li>extracting events from "envelopes"</li>
* <li>adapting events from a "domain model" to the "data model", e.g. converting to the Journals storage format,
* such as JSON, BSON or any specialised binary format</li>
* <li>adapting events from a "data model" to the "domain model"</li>
* <li>adding metadata that is understood by the journal</li>
* <li>migration by splitting up events into sequences of other events</li>
* <li>migration filtering out unused events, or replacing an event with another</li>
* </ul>
*/
abstract class EventAdapter[E, P] {

/**
* Type of the event to persist
* Convert domain event to journal event type.
*
* Some journal may require a specific type to be returned to them,
* for example if a primary key has to be associated with each event then a journal
* may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`.
*
* The `toJournal` adaptation must be an 1-to-1 transformation.
* It is not allowed to drop incoming events during the `toJournal` adaptation.
*
* @param e the application-side domain event to be adapted to the journal model
* @return the adapted event object, possibly the same object if no adaptation was performed
*/
type Per = P
def toJournal(e: E): P

/**
* Transform event on the way to the journal
* Return the manifest (type hint) that will be provided in the `fromJournal` method.
* Use `""` if manifest is not needed.
*/
def toJournal(e: E): Per
def manifest(event: E): String

/**
* Transform the event on recovery from the journal.
* Note that this is not called in any read side so will need to be applied
* manually when using Query.
*
* Convert a event from its journal model to the applications domain model.
*
* One event may be adapter into multiple (or none) events which should be delivered to the `EventSourcedBehavior`.
* Use the specialised [[EventSeq.single]] method to emit exactly one event,
* or [[EventSeq.empty]] in case the adapter is not handling this event.
*
* @param p event to be adapted before delivering to the `EventSourcedBehavior`
* @param manifest optionally provided manifest (type hint) in case the Adapter has stored one
* for this event, `""` if none
* @return sequence containing the adapted events (possibly zero) which will be delivered to
* the `EventSourcedBehavior`
*/
def fromJournal(p: Per): E
def fromJournal(p: P, manifest: String): EventSeq[E]
}

sealed trait EventSeq[+A] {
def events: immutable.Seq[A]
def isEmpty: Boolean = events.isEmpty
def nonEmpty: Boolean = events.nonEmpty
def size: Int
}
object EventSeq {

final def empty[A]: EventSeq[A] = EmptyEventSeq.asInstanceOf[EventSeq[A]]

final def single[A](event: A): EventSeq[A] = SingleEventSeq(event)

@varargs final def many[A](events: A*): EventSeq[A] = EventsSeq(events.toList)

/** Java API */
final def create[A](events: java.util.List[A]): EventSeq[A] = {
import akka.util.ccompat.JavaConverters._
EventsSeq(events.asScala.toList)
}

/** Scala API */
final def apply[A](events: immutable.Seq[A]): EventSeq[A] = EventsSeq(events)

}

/** INTERNAL API */
@InternalApi private[akka] final case class SingleEventSeq[A](event: A) extends EventSeq[A] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried to make this AnyVal but not possible since extends EventSeq

override def events: immutable.Seq[A] = List(event)
override def size: Int = 1
}

/** INTERNAL API */
@InternalApi private[akka] case object EmptyEventSeq extends EventSeq[Nothing] {
override def events: immutable.Seq[Nothing] = Nil
override def size: Int = 0
}

/** INTERNAL API */
@InternalApi private[akka] final case class EventsSeq[A](override val events: immutable.Seq[A]) extends EventSeq[A] {
override def size: Int = events.size
}

/**
Expand All @@ -39,5 +120,6 @@ abstract class EventAdapter[E, P] {
*/
@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] {
override def toJournal(e: E): Any = e
override def fromJournal(p: Any): E = p.asInstanceOf[E]
override def fromJournal(p: Any, manifest: String): EventSeq[E] = EventSeq.single(p.asInstanceOf[E])
override def manifest(event: E): String = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
private val signalHandler: PartialFunction[(S, Signal), Unit],
val tagger: E => Set[String],
val eventAdapter: EventAdapter[E, _],
val eventAdapter: EventAdapter[E, Any],
val snapshotWhen: (S, E, Long) => Boolean,
val recovery: Recovery,
val retention: RetentionCriteria,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ private[akka] trait JournalInteractions[C, E, S] {

type EventOrTagged = Any // `Any` since can be `E` or `Tagged`

protected def internalPersist(state: Running.RunningState[S], event: EventOrTagged): Running.RunningState[S] = {
protected def internalPersist(
state: Running.RunningState[S],
event: EventOrTagged,
eventAdapterManifest: String): Running.RunningState[S] = {

val newState = state.nextSequenceNr()

val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped)
sender = ActorRef.noSender)

val write = AtomicWrite(repr) :: Nil
setup.journal
Expand All @@ -46,19 +49,21 @@ private[akka] trait JournalInteractions[C, E, S] {
}

protected def internalPersistAll(
events: immutable.Seq[EventOrTagged],
state: Running.RunningState[S]): Running.RunningState[S] = {
state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
if (events.nonEmpty) {
var newState = state

val writes = events.map { event =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
val writes = events.map {
case (event, eventAdapterManifest) =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
}
val write = AtomicWrite(writes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.persistence.typed.internal

import scala.util.control.NonFatal
import scala.concurrent.duration._

import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException
Expand All @@ -14,10 +15,14 @@ import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.EmptyEventSeq
import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.OptionVal
import akka.util.unused
import akka.util.PrettyDuration._

Expand Down Expand Up @@ -103,19 +108,31 @@ private[akka] final class ReplayingEvents[C, E, S](
try {
response match {
case ReplayedMessage(repr) =>
val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])

var eventForErrorReporting: OptionVal[Any] = OptionVal.None
try {
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest)

def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
}

eventSeq match {
case SingleEventSeq(event) => handleEvent(event)
case EventsSeq(events) => events.foreach(handleEvent)
case EmptyEventSeq => // no events
}

this
} catch {
case NonFatal(ex) =>
state = state.copy(repr.sequenceNr)
onRecoveryFailure(ex, Some(event))
onRecoveryFailure(ex, eventForErrorReporting.toOption)
}

case RecoverySuccess(highestSeqNr) =>
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
onRecoveryCompleted(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ private[akka] object Running {
val newState = state.applyEvent(setup, event)

val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)

val newState2 = internalPersist(newState, eventToPersist)
val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest)

val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)

Expand All @@ -169,9 +170,9 @@ private[akka] object Running {
(currentState.applyEvent(setup, event), shouldSnapshot)
}

val eventsToPersist = events.map(adaptEvent)
val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))

val newState2 = internalPersistAll(eventsToPersist, newState)
val newState2 = internalPersistAll(newState, eventsToPersist)

persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/
def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet()

/**
* Transform the event in another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`.
*/
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]

/**
Expand Down