diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java new file mode 100644 index 00000000000..7da4adfb31b --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -0,0 +1,376 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.persistence; + +import static akka.pattern.Patterns.ask; + +import akka.actor.*; +import akka.dispatch.Mapper; +import akka.event.EventStreamSpec; +import akka.japi.Function; +import akka.japi.Procedure; +import akka.japi.pf.ReceiveBuilder; +import akka.pattern.BackoffSupervisor; +import akka.persistence.*; +import akka.persistence.query.*; +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.Timeout; +import docs.persistence.query.MyEventsByTagPublisher; +import docs.persistence.query.PersistenceQueryDocSpec; +import org.reactivestreams.Subscriber; +import scala.collection.Seq; +import scala.collection.immutable.Vector; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.Boxed; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class PersistenceQueryDocTest { + + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + //#my-read-journal + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; + + public MyReadJournal(ExtendedActorSystem system) { + this.system = system; + } + + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); + + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); + + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query " + q + " not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); + } + } + + private FiniteDuration refreshInterval(Hint[] hints) { + for (Hint hint : hints) + if (hint instanceof RefreshInterval) + return ((RefreshInterval) hint).interval(); + + return defaultRefreshInterval; + } + + private akka.japi.function.Function noMaterializedValue() { + return param -> (M) null; + } + } + //#my-read-journal + + void demonstrateBasicUsage() { + final ActorSystem system = ActorSystem.create(); + + //#basic-usage + // obtain read journal by plugin id + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + // issue query to journal + Source source = + readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); + + // materialize stream, consuming events + ActorMaterializer mat = ActorMaterializer.create(system); + source.runForeach(event -> System.out.println("Event: " + event), mat); + //#basic-usage + } + + void demonstrateAllPersistenceIdsLive() { + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds.getInstance()); + //#all-persistence-ids-live + } + + void demonstrateNoRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance()); + //#all-persistence-ids-snap + } + + void demonstrateRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-persistent-id-refresh + final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS); + readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh); + //#events-by-persistent-id-refresh + } + + void demonstrateEventsByTag() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + final Source blueThings = + readJournal.query(EventsByTag.create("blue")); + + // find top 10 blue things: + final Future> top10BlueThings = + (Future>) blueThings + .map(t -> t.event()) + .take(10) // cancels the query stream after pulling 10 elements + .>runFold(new ArrayList<>(10), (acc, e) -> { + acc.add(e); + return acc; + }, mat); + + // start another query, from the known offset + Source blue = readJournal.query(EventsByTag.create("blue", 10)); + //#events-by-tag + } + //#materialized-query-metadata-classes + // a plugin can provide: + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class QueryMetadata { + public final boolean deterministicOrder; + public final boolean infinite; + + public QueryMetadata(Boolean deterministicOrder, Boolean infinite) { + this.deterministicOrder = deterministicOrder; + this.infinite = infinite; + } + } + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class AllEvents implements Query { + private AllEvents() {} + private static AllEvents INSTANCE = new AllEvents(); + } + + //#materialized-query-metadata-classes + + void demonstrateMaterializedQueryValues() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#materialized-query-metadata + + final Source events = readJournal.query(AllEvents.INSTANCE); + + events.mapMaterializedValue(meta -> { + System.out.println("The query is: " + + "ordered deterministically: " + meta.deterministicOrder + " " + + "infinite: " + meta.infinite); + return meta; + }); + //#materialized-query-metadata + } + + class ReactiveStreamsCompatibleDBDriver { + Subscriber> batchWriter() { + return null; + } + } + + void demonstrateWritingIntoDifferentStore() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-rs + final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver(); + final Subscriber> dbBatchWriter = driver.batchWriter(); + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId.create("user-1337")) + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-simple-classes + class ExampleStore { + Future save(Object any) { + // ... + //#projection-into-different-store-simple-classes + return null; + //#projection-into-different-store-simple-classes + } + } + //#projection-into-different-store-simple-classes + + void demonstrateWritingIntoDifferentStoreWithMapAsync() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-simple + final ExampleStore store = new ExampleStore(); + + readJournal + .query(EventsByTag.create("bid")) + .mapAsync(1, store::save) + .runWith(Sink.ignore(), mat); + //#projection-into-different-store-simple + } + + //#projection-into-different-store + class MyResumableProjection { + private final String name; + + public MyResumableProjection(String name) { + this.name = name; + } + + public Future saveProgress(long offset) { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + public Future latestOffset() { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + } + //#projection-into-different-store + + + void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-actor-run + final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); + + final MyResumableProjection bidProjection = new MyResumableProjection("bid"); + + final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); + final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); + + long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration()); + + readJournal + .query(EventsByTag.create("bid", startFromOffset)) + .mapAsync(8, envelope -> { + final Future f = ask(writer, envelope.event(), timeout); + return f.map(new Mapper() { + @Override public Long apply(Object in) { + return envelope.offset(); + } + }, system.dispatcher()); + }) + .mapAsync(1, offset -> bidProjection.saveProgress(offset)) + .runWith(Sink.ignore(), mat); + } + + //#projection-into-different-store-actor-run + + class ComplexState { + + boolean readyToSave() { + return false; + } + } + + static class Record { + static Record of(Object any) { + return new Record(); + } + } + + //#projection-into-different-store-actor + final class TheOneWhoWritesToQueryJournal extends AbstractActor { + private final ExampleStore store; + + private ComplexState state = new ComplexState(); + + public TheOneWhoWritesToQueryJournal() { + store = new ExampleStore(); + + receive(ReceiveBuilder.matchAny(message -> { + state = updateState(state, message); + + // example saving logic that requires state to become ready: + if (state.readyToSave()) + store.save(Record.of(state)); + + }).build()); + } + + + ComplexState updateState(ComplexState state, Object msg) { + // some complicated aggregation logic here ... + return state; + } + } + //#projection-into-different-store-actor + +} diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java new file mode 100644 index 00000000000..0053246d0ff --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query; + +import akka.actor.Cancellable; +import akka.actor.Scheduler; +import akka.japi.Pair; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.PersistentRepr; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import akka.stream.actor.AbstractActorPublisher; +import scala.Int; + +import akka.actor.Props; +import akka.persistence.query.EventEnvelope; +import akka.stream.actor.ActorPublisherMessage.Cancel; + +import scala.concurrent.duration.FiniteDuration; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +//#events-by-tag-publisher +class MyEventsByTagJavaPublisher extends AbstractActorPublisher { + private final Serialization serialization = + SerializationExtension.get(context().system()); + + private final Connection connection; + + private final String tag; + + private final String CONTINUE = "CONTINUE"; + private final int LIMIT = 1000; + private long currentOffset; + private List buf = new LinkedList<>(); + + private Cancellable continueTask; + + public MyEventsByTagJavaPublisher(Connection connection, + String tag, + Long offset, + FiniteDuration refreshInterval) { + this.connection = connection; + this.tag = tag; + this.currentOffset = offset; + + final Scheduler scheduler = context().system().scheduler(); + this.continueTask = scheduler + .schedule(refreshInterval, refreshInterval, self(), CONTINUE, + context().dispatcher(), self()); + + receive(ReceiveBuilder + .matchEquals(CONTINUE, (in) -> { + query(); + deliverBuf(); + }) + .match(Cancel.class, (in) -> { + context().stop(self()); + }) + .build()); + } + + public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) { + return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); + } + + @Override + public void postStop() { + continueTask.cancel(); + } + + private void query() { + if (buf.isEmpty()) { + final String query = "SELECT id, persistent_repr " + + "FROM journal WHERE tag = ? AND id >= ? " + + "ORDER BY id LIMIT ?"; + + try (PreparedStatement s = connection.prepareStatement(query)) { + s.setString(1, tag); + s.setLong(2, currentOffset); + s.setLong(3, LIMIT); + try (ResultSet rs = s.executeQuery()) { + + final List> res = new ArrayList<>(LIMIT); + while (rs.next()) + res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); + + if (!res.isEmpty()) { + currentOffset = res.get(res.size() - 1).first(); + } + + buf = res.stream().map(in -> { + final Long id = in.first(); + final byte[] bytes = in.second(); + + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); + + return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + }).collect(toList()); + } + } catch(Exception e) { + onErrorThenStop(e); + } + } + } + + private void deliverBuf() { + while (totalDemand() > 0 && !buf.isEmpty()) + onNext(buf.remove(0)); + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 20b567295b5..173a0fd58e5 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -12,4 +12,5 @@ Actors routing fsm persistence + persistence-query testing diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index b0267dc7364..4be673daaef 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -61,7 +61,7 @@ Architecture * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst new file mode 100644 index 00000000000..b35f9343250 --- /dev/null +++ b/akka-docs/rst/java/persistence-query.rst @@ -0,0 +1,229 @@ +.. _persistence-query-java: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-java` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage + +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via +``getJournalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs + +.. _Reactive Streams: http://reactive-streams.org + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple-classes +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-java: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagJavaPublisher.java#events-by-tag-publisher + +Plugin TCK +---------- + +TODO, not available yet. + diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 8c84f71eb64..e2e7dfba936 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -65,7 +65,7 @@ Architecture * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. @@ -610,7 +610,7 @@ completely. Event Adapters help in situations where: -- **Version Migration** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, +- **Version Migrations** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as ``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala new file mode 100644 index 00000000000..69d7a700471 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor.Props +import akka.persistence.PersistentRepr +import akka.persistence.query.EventEnvelope +import akka.serialization.SerializationExtension +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } + +import scala.concurrent.duration.FiniteDuration + +object MyEventsByTagPublisher { + def props(tag: String, offset: Long, refreshInterval: FiniteDuration): Props = + Props(new MyEventsByTagPublisher(tag, offset, refreshInterval)) +} + +//#events-by-tag-publisher +class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) + extends ActorPublisher[EventEnvelope] { + + private case object Continue + + private val connection: java.sql.Connection = ??? + + private val Limit = 1000 + private var currentOffset = offset + var buf = Vector.empty[EventEnvelope] + + import context.dispatcher + val continueTask = context.system.scheduler.schedule( + refreshInterval, refreshInterval, self, Continue) + + override def postStop(): Unit = { + continueTask.cancel() + } + + def receive = { + case _: Request | Continue ⇒ + query() + deliverBuf() + + case Cancel ⇒ + context.stop(self) + } + + object Select { + private def statement() = connection.prepareStatement( + """ + SELECT id, persistent_repr FROM journal + WHERE tag = ? AND id >= ? + ORDER BY id LIMIT ? + """) + + def run(tag: String, from: Long, limit: Int): Vector[(Long, Array[Byte])] = { + val s = statement() + try { + s.setString(1, tag) + s.setLong(2, from) + s.setLong(3, limit) + val rs = s.executeQuery() + + val b = Vector.newBuilder[(Long, Array[Byte])] + while (rs.next()) + b += (rs.getLong(1) -> rs.getBytes(2)) + b.result() + } finally s.close() + } + } + + def query(): Unit = + if (buf.isEmpty) { + try { + val result = Select.run(tag, currentOffset, Limit) + currentOffset = if (result.nonEmpty) result.last._1 else currentOffset + val serialization = SerializationExtension(context.system) + + buf = result.map { + case (id, bytes) ⇒ + val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get + EventEnvelope(offset = id, p.persistenceId, p.sequenceNr, p.payload) + } + } catch { + case e: Exception ⇒ + onErrorThenStop(e) + } + } + + final def deliverBuf(): Unit = + if (totalDemand > 0 && buf.nonEmpty) { + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + buf foreach onNext + buf = Vector.empty + } + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala new file mode 100644 index 00000000000..4d3963a04f7 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -0,0 +1,259 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor._ +import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.{ Recovery, PersistentActor } +import akka.persistence.query._ +import akka.stream.{ FlowShape, ActorMaterializer } +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.testkit.AkkaSpec +import akka.util.Timeout +import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal } +import org.reactivestreams.Subscriber + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +object PersistenceQueryDocSpec { + + implicit val timeout = Timeout(3.seconds) + + //#my-read-journal + class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { + + private val defaulRefreshInterval = 3.seconds + + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + q match { + case EventsByTag(tag, offset) ⇒ + val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)) + Source.actorPublisher[EventEnvelope](props) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + + case unsupported ⇒ + Source.failed[T]( + new UnsupportedOperationException( + s"Query $unsupported not supported by ${getClass.getName}")) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + } + + private def refreshInterval(hints: Seq[Hint]): FiniteDuration = + hints.collectFirst { case RefreshInterval(interval) ⇒ interval } + .getOrElse(defaulRefreshInterval) + + private def noMaterializedValue[M]: M = + null.asInstanceOf[M] + } + + //#my-read-journal + case class ComplexState() { + def readyToSave = false + } + case class Record(any: Any) + class DummyStore { def save(record: Record) = Future.successful(42L) } + + class X { + val JournalId = "" + + def convertToReadSideTypes(in: Any): Any = ??? + + object ReactiveStreamsCompatibleDBDriver { + def batchWriter: Subscriber[immutable.Seq[Any]] = ??? + } + + //#projection-into-different-store-rs + implicit val system = ActorSystem() + implicit val mat = ActorMaterializer() + + val readJournal = PersistenceQuery(system).readJournalFor(JournalId) + val dbBatchWriter: Subscriber[immutable.Seq[Any]] = + ReactiveStreamsCompatibleDBDriver.batchWriter + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId("user-1337")) + .map(convertToReadSideTypes) // convert to datatype + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink(dbBatchWriter)) // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-actor + class TheOneWhoWritesToQueryJournal(id: String) extends Actor { + val store = new DummyStore() + + var state: ComplexState = ComplexState() + + def receive = { + case m => + state = updateState(state, m) + if (state.readyToSave) store.save(Record(state)) + } + + def updateState(state: ComplexState, msg: Any): ComplexState = { + // some complicated aggregation logic here ... + state + } + } + + //#projection-into-different-store-actor + +} + +class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { + + def this() { + this( + """ + akka.persistence.query.noop-read-journal { + class = "docs.persistence.query.NoopReadJournal" + } + """.stripMargin) + } + + //#basic-usage + // obtain read journal by plugin id + val readJournal = + PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal") + + // issue query to journal + val source: Source[Any, Unit] = + readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue)) + + // materialize stream, consuming events + implicit val mat = ActorMaterializer() + source.runForeach { event => println("Event: " + event) } + //#basic-usage + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds) + //#all-persistence-ids-live + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds, hints = NoRefresh) + //#all-persistence-ids-snap + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + + val blueThings: Source[EventEnvelope, Unit] = + readJournal.query(EventsByTag("blue")) + + // find top 10 blue things: + val top10BlueThings: Future[Vector[Any]] = + blueThings + .map(_.event) + .take(10) // cancels the query stream after pulling 10 elements + .runFold(Vector.empty[Any])(_ :+ _) + + // start another query, from the known offset + val furtherBlueThings = readJournal.query(EventsByTag("blue", offset = 10)) + //#events-by-tag + + //#events-by-persistent-id-refresh + readJournal.query(EventsByPersistenceId("user-us-1337"), hints = RefreshInterval(1.second)) + + //#events-by-persistent-id-refresh + + //#advanced-journal-query-definition + final case class RichEvent(tags: immutable.Set[String], payload: Any) + + case class QueryStats(totalEvents: Long) + + case class ByTagsWithStats(tags: immutable.Set[String]) + extends Query[RichEvent, QueryStats] + + //#advanced-journal-query-definition + + //#advanced-journal-query-hints + + import scala.concurrent.duration._ + + readJournal.query(EventsByTag("blue"), hints = RefreshInterval(1.second)) + //#advanced-journal-query-hints + + //#advanced-journal-query-usage + val query: Source[RichEvent, QueryStats] = + readJournal.query(ByTagsWithStats(Set("red", "blue"))) + + query + .mapMaterializedValue { stats => println(s"Stats: $stats") } + .map { event => println(s"Event payload: ${event.payload}") } + .runWith(Sink.ignore) + + //#advanced-journal-query-usage + + //#materialized-query-metadata + // a plugin can provide: + case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) + + case object AllEvents extends Query[Any, QueryMetadata] + + val events = readJournal.query(AllEvents) + events + .mapMaterializedValue { meta => + println(s"The query is: " + + s"ordered deterministically: ${meta.deterministicOrder}, " + + s"infinite: ${meta.infinite}") + } + + //#materialized-query-metadata + + //#projection-into-different-store + class MyResumableProjection(name: String) { + def saveProgress(offset: Long): Future[Long] = ??? + def latestOffset: Future[Long] = ??? + } + //#projection-into-different-store + + class RunWithActor { + //#projection-into-different-store-actor-run + import akka.pattern.ask + import system.dispatcher + implicit val timeout = Timeout(3.seconds) + + val bidProjection = new MyResumableProjection("bid") + + val writerProps = Props(classOf[TheOneWhoWritesToQueryJournal], "bid") + val writer = system.actorOf(writerProps, "bid-projection-writer") + + bidProjection.latestOffset.foreach { startFromOffset => + readJournal + .query(EventsByTag("bid", startFromOffset)) + .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) } + .mapAsync(1) { offset => bidProjection.saveProgress(offset) } + .runWith(Sink.ignore) + } + //#projection-into-different-store-actor-run + } + + class RunWithAsyncFunction { + //#projection-into-different-store-simple + trait ExampleStore { + def save(event: Any): Future[Unit] + } + //#projection-into-different-store-simple + + //#projection-into-different-store-simple + val store: ExampleStore = ??? + + readJournal + .query(EventsByTag("bid")) + .mapAsync(1) { e => store.save(e) } + .runWith(Sink.ignore) + //#projection-into-different-store-simple + } + +} + +class NoopReadJournal(sys: ExtendedActorSystem) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source.empty.mapMaterializedValue(_ => null.asInstanceOf[M]) +} diff --git a/akka-docs/rst/scala/index-actors.rst b/akka-docs/rst/scala/index-actors.rst index 0e77f0c2258..ca09504ab76 100644 --- a/akka-docs/rst/scala/index-actors.rst +++ b/akka-docs/rst/scala/index-actors.rst @@ -12,6 +12,7 @@ Actors routing fsm persistence + persistence-query testing actordsl typed-actors diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst new file mode 100644 index 00000000000..b5e9023f185 --- /dev/null +++ b/akka-docs/rst/scala/persistence-query.rst @@ -0,0 +1,228 @@ +.. _persistence-query-scala: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-scala` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage + +Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via +``journalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the :class:`ReadJournal` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs + +.. _Reactive Streams: http://reactive-streams.org + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-scala: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher + +Plugin TCK +---------- + +TODO, not available yet. + + diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 6d516ff6ecd..565a0f636b8 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -49,7 +49,7 @@ Architecture * *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. @@ -619,7 +619,6 @@ Event Adapters help in situations where: understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the json instead of serializing the object to its binary representation. - Implementing an EventAdapter is rather stright forward: .. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#identity-event-adapter diff --git a/akka-persistence-query/build.sbt b/akka-persistence-query/build.sbt new file mode 100644 index 00000000000..1b57eef61d2 --- /dev/null +++ b/akka-persistence-query/build.sbt @@ -0,0 +1,16 @@ +import akka.{ AkkaBuild, Dependencies, Formatting, ScaladocNoVerificationOfDiagrams, OSGi } +import com.typesafe.tools.mima.plugin.MimaKeys + +AkkaBuild.defaultSettings + +AkkaBuild.experimentalSettings + +Formatting.formatSettings + +OSGi.persistenceQuery + +Dependencies.persistenceQuery + +//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value + +enablePlugins(ScaladocNoVerificationOfDiagrams) diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java new file mode 100644 index 00000000000..6bf5be87f8d --- /dev/null +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl; + +import akka.persistence.query.Query; +import akka.persistence.query.Hint; +import akka.stream.javadsl.Source; +import scala.annotation.varargs; + +/** + * Java API + *

+ * API for reading persistent events and information derived + * from stored persistent events. + *

+ * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + *

+ * Usage: + *


+ * final ReadJournal journal =
+ *   PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
+ *
+ * final Source<EventEnvelope, ?> events =
+ *   journal.query(new EventsByTag("mytag", 0L));
+ * 
+ */ + +public interface ReadJournal { + + /** + * Java API + *

+ * A query that returns a `Source` with output type `T` and materialized value `M`. + *

+ * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + Source query(Query q, Hint... hints); + +} diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java new file mode 100644 index 00000000000..96866071fb6 --- /dev/null +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl; + +import akka.japi.Util; +import akka.persistence.query.Hint; +import akka.persistence.query.Query; +import akka.stream.javadsl.Source; + +/** + * INTERNAL API + * + * Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal} + * to JavaDSL {@link ReadJournal}. + */ +public final class ReadJournalAdapter implements ReadJournal { + + private final akka.persistence.query.scaladsl.ReadJournal backing; + + public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) { + this.backing = backing; + } + + @Override + public Source query(Query q, Hint... hints) { + return backing.query(q, Util.immutableSeq(hints)).asJava(); + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala new file mode 100644 index 00000000000..62a48a942dd --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.FiniteDuration + +/** + * A query hint that defines how to execute the query, + * typically specific to the journal implementation. + * + * A plugin may optionally support a [[Hint]]. + */ +trait Hint + +/** + * If the underlying datastore only supports queries that are completed when they reach the + * end of the "result set", the journal has to submit new queries after a while in order + * to support "infinite" event streams that include events stored after the initial query has completed. + * + * A plugin may optionally support this [[Hint]] for defining such a refresh interval. + */ +final case class RefreshInterval(interval: FiniteDuration) extends Hint +object RefreshInterval { + /** Java API */ + def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit)) + /** Java API */ + def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval) +} + +/** + * Indicates that the event stream is supposed to be completed immediately when it + * reaches the end of the "result set", as described in [[RefreshInterval]]. + * + */ +final case object NoRefresh extends NoRefresh { + /** Java API */ + def getInstance: NoRefresh = this +} +sealed class NoRefresh extends Hint + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala new file mode 100644 index 00000000000..9eb150e62c2 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor._ +import akka.event.Logging + +import scala.annotation.tailrec +import scala.util.Failure + +/** + * Persistence extension for queries. + */ +object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider { + /** + * Java API. + */ + override def get(system: ActorSystem): PersistenceQuery = super.get(system) + + def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system) + + def lookup() = PersistenceQuery + + /** INTERNAL API. */ + private[persistence] case class PluginHolder(plugin: scaladsl.ReadJournal) extends Extension + +} + +class PersistenceQuery(system: ExtendedActorSystem) extends Extension { + import PersistenceQuery._ + + private val log = Logging(system, getClass) + + /** Discovered query plugins. */ + private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + + /** + * Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given read journal configuration entry. + */ + @tailrec final def readJournalFor(readJournalPluginId: String): scaladsl.ReadJournal = { + val configPath = readJournalPluginId + val extensionIdMap = readJournalPluginExtensionIds.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).plugin + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = + PluginHolder(createPlugin(configPath)) + } + readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + readJournalFor(readJournalPluginId) // Recursive invocation. + } + } + + /** + * Java API + * + * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. + */ + final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = + new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId)) + + private def createPlugin(configPath: String): scaladsl.ReadJournal = { + require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), + s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") + val pluginConfig = system.settings.config.getConfig(configPath) + val pluginClassName = pluginConfig.getString("class") + log.debug(s"Create plugin: ${configPath} ${pluginClassName}") + val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + + // TODO remove duplication + val scalaPlugin = + if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil)) + .map(jj ⇒ new scaladsl.ReadJournalAdapter(jj)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend") + + scalaPlugin.get + } + + /** Check for default or missing identity. */ + private def isEmpty(text: String) = text == null || text.length == 0 +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala new file mode 100644 index 00000000000..f63b263e6ea --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +/** + * General interface for all queries. There are a few pre-defined queries, + * such as [[EventsByPersistenceId]], [[AllPersistenceIds]] and [[EventsByTag]] + * but implementation of these queries are optional. Query (journal) plugins + * may define their own specialized queries. + * + * If a query plugin does not support a query it will return a stream that + * will be completed with a failure of [[UnsupportedOperationException]]. + */ +trait Query[T, M] + +/** + * Query all `PersistentActor` identifiers, i.e. as defined by the + * `persistenceId` of the `PersistentActor`. + * + * A plugin may optionally support this [[Query]]. + */ +final case object AllPersistenceIds extends AllPersistenceIds { + /** Java API */ + final def getInstance: AllPersistenceIds = this +} +abstract class AllPersistenceIds extends Query[String, Unit] + +/** + * Query events for a specific `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. + * + * The returned event stream should be ordered by sequence number. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) + extends Query[Any, Unit] +object EventsByPersistenceId { + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr) + + /** Java API */ + def create(persistenceId: String): EventsByPersistenceId = + EventsByPersistenceId(persistenceId) +} +/** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] +object EventsByTag { + /** Java API */ + def create(tag: String): EventsByTag = EventsByTag(tag) + /** Java API */ + def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag) +} + +/** + * Event wrapper adding meta data for the events in the result stream of + * [[EventsByTag]] query, or similar queries. + */ +//#event-envelope +final case class EventEnvelope( + offset: Long, + persistenceId: String, + sequenceNr: Long, + event: Any) +//#event-envelope diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala new file mode 100644 index 00000000000..936aadb7d96 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.scaladsl + +import akka.persistence.query.{ Hint, Query } +import akka.stream.scaladsl.Source + +/** + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * Usage: + * {{{ + * val journal = PersistenceQuery(system).readJournalFor(queryPluginConfigPath) + * val events = journal.query(EventsByTag("mytag", 0L)) + * }}} + * + * For Java API see [[akka.persistence.query.javadsl.ReadJournal]]. + */ +abstract class ReadJournal { + + /** + * A query that returns a `Source` with output type `T` and materialized + * value `M`. + * + * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] + +} + +/** INTERNAL API */ +private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asScala +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java new file mode 100644 index 00000000000..1b0050cd814 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.util.Iterator; + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockJavaReadJournal implements ReadJournal { + public static final String Identifier = "akka.persistence.query.journal.mock-java"; + + public static final Config config = ConfigFactory.parseString( + Identifier + " { \n" + + " class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" + + " }\n\n"); + + @Override + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + return (Source) Source.fromIterator(() -> new Iterator() { + private int i = 0; + @Override public boolean hasNext() { return true; } + + @Override public String next() { + return "" + (i++); + } + }); + } +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java new file mode 100644 index 00000000000..2ab31f83010 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.actor.ActorSystem; +import akka.persistence.query.javadsl.ReadJournal; +import akka.testkit.AkkaJUnitActorSystemResource; +import org.junit.ClassRule; +import scala.runtime.BoxedUnit; + +public class PersistenceQueryTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource(PersistenceQueryTest.class.getName()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + private final Hint hint = NoRefresh.getInstance(); + + // compile-only test + @SuppressWarnings("unused") + public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception { + final ReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor("noop-journal"); + final akka.stream.javadsl.Source tag = readJournal.query(new EventsByTag("tag", 0L), hint, hint); // java varargs + } +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala new file mode 100644 index 00000000000..ea813a1cd53 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import akka.stream.scaladsl.Source +import com.typesafe.config.{ Config, ConfigFactory } + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockReadJournal extends scaladsl.ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source(() ⇒ Iterator.from(0)).map(_.toString).asInstanceOf[Source[T, M]] +} + +object MockReadJournal { + final val Identifier = "akka.persistence.query.journal.mock" + + final val config: Config = ConfigFactory.parseString( + s""" + |$Identifier { + | class = "${classOf[MockReadJournal].getCanonicalName}" + |} + """.stripMargin) +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala new file mode 100644 index 00000000000..a7778c00bcb --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem +import akka.persistence.journal.{ EventAdapter, EventSeq } +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll { + + val anything: Query[String, _] = null + + val eventAdaptersConfig = + s""" + |akka.persistence.query.journal.mock { + | event-adapters { + | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} + | } + |} + """.stripMargin + + "ReadJournal" must { + "be found by full config key" in { + withActorSystem() { system ⇒ + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier) + } + } + + "throw if unable to find query journal by config key" in { + withActorSystem() { system ⇒ + intercept[IllegalArgumentException] { + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier + "-unknown") + }.getMessage should include("missing persistence read journal") + } + } + + "expose scaladsl implemented journal as javadsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier) + } + } + "expose javadsl implemented journal as scaladsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier) + } + } + } + + private val systemCounter = new AtomicInteger() + private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { + val config = + MockReadJournal.config + .withFallback(MockJavaReadJournal.config) + .withFallback(ConfigFactory.parseString(conf)) + .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) + .withFallback(ConfigFactory.load()) + + val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config) + try block(sys) finally Await.ready(sys.terminate(), 10.seconds) + } +} + +object ExampleQueryModels { + case class OldModel(value: String) { def promote = NewModel(value) } + case class NewModel(value: String) +} + +class PrefixStringWithPAdapter extends EventAdapter { + override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event) + + override def manifest(event: Any) = "" + override def toJournal(event: Any) = throw new Exception("toJournal should not be called by query side") +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 2752e11a5ff..effd6fbd3bf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -213,7 +213,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def journalFor(journalPluginId: String): ActorRef = { + @tailrec private[akka] final def journalFor(journalPluginId: String): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId val extensionIdMap = journalPluginExtensionId.get extensionIdMap.get(configPath) match { @@ -227,12 +227,14 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } /** + * INTERNAL API + * * Returns a snapshot store plugin actor identified by `snapshotPluginId`. * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + @tailrec private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId val extensionIdMap = snapshotPluginExtensionId.get extensionIdMap.get(configPath) match { diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java index fef46cc3162..d660cc79ab0 100644 --- a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java @@ -42,6 +42,7 @@ public class AbstractPersistentFSMTest { private static Option none = Option.none(); + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config( diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 0eb78a9c46a..ff2224e858b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -204,7 +204,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("Failure: wrong-1") expectTerminated(persistentActor) } - "call onPersistFailure and stop if persistAsync fails xoxo" in { + "call onPersistFailure and stop if persistAsync fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala diff --git a/lol b/lol new file mode 100644 index 00000000000..6c9058697db --- /dev/null +++ b/lol @@ -0,0 +1,172 @@ +adoc-api/akka/persistence/query/scaladsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/scaladsl/ReadJournal.java:31: warning: no @param for +[error] public abstract akka.stream.scaladsl.Source query (akka.persistence.query.Query q, scala.collection.Seq hints) ; +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/scaladsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournal.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for q +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @param for hints +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:43: warning: no @return +[error] Source query(Query q, Hint... hints); +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:23: error: end tag missing: +[error] *


