Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky EventSourcedBehaviorRetentionSpec, #31623 #31703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -116,7 +116,7 @@ class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
Future.fromTry(Try(storage.tryDelete(metadata)))

override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
Future.successful(Try(storage.tryDelete(persistenceId, criteria)))
Future.fromTry(Try(storage.tryDelete(persistenceId, criteria)))

}

Expand Down
Expand Up @@ -51,6 +51,7 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
persistenceId: PersistenceId,
probe: Option[ActorRef[(State, Event)]] = None,
snapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None,
deleteSnapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None,
eventSignalProbe: Option[ActorRef[Try[EventSourcedSignal]]] = None)
: EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
Expand Down Expand Up @@ -84,9 +85,9 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
case (_, sf: SnapshotFailed) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(sf))
case (_, dc: DeleteSnapshotsCompleted) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(dc))
deleteSnapshotSignalProbe.foreach(_ ! WrappedSignal(dc))
case (_, dsf: DeleteSnapshotsFailed) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(dsf))
deleteSnapshotSignalProbe.foreach(_ ! WrappedSignal(dsf))
case (_, e: EventSourcedSignal) =>
eventSignalProbe.foreach(_ ! Success(e))
}
Expand Down Expand Up @@ -258,43 +259,55 @@ class EventSourcedBehaviorRetentionSpec
"delete snapshots automatically, based on criteria" in {
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val replyProbe = TestProbe[State]()

val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2))))
Behaviors.setup[Command](
ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2))))

(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectSnapshotCompleted(9)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)

(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotSignalProbe.expectSnapshotCompleted(12)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
snapshotSignalProbe.expectSnapshotCompleted(18)
snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)

snapshotSignalProbe.expectNoMessage()
}

"optionally delete both old events and snapshots" in {
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()

val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.withRetention(
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref),
eventSignalProbe = Some(eventProbe.ref)).withRetention(
// tests the Java API as well
RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot)))

Expand All @@ -310,31 +323,31 @@ class EventSourcedBehaviorRetentionSpec
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)

// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)

persistentActor ! Increment // 13
persistentActor ! Increment // 14
persistentActor ! Increment // 11
persistentActor ! Increment // 15
snapshotSignalProbe.expectSnapshotCompleted(15)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)

persistentActor ! Increment // 16
persistentActor ! Increment // 17
persistentActor ! Increment // 18
snapshotSignalProbe.expectSnapshotCompleted(18)

eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)

eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
Expand All @@ -343,13 +356,20 @@ class EventSourcedBehaviorRetentionSpec
"be possible to combine snapshotWhen and retention criteria" in {
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()

val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref),
eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))

(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
Expand All @@ -360,7 +380,7 @@ class EventSourcedBehaviorRetentionSpec
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectSnapshotCompleted(10)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)

(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(13)
Expand All @@ -374,7 +394,7 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
Expand All @@ -384,14 +404,21 @@ class EventSourcedBehaviorRetentionSpec
"be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in {
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()

val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref),
eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))

(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
Expand All @@ -417,18 +444,18 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria
// triggers delete up to snapshot no 2
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot

persistentActor ! Increment // 9
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4

persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6

persistentActor ! Increment // 13
Expand All @@ -442,13 +469,13 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 14
snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8
snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)

persistentActor ! Increment // 15
persistentActor ! Increment // 16
snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)

eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
Expand All @@ -460,12 +487,18 @@ class EventSourcedBehaviorRetentionSpec
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val replyProbe = TestProbe[State]()

val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3))))
Behaviors.setup[Command](
ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3))))

(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
Expand All @@ -474,36 +507,42 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)

snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)

snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)

snapshotSignalProbe.expectSnapshotCompleted(7)
snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)

snapshotSignalProbe.expectSnapshotCompleted(8)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)

snapshotSignalProbe.expectSnapshotCompleted(9)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)

snapshotSignalProbe.expectSnapshotCompleted(10)
snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
}

"be possible to snapshot every event withDeleteEventsOnSnapshot" in {
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()

val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.withRetention(
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counter(
ctx,
pid,
snapshotSignalProbe = Some(snapshotSignalProbe.ref),
deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref),
eventSignalProbe = Some(eventProbe.ref)).withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))

// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
Expand All @@ -518,32 +557,32 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 5
snapshotSignalProbe.expectSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)

persistentActor ! Increment // 6
snapshotSignalProbe.expectSnapshotCompleted(6)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)

persistentActor ! Increment // 7
snapshotSignalProbe.expectSnapshotCompleted(7)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)

persistentActor ! Increment // 8
snapshotSignalProbe.expectSnapshotCompleted(8)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5
snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)

persistentActor ! Increment // 9
snapshotSignalProbe.expectSnapshotCompleted(9)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)

persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
}

"snapshot on recovery if expected snapshot is missing" in {
Expand Down