From 2e2335726bf231d62a0f05c061812ddc63a8d5b6 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 8 Jun 2015 12:26:19 +0200 Subject: [PATCH] +per #16541 add missing java samples for persistence query --- .../persistence/PersistenceQueryDocTest.java | 370 ++++++++++++++++-- akka-docs/rst/java/persistence-query.rst | 2 + .../scala/akka/persistence/query/Hint.scala | 8 + .../scala/akka/persistence/query/Query.scala | 24 +- 4 files changed, 371 insertions(+), 33 deletions(-) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 2af6572628b5..e311a3cfb86e 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -4,24 +4,38 @@ package docs.persistence; +import static akka.pattern.Patterns.ask; + import akka.actor.*; +import akka.dispatch.Mapper; import akka.event.EventStreamSpec; import akka.japi.Function; import akka.japi.Procedure; +import akka.japi.pf.ReceiveBuilder; import akka.pattern.BackoffSupervisor; import akka.persistence.*; import akka.persistence.query.*; import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; +import docs.persistence.query.PersistenceQueryDocSpec; +import org.reactivestreams.Subscriber; import scala.collection.Seq; +import scala.collection.immutable.Vector; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.Boxed; import scala.runtime.BoxedUnit; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; public class PersistenceQueryDocTest { @@ -29,46 +43,338 @@ public class PersistenceQueryDocTest { final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS)); //#my-read-journal - class MyReadJournal implements ReadJournal { - private final ExtendedActorSystem system; + class MyReadJournal implements ReadJournal { + private final ExtendedActorSystem system; public MyReadJournal(ExtendedActorSystem system) { this.system = system; } - final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); - - @SuppressWarnings("unchecked") - public Source query(Query q, Hint... hints) { - if (q instanceof EventsByTag) { - final EventsByTag eventsByTag = (EventsByTag) q; - final String tag = eventsByTag.tag(); - long offset = eventsByTag.offset(); - - final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); - - return (Source) Source.actorPublisher(props) - .mapMaterializedValue(noMaterializedValue()); - } else { - // unsuported - return Source.failed( - new UnsupportedOperationException( - "Query $unsupported not supported by " + getClass().getName())) - .mapMaterializedValue(noMaterializedValue()); - } - } + final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + if (q instanceof EventsByTag) { + final EventsByTag eventsByTag = (EventsByTag) q; + final String tag = eventsByTag.tag(); + long offset = eventsByTag.offset(); - private FiniteDuration refreshInterval(Hint[] hints) { - FiniteDuration ret = defaultRefreshInterval; - for (Hint hint : hints) - if (hint instanceof RefreshInterval) - ret = ((RefreshInterval) hint).interval(); - return ret; + final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)); + + return (Source) Source.actorPublisher(props) + .mapMaterializedValue(noMaterializedValue()); + } else { + // unsuported + return Source.failed( + new UnsupportedOperationException( + "Query $unsupported not supported by " + getClass().getName())) + .mapMaterializedValue(noMaterializedValue()); } + } - private akka.japi.function.Function noMaterializedValue () { - return param -> (M) null; + private FiniteDuration refreshInterval(Hint[] hints) { + FiniteDuration ret = defaultRefreshInterval; + for (Hint hint : hints) { + if (hint instanceof RefreshInterval) { + ret = ((RefreshInterval) hint).interval(); } + } + return ret; + } + + private akka.japi.function.Function noMaterializedValue() { + return param -> (M) null; + } + } + //#my-read-journal + + void demonstrateBasicUsage() { + final ActorSystem system = ActorSystem.create(); + + //#basic-usage + // obtain read journal by plugin id + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + // issue query to journal + Source source = + readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE)); + + // materialize stream, consuming events + ActorMaterializer mat = ActorMaterializer.create(system); + source.runForeach(event -> System.out.println("Event: " + event), mat); + //#basic-usage + } + + void demonstrateAllPersistenceIdsLive() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds.getInstance()); + //#all-persistence-ids-live + } + + void demonstrateNoRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance()); + //#all-persistence-ids-snap + } + + void demonstrateRefresh() { + final ActorSystem system = ActorSystem.create(); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-persistent-id-refresh + final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS); + readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh); + //#events-by-persistent-id-refresh + } + + void demonstrateEventsByTag() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + final Source blueThings = + readJournal.query(EventsByTag.create("blue")); + + // find top 10 blue things: + final Future> top10BlueThings = + (Future>) blueThings + .map(t -> t.event()) + .take(10) // cancels the query stream after pulling 10 elements + .>runFold(new ArrayList<>(10), (acc, e) -> { + acc.add(e); + return acc; + }, mat); + + // start another query, from the known offset + Source blue = readJournal.query(EventsByTag.create("blue", 10)); + //#events-by-tag + } + //#materialized-query-metadata-classes + // a plugin can provide: + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class QueryMetadata { + public final boolean deterministicOrder; + public final boolean infinite; + + public QueryMetadata(Boolean deterministicOrder, Boolean infinite) { + this.deterministicOrder = deterministicOrder; + this.infinite = infinite; + } + } + + //#materialized-query-metadata-classes + + static + //#materialized-query-metadata-classes + final class AllEvents implements Query { + private AllEvents() {} + private static AllEvents INSTANCE = new AllEvents(); + } + + //#materialized-query-metadata-classes + + void demonstrateMaterializedQueryValues() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + //#materialized-query-metadata + + final Source events = readJournal.query(AllEvents.INSTANCE); + + events.mapMaterializedValue(meta -> { + System.out.println("The query is: " + + "ordered deterministically: " + meta.deterministicOrder + " " + + "infinite: " + meta.infinite); + return meta; + }); + //#materialized-query-metadata + } + + class ReactiveStreamsCompatibleDBDriver { + Subscriber> batchWriter() { + return null; + } } - //#my-read-journal + + void demonstrateWritingIntoDifferentStore() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-rs + final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver(); + final Subscriber> dbBatchWriter = driver.batchWriter(); + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId.create("user-1337")) + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-simple-classes + class ExampleStore { + Future save(Object any) { + // ... + //#projection-into-different-store-simple-classes + return null; + //#projection-into-different-store-simple-classes + } + } + //#projection-into-different-store-simple-classes + + void demonstrateWritingIntoDifferentStoreWithMapAsync() { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-simple + final ExampleStore store = new ExampleStore(); + + readJournal + .query(EventsByTag.create("bid")) + .mapAsync(1, store::save) + .runWith(Sink.ignore(), mat); + //#projection-into-different-store-simple + } + + //#projection-into-different-store + class MyResumableProjection { + private final String name; + + public MyResumableProjection(String name) { + this.name = name; + } + + public Future saveProgress(long offset) { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + public Future latestOffset() { + // ... + //#projection-into-different-store + return null; + //#projection-into-different-store + } + } + //#projection-into-different-store + + + void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { + final ActorSystem system = ActorSystem.create(); + final ActorMaterializer mat = ActorMaterializer.create(system); + + final ReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor("akka.persistence.query.noop-read-journal"); + + + //#projection-into-different-store-actor-run + final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); + + final MyResumableProjection bidProjection = new MyResumableProjection("bid"); + + final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); + final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); + + long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration()); + + readJournal + .query(EventsByTag.create("bid", startFromOffset)) + .mapAsync(8, envelope -> { + final Future f = ask(writer, envelope.event(), timeout); + return f.map(new Mapper() { + @Override public Long apply(Object in) { + return envelope.offset(); + } + }, system.dispatcher()); + }) + .mapAsync(1, offset -> bidProjection.saveProgress(offset)) + .runWith(Sink.ignore(), mat); + } + + //#projection-into-different-store-actor-run + + class ComplexState { + + boolean readyToSave() { + return false; + } + } + + static class Record { + static Record of(Object any) { + return new Record(); + } + } + + //#projection-into-different-store-actor + final class TheOneWhoWritesToQueryJournal extends AbstractActor { + private final ExampleStore store; + + private ComplexState state = new ComplexState(); + + public TheOneWhoWritesToQueryJournal() { + store = new ExampleStore(); + + receive(ReceiveBuilder.matchAny(message -> { + state = updateState(state, message); + + // example saving logic that requires state to become ready: + if (state.readyToSave()) + store.save(Record.of(state)); + + }).build()); + } + + + ComplexState updateState(ComplexState state, Object msg) { + // some complicated aggregation logic here ... + return state; + } + } + //#projection-into-different-store-actor + + } diff --git a/akka-docs/rst/java/persistence-query.rst b/akka-docs/rst/java/persistence-query.rst index cb816a1f2652..3edf29739e41 100644 --- a/akka-docs/rst/java/persistence-query.rst +++ b/akka-docs/rst/java/persistence-query.rst @@ -133,6 +133,7 @@ stream, for example if it's finite or infinite, strictly ordered or not ordered is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their specialised query object, as demonstrated in the sample below: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#materialized-query-metadata .. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-quickstart.html#Materialized_values @@ -179,6 +180,7 @@ you may have to implement the write logic using plain functions or Actors instea In case your write logic is state-less and you just need to convert the events from one data data type to another before writing into the alternative datastore, then the projection is as simple as: +.. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple-classes .. includecode:: code/docs/persistence/PersistenceQueryDocTest.java#projection-into-different-store-simple Resumable projections diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala index 5b6e35e26345..62a48a942ddf 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -3,6 +3,8 @@ */ package akka.persistence.query +import java.util.concurrent.TimeUnit + import scala.concurrent.duration.FiniteDuration /** @@ -21,6 +23,12 @@ trait Hint * A plugin may optionally support this [[Hint]] for defining such a refresh interval. */ final case class RefreshInterval(interval: FiniteDuration) extends Hint +object RefreshInterval { + /** Java API */ + def create(length: Long, unit: TimeUnit): RefreshInterval = new RefreshInterval(FiniteDuration(length, unit)) + /** Java API */ + def create(interval: FiniteDuration): RefreshInterval = new RefreshInterval(interval) +} /** * Indicates that the event stream is supposed to be completed immediately when it diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala index fdca0ec0c3a6..f63b263e6eaf 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -20,7 +20,11 @@ trait Query[T, M] * * A plugin may optionally support this [[Query]]. */ -final case object AllPersistenceIds extends Query[String, Unit] +final case object AllPersistenceIds extends AllPersistenceIds { + /** Java API */ + final def getInstance: AllPersistenceIds = this +} +abstract class AllPersistenceIds extends Query[String, Unit] /** * Query events for a specific `PersistentActor` identified by `persistenceId`. @@ -34,7 +38,19 @@ final case object AllPersistenceIds extends Query[String, Unit] */ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) extends Query[Any, Unit] +object EventsByPersistenceId { + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + /** Java API */ + def create(persistenceId: String, fromSequenceNr: Long): EventsByPersistenceId = + EventsByPersistenceId(persistenceId, fromSequenceNr) + + /** Java API */ + def create(persistenceId: String): EventsByPersistenceId = + EventsByPersistenceId(persistenceId) +} /** * Query events that have a specific tag. A tag can for example correspond to an * aggregate root type (in DDD terminology). @@ -56,6 +72,12 @@ final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Lo * A plugin may optionally support this [[Query]]. */ final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] +object EventsByTag { + /** Java API */ + def create(tag: String): EventsByTag = EventsByTag(tag) + /** Java API */ + def create(tag: String, offset: Long): EventsByTag = EventsByTag(tag) +} /** * Event wrapper adding meta data for the events in the result stream of