Skip to content

Commit

Permalink
Fix stashing of commands when snapshot is in progress (#27389)
Browse files Browse the repository at this point in the history
* Fix stashing of commands when snapshot is in progress

Fixes #27381

* Naming
  • Loading branch information
chbatey authored and raboof committed Jul 25, 2019
1 parent af1c8cc commit 15ba6af
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 1 deletion.
Expand Up @@ -331,7 +331,7 @@ private[akka] object Running {
setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else {
stashUser(cmd)
stashInternal(cmd)
Behaviors.same
}
}
Expand Down
@@ -0,0 +1,104 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed

import java.util.concurrent.CyclicBarrier

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.persistence
import akka.persistence.SelectedSnapshot
import akka.persistence.snapshot.SnapshotStore
import com.typesafe.config.ConfigFactory
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.typed.StashingWhenSnapshottingSpec.ControllableSnapshotStoreExt
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import org.scalatest.WordSpecLike

import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Success

object StashingWhenSnapshottingSpec {
object ControllableSnapshotStoreExt extends ExtensionId[ControllableSnapshotStoreExt] {

override def createExtension(system: ActorSystem[_]): ControllableSnapshotStoreExt =
new ControllableSnapshotStoreExt()
}

class ControllableSnapshotStoreExt extends Extension {
val completeSnapshotWrite = Promise[Unit]()
val snapshotWriteStarted = new CyclicBarrier(2)
}

class ControllableSnapshotStore extends SnapshotStore {
override def loadAsync(
persistenceId: String,
criteria: persistence.SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = Future.successful(None)

override def saveAsync(metadata: persistence.SnapshotMetadata, snapshot: Any): Future[Unit] = {
ControllableSnapshotStoreExt(context.system.toTyped).snapshotWriteStarted.await()
ControllableSnapshotStoreExt(context.system.toTyped).completeSnapshotWrite.future
}
override def deleteAsync(metadata: persistence.SnapshotMetadata): Future[Unit] = Future.successful(())
override def deleteAsync(persistenceId: String, criteria: persistence.SnapshotSelectionCriteria): Future[Unit] =
Future.successful(())
}
val config = ConfigFactory.parseString(s"""
slow-snapshot {
class = "akka.persistence.typed.StashingWhenSnapshottingSpec$$ControllableSnapshotStore"
}
akka.actor.allow-java-serialization = on
akka {
loglevel = "INFO"

persistence {
journal {
plugin = "akka.persistence.journal.inmem"
auto-start-journals = []
}

snapshot-store {
plugin = "slow-snapshot"
auto-start-journals = []
}
}
}
""")

def persistentTestBehavior(pid: PersistenceId, eventProbe: TestProbe[String]) =
EventSourcedBehavior[String, String, List[String]](
pid,
Nil,
(_, command) => Effect.persist(command),
(state, event) => {
eventProbe.ref.tell(event)
event :: state
}).snapshotWhen((_, event, _) => event.contains("snap"))
}

class StashingWhenSnapshottingSpec
extends ScalaTestWithActorTestKit(StashingWhenSnapshottingSpec.config)
with WordSpecLike {
"A persistent actor" should {
"stash messages and automatically replay when snapshot is in progress" in {
val eventProbe = TestProbe[String]()
val persistentActor = spawn(StashingWhenSnapshottingSpec.persistentTestBehavior(PersistenceId("1"), eventProbe))
persistentActor ! "one"
eventProbe.expectMessage("one")
persistentActor ! "snap"
eventProbe.expectMessage("snap")
ControllableSnapshotStoreExt(system).snapshotWriteStarted.await()
persistentActor ! "two"
eventProbe.expectNoMessage() // snapshot in progress
ControllableSnapshotStoreExt(system).completeSnapshotWrite.complete(Success(()))
eventProbe.expectMessage("two")
}
}
}

0 comments on commit 15ba6af

Please sign in to comment.