From f849793f36015579518cbf3997c67a1998c6776c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall & Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH 1/4] +per #16541 initial version of the Persistence Query module --- akka-docs/rst/java/lambda-persistence.rst | 2 +- akka-docs/rst/java/persistence.rst | 2 +- .../query/MyEventsByTagPublisher.scala | 86 ++++++ .../query/PersistenceQueryDocSpec.scala | 260 ++++++++++++++++++ akka-docs/rst/scala/index-actors.rst | 1 + akka-docs/rst/scala/persistence-query.rst | 251 +++++++++++++++++ akka-docs/rst/scala/persistence.rst | 3 +- akka-persistence-query/build.sbt | 16 ++ .../scala/akka/persistence/query/Hint.scala | 35 +++ .../persistence/query/PersistenceQuery.scala | 89 ++++++ .../scala/akka/persistence/query/Query.scala | 70 +++++ .../query/javadsl/ReadJournal.scala | 46 ++++ .../query/scaladsl/ReadJournal.scala | 38 +++ .../query/PersistenceQueryTest.java | 29 ++ .../persistence/query/MockReadJournal.scala | 28 ++ .../query/PersistenceQuerySpec.scala | 70 +++++ .../scala/akka/persistence/Persistence.scala | 6 +- .../scala/akka/persistence/Persistent.scala | 10 +- .../fsm/AbstractPersistentFSMTest.java | 1 + .../PersistentActorFailureSpec.scala | 2 +- project/AkkaBuild.scala | 12 +- project/Dependencies.scala | 6 + project/OSGi.scala | 2 + 23 files changed, 1051 insertions(+), 14 deletions(-) create mode 100644 akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala create mode 100644 akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala create mode 100644 akka-docs/rst/scala/persistence-query.rst create mode 100644 akka-persistence-query/build.sbt create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala create mode 100644 akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala create mode 100644 akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java create mode 100644 akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala create mode 100644 akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index bdef9e23535..11058a876f3 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -79,7 +79,7 @@ Architecture * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 5a1832f4850..71a2f8b2ce5 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -83,7 +83,7 @@ Architecture * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala new file mode 100644 index 00000000000..ac8dab19258 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor.Props +import akka.persistence.PersistentRepr +import akka.persistence.query.EventEnvelope +import akka.serialization.SerializationExtension +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } + +import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration + +object MyEventsByTagPublisher { + def props(tag: String, offset: Long, refreshInterval: FiniteDuration): Props = + Props(new MyEventsByTagPublisher(tag, offset, refreshInterval)) +} + +//#events-by-tag-publisher +class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) + extends ActorPublisher[EventEnvelope] { + import MyEventsByTagPublisher._ + + private case object Continue + + private val limit = 1000 + + private var currentId = 0L + var buf = Vector.empty[EventEnvelope] + + import context.dispatcher + val continueTask = context.system.scheduler.schedule( + refreshInterval, refreshInterval, self, Continue) + + override def postStop(): Unit = { + continueTask.cancel() + } + + def receive = { + case _: Request | Continue ⇒ + query() + deliverBuf() + + case Cancel ⇒ + context.stop(self) + } + + def query(): Unit = + if (buf.isEmpty) { + try { + // Could be an SQL query, for example: + // "SELECT id, persistent_repr FROM journal WHERE tag = like ? and " + + // "id >= ? ORDER BY id limit ?" + val result: Vector[(Long, Array[Byte])] = ??? + currentId = if (result.nonEmpty) result.last._1 else currentId + val serialization = SerializationExtension(context.system) + + buf = result.map { + case (id, bytes) ⇒ + val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get + EventEnvelope(offset = id, p.persistenceId, p.sequenceNr, p.payload) + } + } catch { + case e: Exception ⇒ + onErrorThenStop(e) + } + } + + @tailrec final def deliverBuf(): Unit = + if (totalDemand > 0 && buf.nonEmpty) { + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + val (use, keep) = buf.splitAt(Int.MaxValue) + buf = keep + use foreach onNext + deliverBuf() + } + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala new file mode 100644 index 00000000000..0c4dc391b6b --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor._ +import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.{ Recovery, PersistentActor } +import akka.persistence.query._ +import akka.stream.{ FlowShape, ActorMaterializer } +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.testkit.AkkaSpec +import akka.util.Timeout +import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal } +import org.reactivestreams.Subscriber + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +object PersistenceQueryDocSpec { + + implicit val timeout = Timeout(3.seconds) + + //#my-read-journal + class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { + + // TODO from config + private val defaulRefreshInterval: FiniteDuration = 3.seconds + + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + q match { + case EventsByTag(tag, offset) ⇒ + val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)) + Source.actorPublisher[EventEnvelope](props) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + + case unsupported ⇒ + Source.failed[T]( + new UnsupportedOperationException( + s"Query $unsupported not supported by ${getClass.getName}")) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + } + + private def refreshInterval(hints: Seq[Hint]): FiniteDuration = + hints.collectFirst { case RefreshInterval(interval) ⇒ interval } + .getOrElse(defaulRefreshInterval) + + private def noMaterializedValue[M]: M = + null.asInstanceOf[M] + } + + //#my-read-journal + case class ComplexState() { + def readyToSave = false + } + case class Record(any: Any) + class DummyStore { def save(record: Record) = Future.successful(42L) } + + class X { + val JournalId = "" + + def convertToReadSideTypes(in: Any): Any = ??? + + object ReactiveStreamsCompatibleDBDriver { + def batchWriter: Subscriber[immutable.Seq[Any]] = ??? + } + + //#projection-into-different-store-rs + implicit val system = ActorSystem() + implicit val mat = ActorMaterializer() + + val readJournal = PersistenceQuery(system).readJournalFor(JournalId) + val dbBatchWriter: Subscriber[immutable.Seq[Any]] = + ReactiveStreamsCompatibleDBDriver.batchWriter + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId("user-1337")) + .map(convertToReadSideTypes) // convert to datatype + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink(dbBatchWriter)) // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-actor + class TheOneWhoWritesToQueryJournal(id: String) extends Actor { + val store = new DummyStore() + + var state: ComplexState = ComplexState() + + def receive = { + case m => + state = updateState(state, m) + if (state.readyToSave) store.save(Record(state)) + } + + def updateState(state: ComplexState, msg: Any): ComplexState = { + // some complicated aggregation logic here ... + state + } + } + + //#projection-into-different-store-actor + +} + +class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { + + def this() { + this( + """ + akka.persistence.query.noop-read-journal { + class = "docs.persistence.query.NoopReadJournal" + } + """.stripMargin) + } + + //#basic-usage + // obtain read journal by plugin id + val readJournal = + PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal") + + // issue query to journal + val source: Source[Any, Unit] = + readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue)) + + // materialize stream, consuming events + implicit val mat = ActorMaterializer() + source.runForeach { event => println("Event: " + event) } + //#basic-usage + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds) + //#all-persistence-ids-live + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds, hints = NoRefresh) + //#all-persistence-ids-snap + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + + val blueThings: Source[EventEnvelope, Unit] = + readJournal.query(EventsByTag("blue")) + + // find top 10 blue things: + val top10BlueThings: Future[Vector[Any]] = + blueThings + .map(_.event) + .take(10) // cancels the query stream after pulling 10 elements + .runFold(Vector.empty[Any])(_ :+ _) + + // start another query, from the known offset + val furtherBlueThings = readJournal.query(EventsByTag("blue", offset = 10)) + //#events-by-tag + + //#events-by-persistent-id-refresh + readJournal.query(EventsByPersistenceId("user-us-1337"), hints = RefreshInterval(1.second)) + + //#events-by-persistent-id-refresh + + //#advanced-journal-query-definition + final case class RichEvent(tags: immutable.Set[String], payload: Any) + + case class QueryStats(totalEvents: Long) + + case class ByTagsWithStats(tags: immutable.Set[String]) + extends Query[RichEvent, QueryStats] + + //#advanced-journal-query-definition + + //#advanced-journal-query-hints + + import scala.concurrent.duration._ + + readJournal.query(EventsByTag("blue"), hints = RefreshInterval(1.second)) + //#advanced-journal-query-hints + + //#advanced-journal-query-usage + val query: Source[RichEvent, QueryStats] = + readJournal.query(ByTagsWithStats(Set("red", "blue"))) + + query + .mapMaterializedValue { stats => println(s"Stats: $stats") } + .map { event => println(s"Event payload: ${event.payload}") } + .runWith(Sink.ignore) + + //#advanced-journal-query-usage + + //#materialized-query-metadata + // a plugin can provide: + case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) + + case object AllEvents extends Query[Any, QueryMetadata] + + val events = readJournal.query(AllEvents) + events + .mapMaterializedValue { meta => + println(s"The query is: " + + s"ordered deterministically: ${meta.deterministicOrder}, " + + s"infinite: ${meta.infinite}") + } + + //#materialized-query-metadata + + //#projection-into-different-store + class MyResumableProjection(name: String) { + def saveProgress(offset: Long): Future[Long] = ??? + def latestOffset: Future[Long] = ??? + } + //#projection-into-different-store + + class RunWithActor { + //#projection-into-different-store-actor-run + import akka.pattern.ask + import system.dispatcher + implicit val timeout = Timeout(3.seconds) + + val bidProjection = new MyResumableProjection("bid") + + val writerProps = Props(classOf[TheOneWhoWritesToQueryJournal], "bid") + val writer = system.actorOf(writerProps, "bid-projection-writer") + + bidProjection.latestOffset.foreach { startFromOffset => + readJournal + .query(EventsByTag("bid", startFromOffset)) + .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) } + .mapAsync(1) { offset => bidProjection.saveProgress(offset) } + .runWith(Sink.ignore) + } + //#projection-into-different-store-actor-run + } + + class RunWithAsyncFunction { + //#projection-into-different-store-simple + trait ExampleStore { + def save(event: Any): Future[Unit] + } + //#projection-into-different-store-simple + + //#projection-into-different-store-simple + val store: ExampleStore = ??? + + readJournal + .query(EventsByTag("bid")) + .mapAsync(1) { e => store.save(e) } + .runWith(Sink.ignore) + //#projection-into-different-store-simple + } + +} + +class NoopReadJournal(sys: ExtendedActorSystem) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source.empty.mapMaterializedValue(_ => null.asInstanceOf[M]) +} diff --git a/akka-docs/rst/scala/index-actors.rst b/akka-docs/rst/scala/index-actors.rst index 0e77f0c2258..ca09504ab76 100644 --- a/akka-docs/rst/scala/index-actors.rst +++ b/akka-docs/rst/scala/index-actors.rst @@ -12,6 +12,7 @@ Actors routing fsm persistence + persistence-query testing actordsl typed-actors diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst new file mode 100644 index 00000000000..15435ba1e46 --- /dev/null +++ b/akka-docs/rst/scala/persistence-query.rst @@ -0,0 +1,251 @@ +.. _persistence-query-scala: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-scala` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries +to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more +complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some +transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query +- pulling out of one Journal and storing into another one. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage + +Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +``journalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the ``ReadJournal`` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +Again, specific capabilities are specific to the journal you are using, so you have to + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-scala: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher + +More journal example implementations +------------------------------------ + +In order to help implementers get get started with implementing read journals a number of reference implementaions +have been prepared, each highlighting a specific style a journal might need to be implemented in: + +* TODO LINK HERE – when the backing data store is unable to push events, nor does it expose an reactive streams interface, + yet has rich query capabilities (like an SQL database for example) +* TODO LINK HERE – when a `Reactive Streams`_ adapter or driver is available for the datastore, yet it is not able to handle + polling by itself. For example when using `Slick 3`_ along side with a typical SQL database. +* TODO LINK HERE – when the backing datastore already has a fully "reactive push/pull" adapter implemented, for example + such exist for Kafka (see the `Reactive Kafka`_ project by Krzysztof Ciesielski for details). + +.. _Reactive Kafka: https://github.com/softwaremill/reactive-kafka +.. _Reactive Streams: http://reactive-streams.org +.. _Slick 3: http://slick.typesafe.com/ + + +Plugin TCK +---------- + +TODO, not available yet. + + diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 2c28f9092a7..0289298ec25 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -68,7 +68,7 @@ Architecture * *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. @@ -638,6 +638,7 @@ Event Adapters help in situations where: understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the json instead of serializing the object to its binary representation. +.. image:: ../images/persistence-eventadapter.png Implementing an EventAdapter is rather stright forward: diff --git a/akka-persistence-query/build.sbt b/akka-persistence-query/build.sbt new file mode 100644 index 00000000000..1b57eef61d2 --- /dev/null +++ b/akka-persistence-query/build.sbt @@ -0,0 +1,16 @@ +import akka.{ AkkaBuild, Dependencies, Formatting, ScaladocNoVerificationOfDiagrams, OSGi } +import com.typesafe.tools.mima.plugin.MimaKeys + +AkkaBuild.defaultSettings + +AkkaBuild.experimentalSettings + +Formatting.formatSettings + +OSGi.persistenceQuery + +Dependencies.persistenceQuery + +//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value + +enablePlugins(ScaladocNoVerificationOfDiagrams) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala new file mode 100644 index 00000000000..5b6e35e2634 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import scala.concurrent.duration.FiniteDuration + +/** + * A query hint that defines how to execute the query, + * typically specific to the journal implementation. + * + * A plugin may optionally support a [[Hint]]. + */ +trait Hint + +/** + * If the underlying datastore only supports queries that are completed when they reach the + * end of the "result set", the journal has to submit new queries after a while in order + * to support "infinite" event streams that include events stored after the initial query has completed. + * + * A plugin may optionally support this [[Hint]] for defining such a refresh interval. + */ +final case class RefreshInterval(interval: FiniteDuration) extends Hint + +/** + * Indicates that the event stream is supposed to be completed immediately when it + * reaches the end of the "result set", as described in [[RefreshInterval]]. + * + */ +final case object NoRefresh extends NoRefresh { + /** Java API */ + def getInstance: NoRefresh = this +} +sealed class NoRefresh extends Hint + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala new file mode 100644 index 00000000000..bb93297b339 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor._ +import akka.event.Logging + +import scala.annotation.tailrec +import scala.util.Failure + +/** + * Persistence extension for queries. + */ +object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider { + /** + * Java API. + */ + override def get(system: ActorSystem): PersistenceQuery = super.get(system) + + def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system) + + def lookup() = PersistenceQuery + + /** INTERNAL API. */ + private[persistence] case class PluginHolder(plugin: scaladsl.ReadJournal) extends Extension + +} + +class PersistenceQuery(system: ExtendedActorSystem) extends Extension { + import PersistenceQuery._ + + private val log = Logging(system, getClass) + + /** Discovered query plugins. */ + private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + + /** + * Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given read journal configuration entry. + */ + @tailrec final def readJournalFor(readJournalPluginId: String): scaladsl.ReadJournal = { + val configPath = readJournalPluginId + val extensionIdMap = readJournalPluginExtensionIds.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).plugin + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = + PluginHolder(createPlugin(configPath)) + } + readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + readJournalFor(readJournalPluginId) // Recursive invocation. + } + } + + /** + * Java API + * + * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. + */ + final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = + new javadsl.ReadJournal(readJournalFor(readJournalPluginId)) + + private def createPlugin(configPath: String): scaladsl.ReadJournal = { + require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), + s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") + val pluginActorName = configPath + val pluginConfig = system.settings.config.getConfig(configPath) + val pluginClassName = pluginConfig.getString("class") + log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") + val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + + val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + + // TODO possibly apply event adapters here + plugin.get + } + + /** Check for default or missing identity. */ + private def isEmpty(text: String) = text == null || text.length == 0 +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala new file mode 100644 index 00000000000..fdca0ec0c3a --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +/** + * General interface for all queries. There are a few pre-defined queries, + * such as [[EventsByPersistenceId]], [[AllPersistenceIds]] and [[EventsByTag]] + * but implementation of these queries are optional. Query (journal) plugins + * may define their own specialized queries. + * + * If a query plugin does not support a query it will return a stream that + * will be completed with a failure of [[UnsupportedOperationException]]. + */ +trait Query[T, M] + +/** + * Query all `PersistentActor` identifiers, i.e. as defined by the + * `persistenceId` of the `PersistentActor`. + * + * A plugin may optionally support this [[Query]]. + */ +final case object AllPersistenceIds extends Query[String, Unit] + +/** + * Query events for a specific `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. + * + * The returned event stream should be ordered by sequence number. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) + extends Query[Any, Unit] + +/** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] + +/** + * Event wrapper adding meta data for the events in the result stream of + * [[EventsByTag]] query, or similar queries. + */ +//#event-envelope +final case class EventEnvelope( + offset: Long, + persistenceId: String, + sequenceNr: Long, + event: Any) +//#event-envelope diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala new file mode 100644 index 00000000000..7caef0a19bf --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl + +import akka.persistence.query.{ Query, Hint } +import akka.stream.javadsl.Source + +import scala.annotation.varargs + +/** + * Java API + * + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * Usage: + * {{{ + * final ReadJournal journal = + * PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath); + * + * final Source<EventEnvelope, ?> events = + * journal.query(new EventsByTag("mytag", 0L)); + * }}} + */ + +final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { + + /** + * Java API + * + * A query that returns a `Source` with output type `T` and materialized value `M`. + * + * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asJava + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala new file mode 100644 index 00000000000..02824269d35 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.scaladsl + +import akka.persistence.query.{ Hint, Query } +import akka.stream.scaladsl.Source + +/** + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * Usage: + * {{{ + * val journal = PersistenceQuery(system).readJournalFor(queryPluginConfigPath) + * val events = journal.query(EventsByTag("mytag", 0L)) + * }}} + * + * For Java API see [[akka.persistence.query.javadsl.ReadJournal]]. + */ +abstract class ReadJournal { + + /** + * A query that returns a `Source` with output type `T` and materialized + * value `M`. + * + * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] + +} diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java new file mode 100644 index 00000000000..9714ac86321 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.actor.ActorSystem; +import akka.persistence.query.javadsl.ReadJournal; +import akka.testkit.AkkaJUnitActorSystemResource; +import org.junit.ClassRule; +import scala.runtime.BoxedUnit; + +public class PersistenceQueryTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource(PersistenceQueryTest.class.getName()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + private final Hint hint = NoRefresh.getInstance(); + + // compile-only test + @SuppressWarnings("unused") + public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception { + final ReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor("noop-journal"); + final akka.stream.javadsl.Source tag = readJournal.query(new EventsByTag("tag", 0L), hint, hint); // java varargs + } +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala new file mode 100644 index 00000000000..ea813a1cd53 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import akka.stream.scaladsl.Source +import com.typesafe.config.{ Config, ConfigFactory } + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockReadJournal extends scaladsl.ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source(() ⇒ Iterator.from(0)).map(_.toString).asInstanceOf[Source[T, M]] +} + +object MockReadJournal { + final val Identifier = "akka.persistence.query.journal.mock" + + final val config: Config = ConfigFactory.parseString( + s""" + |$Identifier { + | class = "${classOf[MockReadJournal].getCanonicalName}" + |} + """.stripMargin) +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala new file mode 100644 index 00000000000..44d442fe03c --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem +import akka.persistence.journal.{ EventAdapter, EventSeq } +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import org.scalautils.ConversionCheckedTripleEquals + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals { + + val anything: Query[String, _] = null + + val eventAdaptersConfig = + s""" + |akka.persistence.query.journal.mock { + | event-adapters { + | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} + | } + |} + """.stripMargin + + "ReadJournal" must { + "be found by full config key" in { + withActorSystem() { system ⇒ + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier) + } + } + + "throw if unable to find query journal by config key" in { + withActorSystem() { system ⇒ + intercept[IllegalArgumentException] { + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier + "-unknown") + }.getMessage should include("missing persistence read journal") + } + } + } + + private val systemCounter = new AtomicInteger() + private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { + val config = + MockReadJournal.config + .withFallback(ConfigFactory.parseString(conf)) + .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) + .withFallback(ConfigFactory.load()) + + val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config) + try block(sys) finally Await.ready(sys.terminate(), 10.seconds) + } +} + +object ExampleQueryModels { + case class OldModel(value: String) { def promote = NewModel(value) } + case class NewModel(value: String) +} + +class PrefixStringWithPAdapter extends EventAdapter { + override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event) + + override def manifest(event: Any) = "" + override def toJournal(event: Any) = throw new Exception("toJournal should not be called by query side") +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index c6cfd7f9f77..a2ab16dc8db 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -219,7 +219,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def journalFor(journalPluginId: String): ActorRef = { + @tailrec private[akka] final def journalFor(journalPluginId: String): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId val extensionIdMap = journalPluginExtensionId.get extensionIdMap.get(configPath) match { @@ -239,12 +239,14 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } /** + * INTERNAL API + * * Returns a snapshot store plugin actor identified by `snapshotPluginId`. * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + @tailrec private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId val extensionIdMap = snapshotPluginExtensionId.get extensionIdMap.get(configPath) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index d0906c395f2..6e8b03c4028 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -41,12 +41,12 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per // only check that all persistenceIds are equal when there's more than one in the Seq if (payload match { - case l: List[PersistentRepr] => l.tail.nonEmpty - case v: Vector[PersistentRepr] => v.size > 1 - case _ => true // some other collection type, let's just check + case l: List[PersistentRepr] ⇒ l.tail.nonEmpty + case v: Vector[PersistentRepr] ⇒ v.size > 1 + case _ ⇒ true // some other collection type, let's just check }) require(payload.forall(_.persistenceId == payload.head.persistenceId), - "AtomicWrite must contain messages for the same persistenceId, " + - s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") + "AtomicWrite must contain messages for the same persistenceId, " + + s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") def persistenceId = payload.head.persistenceId def lowestSequenceNr = payload.head.sequenceNr // this assumes they're gapless; they should be (it is only our code creating AWs) diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java index fef46cc3162..d660cc79ab0 100644 --- a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java @@ -42,6 +42,7 @@ public class AbstractPersistentFSMTest { private static Option none = Option.none(); + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config( diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 0eb78a9c46a..ff2224e858b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -204,7 +204,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("Failure: wrong-1") expectTerminated(persistentActor) } - "call onPersistFailure and stop if persistAsync fails xoxo" in { + "call onPersistFailure and stop if persistAsync fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e49ce06c35e..7e9e7831b31 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -54,7 +54,7 @@ object AkkaBuild extends Build { ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, agent, persistence, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) + slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) ) lazy val akkaScalaNightly = Project( @@ -64,7 +64,7 @@ object AkkaBuild extends Build { // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) + slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) ).disablePlugins(ValidatePullRequest) lazy val actor = Project( @@ -163,6 +163,12 @@ object AkkaBuild extends Build { dependencies = Seq(actor, remote % "test->test", testkit % "test->test") ) + lazy val persistenceQuery = Project( + id = "akka-persistence-query-experimental", + base = file("akka-persistence-query"), + dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") + ) + lazy val persistenceTck = Project( id = "akka-persistence-experimental-tck", base = file("akka-persistence-tck"), @@ -192,7 +198,7 @@ object AkkaBuild extends Build { base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", remote % "compile;test->test", cluster, clusterMetrics, slf4j, agent, camel, osgi, - persistence % "compile;provided->provided;test->test", persistenceTck, + persistence % "compile;provided->provided;test->test", persistenceTck, persistenceQuery, typed % "compile;test->test", distributedData) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 78c5277801d..0135c8baf57 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,6 +19,10 @@ object Dependencies { object Compile { // Compile + + // Akka Streams // FIXME: change to project dependency once merged before 2.4.0 + val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" + val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2 // when updating config version, update links ActorSystem ScalaDoc to link to the updated version @@ -108,6 +112,8 @@ object Dependencies { val persistence = l ++= Seq(protobuf, Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) + val persistenceQuery = l ++= Seq(akkaStream, Test.scalatest.value, Test.junit, Test.commonsIo) + val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) val kernel = l ++= Seq(Test.scalatest.value, Test.junit) diff --git a/project/OSGi.scala b/project/OSGi.scala index 658c76673b8..ad3900f927e 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -47,6 +47,8 @@ object OSGi { val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport())) + val persistenceQuery = exports(Seq("akka.persistence.query.*")) + val testkit = exports(Seq("akka.testkit.*")) val osgiOptionalImports = Seq( From 893578a8af0afa6abf59754a4a55faaf2af1f822 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall & Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH 2/4] +per #16541 java docs for persistence query --- .../persistence/PersistenceQueryDocTest.java | 75 ++++++ .../query/MyEventsByTagJavaPublisher.java | 112 +++++++++ akka-docs/rst/java/index-actors.rst | 1 + akka-docs/rst/java/persistence-query.rst | 234 ++++++++++++++++++ akka-docs/rst/java/persistence.rst | 2 +- .../query/MyEventsByTagPublisher.scala | 37 ++- .../query/PersistenceQueryDocSpec.scala | 3 +- akka-docs/rst/scala/persistence-query.rst | 24 +- akka-docs/rst/scala/persistence.rst | 2 - .../persistence/query/PersistenceQuery.scala | 10 +- .../query/javadsl/ReadJournal.scala | 6 +- ...ctorSpec.scala => PersistentFSMSpec.scala} | 0 12 files changed, 468 insertions(+), 38 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java create mode 100644 akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java create mode 100644 akka-docs/rst/java/persistence-query.rst rename akka-persistence/src/test/scala/akka/persistence/fsm/{PersistentFSMActorSpec.scala => PersistentFSMSpec.scala} (100%) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java new file mode 100644 index 00000000000..b90031fa53f --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.persistence; + +import akka.actor.*; +import akka.event.EventStreamSpec; +import akka.japi.Function; +import akka.japi.Procedure; +import akka.pattern.BackoffSupervisor; +import akka.persistence.*; +import akka.persistence.query.*; +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; +import akka.util.Timeout; +import docs.persistence.query.MyEventsByTagPublisher; +import scala.collection.Seq; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class PersistenceQueryDocTest { + + final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS)); + + //#my-read-journal + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; + + public MyReadJournal(ExtendedActorSystem system) { + this.system = system; + } + + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); + + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); + + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query $unsupported not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); + } + } + + private FiniteDuration refreshInterval(Hint[] hints) { + FiniteDuration ret = defaultRefreshInterval; + for (Hint hint : hints) + if (hint instanceof RefreshInterval) + ret = ((RefreshInterval) hint).interval(); + return ret; + } + + private akka.japi.function.Function noMaterializedValue () { + return param -> (M) null; + } + + } + //#my-read-journal +} diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java new file mode 100644 index 00000000000..b4353623ee7 --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query; + +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.PersistentRepr; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import akka.stream.actor.AbstractActorPublisher; +import scala.Int; + +import akka.actor.Props; +import akka.persistence.query.EventEnvelope; +import akka.stream.actor.ActorPublisherMessage.Cancel; + +import scala.concurrent.duration.FiniteDuration; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +//#events-by-tag-publisher +class MyEventsByTagJavaPublisher extends AbstractActorPublisher { + private final Serialization serialization = + SerializationExtension.get(context().system()); + + private final Connection connection; + + private final String tag; + + private final String CONTINUE = "CONTINUE"; + private final int LIMIT = 1000; + private long currentOffset; + private List buf = new ArrayList<>(); + + private Cancellable continueTask; + + public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) { + this.connection = connection; + this.tag = tag; + this.currentOffset = offset; + + this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self()); + receive(ReceiveBuilder + .matchEquals(CONTINUE, (in) -> { + query(); + deliverBuf(); + }) + .match(Cancel.class, (in) -> { + context().stop(self()); + }) + .build()); + } + + public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) { + return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); + } + + @Override + public void postStop() { + continueTask.cancel(); + } + + private void query() { + if (buf.isEmpty()) { + try { + PreparedStatement s = connection.prepareStatement( + "SELECT id, persistent_repr " + + "FROM journal WHERE tag = ? AND id >= ? " + + "ORDER BY id LIMIT ?"); + + s.setString(1, tag); + s.setLong(2, currentOffset); + s.setLong(3, LIMIT); + final ResultSet rs = s.executeQuery(); + + final List> res = new ArrayList<>(LIMIT); + while (rs.next()) + res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); + + if (!res.isEmpty()) { + currentOffset = res.get(res.size() - 1).first(); + } + + buf = res.stream().map(in -> { + final Long id = in.first(); + final byte[] bytes = in.second(); + + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + + return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + }).collect(toList()); + } catch(Exception e) { + onErrorThenStop(e); + } + } + } + + private void deliverBuf() { + while (totalDemand() > 0 && !buf.isEmpty()) + onNext(buf.remove(0)); + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 20b567295b5..173a0fd58e5 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -12,4 +12,5 @@ Actors routing fsm persistence + persistence-query testing diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst new file mode 100644 index 00000000000..cb816a1f265 --- /dev/null +++ b/akka-docs/rst/java/persistence-query.rst @@ -0,0 +1,234 @@ +.. _persistence-query-java: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-java` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries +to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more +complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some +transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query +- pulling out of one Journal and storing into another one. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage + +Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +``getJournalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +Again, specific capabilities are specific to the journal you are using, so you have to + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs + +.. _Reactive Streams: http://reactive-streams.org + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-java: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher + +Plugin TCK +---------- + +TODO, not available yet. + diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 71a2f8b2ce5..8c7436d154e 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -628,7 +628,7 @@ completely. Event Adapters help in situations where: -- **Version Migration** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, +- **Version Migrations** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as ``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index ac8dab19258..2f36623436d 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -22,13 +22,13 @@ object MyEventsByTagPublisher { //#events-by-tag-publisher class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) extends ActorPublisher[EventEnvelope] { - import MyEventsByTagPublisher._ private case object Continue - private val limit = 1000 + private val connection: java.sql.Connection = ??? - private var currentId = 0L + private val Limit = 1000 + private var currentOffset = offset var buf = Vector.empty[EventEnvelope] import context.dispatcher @@ -48,14 +48,35 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD context.stop(self) } + object Select { + private def statement() = connection.prepareStatement( + """ + SELECT id, persistent_repr FROM journal + WHERE tag = ? AND id >= ? + ORDER BY id LIMIT ? + """) + + def run(tag: String, from: Long, limit: Int): Vector[(Long, Array[Byte])] = { + val s = statement() + try { + s.setString(1, tag) + s.setLong(2, from) + s.setLong(3, limit) + val rs = s.executeQuery() + + val b = Vector.newBuilder[(Long, Array[Byte])] + while (rs.next()) + b += (rs.getLong(1) -> rs.getBytes(2)) + b.result() + } finally s.close() + } + } + def query(): Unit = if (buf.isEmpty) { try { - // Could be an SQL query, for example: - // "SELECT id, persistent_repr FROM journal WHERE tag = like ? and " + - // "id >= ? ORDER BY id limit ?" - val result: Vector[(Long, Array[Byte])] = ??? - currentId = if (result.nonEmpty) result.last._1 else currentId + val result = Select.run(tag, currentOffset, Limit) + currentOffset = if (result.nonEmpty) result.last._1 else currentOffset val serialization = SerializationExtension(context.system) buf = result.map { diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index 0c4dc391b6b..4d3963a04f7 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -28,8 +28,7 @@ object PersistenceQueryDocSpec { //#my-read-journal class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { - // TODO from config - private val defaulRefreshInterval: FiniteDuration = 3.seconds + private val defaulRefreshInterval = 3.seconds override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = q match { diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 15435ba1e46..319e1ac56bd 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -72,7 +72,7 @@ a journal is not obliged to implement all of them - for example because in a giv significantly inefficient. .. note:: - Refer to the documentation of the ``ReadJournal`` plugin you are using for a specific list of supported query types. + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. For example, Journal plugins should document their stream completion strategies. The predefined queries are: @@ -110,7 +110,7 @@ depends on the used journal. on relational databases, yet may be hard to implement efficiently on plain key-value datastores. In the example below we query all events which have been tagged (we assume this was performed by the write-side using an -:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-tag @@ -168,6 +168,8 @@ is as simple as, using the read-journal and feeding it into the databases driver .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs +.. _Reactive Streams: http://reactive-streams.org + Materialize view using mapAsync ------------------------------- @@ -225,24 +227,6 @@ And the ``EventsByTag`` could be backed by such an Actor for example: .. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher -More journal example implementations ------------------------------------- - -In order to help implementers get get started with implementing read journals a number of reference implementaions -have been prepared, each highlighting a specific style a journal might need to be implemented in: - -* TODO LINK HERE – when the backing data store is unable to push events, nor does it expose an reactive streams interface, - yet has rich query capabilities (like an SQL database for example) -* TODO LINK HERE – when a `Reactive Streams`_ adapter or driver is available for the datastore, yet it is not able to handle - polling by itself. For example when using `Slick 3`_ along side with a typical SQL database. -* TODO LINK HERE – when the backing datastore already has a fully "reactive push/pull" adapter implemented, for example - such exist for Kafka (see the `Reactive Kafka`_ project by Krzysztof Ciesielski for details). - -.. _Reactive Kafka: https://github.com/softwaremill/reactive-kafka -.. _Reactive Streams: http://reactive-streams.org -.. _Slick 3: http://slick.typesafe.com/ - - Plugin TCK ---------- diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0289298ec25..753d10c7e02 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -638,8 +638,6 @@ Event Adapters help in situations where: understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the json instead of serializing the object to its binary representation. -.. image:: ../images/persistence-eventadapter.png - Implementing an EventAdapter is rather stright forward: .. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#identity-event-adapter diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index bb93297b339..0010e50d0d7 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -7,8 +7,9 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.event.Logging +import akka.stream.javadsl.Source -import scala.annotation.tailrec +import scala.annotation.{varargs, tailrec} import scala.util.Failure /** @@ -62,7 +63,12 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. */ final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = - new javadsl.ReadJournal(readJournalFor(readJournalPluginId)) + new javadsl.ReadJournal { + val backing = readJournalFor(readJournalPluginId) + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asJava + } + private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala index 7caef0a19bf..c0edc644cd1 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -29,7 +29,7 @@ import scala.annotation.varargs * }}} */ -final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { +trait ReadJournal { /** * Java API @@ -40,7 +40,7 @@ final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { * query, typically specific to the journal implementation. * */ - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - backing.query(q, hints: _*).asJava + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] + } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala From 3b94108e0cb61e196a7485c09abc6539e44811b7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall & Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH 3/4] +per #16541 allow using javadsl implemented journals as-if scaladsl --- .../persistence/PersistenceQueryDocTest.java | 1 - .../query/javadsl/ReadJournal.java} | 29 +++++++------- .../query/javadsl/ReadJournalAdapter.java | 31 +++++++++++++++ .../persistence/query/PersistenceQuery.scala | 36 ++++++++++-------- .../query/scaladsl/ReadJournal.scala | 6 +++ .../query/MockJavaReadJournal.java | 38 +++++++++++++++++++ .../query/PersistenceQueryTest.java | 4 +- .../query/PersistenceQuerySpec.scala | 15 +++++++- 8 files changed, 124 insertions(+), 36 deletions(-) rename akka-persistence-query/src/main/{scala/akka/persistence/query/javadsl/ReadJournal.scala => java/akka/persistence/query/javadsl/ReadJournal.java} (72%) create mode 100644 akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java create mode 100644 akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index b90031fa53f..2af6572628b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -69,7 +69,6 @@ private FiniteDuration refreshInterval(Hint[] hints) { private akka.japi.function.Function noMaterializedValue () { return param -> (M) null; } - } //#my-read-journal } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java similarity index 72% rename from akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala rename to akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java index c0edc644cd1..c776b92c379 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java @@ -2,45 +2,44 @@ * Copyright (C) 2009-2015 Typesafe Inc. */ -package akka.persistence.query.javadsl +package akka.persistence.query.javadsl; -import akka.persistence.query.{ Query, Hint } -import akka.stream.javadsl.Source - -import scala.annotation.varargs +import akka.persistence.query.Query; +import akka.persistence.query.Hint; +import akka.stream.javadsl.Source; +import scala.annotation.varargs; /** * Java API - * + *

