Skip to content

Commit

Permalink
Merge pull request #32 from akka/wip-inhibit-read-highest
Browse files Browse the repository at this point in the history
inhibit readHighest while writes are ongoing #29
  • Loading branch information
rkuhn committed Feb 6, 2016
2 parents 9790022 + f5c04cf commit 5287549
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Try, Success, Failure }
import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import scala.concurrent.Promise

class DynamoDBJournalFailure(message: String) extends RuntimeException(message) with NoStackTrace
class DynamoDBJournalRejection(message: String, cause: Throwable = null) extends RuntimeException(message, cause) with NoStackTrace
Expand Down Expand Up @@ -52,6 +53,7 @@ case class ListAllResult(persistenceId: String, lowest: Set[Long], highest: Set[

/**
* Purge all information stored for the given `persistenceId` from the journal.
* Purging the information for a running actor results in undefined behavior.
*
* A confirmation of type [[Purged]] will be sent to the given `replyTo` reference.
*/
Expand Down Expand Up @@ -86,8 +88,28 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
}
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
logFailure("write")(Future.sequence(messages.map(writeMessages)))
private case class OpFinished(pid: String)
private val opQueue: JMap[String, Future[Done]] = new JHMap

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
val p = Promise[Done]
val pid = messages.head.persistenceId
log.debug("writeMesssages for {}", pid)
opQueue.put(pid, p.future)
val f = logFailure("write")(Future.sequence(messages.map(writeMessages)))
f.onComplete { _ =>
log.debug("writeMessages for {} finished", pid)
self ! OpFinished(pid)
p.success(Done)
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
f
}

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
opQueue.get(persistenceId) match {
case null => logFailure("read-highest")(readSequenceNr(persistenceId, highest = true))
case f => f.flatMap(_ => logFailure("read-highest")(readSequenceNr(persistenceId, highest = true)))
}

/**
* Delete messages up to a given sequence number. The range to which this applies
Expand All @@ -103,7 +125,7 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
* TODO in principle replays should be inhibited while the purge is ongoing
*/
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = logFailure("delete") {
log.debug("at=delete-messages-to persistenceId={} to={} perm={}", persistenceId, toSequenceNr)
log.debug("delete-messages-to persistenceId={} to={} perm={}", persistenceId, toSequenceNr)
val lowF = readSequenceNr(persistenceId, highest = false)
val highF = readSequenceNr(persistenceId, highest = true)
for {
Expand Down Expand Up @@ -133,6 +155,7 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
} yield Done

override def receivePluginInternal = {
case OpFinished(persistenceId) => opQueue.remove(persistenceId)
case ListAll(persistenceId, replyTo) => listAll(persistenceId) pipeTo replyTo
case Purge(persistenceId, replyTo) => purge(persistenceId).map(_ => Purged(persistenceId)) pipeTo replyTo
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
.takeWhile(_.nonEmpty)
.runFold(Vector.empty[Long])(_ ++ _)

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
logFailure("read-highest")(readSequenceNr(persistenceId, highest = true))

def readSequenceNr(persistenceId: String, highest: Boolean): Future[Long] = {
log.debug("readSequenceNr(highest={}) persistenceId={}", highest, persistenceId)
val keyGroups = readSequenceNrBatches(persistenceId, highest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ trait DynamoDBRequests {
// Dynamo can partially fail a number of write items in a batch write, usually because
// of throughput constraints. We need to continue to retry the unprocessed item
// until we exhaust our backoff
dynamo.batchWriteItem(write).flatMap(r => sendUnprocessedItems(r)).map {
_ =>
if (log.isDebugEnabled)
log.debug("at=batch-write-finish writes={}", write.getRequestItems.get(JournalTable).size())
()
}
dynamo.batchWriteItem(write).flatMap(r => sendUnprocessedItems(r))
}

// Squash all of the futures into a single result
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess

class RecoveryConsistencySpec extends TestKit(ActorSystem("FailureReportingSpec"))
with ImplicitSender
with WordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with ConversionCheckedTripleEquals
with DynamoDBUtils {

override def beforeAll(): Unit = ensureJournalTableExists()
override def afterAll(): Unit = system.terminate().futureValue

override val persistenceId = "RecoveryConsistencySpec"
val journal = Persistence(system).journalFor("")

"DynamoDB Journal (Recovery)" must {

val N = 50
val M = 20
val writes = (1 to M).map(i => AtomicWrite(persistentRepr(f"a-$i%04d")))
val probe = TestProbe()

for (i <- 1 to N) s"not return intermediate values for the highest sequence number ($i of $N)" in {
journal ! Purge(persistenceId, testActor)
expectMsg(Purged(persistenceId))
journal ! WriteMessages(writes, testActor, 1)
journal ! ReplayMessages(1, 0, Long.MaxValue, persistenceId, probe.ref)
expectMsg(WriteMessagesSuccessful)
(1 to M) foreach (_ => expectMsgType[WriteMessageSuccess])
probe.expectMsg(RecoverySuccess(M))
}

}
}

0 comments on commit 5287549

Please sign in to comment.