Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #18264 from akka/wip-18190-leveldb-EventsByPersist…
…enceId-patriknw

+per #18190 leveldb impl of EventsByPersistenceId query
  • Loading branch information
patriknw committed Aug 19, 2015
2 parents 7679b7e + 009d80d commit 87bc512
Show file tree
Hide file tree
Showing 16 changed files with 484 additions and 19 deletions.
Expand Up @@ -6,6 +6,8 @@

import static akka.pattern.Patterns.ask;

import com.typesafe.config.Config;

import akka.actor.*;
import akka.dispatch.Mapper;
import akka.event.EventStreamSpec;
Expand All @@ -20,6 +22,7 @@
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.Timeout;

import docs.persistence.query.MyEventsByTagPublisher;
import docs.persistence.query.PersistenceQueryDocSpec;
import org.reactivestreams.Subscriber;
Expand All @@ -31,7 +34,6 @@
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.Boxed;
import scala.runtime.BoxedUnit;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -47,7 +49,7 @@ public class PersistenceQueryDocTest {
class MyReadJournal implements ReadJournal {
private final ExtendedActorSystem system;

public MyReadJournal(ExtendedActorSystem system) {
public MyReadJournal(ExtendedActorSystem system, Config config) {
this.system = system;
}

Expand Down Expand Up @@ -97,7 +99,7 @@ void demonstrateBasicUsage() {
.getReadJournalFor("akka.persistence.query.noop-read-journal");

// issue query to journal
Source<Object, BoxedUnit> source =
Source<EventEnvelope, BoxedUnit> source =
readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE));

// materialize stream, consuming events
Expand Down Expand Up @@ -239,6 +241,7 @@ void demonstrateWritingIntoDifferentStore() {
// Using an example (Reactive Streams) Database driver
readJournal
.query(EventsByPersistenceId.create("user-1337"))
.map(envelope -> envelope.event())
.grouped(20) // batch inserts into groups of 20
.runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database
//#projection-into-different-store-rs
Expand Down
Expand Up @@ -15,18 +15,18 @@ import akka.testkit.AkkaSpec
import akka.util.Timeout
import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal }
import org.reactivestreams.Subscriber

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import com.typesafe.config.Config

object PersistenceQueryDocSpec {

implicit val timeout = Timeout(3.seconds)

//#my-read-journal
class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal {
class MyReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal {

private val defaulRefreshInterval = 3.seconds

Expand Down Expand Up @@ -79,6 +79,7 @@ object PersistenceQueryDocSpec {
// Using an example (Reactive Streams) Database driver
readJournal
.query(EventsByPersistenceId("user-1337"))
.map(envelope => envelope.event)
.map(convertToReadSideTypes) // convert to datatype
.grouped(20) // batch inserts into groups of 20
.runWith(Sink(dbBatchWriter)) // write batches to read-side database
Expand All @@ -98,7 +99,7 @@ object PersistenceQueryDocSpec {
}

def updateState(state: ComplexState, msg: Any): ComplexState = {
// some complicated aggregation logic here ...
// some complicated aggregation logic here ...
state
}
}
Expand All @@ -124,7 +125,7 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal")

// issue query to journal
val source: Source[Any, Unit] =
val source: Source[EventEnvelope, Unit] =
readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue))

// materialize stream, consuming events
Expand Down
2 changes: 2 additions & 0 deletions akka-persistence-query/build.sbt
Expand Up @@ -14,3 +14,5 @@ Dependencies.persistenceQuery
//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value

enablePlugins(ScaladocNoVerificationOfDiagrams)

fork in Test := true
30 changes: 30 additions & 0 deletions akka-persistence-query/src/main/resources/reference.conf
@@ -0,0 +1,30 @@
#######################################################
# Akka Persistence Query Reference Configuration File #
#######################################################

# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.

akka.persistence.query {
journal {
leveldb {
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournal"

# Absolute path to the write journal plugin configuration entry that this query journal
# will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# If undefined (or "") it will connect to the default journal as specified by the
# akka.persistence.journal.plugin property.
write-plugin = ""

# Look for more data with this interval. The query journal is also notified by
# the write journal when something is changed and thereby updated quickly, but
# when there are a lot of changes it falls back to periodic queries to avoid
# overloading the system with many small queries.
refresh-interval = 3s

# How many events to fetch in one query and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
}
}
Expand Up @@ -4,12 +4,11 @@
package akka.persistence.query

import java.util.concurrent.atomic.AtomicReference

import akka.actor._
import akka.event.Logging

import scala.annotation.tailrec
import scala.util.Failure
import com.typesafe.config.Config

/**
* Persistence extension for queries.
Expand Down Expand Up @@ -75,13 +74,15 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
// TODO remove duplication
val scalaPlugin =
if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil))
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil))
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil))
.map(jj new scaladsl.ReadJournalAdapter(jj))
.recoverWith {
Expand Down
Expand Up @@ -37,7 +37,7 @@ abstract class AllPersistenceIds extends Query[String, Unit]
* A plugin may optionally support this [[Query]].
*/
final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue)
extends Query[Any, Unit]
extends Query[EventEnvelope, Unit]
object EventsByPersistenceId {
/** Java API */
def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId =
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb

import akka.stream.actor.ActorPublisher

/**
* INTERNAL API
*/
private[akka] trait DeliveryBuffer[T] { _: ActorPublisher[T]

var buf = Vector.empty[T]

def deliverBuf(): Unit =
if (buf.nonEmpty && totalDemand > 0) {
if (buf.size == 1) {
// optimize for this common case
onNext(buf.head)
buf = Vector.empty
} else if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
buf foreach onNext
buf = Vector.empty
}
}

}
@@ -0,0 +1,132 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb

import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.persistence.query.EventEnvelope

/**
* INTERNAL API
*/
private[akka] object EventsByPersistenceIdPublisher {
def props(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String): Props =
Props(new EventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, refreshInterval,
maxBufSize, writeJournalPluginId))

private case object Continue
}

class EventsByPersistenceIdPublisher(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
refreshInterval: Option[FiniteDuration],
maxBufSize: Int, writeJournalPluginId: String)
extends ActorPublisher[EventEnvelope] with DeliveryBuffer[EventEnvelope] with ActorLogging {
import EventsByPersistenceIdPublisher._

val journal: ActorRef = Persistence(context.system).journalFor(writeJournalPluginId)

var currSeqNo = fromSequenceNr

val tickTask = refreshInterval.map { interval
import context.dispatcher
context.system.scheduler.schedule(interval, interval, self, Continue)
}

def nonLiveQuery: Boolean = refreshInterval.isEmpty

override def postStop(): Unit = {
tickTask.foreach(_.cancel())
}

def receive = init

def init: Receive = {
case _: Request
journal ! LeveldbJournal.SubscribePersistenceId(persistenceId)
replay()
case Continue // skip, wait for first Request
case Cancel context.stop(self)
}

def idle: Receive = {
case Continue | _: LeveldbJournal.ChangedPersistenceId
if (timeForReplay)
replay()

case _: Request
deliverBuf()
if (nonLiveQuery) {
if (buf.isEmpty)
onCompleteThenStop()
else
self ! Continue
}

case Cancel
context.stop(self)

}

def timeForReplay: Boolean =
buf.isEmpty || buf.size <= maxBufSize / 2

def replay(): Unit = {
val limit = maxBufSize - buf.size
log.debug("request replay for persistenceId [{}] from [{}] to [{}] limit [{}]", persistenceId, currSeqNo, toSequenceNr, limit)
journal ! ReplayMessages(currSeqNo, toSequenceNr, limit, persistenceId, self)
context.become(replaying(limit))
}

def replaying(limit: Int): Receive = {
var replayCount = 0

{
case ReplayedMessage(p)
buf :+= EventEnvelope(
offset = p.sequenceNr,
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currSeqNo = p.sequenceNr + 1
replayCount += 1
deliverBuf()

case _: RecoverySuccess
log.debug("replay completed for persistenceId [{}], currSeqNo [{}], replayCount [{}]", persistenceId, currSeqNo, replayCount)
deliverBuf()
if (buf.isEmpty && currSeqNo > toSequenceNr)
onCompleteThenStop()
else if (nonLiveQuery) {
if (buf.isEmpty && replayCount < limit)
onCompleteThenStop()
else
self ! Continue // more to fetch
}
context.become(idle)

case ReplayMessagesFailure(cause)
log.debug("replay failed for persistenceId [{}], due to [{}]", persistenceId, cause.getMessage)
deliverBuf()
onErrorThenStop(cause)

case _: Request
deliverBuf()

case Continue | _: LeveldbJournal.ChangedPersistenceId // skip during replay

case Cancel
context.stop(self)
}
}

}
@@ -0,0 +1,51 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.journal.leveldb

import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem
import akka.persistence.query.EventsByPersistenceId
import akka.persistence.query.Hint
import akka.persistence.query.Query
import akka.persistence.query.scaladsl
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.Source
import scala.concurrent.duration.FiniteDuration
import akka.persistence.query.NoRefresh
import akka.persistence.query.RefreshInterval
import com.typesafe.config.Config
import akka.persistence.query.EventEnvelope

object LeveldbReadJournal {
final val Identifier = "akka.persistence.query.journal.leveldb"
}

class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends scaladsl.ReadJournal {

private val serialization = SerializationExtension(system)
private val defaulRefreshInterval: Option[FiniteDuration] =
Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size")

override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match {
case EventsByPersistenceId(pid, from, to) eventsByPersistenceId(pid, from, to, hints)
case unknown unsupportedQueryType(unknown)
}

def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long, hints: Seq[Hint]): Source[EventEnvelope, Unit] = {
Source.actorPublisher[EventEnvelope](EventsByPersistenceIdPublisher.props(persistenceId, fromSeqNr, toSeqNr,
refreshInterval(hints), maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ())
}

private def refreshInterval(hints: Seq[Hint]): Option[FiniteDuration] =
if (hints.contains(NoRefresh))
None
else
hints.collectFirst { case RefreshInterval(interval) interval }.orElse(defaulRefreshInterval)

private def unsupportedQueryType[M, T](unknown: Query[T, M]): Nothing =
throw new IllegalArgumentException(s"${getClass.getSimpleName} does not implement the ${unknown.getClass.getName} query type!")
}

0 comments on commit 87bc512

Please sign in to comment.