+[error]         ^
+[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java:29: error: unexpected end tag: 
+[error]  * 
+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/javadsl/ReadJournalAdapter.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/AllPersistenceIds.java:4: warning: no @return +[error] static public final akka.persistence.query.AllPersistenceIds getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/AllPersistenceIds$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventEnvelope$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @param for toSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:14: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @param for fromSequenceNr +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:16: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @param for persistenceId +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId.java:18: warning: no @return +[error] static public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByPersistenceId$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @param for toSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr, long toSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @param for fromSequenceNr +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId, long fromSequenceNr) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @param for persistenceId +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByPersistenceId$.java:13: warning: no @return +[error] public akka.persistence.query.EventsByPersistenceId create (java.lang.String persistenceId) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:24: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for tag +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @param for offset +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag.java:26: warning: no @return +[error] static public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/EventsByTag$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:9: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for tag +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @param for offset +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/EventsByTag$.java:11: warning: no @return +[error] public akka.persistence.query.EventsByTag create (java.lang.String tag, long offset) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Hint.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh.java:4: warning: no @return +[error] static public akka.persistence.query.NoRefresh getInstance () { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/NoRefresh$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/NoRefresh$.java:5: warning: empty

tag +[error] *

+[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery.PluginHolder$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/PersistenceQuery$.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/Query.html... +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for length +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @param for unit +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:11: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @param for interval +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval.java:13: warning: no @return +[error] static public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[info] Generating /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/genjavadoc-api/akka/persistence/query/RefreshInterval$.html... +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for length +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @param for unit +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:9: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (long length, java.util.concurrent.TimeUnit unit) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @param for interval +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ +[error] /mnt/akka/jenkinsakka/localhome-a0/workspace/pr-validator-per-commit-jenkins/akka-persistence-query/target/java/akka/persistence/query/RefreshInterval$.java:11: warning: no @return +[error] public akka.persistence.query.RefreshInterval create (scala.concurrent.duration.FiniteDuration interval) { throw new RuntimeException(); } +[error] ^ \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 81a989d93a1..0933ea821c5 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -54,7 +54,7 @@ object AkkaBuild extends Build { ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, agent, persistence, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) + slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) ) lazy val akkaScalaNightly = Project( @@ -64,7 +64,7 @@ object AkkaBuild extends Build { // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) + slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) ).disablePlugins(ValidatePullRequest) lazy val actor = Project( @@ -163,6 +163,12 @@ object AkkaBuild extends Build { dependencies = Seq(actor, remote % "test->test", testkit % "test->test") ) + lazy val persistenceQuery = Project( + id = "akka-persistence-query-experimental", + base = file("akka-persistence-query"), + dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") + ) + lazy val persistenceTck = Project( id = "akka-persistence-tck", base = file("akka-persistence-tck"), @@ -192,7 +198,7 @@ object AkkaBuild extends Build { base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", remote % "compile;test->test", cluster, clusterMetrics, slf4j, agent, camel, osgi, - persistence % "compile;provided->provided;test->test", persistenceTck, + persistence % "compile;provided->provided;test->test", persistenceTck, persistenceQuery, typed % "compile;test->test", distributedData) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 02b8b26d317..23b0af2a1f9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,6 +19,10 @@ object Dependencies { object Compile { // Compile + + // Akka Streams // FIXME: change to project dependency once merged before 2.4.0 + val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" + val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2 // when updating config version, update links ActorSystem ScalaDoc to link to the updated version @@ -108,6 +112,8 @@ object Dependencies { val persistence = l ++= Seq(protobuf, Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) + val persistenceQuery = l ++= Seq(akkaStream, Test.scalatest.value, Test.junit, Test.commonsIo) + val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) val kernel = l ++= Seq(Test.scalatest.value, Test.junit) diff --git a/project/OSGi.scala b/project/OSGi.scala index 658c76673b8..ad3900f927e 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -47,6 +47,8 @@ object OSGi { val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport())) + val persistenceQuery = exports(Seq("akka.persistence.query.*")) + val testkit = exports(Seq("akka.testkit.*")) val osgiOptionalImports = Seq(