Skip to content

Commit

Permalink
Delete effect in durable state, #30446 (#31529)
Browse files Browse the repository at this point in the history
* Add reset effect to the state dsl.
* The effect calls deleteObject in the durable state store.
* The effect updates state to the empty state.
* Implement DeletedDurableState for persistence query.
* Update PersistenceTestKitDurableStateStore so that deleteObject sets the Record payload to None, ie delete the payload.
* update documentation for delete effect
* increment the revision by one when deleting state
* Overload deleteObject with revision and deprecate deleteObject.
* add bin-comp exclude

(cherry picked from commit 34a621a)
  • Loading branch information
jausmann authored and patriknw committed Sep 2, 2022
1 parent 1414566 commit 66afe3f
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 44 deletions.
Expand Up @@ -5,10 +5,9 @@
package docs.akka.cluster.sharding.typed

import scala.annotation.nowarn

import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.Offset
import akka.persistence.query.{ DeletedDurableState, Offset }
import akka.stream.scaladsl.Source

@nowarn
Expand All @@ -24,7 +23,8 @@ object DurableStateStoreQueryUsageCompileOnlySpec {
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
case _: DeletedDurableState[_] => None
}
//#get-durable-state-store-query-example
}
Expand Down
Expand Up @@ -46,4 +46,3 @@ Java
: @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example }

