Skip to content

Commit

Permalink
a few more persistence tests from PersistentActorSpec, #24687
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Mar 15, 2019
1 parent ef3a19b commit 0c65635
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 29 deletions.
Expand Up @@ -8,4 +8,10 @@ package akka.persistence.typed
* Unique identifier in the backend data store (journal and snapshot store) of the
* persistent actor.
*/
final case class PersistenceId(id: String)
final case class PersistenceId(id: String) {
if (id eq null)
throw new IllegalArgumentException("persistenceId must not be null")

if (id.trim.isEmpty)
throw new IllegalArgumentException("persistenceId must not be empty")
}
Expand Up @@ -7,11 +7,8 @@ package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.Done

import akka.actor.typed
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
Expand Down Expand Up @@ -68,6 +65,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](

import EventSourcedBehaviorImpl.WriterIdentity

if (persistenceId eq null)
throw new IllegalArgumentException("persistenceId must not be null")

override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
val ctx = context.asScala
ctx.setLoggerClass(loggerClass)
Expand Down
Expand Up @@ -10,11 +10,13 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.Done
import akka.testkit.EventFilter
import akka.actor.testkit.typed.{ TestException, TestKitSettings }
import akka.actor.ActorInitializationException
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -40,12 +42,11 @@ import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.EventFilter
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike

import scala.util.Failure

object EventSourcedBehaviorSpec {

//#event-wrapper
Expand Down Expand Up @@ -275,9 +276,6 @@ object EventSourcedBehaviorSpec {
class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike {

import EventSourcedBehaviorSpec._

private implicit val testSettings: TestKitSettings = TestKitSettings(system)

import akka.actor.typed.scaladsl.adapter._

implicit val materializer = ActorMaterializer()(system.toUntyped)
Expand Down Expand Up @@ -704,9 +702,35 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh

}

"fail fast if persistenceId is null" in {
intercept[IllegalArgumentException] {
PersistenceId(null)
}
val probe = TestProbe[AnyRef]
EventFilter[ActorInitializationException](start = "persistenceId must not be null", occurrences = 1).intercept {
val ref = spawn(Behaviors.setup[Command](counter(_, persistenceId = PersistenceId(null))))
probe.expectTerminated(ref)
}
EventFilter[ActorInitializationException](start = "persistenceId must not be null", occurrences = 1).intercept {
val ref = spawn(Behaviors.setup[Command](counter(_, persistenceId = null)))
probe.expectTerminated(ref)
}
}

"fail fast if persistenceId is empty" in {
intercept[IllegalArgumentException] {
PersistenceId("")
}
val probe = TestProbe[AnyRef]
EventFilter[ActorInitializationException](start = "persistenceId must not be empty", occurrences = 1).intercept {
val ref = spawn(Behaviors.setup[Command](counter(_, persistenceId = PersistenceId(""))))
probe.expectTerminated(ref)
}
}

def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { (ctx) =>
val w = Behaviors.setup[Any] { ctx =>
ctx.watch(toWatch)
Behaviors
.receive[Any] { (_, _) =>
Expand Down
Expand Up @@ -4,12 +4,13 @@

package akka.persistence.typed.scaladsl

import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike

Expand All @@ -22,7 +23,8 @@ object PrimitiveStateSpec {

class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.conf) with WordSpecLike {

implicit val testSettings = TestKitSettings(system)
import akka.actor.typed.scaladsl.adapter._
system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))

def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] =
EventSourcedBehavior[Int, Int, Int](
Expand All @@ -43,7 +45,7 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co
}

"A typed persistent actor with primitive state" must {
"persist events and update state" in {
"persist primitive events and update state" in {
val probe = TestProbe[String]()
val b = primitiveState(PersistenceId("a"), probe.ref)
val ref1 = spawn(b)
Expand All @@ -56,7 +58,7 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co
ref1 ! -1
probe.expectTerminated(ref1)

val ref2 = testKit.spawn(b)
val ref2 = spawn(b)
// eventHandler from replay
probe.expectMessage("eventHandler:0:1")
probe.expectMessage("eventHandler:1:2")
Expand Down
Expand Up @@ -518,16 +518,16 @@ object PersistentActorSpec {
extends AsyncPersistHandlerCorrelationCheck(name)
with InmemRuntimePluginConfig

class AnyValEventPersistentActor(name: String) extends ExamplePersistentActor(name) {
class PrimitiveEventPersistentActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd("a") => persist(5)(evt => sender() ! evt)
}
}
class AnyValEventPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config)
extends AnyValEventPersistentActor(name)
class PrimitiveEventPersistentActorWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config)
extends PrimitiveEventPersistentActor(name)
with LevelDbRuntimePluginConfig
class AnyValEventPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config)
extends AnyValEventPersistentActor(name)
class PrimitiveEventPersistentActorWithInmemRuntimePluginConfig(name: String, val providedConfig: Config)
extends PrimitiveEventPersistentActor(name)
with InmemRuntimePluginConfig

class HandleRecoveryFinishedEventPersistentActor(name: String, probe: ActorRef)
Expand Down Expand Up @@ -1173,7 +1173,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi

protected def replyInEventHandlerPersistentActor: ActorRef = namedPersistentActor[ReplyInEventHandlerPersistentActor]

protected def anyValEventPersistentActor: ActorRef = namedPersistentActor[AnyValEventPersistentActor]
protected def primitiveEventPersistentActor: ActorRef = namedPersistentActor[PrimitiveEventPersistentActor]

protected def asyncPersistPersistentActor: ActorRef = namedPersistentActor[AsyncPersistPersistentActor]

Expand Down Expand Up @@ -1394,8 +1394,8 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
persistentActor ! Cmd("a")
expectMsg("a")
}
"be able to persist events that extend AnyVal" in {
val persistentActor = anyValEventPersistentActor
"be able to persist primitive events" in {
val persistentActor = primitiveEventPersistentActor
persistentActor ! Cmd("a")
expectMsg(5)
}
Expand Down Expand Up @@ -1896,8 +1896,8 @@ class LeveldbPersistentActorWithRuntimePluginConfigSpec
namedPersistentActorWithProvidedConfig[ReplyInEventHandlerPersistentActorWithLevelDbRuntimePluginConfig](
providedActorConfig)

override protected def anyValEventPersistentActor: ActorRef =
namedPersistentActorWithProvidedConfig[AnyValEventPersistentActorWithLevelDbRuntimePluginConfig](
override protected def primitiveEventPersistentActor: ActorRef =
namedPersistentActorWithProvidedConfig[PrimitiveEventPersistentActorWithLevelDbRuntimePluginConfig](
providedActorConfig)

override protected def asyncPersistPersistentActor: ActorRef =
Expand Down Expand Up @@ -2096,8 +2096,9 @@ class InmemPersistentActorWithRuntimePluginConfigSpec
namedPersistentActorWithProvidedConfig[ReplyInEventHandlerPersistentActorWithInmemRuntimePluginConfig](
providedActorConfig)

override protected def anyValEventPersistentActor: ActorRef =
namedPersistentActorWithProvidedConfig[AnyValEventPersistentActorWithInmemRuntimePluginConfig](providedActorConfig)
override protected def primitiveEventPersistentActor: ActorRef =
namedPersistentActorWithProvidedConfig[PrimitiveEventPersistentActorWithInmemRuntimePluginConfig](
providedActorConfig)

override protected def asyncPersistPersistentActor: ActorRef =
namedPersistentActorWithProvidedConfig[AsyncPersistPersistentActorWithInmemRuntimePluginConfig](providedActorConfig)
Expand Down

0 comments on commit 0c65635

Please sign in to comment.