Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inhibit readHighest while writes are ongoing #29 #32

Merged
merged 1 commit into from
Feb 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Try, Success, Failure }
import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import scala.concurrent.Promise

class DynamoDBJournalFailure(message: String) extends RuntimeException(message) with NoStackTrace
class DynamoDBJournalRejection(message: String, cause: Throwable = null) extends RuntimeException(message, cause) with NoStackTrace
Expand Down Expand Up @@ -52,6 +53,7 @@ case class ListAllResult(persistenceId: String, lowest: Set[Long], highest: Set[

/**
* Purge all information stored for the given `persistenceId` from the journal.
* Purging the information for a running actor results in undefined behavior.
*
* A confirmation of type [[Purged]] will be sent to the given `replyTo` reference.
*/
Expand Down Expand Up @@ -86,8 +88,28 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
}
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
logFailure("write")(Future.sequence(messages.map(writeMessages)))
private case class OpFinished(pid: String)
private val opQueue: JMap[String, Future[Done]] = new JHMap

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
val p = Promise[Done]
val pid = messages.head.persistenceId
log.debug("writeMesssages for {}", pid)
opQueue.put(pid, p.future)
val f = logFailure("write")(Future.sequence(messages.map(writeMessages)))
f.onComplete { _ =>
log.debug("writeMessages for {} finished", pid)
self ! OpFinished(pid)
p.success(Done)
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
f
}

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
opQueue.get(persistenceId) match {
case null => logFailure("read-highest")(readSequenceNr(persistenceId, highest = true))
case f => f.flatMap(_ => logFailure("read-highest")(readSequenceNr(persistenceId, highest = true)))
}

/**
* Delete messages up to a given sequence number. The range to which this applies
Expand All @@ -103,7 +125,7 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
* TODO in principle replays should be inhibited while the purge is ongoing
*/
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = logFailure("delete") {
log.debug("at=delete-messages-to persistenceId={} to={} perm={}", persistenceId, toSequenceNr)
log.debug("delete-messages-to persistenceId={} to={} perm={}", persistenceId, toSequenceNr)
val lowF = readSequenceNr(persistenceId, highest = false)
val highF = readSequenceNr(persistenceId, highest = true)
for {
Expand Down Expand Up @@ -133,6 +155,7 @@ class DynamoDBJournal(config: Config) extends AsyncWriteJournal with DynamoDBRec
} yield Done

override def receivePluginInternal = {
case OpFinished(persistenceId) => opQueue.remove(persistenceId)
case ListAll(persistenceId, replyTo) => listAll(persistenceId) pipeTo replyTo
case Purge(persistenceId, replyTo) => purge(persistenceId).map(_ => Purged(persistenceId)) pipeTo replyTo
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
.takeWhile(_.nonEmpty)
.runFold(Vector.empty[Long])(_ ++ _)

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
logFailure("read-highest")(readSequenceNr(persistenceId, highest = true))

def readSequenceNr(persistenceId: String, highest: Boolean): Future[Long] = {
log.debug("readSequenceNr(highest={}) persistenceId={}", highest, persistenceId)
val keyGroups = readSequenceNrBatches(persistenceId, highest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ trait DynamoDBRequests {
// Dynamo can partially fail a number of write items in a batch write, usually because
// of throughput constraints. We need to continue to retry the unprocessed item
// until we exhaust our backoff
dynamo.batchWriteItem(write).flatMap(r => sendUnprocessedItems(r)).map {
_ =>
if (log.isDebugEnabled)
log.debug("at=batch-write-finish writes={}", write.getRequestItems.get(JournalTable).size())
()
}
dynamo.batchWriteItem(write).flatMap(r => sendUnprocessedItems(r))
}

// Squash all of the futures into a single result
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess

class RecoveryConsistencySpec extends TestKit(ActorSystem("FailureReportingSpec"))
with ImplicitSender
with WordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with ConversionCheckedTripleEquals
with DynamoDBUtils {

override def beforeAll(): Unit = ensureJournalTableExists()
override def afterAll(): Unit = system.terminate().futureValue

override val persistenceId = "RecoveryConsistencySpec"
val journal = Persistence(system).journalFor("")

"DynamoDB Journal (Recovery)" must {

val N = 50
val M = 20
val writes = (1 to M).map(i => AtomicWrite(persistentRepr(f"a-$i%04d")))
val probe = TestProbe()

for (i <- 1 to N) s"not return intermediate values for the highest sequence number ($i of $N)" in {
journal ! Purge(persistenceId, testActor)
expectMsg(Purged(persistenceId))
journal ! WriteMessages(writes, testActor, 1)
journal ! ReplayMessages(1, 0, Long.MaxValue, persistenceId, probe.ref)
expectMsg(WriteMessagesSuccessful)
(1 to M) foreach (_ => expectMsgType[WriteMessageSuccess])
probe.expectMsg(RecoverySuccess(M))
}

}
}