Skip to content

Commit

Permalink
+per #16541 add missing java samples for persistence query
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Aug 12, 2015
1 parent 3b94108 commit 3314de4
Show file tree
Hide file tree
Showing 10 changed files with 590 additions and 93 deletions.
370 changes: 336 additions & 34 deletions akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package docs.persistence.query;

import akka.actor.Cancellable;
import akka.actor.Scheduler;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.PersistentRepr;
Expand All @@ -23,6 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import static java.util.stream.Collectors.toList;
Expand All @@ -39,21 +41,28 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
private final String CONTINUE = "CONTINUE";
private final int LIMIT = 1000;
private long currentOffset;
private List<EventEnvelope> buf = new ArrayList<>();
private List<EventEnvelope> buf = new LinkedList<>();

private Cancellable continueTask;

public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) {
public MyEventsByTagJavaPublisher(Connection connection,
String tag,
Long offset,
FiniteDuration refreshInterval) {
this.connection = connection;
this.tag = tag;
this.currentOffset = offset;

this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self());
final Scheduler scheduler = context().system().scheduler();
this.continueTask = scheduler
.schedule(refreshInterval, refreshInterval, self(), CONTINUE,
context().dispatcher(), self());

receive(ReceiveBuilder
.matchEquals(CONTINUE, (in) -> {
query();
deliverBuf();
})
.matchEquals(CONTINUE, (in) -> {
query();
deliverBuf();
})
.match(Cancel.class, (in) -> {
context().stop(self());
})
Expand All @@ -71,33 +80,33 @@ public void postStop() {

private void query() {
if (buf.isEmpty()) {
try {
PreparedStatement s = connection.prepareStatement(
"SELECT id, persistent_repr " +
"FROM journal WHERE tag = ? AND id >= ? " +
"ORDER BY id LIMIT ?");
final String query = "SELECT id, persistent_repr " +
"FROM journal WHERE tag = ? AND id >= ? " +
"ORDER BY id LIMIT ?";

try (PreparedStatement s = connection.prepareStatement(query)) {
s.setString(1, tag);
s.setLong(2, currentOffset);
s.setLong(3, LIMIT);
final ResultSet rs = s.executeQuery();
try (ResultSet rs = s.executeQuery()) {

final List<Pair<Long, byte[]>> res = new ArrayList<>(LIMIT);
while (rs.next())
res.add(Pair.create(rs.getLong(1), rs.getBytes(2)));
final List<Pair<Long, byte[]>> res = new ArrayList<>(LIMIT);
while (rs.next())
res.add(Pair.create(rs.getLong(1), rs.getBytes(2)));

if (!res.isEmpty()) {
currentOffset = res.get(res.size() - 1).first();
}
if (!res.isEmpty()) {
currentOffset = res.get(res.size() - 1).first();
}

buf = res.stream().map(in -> {
final Long id = in.first();
final byte[] bytes = in.second();
buf = res.stream().map(in -> {
final Long id = in.first();
final byte[] bytes = in.second();

final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get();
final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get();

return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload());
}).collect(toList());
return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload());
}).collect(toList());
}
} catch(Exception e) {
onErrorThenStop(e);
}
Expand Down
19 changes: 7 additions & 12 deletions akka-docs/rst/java/persistence-query.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side
simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly
recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises.

While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries
to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more
complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some
transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query
- pulling out of one Journal and storing into another one.

.. warning::

This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
Expand Down Expand Up @@ -58,7 +52,7 @@ journal is as simple as:

.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#basic-usage

Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via
Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via
``getJournalFor(NoopJournal.identifier)``, however this is not enforced.

Read journal implementations are available as `Community plugins`_.
Expand Down Expand Up @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin

``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor <event-sourcing>`,
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve
persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve
this, which can be configured using the ``RefreshInterval`` query hint:

.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#events-by-persistent-id-refresh
Expand Down Expand Up @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth
query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams.
For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore
that is able to order events by insertion time it could treat the Long as a timestamp and select only older events.
Again, specific capabilities are specific to the journal you are using, so you have to


Materialized values of queries
Expand All @@ -133,6 +126,7 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered
is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their
specialised query object, as demonstrated in the sample below:

.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes
.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata

.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values
Expand All @@ -152,18 +146,18 @@ means that data stores which are able to scale to accomodate these requirements

On the other hand the same application may have some complex statistics view or we may have analists working with the data
to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like
for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be
for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be
projected into the other read-optimised datastore.

.. note::
When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query".
In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format
In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format
it may be more efficient or interesting to query it (instead of the source events directly).

Materialize view to Reactive Streams compatible datastore
---------------------------------------------------------

If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection
If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection
is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so:

.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-rs
Expand All @@ -179,6 +173,7 @@ you may have to implement the write logic using plain functions or Actors instea
In case your write logic is state-less and you just need to convert the events from one data data type to another
before writing into the alternative datastore, then the projection is as simple as:

.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple-classes
.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple

Resumable projections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import akka.serialization.SerializationExtension
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

object MyEventsByTagPublisher {
Expand Down Expand Up @@ -90,17 +89,15 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
}
}

@tailrec final def deliverBuf(): Unit =
final def deliverBuf(): Unit =
if (totalDemand > 0 && buf.nonEmpty) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
buf foreach onNext
buf = Vector.empty
}
}
}
Expand Down
17 changes: 5 additions & 12 deletions akka-docs/rst/scala/persistence-query.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ side of an application, however it can help to migrate data from the write side
simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly
recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises.

While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries
to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more
complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some
transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query
- pulling out of one Journal and storing into another one.

.. warning::

This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
Expand Down Expand Up @@ -58,7 +52,7 @@ journal is as simple as:

.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage

Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via
Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via
``journalFor(NoopJournal.identifier)``, however this is not enforced.

Read journal implementations are available as `Community plugins`_.
Expand Down Expand Up @@ -90,7 +84,7 @@ If your usage does not require a live stream, you can disable refreshing by usin

``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor <event-sourcing>`,
however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the
persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve
persistent actor identified by the given ``persistenceId``. Most journals will have to revert to polling in order to achieve
this, which can be configured using the ``RefreshInterval`` query hint:

.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh
Expand Down Expand Up @@ -120,7 +114,6 @@ including for example taking the first 10 and cancelling the stream. It is worth
query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams.
For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore
that is able to order events by insertion time it could treat the Long as a timestamp and select only older events.
Again, specific capabilities are specific to the journal you are using, so you have to


Materialized values of queries
Expand Down Expand Up @@ -152,18 +145,18 @@ means that data stores which are able to scale to accomodate these requirements

On the other hand the same application may have some complex statistics view or we may have analists working with the data
to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like
for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be
for example SQL or writing Spark jobs to analyse the data. Therefore the data stored in the write-side needs to be
projected into the other read-optimised datastore.

.. note::
When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query".
In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format
In other words, it means that the view is created once, in order to be afterwards queried multiple times, as in this format
it may be more efficient or interesting to query it (instead of the source events directly).

Materialize view to Reactive Streams compatible datastore
---------------------------------------------------------

If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection
If the read datastore exposes an `Reactive Streams`_ interface then implementing a simple projection
is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so:

.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
* final Source&lt;EventEnvelope, ?&gt; events =
* journal.query(new EventsByTag("mytag", 0L));
* </pre></code>
* </code></pre>
*/

public interface ReadJournal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package akka.persistence.query

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration

/**
Expand All @@ -21,6 +23,12 @@ trait Hint
* A plugin may optionally support this [[Hint]] for defining such a refresh interval.
*/
final case class RefreshInterval(interval: FiniteDuration) extends Hint
object RefreshInterval {
/** Java API */
def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit))
/** Java API */
def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval)
}

/**
* Indicates that the event stream is supposed to be completed immediately when it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
private def createPlugin(configPath: String): scaladsl.ReadJournal = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'")
val pluginActorName = configPath
val pluginConfig = system.settings.config.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}")
log.debug(s"Create plugin: ${configPath} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get

// TODO remove duplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ trait Query[T, M]
*
* A plugin may optionally support this [[Query]].
*/
final case object AllPersistenceIds extends Query[String, Unit]
final case object AllPersistenceIds extends AllPersistenceIds {
/** Java API */
final def getInstance: AllPersistenceIds = this
}
abstract class AllPersistenceIds extends Query[String, Unit]

/**
* Query events for a specific `PersistentActor` identified by `persistenceId`.
Expand All @@ -34,7 +38,19 @@ final case object AllPersistenceIds extends Query[String, Unit]
*/
final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue)
extends Query[Any, Unit]
object EventsByPersistenceId {
/** Java API */
def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId =
EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)

/** Java API */
def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId =
EventsByPersistenceId(persistenceId, fromSequenceNr)

/** Java API */
def create(persistenceId: String): EventsByPersistenceId =
EventsByPersistenceId(persistenceId)
}
/**
* Query events that have a specific tag. A tag can for example correspond to an
* aggregate root type (in DDD terminology).
Expand All @@ -56,6 +72,12 @@ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Lo
* A plugin may optionally support this [[Query]].
*/
final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit]
object EventsByTag {
/** Java API */
def create(tag: String): EventsByTag = EventsByTag(tag)
/** Java API */
def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag)
}

/**
* Event wrapper adding meta data for the events in the result stream of
Expand Down
Loading

0 comments on commit 3314de4

Please sign in to comment.