* API for reading persistent events and information derived * from stored persistent events. - * + *

* The purpose of the API is not to enforce compatibility between different * journal implementations, because the technical capabilities may be very different. * The interface is very open so that different journals may implement specific queries. - * + *

* Usage: - * {{{ + *


  * final ReadJournal journal =
  *   PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
  *
  * final Source<EventEnvelope, ?> events =
  *   journal.query(new EventsByTag("mytag", 0L));
- * }}}
+ * 
*/ -trait ReadJournal { +public interface ReadJournal { /** * Java API - * + *

* A query that returns a `Source` with output type `T` and materialized value `M`. - * + *

* The `hints` are optional parameters that defines how to execute the * query, typically specific to the journal implementation. * */ - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] - + Source query(Query q, Hint... hints); } diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java new file mode 100644 index 00000000000..96866071fb6 --- /dev/null +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl; + +import akka.japi.Util; +import akka.persistence.query.Hint; +import akka.persistence.query.Query; +import akka.stream.javadsl.Source; + +/** + * INTERNAL API + * + * Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal} + * to JavaDSL {@link ReadJournal}. + */ +public final class ReadJournalAdapter implements ReadJournal { + + private final akka.persistence.query.scaladsl.ReadJournal backing; + + public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) { + this.backing = backing; + } + + @Override + public Source query(Query q, Hint... hints) { + return backing.query(q, Util.immutableSeq(hints)).asJava(); + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 0010e50d0d7..57ec6fc9d89 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -7,9 +7,8 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.event.Logging -import akka.stream.javadsl.Source -import scala.annotation.{varargs, tailrec} +import scala.annotation.tailrec import scala.util.Failure /** @@ -63,12 +62,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. */ final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = - new javadsl.ReadJournal { - val backing = readJournalFor(readJournalPluginId) - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - backing.query(q, hints: _*).asJava - } - + new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId)) private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), @@ -79,14 +73,24 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get - val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) - .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) - .recoverWith { - case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) - } - - // TODO possibly apply event adapters here - plugin.get + // TODO remove duplication + val scalaPlugin = + if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil)) + .map(jj ⇒ new scaladsl.ReadJournalAdapter(jj)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend") + + scalaPlugin.get } /** Check for default or missing identity. */ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala index 02824269d35..936aadb7d96 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -36,3 +36,9 @@ abstract class ReadJournal { def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] } + +/** INTERNAL API */ +private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asScala +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java new file mode 100644 index 00000000000..1b0050cd814 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.util.Iterator; + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockJavaReadJournal implements ReadJournal { + public static final String Identifier = "akka.persistence.query.journal.mock-java"; + + public static final Config config = ConfigFactory.parseString( + Identifier + " { \n" + + " class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" + + " }\n\n"); + + @Override + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + return (Source) Source.fromIterator(() -> new Iterator() { + private int i = 0; + @Override public boolean hasNext() { return true; } + + @Override public String next() { + return "" + (i++); + } + }); + } +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java index 9714ac86321..2ab31f83010 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -1,6 +1,6 @@ /* - * Copyright (C) 2009-2015 Typesafe Inc. - */ + * Copyright (C) 2009-2015 Typesafe Inc. + */ package akka.persistence.query; diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index 44d442fe03c..a7778c00bcb 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -10,12 +10,11 @@ import akka.actor.ActorSystem import akka.persistence.journal.{ EventAdapter, EventSeq } import com.typesafe.config.ConfigFactory import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } -import org.scalautils.ConversionCheckedTripleEquals import scala.concurrent.Await import scala.concurrent.duration._ -class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals { +class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll { val anything: Query[String, _] = null @@ -42,12 +41,24 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte }.getMessage should include("missing persistence read journal") } } + + "expose scaladsl implemented journal as javadsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier) + } + } + "expose javadsl implemented journal as scaladsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier) + } + } } private val systemCounter = new AtomicInteger() private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { val config = MockReadJournal.config + .withFallback(MockJavaReadJournal.config) .withFallback(ConfigFactory.parseString(conf)) .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) .withFallback(ConfigFactory.load()) From 3314de4cb92fe61559937a7c07e853755d9fa561 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH 4/4] +per #16541 add missing java samples for persistence query --- .../persistence/PersistenceQueryDocTest.java | 370 ++++++++++++++++-- .../query/MyEventsByTagJavaPublisher.java | 59 +-- akka-docs/rst/java/persistence-query.rst | 19 +- .../query/MyEventsByTagPublisher.scala | 9 +- akka-docs/rst/scala/persistence-query.rst | 17 +- .../query/javadsl/ReadJournal.java | 2 +- .../scala/akka/persistence/query/Hint.scala | 8 + .../persistence/query/PersistenceQuery.scala | 3 +- .../scala/akka/persistence/query/Query.scala | 24 +- lol | 172 ++++++++ 10 files changed, 590 insertions(+), 93 deletions(-) create mode 100644 lol diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 2af6572628b..7da4adfb31b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -4,71 +4,373 @@ package docs.persistence; +import static akka.pattern.Patterns.ask; + import akka.actor.*; +import akka.dispatch.Mapper; import akka.event.EventStreamSpec; import akka.japi.Function; import akka.japi.Procedure; +import akka.japi.pf.ReceiveBuilder; import akka.pattern.BackoffSupervisor; import akka.persistence.*; import akka.persistence.query.*; import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; +import docs.persistence.query.PersistenceQueryDocSpec; +import org.reactivestreams.Subscriber; import scala.collection.Seq; +import scala.collection.immutable.Vector; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.Boxed; import scala.runtime.BoxedUnit; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; public class PersistenceQueryDocTest { - final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS)); + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); //#my-read-journal - class MyReadJournal implements ReadJournal { - private final ExtendedActorSystem system; + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; public MyReadJournal(ExtendedActorSystem system) { this.system = system; } - final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); - - @SuppressWarnings("unchecked") - public Source query(Query q, Hint... hints) { - if (q instanceof EventsByTag) { - final EventsByTag eventsByTag = (EventsByTag) q; - final String tag = eventsByTag.tag(); - long offset = eventsByTag.offset(); - - final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); - - return (Source) Source.actorPublisher(props) - .mapMaterializedValue(noMaterializedValue()); - } else { - // unsuported - return Source.failed( - new UnsupportedOperationException( - "Query $unsupported not supported by " + getClass().getName())) - .mapMaterializedValue(noMaterializedValue()); - } - } + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); + + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); - private FiniteDuration refreshInterval(Hint[] hints) { - FiniteDuration ret = defaultRefreshInterval; - for (Hint hint : hints) - if (hint instanceof RefreshInterval) - ret = ((RefreshInterval) hint).interval(); - return ret; + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query " + q + " not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); } + } + + private FiniteDuration refreshInterval(Hint[] hints) { + for (Hint hint : hints) + if (hint instanceof RefreshInterval) + return ((RefreshInterval) hint).interval(); + + return defaultRefreshInterval; + } + + private akka.japi.function.Function noMaterializedValue() { + return param -> (M) null; + } + } + //#my-read-journal + + void demonstrateBasicUsage() { + final ActorSystem system = ActorSystem.create(); + + //#basic-usage + // obtain read journal by plugin id + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + // issue query to journal + Source source = + readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); + + // materialize stream, consuming events + ActorMaterializer mat = ActorMaterializer.create(system); + source.runForeach(event -> System.out.println("Event: " + event), mat); + //#basic-usage + } + + void demonstrateAllPersistenceIdsLive() { + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds.getInstance()); + //#all-persistence-ids-live + } + + void demonstrateNoRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance()); + //#all-persistence-ids-snap + } + + void demonstrateRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); - private akka.japi.function.Function noMaterializedValue () { - return param -> (M) null; - } + //#events-by-persistent-id-refresh + final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS); + readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh); + //#events-by-persistent-id-refresh } - //#my-read-journal + + void demonstrateEventsByTag() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + final Source blueThings = + readJournal.query(EventsByTag.create("blue")); + + // find top 10 blue things: + final Future> top10BlueThings = + (Future>) blueThings + .map(t -> t.event()) + .take(10) // cancels the query stream after pulling 10 elements + .>runFold(new ArrayList<>(10), (acc, e) -> { + acc.add(e); + return acc; + }, mat); + + // start another query, from the known offset + Source blue = readJournal.query(EventsByTag.create("blue", 10)); + //#events-by-tag + } + //#materialized-query-metadata-classes + // a plugin can provide: + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class QueryMetadata { + public final boolean deterministicOrder; + public final boolean infinite; + + public QueryMetadata(Boolean deterministicOrder, Boolean infinite) { + this.deterministicOrder = deterministicOrder; + this.infinite = infinite; + } + } + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class AllEvents implements Query { + private AllEvents() {} + private static AllEvents INSTANCE = new AllEvents(); + } + + //#materialized-query-metadata-classes + + void demonstrateMaterializedQueryValues() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#materialized-query-metadata + + final Source events = readJournal.query(AllEvents.INSTANCE); + + events.mapMaterializedValue(meta -> { + System.out.println("The query is: " + + "ordered deterministically: " + meta.deterministicOrder + " " + + "infinite: " + meta.infinite); + return meta; + }); + //#materialized-query-metadata + } + + class ReactiveStreamsCompatibleDBDriver { + Subscriber> batchWriter() { + return null; + } + } + + void demonstrateWritingIntoDifferentStore() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-rs + final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver(); + final Subscriber> dbBatchWriter = driver.batchWriter(); + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId.create("user-1337")) + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-simple-classes + class ExampleStore { + Future save(Object any) { + // ... + //#projection-into-different-store-simple-classes + return null; + //#projection-into-different-store-simple-classes + } + } + //#projection-into-different-store-simple-classes + + void demonstrateWritingIntoDifferentStoreWithMapAsync() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-simple + final ExampleStore store = new ExampleStore(); + + readJournal + .query(EventsByTag.create("bid")) + .mapAsync(1, store::save) + .runWith(Sink.ignore(), mat); + //#projection-into-different-store-simple + } + + //#projection-into-different-store + class MyResumableProjection { + private final String name; + + public MyResumableProjection(String name) { + this.name = name; + } + + public Future saveProgress(long offset) { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + public Future latestOffset() { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + } + //#projection-into-different-store + + + void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-actor-run + final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); + + final MyResumableProjection bidProjection = new MyResumableProjection("bid"); + + final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); + final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); + + long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration()); + + readJournal + .query(EventsByTag.create("bid", startFromOffset)) + .mapAsync(8, envelope -> { + final Future f = ask(writer, envelope.event(), timeout); + return f.map(new Mapper() { + @Override public Long apply(Object in) { + return envelope.offset(); + } + }, system.dispatcher()); + }) + .mapAsync(1, offset -> bidProjection.saveProgress(offset)) + .runWith(Sink.ignore(), mat); + } + + //#projection-into-different-store-actor-run + + class ComplexState { + + boolean readyToSave() { + return false; + } + } + + static class Record { + static Record of(Object any) { + return new Record(); + } + } + + //#projection-into-different-store-actor + final class TheOneWhoWritesToQueryJournal extends AbstractActor { + private final ExampleStore store; + + private ComplexState state = new ComplexState(); + + public TheOneWhoWritesToQueryJournal() { + store = new ExampleStore(); + + receive(ReceiveBuilder.matchAny(message -> { + state = updateState(state, message); + + // example saving logic that requires state to become ready: + if (state.readyToSave()) + store.save(Record.of(state)); + + }).build()); + } + + + ComplexState updateState(ComplexState state, Object msg) { + // some complicated aggregation logic here ... + return state; + } + } + //#projection-into-different-store-actor + } diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java index b4353623ee7..0053246d0ff 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -5,6 +5,7 @@ package docs.persistence.query; import akka.actor.Cancellable; +import akka.actor.Scheduler; import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; import akka.persistence.PersistentRepr; @@ -23,6 +24,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import static java.util.stream.Collectors.toList; @@ -39,21 +41,28 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { private final String CONTINUE = "CONTINUE"; private final int LIMIT = 1000; private long currentOffset; - private List buf = new ArrayList<>(); + private List buf = new LinkedList<>(); private Cancellable continueTask; - public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) { + public MyEventsByTagJavaPublisher(Connection connection, + String tag, + Long offset, + FiniteDuration refreshInterval) { this.connection = connection; this.tag = tag; this.currentOffset = offset; - this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self()); + final Scheduler scheduler = context().system().scheduler(); + this.continueTask = scheduler + .schedule(refreshInterval, refreshInterval, self(), CONTINUE, + context().dispatcher(), self()); + receive(ReceiveBuilder - .matchEquals(CONTINUE, (in) -> { - query(); - deliverBuf(); - }) + .matchEquals(CONTINUE, (in) -> { + query(); + deliverBuf(); + }) .match(Cancel.class, (in) -> { context().stop(self()); }) @@ -71,33 +80,33 @@ public void postStop() { private void query() { if (buf.isEmpty()) { - try { - PreparedStatement s = connection.prepareStatement( - "SELECT id, persistent_repr " + - "FROM journal WHERE tag = ? AND id >= ? " + - "ORDER BY id LIMIT ?"); + final String query = "SELECT id, persistent_repr " + + "FROM journal WHERE tag = ? AND id >= ? " + + "ORDER BY id LIMIT ?"; + try (PreparedStatement s = connection.prepareStatement(query)) { s.setString(1, tag); s.setLong(2, currentOffset); s.setLong(3, LIMIT); - final ResultSet rs = s.executeQuery(); + try (ResultSet rs = s.executeQuery()) { - final List> res = new ArrayList<>(LIMIT); - while (rs.next()) - res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); + final List> res = new ArrayList<>(LIMIT); + while (rs.next()) + res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); - if (!res.isEmpty()) { - currentOffset = res.get(res.size() - 1).first(); - } + if (!res.isEmpty()) { + currentOffset = res.get(res.size() - 1).first(); + } - buf = res.stream().map(in -> { - final Long id = in.first(); - final byte[] bytes = in.second(); + buf = res.stream().map(in -> { + final Long id = in.first(); + final byte[] bytes = in.second(); - final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); - return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); - }).collect(toList()); + return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + }).collect(toList()); + } } catch(Exception e) { onErrorThenStop(e); } diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index cb816a1f265..b35f9343250 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. -While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries -to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more -complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some -transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query -- pulling out of one Journal and storing into another one. - .. warning:: This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to @@ -58,7 +52,7 @@ journal is as simple as: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage -Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via ``getJournalFor(NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. -Again, specific capabilities are specific to the journal you are using, so you have to Materialized values of queries @@ -133,6 +126,7 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their specialised query object, as demonstrated in the sample below: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata .. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values @@ -152,18 +146,18 @@ means that data stores which are able to scale to accomodate these requirements On the other hand the same application may have some complex statistics view or we may have analists working with the data to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like -for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be projected into the other read-optimised datastore. .. note:: When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". - In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format it may be more efficient or interesting to query it (instead of the source events directly). Materialize view to Reactive Streams compatible datastore --------------------------------------------------------- -If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs @@ -179,6 +173,7 @@ you may have to implement the write logic using plain functions or Actors instea In case your write logic is state-less and you just need to convert the events from one data data type to another before writing into the alternative datastore, then the projection is as simple as: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple Resumable projections diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index 2f36623436d..69d7a700471 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -11,7 +11,6 @@ import akka.serialization.SerializationExtension import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } -import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration object MyEventsByTagPublisher { @@ -90,17 +89,15 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD } } - @tailrec final def deliverBuf(): Unit = + final def deliverBuf(): Unit = if (totalDemand > 0 && buf.nonEmpty) { if (totalDemand <= Int.MaxValue) { val (use, keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { - val (use, keep) = buf.splitAt(Int.MaxValue) - buf = keep - use foreach onNext - deliverBuf() + buf foreach onNext + buf = Vector.empty } } } diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst index 319e1ac56bd..b5e9023f185 100644 --- a/akka-docs/rst/scala/persistence-query.rst +++ b/akka-docs/rst/scala/persistence-query.rst @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. -While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries -to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more -complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some -transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query -- pulling out of one Journal and storing into another one. - .. warning:: This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to @@ -58,7 +52,7 @@ journal is as simple as: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage -Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via ``journalFor(NoopJournal.identifier)``, however this is not enforced. Read journal implementations are available as `Community plugins`_. @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin ``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the -persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve this, which can be configured using the ``RefreshInterval`` query hint: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. -Again, specific capabilities are specific to the journal you are using, so you have to Materialized values of queries @@ -152,18 +145,18 @@ means that data stores which are able to scale to accomodate these requirements On the other hand the same application may have some complex statistics view or we may have analists working with the data to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like -for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be projected into the other read-optimised datastore. .. note:: When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". - In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format it may be more efficient or interesting to query it (instead of the source events directly). Materialize view to Reactive Streams compatible datastore --------------------------------------------------------- -If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: .. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java index c776b92c379..6bf5be87f8d 100644 --- a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java @@ -26,7 +26,7 @@ * * final Source<EventEnvelope, ?> events = * journal.query(new EventsByTag("mytag", 0L)); - * + * */ public interface ReadJournal { diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala index 5b6e35e2634..62a48a942dd 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -3,6 +3,8 @@ */ package akka.persistence.query +import java.util.concurrent.TimeUnit + import scala.concurrent.duration.FiniteDuration /** @@ -21,6 +23,12 @@ trait Hint * A plugin may optionally support this [[Hint]] for defining such a refresh interval. */ final case class RefreshInterval(interval: FiniteDuration) extends Hint +object RefreshInterval { + /** Java API */ + def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit)) + /** Java API */ + def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval) +} /** * Indicates that the event stream is supposed to be completed immediately when it diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 57ec6fc9d89..9eb150e62c2 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -67,10 +67,9 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") - val pluginActorName = configPath val pluginConfig = system.settings.config.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") - log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") + log.debug(s"Create plugin: ${configPath} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get // TODO remove duplication diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala index fdca0ec0c3a..f63b263e6ea 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -20,7 +20,11 @@ trait Query[T, M] * * A plugin may optionally support this [[Query]]. */ -final case object AllPersistenceIds extends Query[String, Unit] +final case object AllPersistenceIds extends AllPersistenceIds { + /** Java API */ + final def getInstance: AllPersistenceIds = this +} +abstract class AllPersistenceIds extends Query[String, Unit] /** * Query events for a specific `PersistentActor` identified by `persistenceId`. @@ -34,7 +38,19 @@ final case object AllPersistenceIds extends Query[String, Unit] */ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) extends Query[Any, Unit] +object EventsByPersistenceId { + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr) + + /** Java API */ + def create(persistenceId: String): EventsByPersistenceId = + EventsByPersistenceId(persistenceId) +} /** * Query events that have a specific tag. A tag can for example correspond to an * aggregate root type (in DDD terminology). @@ -56,6 +72,12 @@ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Lo * A plugin may optionally support this [[Query]]. */ final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] +object EventsByTag { + /** Java API */ + def create(tag: String): EventsByTag = EventsByTag(tag) + /** Java API */ + def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag) +} /** * Event wrapper adding meta data for the events in the result stream of diff --git a/lol b/lol new file mode 100644 index 00000000000..6c9058697db --- /dev/null +++ b/lol @@ -0,0 +1,172 @@ +adoc-api/akka/persistence/query/scaladsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/scaladsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for q +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for hints +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @return +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:23: error: end tag missing: +[error] *

+[error]         ^
+[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:29: error: unexpected end tag: 
+[error]  * 
+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/AllPersistenceIds.java:4: warning: no @return +[error] static public final akka.persistence.query.AllPersistenceIds getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for toSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for toSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for offset +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for offset +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Hint.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh.java:4: warning: no @return +[error] static public akka.persistence.query.NoRefresh getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh$.java:5: warning: empty

tag +[error] *

+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Query.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for length +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for unit +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @param for interval +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for length +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for unit +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @param for interval +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ \ No newline at end of file