The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`.
`DeletedDurableState` is not implemented yet.
Expand Up @@ -152,6 +152,7 @@ and can be one of:
* `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing
persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already
existing record. Otherwise `persist` will fail.
* `delete` will delete the state by setting it to the empty state and the revision number will be incremented by 1.
* `none` no state to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
Expand Down
Expand Up @@ -9,8 +9,7 @@ import akka.annotation.DoNotInherit
/**
* The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`.
*
* The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446
* The implementation can be a [[UpdatedDurableState]] or a [[DeletedDurableState]].
*
* Not for user extension
*
Expand Down Expand Up @@ -53,3 +52,25 @@ final class UpdatedDurableState[A](
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]

object DeletedDurableState {

def unapply[A](arg: DeletedDurableState[A]): Option[(String, Long, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.offset, arg.timestamp))
}

/**
*
* @param persistenceId The persistence id of the origin entity.
* @param revision The revision number from the origin entity.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
* @tparam A the type of the value
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]
Expand Up @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand All @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand Down
Expand Up @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand All @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand Down
Expand Up @@ -34,8 +34,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
Expand All @@ -56,8 +56,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
Expand Down
Expand Up @@ -35,8 +35,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
Expand All @@ -57,8 +57,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
Expand Down
Expand Up @@ -5,11 +5,9 @@
package akka.persistence.testkit.state.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import java.util.concurrent.{ CompletableFuture, CompletionStage }
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

import akka.japi.Pair
import akka.{ Done, NotUsed }
import akka.persistence.query.DurableStateChange
Expand Down Expand Up @@ -37,8 +35,10 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A])
def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] =
stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava

def deleteObject(persistenceId: String): CompletionStage[Done] =
stateStore.deleteObject(persistenceId).toJava
def deleteObject(persistenceId: String): CompletionStage[Done] = CompletableFuture.completedFuture(Done)

def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
stateStore.deleteObject(persistenceId, revision).toJava

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = {
stateStore.changes(tag, offset).asJava
Expand Down
Expand Up @@ -11,12 +11,15 @@ import scala.concurrent.Future
import akka.{ Done, NotUsed }
import akka.actor.ExtendedActorSystem
import akka.persistence.Persistence
import akka.persistence.query.DurableStateChange
import akka.persistence.query.{
DeletedDurableState,
DurableStateChange,
NoOffset,
Offset,
Sequence,
UpdatedDurableState
}
import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery }
import akka.persistence.query.UpdatedDurableState
import akka.persistence.query.Offset
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult }
import akka.persistence.typed.PersistenceId
Expand All @@ -27,8 +30,6 @@ import akka.stream.typed.scaladsl.ActorSource
import akka.stream.OverflowStrategy
import scala.collection.immutable

import akka.persistence.testkit.internal.CurrentTime

object PersistenceTestKitDurableStateStore {
val Identifier = "akka.persistence.testkit.state"
}
Expand All @@ -54,22 +55,29 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)

override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized {
Future.successful(store.get(persistenceId) match {
case Some(record) => GetObjectResult(Some(record.value), record.revision)
case None => GetObjectResult(None, 0)
case Some(Record(_, _, revision, Some(value), _, _)) => GetObjectResult(Some(value), revision)
case Some(Record(_, _, revision, None, _, _)) => GetObjectResult(None, revision)
case None => GetObjectResult(None, 0)
})
}

override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
this.synchronized {
val globalOffset = lastGlobalOffset.incrementAndGet()
val record = Record(globalOffset, persistenceId, revision, value, tag)
val record = Record(globalOffset, persistenceId, revision, Some(value), tag)
store = store + (persistenceId -> record)
publisher ! record
Future.successful(Done)
}

override def deleteObject(persistenceId: String): Future[Done] = this.synchronized {
store = store - persistenceId
override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done)

override def deleteObject(persistenceId: String, revision: Long): Future[Done] = this.synchronized {
store = store.get(persistenceId) match {
case Some(record) => store + (persistenceId -> record.copy(value = None, revision = revision))
case None => store
}

Future.successful(Done)
}

Expand Down Expand Up @@ -191,9 +199,15 @@ private final case class Record[A](
globalOffset: Long,
persistenceId: String,
revision: Long,
value: A,
value: Option[A],
tag: String,
timestamp: Long = CurrentTime.now()) {
def toDurableStateChange: DurableStateChange[A] =
new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp)
timestamp: Long = System.currentTimeMillis) {
def toDurableStateChange: DurableStateChange[A] = {
value match {
case Some(v) =>
new UpdatedDurableState(persistenceId, revision, v, Sequence(globalOffset), timestamp)
case None =>
new DeletedDurableState(persistenceId, revision, Sequence(globalOffset), timestamp)
}
}
}
Expand Up @@ -31,6 +31,7 @@ object DurableStateBehaviorReplySpec {
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
case object Increment extends Command[Nothing]
case class IncrementBy(by: Int) extends Command[Nothing]

Expand Down Expand Up @@ -61,6 +62,9 @@ object DurableStateBehaviorReplySpec {
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)

case DeleteWithConfirmation(replyTo) =>
Effect.delete[State]().thenReply(replyTo)(_ => Done)

case _ => ???

})
Expand Down Expand Up @@ -108,5 +112,20 @@ class DurableStateBehaviorReplySpec
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1))
}

"delete state thenReply" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]()
c ! IncrementWithConfirmation(updateProbe.ref)
updateProbe.expectMessage(Done)

val deleteProbe = TestProbe[Done]()
c ! DeleteWithConfirmation(deleteProbe.ref)
deleteProbe.expectMessage(Done)

val queryProbe = TestProbe[State]()
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(0))
}
}
}
Expand Up @@ -176,6 +176,8 @@ private[akka] final case class DurableStateBehaviorImpl[Command, State](
final case class GetFailure(cause: Throwable) extends InternalProtocol
case object UpsertSuccess extends InternalProtocol
final case class UpsertFailure(cause: Throwable) extends InternalProtocol
case object DeleteSuccess extends InternalProtocol
final case class DeleteFailure(cause: Throwable) extends InternalProtocol
case object RecoveryTimeout extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol

Expand Down
Expand Up @@ -56,10 +56,30 @@ private[akka] trait DurableStateStoreInteractions[C, S] {
newRunningState
}

protected def internalDelete(
ctx: ActorContext[InternalProtocol],
cmd: Any,
state: Running.RunningState[S]): Running.RunningState[S] = {

val newRunningState = state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id

onDeleteInitiated(ctx, cmd)

ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId, newRunningState.revision)) {
case Success(_) => InternalProtocol.DeleteSuccess
case Failure(cause) => InternalProtocol.DeleteFailure(cause)
}

newRunningState
}

// FIXME These hook methods are for Telemetry. What more parameters are needed? persistenceId?
@InternalStableApi
private[akka] def onWriteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()

private[akka] def onDeleteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()

protected def requestRecoveryPermit(): Unit = {
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfClassic)
}
Expand Down
Expand Up @@ -71,6 +71,10 @@ private[akka] final case class Persist[State](newState: State) extends EffectImp
override def toString: String = s"Persist(${newState.getClass.getName})"
}

/** INTERNAL API */
@InternalApi
private[akka] case class Delete[State]() extends EffectImpl[State]

/** INTERNAL API */
@InternalApi
private[akka] case object Unhandled extends EffectImpl[Nothing]
Expand Down
Expand Up @@ -88,6 +88,8 @@ private[akka] class Recovering[C, S](
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case UpsertSuccess => Behaviors.unhandled
case _: UpsertFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case _: DeleteFailure => Behaviors.unhandled
}
}

Expand Down
Expand Up @@ -141,6 +141,10 @@ private[akka] object Running {
case _: PersistNothing.type =>
(applySideEffects(sideEffects, state), true)

case _: Delete[_] =>
val nextState = internalDelete(setup.context, msg, state)
(applySideEffects(sideEffects, nextState), true)

case _: Unhandled.type =>
import akka.actor.typed.scaladsl.adapter._
setup.context.system.toClassic.eventStream
Expand Down Expand Up @@ -194,6 +198,8 @@ private[akka] object Running {
case RecoveryPermitGranted => Behaviors.unhandled
case _: GetSuccess[_] => Behaviors.unhandled
case _: GetFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case DeleteFailure(_) => Behaviors.unhandled
}
}

Expand Down

0 comments on commit 66afe3f

Please sign in to comment.