diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 7da4adfb31b..4f7955e2725 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -6,6 +6,8 @@ import static akka.pattern.Patterns.ask; +import com.typesafe.config.Config; + import akka.actor.*; import akka.dispatch.Mapper; import akka.event.EventStreamSpec; @@ -20,6 +22,7 @@ import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; + import docs.persistence.query.MyEventsByTagPublisher; import docs.persistence.query.PersistenceQueryDocSpec; import org.reactivestreams.Subscriber; @@ -31,7 +34,6 @@ import scala.concurrent.duration.FiniteDuration; import scala.runtime.Boxed; import scala.runtime.BoxedUnit; - import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -47,7 +49,7 @@ public class PersistenceQueryDocTest { class MyReadJournal implements ReadJournal { private final ExtendedActorSystem system; - public MyReadJournal(ExtendedActorSystem system) { + public MyReadJournal(ExtendedActorSystem system, Config config) { this.system = system; } @@ -97,7 +99,7 @@ void demonstrateBasicUsage() { .getReadJournalFor("akka.persistence.query.noop-read-journal"); // issue query to journal - Source source = + Source source = readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); // materialize stream, consuming events @@ -239,6 +241,7 @@ void demonstrateWritingIntoDifferentStore() { // Using an example (Reactive Streams) Database driver readJournal .query(EventsByPersistenceId.create("user-1337")) + .map(envelope -> envelope.event()) .grouped(20) // batch inserts into groups of 20 .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database //#projection-into-different-store-rs diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index 4d3963a04f7..cf41f39279d 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -15,18 +15,18 @@ import akka.testkit.AkkaSpec import akka.util.Timeout import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal } import org.reactivestreams.Subscriber - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ +import com.typesafe.config.Config object PersistenceQueryDocSpec { implicit val timeout = Timeout(3.seconds) //#my-read-journal - class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { + class MyReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal { private val defaulRefreshInterval = 3.seconds @@ -79,6 +79,7 @@ object PersistenceQueryDocSpec { // Using an example (Reactive Streams) Database driver readJournal .query(EventsByPersistenceId("user-1337")) + .map(envelope => envelope.event) .map(convertToReadSideTypes) // convert to datatype .grouped(20) // batch inserts into groups of 20 .runWith(Sink(dbBatchWriter)) // write batches to read-side database @@ -98,7 +99,7 @@ object PersistenceQueryDocSpec { } def updateState(state: ComplexState, msg: Any): ComplexState = { - // some complicated aggregation logic here ... + // some complicated aggregation logic here ... state } } @@ -124,7 +125,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal") // issue query to journal - val source: Source[Any, Unit] = + val source: Source[EventEnvelope, Unit] = readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue)) // materialize stream, consuming events diff --git a/akka-persistence-query/build.sbt b/akka-persistence-query/build.sbt index 1b57eef61d2..b961d4be5f6 100644 --- a/akka-persistence-query/build.sbt +++ b/akka-persistence-query/build.sbt @@ -14,3 +14,5 @@ Dependencies.persistenceQuery //MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value enablePlugins(ScaladocNoVerificationOfDiagrams) + +fork in Test := true diff --git a/akka-persistence-query/src/main/resources/reference.conf b/akka-persistence-query/src/main/resources/reference.conf new file mode 100644 index 00000000000..5a1d70a6790 --- /dev/null +++ b/akka-persistence-query/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +####################################################### +# Akka Persistence Query Reference Configuration File # +####################################################### + +# This is the reference config file that contains all the default settings. +# Make your edits in your application.conf in order to override these settings. + +akka.persistence.query { + journal { + leveldb { + class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal" + + # Absolute path to the write journal plugin configuration entry that this query journal + # will connect to. That must be a LeveldbJournal or SharedLeveldbJournal. + # If undefined (or "") it will connect to the default journal as specified by the + # akka.persistence.journal.plugin property. + write-plugin = "" + + # Look for more data with this interval. The query journal is also notified by + # the write journal when something is changed and thereby updated quickly, but + # when there are a lot of changes it falls back to periodic queries to avoid + # overloading the system with many small queries. + refresh-interval = 3s + + # How many events to fetch in one query and keep buffered until they + # are delivered downstreams. + max-buffer-size = 100 + } + } +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 9eb150e62c2..7fafe87fd55 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -4,12 +4,11 @@ package akka.persistence.query import java.util.concurrent.atomic.AtomicReference - import akka.actor._ import akka.event.Logging - import scala.annotation.tailrec import scala.util.Failure +import com.typesafe.config.Config /** * Persistence extension for queries. @@ -75,13 +74,15 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { // TODO remove duplication val scalaPlugin = if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass)) - system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)) .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) .recoverWith { case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) } else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass)) - system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)) .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil)) .map(jj ⇒ new scaladsl.ReadJournalAdapter(jj)) .recoverWith { diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala index f63b263e6ea..30206e673db 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -37,7 +37,7 @@ abstract class AllPersistenceIds extends Query[String, Unit] * A plugin may optionally support this [[Query]]. */ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) - extends Query[Any, Unit] + extends Query[EventEnvelope, Unit] object EventsByPersistenceId { /** Java API */ def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/DeliveryBuffer.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/DeliveryBuffer.scala new file mode 100644 index 00000000000..47f0c798976 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/DeliveryBuffer.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import akka.stream.actor.ActorPublisher + +/** + * INTERNAL API + */ +private[akka] trait DeliveryBuffer[T] { _: ActorPublisher[T] ⇒ + + var buf = Vector.empty[T] + + def deliverBuf(): Unit = + if (buf.nonEmpty && totalDemand > 0) { + if (buf.size == 1) { + // optimize for this common case + onNext(buf.head) + buf = Vector.empty + } else if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + buf foreach onNext + buf = Vector.empty + } + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala new file mode 100644 index 00000000000..94a186c10b6 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.persistence.JournalProtocol._ +import akka.persistence.Persistence +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.Cancel +import akka.stream.actor.ActorPublisherMessage.Request +import akka.persistence.journal.leveldb.LeveldbJournal +import akka.persistence.query.EventEnvelope + +/** + * INTERNAL API + */ +private[akka] object EventsByPersistenceIdPublisher { + def props(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, refreshInterval: Option[FiniteDuration], + maxBufSize: Int, writeJournalPluginId: String): Props = + Props(new EventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, refreshInterval, + maxBufSize, writeJournalPluginId)) + + private case object Continue +} + +class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, + refreshInterval: Option[FiniteDuration], + maxBufSize: Int, writeJournalPluginId: String) + extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging { + import EventsByPersistenceIdPublisher._ + + val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId) + + var currSeqNo = fromSequenceNr + + val tickTask = refreshInterval.map { interval ⇒ + import context.dispatcher + context.system.scheduler.schedule(interval, interval, self, Continue) + } + + def nonLiveQuery: Boolean = refreshInterval.isEmpty + + override def postStop(): Unit = { + tickTask.foreach(_.cancel()) + } + + def receive = init + + def init: Receive = { + case _: Request ⇒ + journal ! LeveldbJournal.SubscribePersistenceId(persistenceId) + replay() + case Continue ⇒ // skip, wait for first Request + case Cancel ⇒ context.stop(self) + } + + def idle: Receive = { + case Continue | _: LeveldbJournal.ChangedPersistenceId ⇒ + if (timeForReplay) + replay() + + case _: Request ⇒ + deliverBuf() + if (nonLiveQuery) { + if (buf.isEmpty) + onCompleteThenStop() + else + self ! Continue + } + + case Cancel ⇒ + context.stop(self) + + } + + def timeForReplay: Boolean = + buf.isEmpty || buf.size <= maxBufSize / 2 + + def replay(): Unit = { + val limit = maxBufSize - buf.size + log.debug("request replay for persistenceId [{}] from [{}] to [{}] limit [{}]", persistenceId, currSeqNo, toSequenceNr, limit) + journal ! ReplayMessages(currSeqNo, toSequenceNr, limit, persistenceId, self) + context.become(replaying(limit)) + } + + def replaying(limit: Int): Receive = { + var replayCount = 0 + + { + case ReplayedMessage(p) ⇒ + buf :+= EventEnvelope( + offset = p.sequenceNr, + persistenceId = persistenceId, + sequenceNr = p.sequenceNr, + event = p.payload) + currSeqNo = p.sequenceNr + 1 + replayCount += 1 + deliverBuf() + + case _: RecoverySuccess ⇒ + log.debug("replay completed for persistenceId [{}], currSeqNo [{}], replayCount [{}]", persistenceId, currSeqNo, replayCount) + deliverBuf() + if (buf.isEmpty && currSeqNo > toSequenceNr) + onCompleteThenStop() + else if (nonLiveQuery) { + if (buf.isEmpty && replayCount < limit) + onCompleteThenStop() + else + self ! Continue // more to fetch + } + context.become(idle) + + case ReplayMessagesFailure(cause) ⇒ + log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage) + deliverBuf() + onErrorThenStop(cause) + + case _: Request ⇒ + deliverBuf() + + case Continue | _: LeveldbJournal.ChangedPersistenceId ⇒ // skip during replay + + case Cancel ⇒ + context.stop(self) + } + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala new file mode 100644 index 00000000000..aee9535418f --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournal.scala @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ExtendedActorSystem +import akka.persistence.query.EventsByPersistenceId +import akka.persistence.query.Hint +import akka.persistence.query.Query +import akka.persistence.query.scaladsl +import akka.serialization.SerializationExtension +import akka.stream.scaladsl.Source +import scala.concurrent.duration.FiniteDuration +import akka.persistence.query.NoRefresh +import akka.persistence.query.RefreshInterval +import com.typesafe.config.Config +import akka.persistence.query.EventEnvelope + +object LeveldbReadJournal { + final val Identifier = "akka.persistence.query.journal.leveldb" +} + +class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal { + + private val serialization = SerializationExtension(system) + private val defaulRefreshInterval: Option[FiniteDuration] = + Some(config.getDuration("refresh-interval", MILLISECONDS).millis) + private val writeJournalPluginId: String = config.getString("write-plugin") + private val maxBufSize: Int = config.getInt("max-buffer-size") + + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { + case EventsByPersistenceId(pid, from, to) ⇒ eventsByPersistenceId(pid, from, to, hints) + case unknown ⇒ unsupportedQueryType(unknown) + } + + def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = { + Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr, + refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ ()) + } + + private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] = + if (hints.contains(NoRefresh)) + None + else + hints.collectFirst { case RefreshInterval(interval) ⇒ interval }.orElse(defaulRefreshInterval) + + private def unsupportedQueryType[M, T](unknown: Query[T, M]): Nothing = + throw new IllegalArgumentException(s"${getClass.getSimpleName} does not implement the ${unknown.getClass.getName} query type!") +} + diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/Cleanup.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/Cleanup.scala new file mode 100644 index 00000000000..c19ec4702ae --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/Cleanup.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import akka.testkit.AkkaSpec +import java.io.File +import org.apache.commons.io.FileUtils + +trait Cleanup { this: AkkaSpec ⇒ + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + storageLocations.foreach(FileUtils.deleteDirectory) + } + + override protected def afterTermination() { + storageLocations.foreach(FileUtils.deleteDirectory) + } +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala new file mode 100644 index 00000000000..5a57844b60e --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import scala.concurrent.duration._ +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.persistence.query.EventsByPersistenceId +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.RefreshInterval +import akka.stream.ActorMaterializer +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.persistence.query.NoRefresh +import akka.testkit.AkkaSpec + +object EventsByPersistenceIdSpec { + val config = """ + akka.loglevel = INFO + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec" + """ +} + +class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.config) + with Cleanup with ImplicitSender { + import EventsByPersistenceIdSpec._ + + implicit val mat = ActorMaterializer()(system) + + val refreshInterval = RefreshInterval(1.second) + + val queries = PersistenceQuery(system).readJournalFor(LeveldbReadJournal.Identifier) + + def setup(persistenceId: String): ActorRef = { + val ref = system.actorOf(TestActor.props(persistenceId)) + ref ! s"$persistenceId-1" + ref ! s"$persistenceId-2" + ref ! s"$persistenceId-3" + expectMsg(s"$persistenceId-1-done") + expectMsg(s"$persistenceId-2-done") + expectMsg(s"$persistenceId-3-done") + ref + } + + "Leveldb query EventsByPersistenceId" must { + "find existing events" in { + val ref = setup("a") + + val src = queries.query(EventsByPersistenceId("a", 0L, Long.MaxValue), NoRefresh) + src.map(_.event).runWith(TestSink.probe[Any]) + .request(2) + .expectNext("a-1", "a-2") + .expectNoMsg(500.millis) + .request(2) + .expectNext("a-3") + .expectComplete() + } + + "find existing events up to a sequence number" in { + val ref = setup("b") + val src = queries.query(EventsByPersistenceId("b", 0L, 2L), NoRefresh) + src.map(_.event).runWith(TestSink.probe[Any]) + .request(5) + .expectNext("b-1", "b-2") + .expectComplete() + } + } + + "Leveldb live query EventsByPersistenceId" must { + "find new events" in { + val ref = setup("c") + val src = queries.query(EventsByPersistenceId("c", 0L, Long.MaxValue), refreshInterval) + val probe = src.map(_.event).runWith(TestSink.probe[Any]) + .request(5) + .expectNext("c-1", "c-2", "c-3") + + ref ! "c-4" + expectMsg("c-4-done") + + probe.expectNext("c-4") + } + + "find new events up to a sequence number" in { + val ref = setup("d") + val src = queries.query(EventsByPersistenceId("d", 0L, 4L), refreshInterval) + val probe = src.map(_.event).runWith(TestSink.probe[Any]) + .request(5) + .expectNext("d-1", "d-2", "d-3") + + ref ! "d-4" + expectMsg("d-4-done") + + probe.expectNext("d-4").expectComplete() + } + } + +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala new file mode 100644 index 00000000000..f27e2f553cb --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.query.journal.leveldb + +import akka.persistence.PersistentActor +import akka.actor.Props + +object TestActor { + def props(persistenceId: String): Props = + Props(new TestActor(persistenceId)) +} + +class TestActor(override val persistenceId: String) extends PersistentActor { + + val receiveRecover: Receive = { + case evt: String ⇒ + } + + val receiveCommand: Receive = { + case cmd: String ⇒ + persist(cmd) { evt ⇒ + sender() ! evt + "-done" + } + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 9134c33210b..c0e82322789 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -28,7 +28,7 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash private var isInitialized = false private var isInitTimedOut = false - private var store: Option[ActorRef] = None + protected var store: Option[ActorRef] = None private val storeNotInitialized = Future.failed(new TimeoutException("Store not initialized. " + "Use `SharedLeveldbJournal.setStore(sharedStore, system)`")) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 95b98fb008f..ff509bc364e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -17,7 +17,30 @@ import akka.util.Helpers.ConfigOps * * Journal backed by a local LevelDB store. For production use. */ -private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore +private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore { + import LeveldbJournal._ + + override def receivePluginInternal: Receive = { + case SubscribePersistenceId(persistenceId: String) ⇒ + addPersistenceIdSubscriber(sender(), persistenceId) + context.watch(sender()) + case Terminated(ref) ⇒ + removeSubscriber(ref) + } +} + +/** + * INTERNAL API. + */ +private[persistence] object LeveldbJournal { + /** + * Subscribe the `sender` to changes (append events) for a specific `persistenceId`. + * Used by query-side. The journal will send [[ChangedPersistenceId]] messages to + * the subscriber when `asyncWriteMessages` has been called. + */ + case class SubscribePersistenceId(persistenceId: String) + case class ChangedPersistenceId(persistenceId: String) extends DeadLetterSuppression +} /** * INTERNAL API. @@ -27,6 +50,18 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy { val timeout: Timeout = context.system.settings.config.getMillisDuration( "akka.persistence.journal.leveldb-shared.timeout") + + override def receivePluginInternal: Receive = { + case m: LeveldbJournal.SubscribePersistenceId ⇒ + // forward subscriptions, they are used by query-side + store match { + case Some(s) ⇒ s.forward(m) + case None ⇒ + log.error("Failed SubscribePersistenceId({}) request. " + + "Store not initialized. Use `SharedLeveldbJournal.setStore(sharedStore, system)`", m.persistenceId) + } + + } } object SharedLeveldbJournal { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index c582ef64676..c68f46bb605 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -6,6 +6,7 @@ package akka.persistence.journal.leveldb import java.io.File +import scala.collection.mutable import akka.actor._ import akka.persistence._ import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget } @@ -32,6 +33,8 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with val leveldbDir = new File(config.getString("dir")) var leveldb: DB = _ + private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] + def leveldbFactory = if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory else org.iq80.leveldb.impl.Iq80DBFactory.factory @@ -40,12 +43,19 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with import Key._ - def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = - Future.fromTry(Try { + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { + var persistenceIds = Set.empty[String] + val result = Future.fromTry(Try { withBatch(batch ⇒ messages.map { a ⇒ - Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch))) + Try { + a.payload.foreach(message ⇒ addToMessageBatch(message, batch)) + persistenceIds += a.persistenceId + } }) }) + persistenceIds.foreach(notifyPersistenceIdChange) + result + } def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = try Future.successful { @@ -109,5 +119,22 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with leveldb.close() super.postStop() } + + protected def hasPersistenceIdSubscribers: Boolean = persistenceIdSubscribers.nonEmpty + + protected def addPersistenceIdSubscriber(subscriber: ActorRef, persistenceId: String): Unit = + persistenceIdSubscribers.addBinding(persistenceId, subscriber) + + protected def removeSubscriber(subscriber: ActorRef): Unit = { + val keys = persistenceIdSubscribers.collect { case (k, s) if s.contains(subscriber) ⇒ k } + keys.foreach { key ⇒ persistenceIdSubscribers.removeBinding(key, subscriber) } + } + + private def notifyPersistenceIdChange(persistenceId: String): Unit = + if (persistenceIdSubscribers.contains(persistenceId)) { + val changed = LeveldbJournal.ChangedPersistenceId(persistenceId) + persistenceIdSubscribers(persistenceId).foreach(_ ! changed) + } + } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d41b5a4e3f2..f63069aee5e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -20,7 +20,7 @@ object Dependencies { object Compile { // Compile - // Akka Streams // FIXME: change to project dependency once merged before 2.4.0 + // FIXME: change to project dependency once akka-stream merged to master val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2 @@ -60,6 +60,8 @@ object Dependencies { val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 val junitIntf = "com.novocode" % "junit-interface" % "0.11" % "test" // MIT val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.4" % "test" + // FIXME: change to project dependency once akka-stream merged to master + val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % "1.0" % "test" // metrics, measurements, perf testing val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2 @@ -112,7 +114,7 @@ object Dependencies { val persistence = l ++= Seq(protobuf, Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) - val persistenceQuery = l ++= Seq(akkaStream, Test.scalatest.value, Test.junit, Test.commonsIo) + val persistenceQuery = l ++= Seq(akkaStream, Test.scalatest.value, Test.junit, Test.commonsIo, Test.akkaStreamTestkit) val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile")))