Merge pull request #18051 from akka/wip-16541-persistence-query
+per #16541 initial version of the Persistence Query module
ktoso committed Aug 12, 2015
2 parents 5c5decf + 3314de4 commit 5a6ee8b
* Copyright (C) 2009-2015 Typesafe Inc. <>

package docs.persistence;

import static akka.pattern.Patterns.ask;

import akka.dispatch.Mapper;
import akka.event.EventStreamSpec;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.pattern.BackoffSupervisor;
import akka.persistence.*;
import akka.persistence.query.*;
import akka.persistence.query.javadsl.ReadJournal;
import akka.util.Timeout;
import docs.persistence.query.MyEventsByTagPublisher;
import docs.persistence.query.PersistenceQueryDocSpec;
import org.reactivestreams.Subscriber;
import scala.collection.Seq;
import scala.collection.immutable.Vector;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.Boxed;
import scala.runtime.BoxedUnit;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PersistenceQueryDocTest {

final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

class MyReadJournal implements ReadJournal {
private final ExtendedActorSystem system;

public MyReadJournal(ExtendedActorSystem system) {
this.system = system;

final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS);

public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
if (q instanceof EventsByTag) {
final EventsByTag eventsByTag = (EventsByTag) q;
final String tag = eventsByTag.tag();
long offset = eventsByTag.offset();

final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints));

return (Source<T, M>) Source.<EventEnvelope>actorPublisher(props)
} else {
// unsuported
return Source.<T>failed(
new UnsupportedOperationException(
"Query " + q + " not supported by " + getClass().getName()))

private FiniteDuration refreshInterval(Hint[] hints) {
for (Hint hint : hints)
if (hint instanceof RefreshInterval)
return ((RefreshInterval) hint).interval();

return defaultRefreshInterval;

private <I, M> akka.japi.function.Function<I, M> noMaterializedValue() {
return param -> (M) null;

void demonstrateBasicUsage() {
final ActorSystem system = ActorSystem.create();

// obtain read journal by plugin id
final ReadJournal readJournal =

// issue query to journal
Source<Object, BoxedUnit> source =
readJournal.query(EventsByPersistenceId.create("user-1337", 0, Long.MAX_VALUE));

// materialize stream, consuming events
ActorMaterializer mat = ActorMaterializer.create(system);
source.runForeach(event -> System.out.println("Event: " + event), mat);

void demonstrateAllPersistenceIdsLive() {
final ReadJournal readJournal =


void demonstrateNoRefresh() {
final ActorSystem system = ActorSystem.create();

final ReadJournal readJournal =

readJournal.query(AllPersistenceIds.getInstance(), NoRefresh.getInstance());

void demonstrateRefresh() {
final ActorSystem system = ActorSystem.create();

final ReadJournal readJournal =

final RefreshInterval refresh = RefreshInterval.create(1, TimeUnit.SECONDS);
readJournal.query(EventsByPersistenceId.create("user-us-1337"), refresh);

void demonstrateEventsByTag() {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final ReadJournal readJournal =

// assuming journal is able to work with numeric offsets we can:
final Source<EventEnvelope, BoxedUnit> blueThings =

// find top 10 blue things:
final Future<List<Object>> top10BlueThings =
(Future<List<Object>>) blueThings
.map(t -> t.event())
.take(10) // cancels the query stream after pulling 10 elements
.<List<Object>>runFold(new ArrayList<>(10), (acc, e) -> {
return acc;
}, mat);

// start another query, from the known offset
Source<EventEnvelope, BoxedUnit> blue = readJournal.query(EventsByTag.create("blue", 10));
// a plugin can provide:


final class QueryMetadata {
public final boolean deterministicOrder;
public final boolean infinite;

public QueryMetadata(Boolean deterministicOrder, Boolean infinite) {
this.deterministicOrder = deterministicOrder;
this.infinite = infinite;


final class AllEvents implements Query<Object, QueryMetadata> {
private AllEvents() {}
private static AllEvents INSTANCE = new AllEvents();


void demonstrateMaterializedQueryValues() {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final ReadJournal readJournal =


final Source<Object, QueryMetadata> events = readJournal.query(AllEvents.INSTANCE);

events.mapMaterializedValue(meta -> {
System.out.println("The query is: " +
"ordered deterministically: " + meta.deterministicOrder + " " +
"infinite: " + meta.infinite);
return meta;

class ReactiveStreamsCompatibleDBDriver {
Subscriber<List<Object>> batchWriter() {
return null;

void demonstrateWritingIntoDifferentStore() {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final ReadJournal readJournal =

final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver();
final Subscriber<List<Object>> dbBatchWriter = driver.batchWriter();

// Using an example (Reactive Streams) Database driver
.grouped(20) // batch inserts into groups of 20
.runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database

class ExampleStore {
Future<Void> save(Object any) {
// ...
return null;

void demonstrateWritingIntoDifferentStoreWithMapAsync() {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final ReadJournal readJournal =

final ExampleStore store = new ExampleStore();

.mapAsync(1, store::save)
.runWith(Sink.ignore(), mat);

class MyResumableProjection {
private final String name;

public MyResumableProjection(String name) { = name;

public Future<Long> saveProgress(long offset) {
// ...
return null;
public Future<Long> latestOffset() {
// ...
return null;

void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final ReadJournal readJournal =

final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);

final MyResumableProjection bidProjection = new MyResumableProjection("bid");

final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid");
final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer");

long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration());

.query(EventsByTag.create("bid", startFromOffset))
.<Long>mapAsync(8, envelope -> {
final Future<Object> f = ask(writer, envelope.event(), timeout);
return f.<Long>map(new Mapper<Object, Long>() {
@Override public Long apply(Object in) {
return envelope.offset();
}, system.dispatcher());
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
.runWith(Sink.ignore(), mat);


class ComplexState {

boolean readyToSave() {
return false;

static class Record {
static Record of(Object any) {
return new Record();

final class TheOneWhoWritesToQueryJournal extends AbstractActor {
private final ExampleStore store;

private ComplexState state = new ComplexState();

public TheOneWhoWritesToQueryJournal() {
store = new ExampleStore();

receive(ReceiveBuilder.matchAny(message -> {
state = updateState(state, message);

// example saving logic that requires state to become ready:
if (state.readyToSave());


ComplexState updateState(ComplexState state, Object msg) {
// some complicated aggregation logic here ...
return state;


