Skip to content

Commit

Permalink
Refactored processor test suites
Browse files Browse the repository at this point in the history
Refactored how processors recover from the journal and snapshots.

Now all tests are executed under Travis CI.
  • Loading branch information
nloyola committed Apr 18, 2018
1 parent 0dc5ff3 commit 8cdd454
Show file tree
Hide file tree
Showing 21 changed files with 421 additions and 377 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ matrix:
script:
- cp conf/logback-test.xml.template conf/logback-test.xml
- sbt ++$TRAVIS_SCALA_VERSION test:compile
- sbt ++$TRAVIS_SCALA_VERSION clean coverage "testOnly -- -l PersistenceTest"
- sbt ++$TRAVIS_SCALA_VERSION clean coverage "test"
after_success:
- sbt ++$TRAVIS_SCALA_VERSION coverageReport coveralls

Expand Down
20 changes: 12 additions & 8 deletions app/org/biobank/services/access/AccessProcessor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.biobank.services.access

import akka.actor._
import akka.event.{Logging, LoggingAdapter}
import akka.persistence.{RecoveryCompleted, SnapshotOffer, SaveSnapshotSuccess, SaveSnapshotFailure}
//import com.github.ghik.silencer.silent
import java.time.OffsetDateTime
Expand Down Expand Up @@ -39,10 +38,11 @@ class AccessProcessor @Inject() (val accessItemRepository: AccessItemRepository,

type ApplyRoleEvent = (Role, OffsetDateTime) => ServiceValidation[Boolean]

override val log: LoggingAdapter = Logging(context.system, this)

override def persistenceId: String = "access-processor-id"

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.Throw"))
val receiveRecover: Receive = {

Expand Down Expand Up @@ -108,13 +108,17 @@ class AccessProcessor @Inject() (val accessItemRepository: AccessItemRepository,

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.info(s"SaveSnapshotSuccess: $metadata")
log.debug(s"SaveSnapshotSuccess: $metadata")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.info(s"SaveSnapshotFailure: $metadata, reason: $reason")
reason.printStackTrace
log.debug(s"SaveSnapshotFailure: $metadata, reason: $reason")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case cmd => log.error(s"accessProcessor: message not handled: $cmd")
}
Expand All @@ -126,12 +130,12 @@ class AccessProcessor @Inject() (val accessItemRepository: AccessItemRepository,
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
snapshot => {
log.info(s"snapshot contains ${snapshot.accessItems.size} accessItems")
log.debug(s"snapshot contains ${snapshot.accessItems.size} accessItems")
snapshot.accessItems.foreach(accessItemRepository.put)
}
)
Expand Down
17 changes: 12 additions & 5 deletions app/org/biobank/services/access/MembershipProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class MembershipProcessor @Inject() (val membershipRepository: MembershipReposit

override def persistenceId: String = "membership-processor-id"

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.Throw"))
val receiveRecover: Receive = {
case event: MembershipEvent =>
Expand Down Expand Up @@ -107,13 +110,17 @@ class MembershipProcessor @Inject() (val membershipRepository: MembershipReposit

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.info(s"SaveSnapshotSuccess: $metadata")
log.debug(s"SaveSnapshotSuccess: $metadata")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.info(s"SaveSnapshotFailure: $metadata, reason: $reason")
reason.printStackTrace
log.debug(s"SaveSnapshotFailure: $metadata, reason: $reason")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None
}

private def mySaveSnapshot(): Unit = {
Expand All @@ -123,12 +130,12 @@ class MembershipProcessor @Inject() (val membershipRepository: MembershipReposit
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
snapshot => {
log.info(s"snapshot contains ${snapshot.memberships.size} memberships")
log.debug(s"snapshot contains ${snapshot.memberships.size} memberships")
snapshot.memberships.foreach(membershipRepository.put)
}
)
Expand Down
17 changes: 11 additions & 6 deletions app/org/biobank/services/centres/CentresProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.persistence.{RecoveryCompleted, SaveSnapshotSuccess, SaveSnapshotFai
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import javax.inject.{Inject}
import org.biobank.TestData
import org.biobank.domain.{LocationId, Slug}
import org.biobank.domain.centres._
import org.biobank.domain.studies.{StudyId, StudyRepository}
Expand All @@ -29,15 +28,17 @@ object CentresProcessor {

class CentresProcessor @Inject() (val centreRepository: CentreRepository,
val studyRepository: StudyRepository,
val snapshotWriter: SnapshotWriter,
val testData: TestData)
val snapshotWriter: SnapshotWriter)
extends Processor {
import CentresProcessor._
import org.biobank.CommonValidations._
import CentreEvent.EventType

override def persistenceId: String = "centre-processor-id"

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

val ErrMsgNameExists: String = "centre with name already exists"

@SuppressWarnings(Array("org.wartremover.warts.Any"))
Expand Down Expand Up @@ -100,13 +101,17 @@ class CentresProcessor @Inject() (val centreRepository: CentreRepository,

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.debug(s"snapshot saved successfully: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.error(s"snapshot save error: ${metadata}")
reason.printStackTrace
log.debug(s"snapshot save error: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case "persistence_restart" =>
throw new Exception("Intentionally throwing exception to test persistence by restarting the actor")
Expand All @@ -122,7 +127,7 @@ class CentresProcessor @Inject() (val centreRepository: CentreRepository,
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
Expand Down
25 changes: 14 additions & 11 deletions app/org/biobank/services/centres/ShipmentsProcessor.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package org.biobank.services.centres

import akka.actor._
import akka.event.{Logging, LoggingAdapter}
import akka.persistence.{RecoveryCompleted, SnapshotOffer, SaveSnapshotSuccess, SaveSnapshotFailure}
import com.github.ghik.silencer.silent
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import javax.inject.Inject
import org.biobank.TestData
import org.biobank.domain.LocationId
import org.biobank.domain.centres._
import org.biobank.domain.participants.{SpecimenId, SpecimenRepository}
Expand Down Expand Up @@ -40,18 +38,19 @@ class ShipmentsProcessor @Inject() (val shipmentRepository: ShipmentRepo
val shipmentSpecimenRepository: ShipmentSpecimenRepository,
val centreRepository: CentreRepository,
val specimenRepository: SpecimenRepository,
val testData: TestData,
val snapshotWriter: SnapshotWriter)
extends Processor
with ShipmentValidations
with ShipmentConstraints {
import ShipmentsProcessor._
import org.biobank.CommonValidations._

override val log: LoggingAdapter = Logging(context.system, this)

override def persistenceId: String = "shipments-processor-id"


@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

@SuppressWarnings(Array("org.wartremover.warts.Any"))
val receiveRecover: Receive = {
case event: ShipmentEvent =>
Expand Down Expand Up @@ -170,13 +169,17 @@ class ShipmentsProcessor @Inject() (val shipmentRepository: ShipmentRepo

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.info(s"SaveSnapshotSuccess: $metadata")
log.debug(s"SaveSnapshotSuccess: $metadata")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.info(s"SaveSnapshotFailure: $metadata, reason: $reason")
reason.printStackTrace
log.debug(s"SaveSnapshotFailure: $metadata, reason: $reason")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case cmd => log.error(s"shipmentsProcessor: message not handled: $cmd")
}
Expand All @@ -189,13 +192,13 @@ class ShipmentsProcessor @Inject() (val shipmentRepository: ShipmentRepo
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
snapshot => {
log.info(s"snapshot contains ${snapshot.shipments.size} shipments")
log.info(s"snapshot contains ${snapshot.shipmentSpecimens.size} shipment specimens")
log.debug(s"snapshot contains ${snapshot.shipments.size} shipments")
log.debug(s"snapshot contains ${snapshot.shipmentSpecimens.size} shipment specimens")
snapshot.shipments.foreach(shipmentRepository.put)
snapshot.shipmentSpecimens.foreach(shipmentSpecimenRepository.put)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class CollectionEventsProcessor @Inject() (
import CollectionEventEvent.EventType
import org.biobank.infrastructure.events.EventUtils._

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

override def persistenceId: String = "collection-events-processor-id"

/**
Expand Down Expand Up @@ -99,13 +102,17 @@ class CollectionEventsProcessor @Inject() (

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.debug(s"snapshot saved successfully: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.error(s"snapshot save error: ${metadata}")
reason.printStackTrace
log.debug(s"snapshot save error: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case "persistence_restart" =>
throw new Exception("Intentionally throwing exception to test persistence by restarting the actor")
Expand All @@ -123,7 +130,7 @@ class CollectionEventsProcessor @Inject() (
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
Expand Down
14 changes: 11 additions & 3 deletions app/org/biobank/services/participants/ParticipantsProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class ParticipantsProcessor @Inject() (val participantRepository: ParticipantRep

override def persistenceId: String = "participant-processor-id"

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None


/**
* These are the events that are recovered during journal recovery. They cannot fail and must be
* processed to recreate the current state of the aggregate.
Expand Down Expand Up @@ -88,13 +92,17 @@ class ParticipantsProcessor @Inject() (val participantRepository: ParticipantRep

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.debug(s"snapshot saved successfully: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.error(s"snapshot save error: ${metadata}")
reason.printStackTrace
log.debug(s"snapshot save error: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case "persistence_restart" =>
throw new Exception("Intentionally throwing exception to test persistence by restarting the actor")
Expand All @@ -110,7 +118,7 @@ class ParticipantsProcessor @Inject() (val participantRepository: ParticipantRep
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
Expand Down
15 changes: 11 additions & 4 deletions app/org/biobank/services/participants/SpecimensProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class SpecimensProcessor @Inject() (

override def persistenceId: String = "specimens-processor-id"

@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var replyTo: Option[ActorRef] = None

/**
* These are the events that are recovered during journal recovery. They cannot fail and must be
* processed to recreate the current state of the aggregate.
Expand Down Expand Up @@ -112,13 +115,17 @@ class SpecimensProcessor @Inject() (

case "snap" =>
mySaveSnapshot
replyTo = Some(sender())

case SaveSnapshotSuccess(metadata) =>
log.debug(s"snapshot saved successfully: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Success(s"snapshot saved: $metadata"))
replyTo = None

case SaveSnapshotFailure(metadata, reason) =>
log.error(s"snapshot save error: ${metadata}")
reason.printStackTrace
log.debug(s"snapshot save error: ${metadata}")
replyTo.foreach(_ ! akka.actor.Status.Failure(reason))
replyTo = None

case "persistence_restart" =>
throw new Exception(
Expand All @@ -131,12 +138,12 @@ class SpecimensProcessor @Inject() (
private def mySaveSnapshot(): Unit = {
val snapshotState = SnapshotState(specimenRepository.getValues.toSet)
val filename = snapshotWriter.save(persistenceId, Json.toJson(snapshotState).toString)
log.info(s"saved snapshot to: $filename")
log.debug(s"saved snapshot to: $filename")
saveSnapshot(filename)
}

private def applySnapshot(filename: String): Unit = {
log.info(s"snapshot recovery file: $filename")
log.debug(s"snapshot recovery file: $filename")
val fileContents = snapshotWriter.load(filename);
Json.parse(fileContents).validate[SnapshotState].fold(
errors => log.error(s"could not apply snapshot: $filename: $errors"),
Expand Down
Loading

0 comments on commit 8cdd454

Please sign in to comment.