Skip to content

Commit

Permalink
+per #16541 java docs for persistence query
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrik Nordwall & Konrad Malawski authored and ktoso committed Aug 12, 2015
1 parent f849793 commit c5d377f
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence;

import akka.actor.*;
import akka.event.EventStreamSpec;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.pattern.BackoffSupervisor;
import akka.persistence.*;
import akka.persistence.query.*;
import akka.persistence.query.javadsl.ReadJournal;
import akka.stream.javadsl.Source;
import akka.util.Timeout;
import docs.persistence.query.MyEventsByTagPublisher;
import scala.collection.Seq;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

import java.io.Serializable;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class PersistenceQueryDocTest {

final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.create(3, TimeUnit.SECONDS));

//#my-read-journal
class MyReadJournal implements ReadJournal {
private final ExtendedActorSystem system;

public MyReadJournal(ExtendedActorSystem system) {
this.system = system;
}

final FiniteDuration defaultRefreshInterval = FiniteDuration.create(3, TimeUnit.SECONDS);

@SuppressWarnings("unchecked")
public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
if (q instanceof EventsByTag) {
final EventsByTag eventsByTag = (EventsByTag) q;
final String tag = eventsByTag.tag();
long offset = eventsByTag.offset();

final Props props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints));

return (Source<T, M>) Source.<EventEnvelope>actorPublisher(props)
.mapMaterializedValue(noMaterializedValue());
} else {
// unsuported
return Source.<T>failed(
new UnsupportedOperationException(
"Query $unsupported not supported by " + getClass().getName()))
.mapMaterializedValue(noMaterializedValue());
}
}

private FiniteDuration refreshInterval(Hint[] hints) {
FiniteDuration ret = defaultRefreshInterval;
for (Hint hint : hints)
if (hint instanceof RefreshInterval)
ret = ((RefreshInterval) hint).interval();
return ret;
}

private <I, M> akka.japi.function.Function<I, M> noMaterializedValue () {
return param -> (M) null;
}

}
//#my-read-journal
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence.query;

import akka.actor.Cancellable;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.PersistentRepr;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.stream.actor.AbstractActorPublisher;
import scala.Int;

import akka.actor.Props;
import akka.persistence.query.EventEnvelope;
import akka.stream.actor.ActorPublisherMessage.Cancel;

import scala.concurrent.duration.FiniteDuration;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;

import static java.util.stream.Collectors.toList;

//#events-by-tag-publisher
class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
private final Serialization serialization =
SerializationExtension.get(context().system());

private final Connection connection;

private final String tag;

private final String CONTINUE = "CONTINUE";
private final int LIMIT = 1000;
private long currentOffset;
private List<EventEnvelope> buf = new ArrayList<>();

private Cancellable continueTask;

public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, FiniteDuration refreshInterval) {
this.connection = connection;
this.tag = tag;
this.currentOffset = offset;

this.continueTask = context().system().scheduler().schedule(refreshInterval, refreshInterval, self(), CONTINUE, context().dispatcher(), self());
receive(ReceiveBuilder
.matchEquals(CONTINUE, (in) -> {
query();
deliverBuf();
})
.match(Cancel.class, (in) -> {
context().stop(self());
})
.build());
}

public static Props props(Connection conn, String tag, Long offset, FiniteDuration refreshInterval) {
return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval));
}

@Override
public void postStop() {
continueTask.cancel();
}

private void query() {
if (buf.isEmpty()) {
try {
PreparedStatement s = connection.prepareStatement(
"SELECT id, persistent_repr " +
"FROM journal WHERE tag = ? AND id >= ? " +
"ORDER BY id LIMIT ?");

s.setString(1, tag);
s.setLong(2, currentOffset);
s.setLong(3, LIMIT);
final ResultSet rs = s.executeQuery();

final List<Pair<Long, byte[]>> res = new ArrayList<>(LIMIT);
while (rs.next())
res.add(Pair.create(rs.getLong(1), rs.getBytes(2)));

if (!res.isEmpty()) {
currentOffset = res.get(res.size() - 1).first();
}

buf = res.stream().map(in -> {
final Long id = in.first();
final byte[] bytes = in.second();

final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get();

return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload());
}).collect(toList());
} catch(Exception e) {
onErrorThenStop(e);
}
}
}

private void deliverBuf() {
while (totalDemand() > 0 && !buf.isEmpty())
onNext(buf.remove(0));
}
}
//#events-by-tag-publisher
1 change: 1 addition & 0 deletions akka-docs/rst/java/index-actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ Actors
routing
fsm
persistence
persistence-query
testing

0 comments on commit c5d377f

Please sign in to comment.