Skip to content

Commit

Permalink
Add live and current variant to AllEvents query and their documentati…
Browse files Browse the repository at this point in the history
…on (#4523)

* Add live and current variant to AllEvents query

* Update documentation

* Move the batching tests to its appropriate namespace/folder

* Fix copy-paste error
  • Loading branch information
Arkatufus committed Jul 28, 2020
1 parent 861d4f8 commit 8fd4af4
Show file tree
Hide file tree
Showing 17 changed files with 606 additions and 181 deletions.
40 changes: 38 additions & 2 deletions docs/articles/persistence/persistence-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ Akka persistence query comes with a number of query interfaces built in and sugg
The predefined queries are:

**AllPersistenceIdsQuery and CurrentPersistenceIdsQuery**
**AllPersistenceIdsQuery (PersistentIds) and CurrentPersistenceIdsQuery**

`AllPersistenceIds` is used for retrieving all persistenceIds of all persistent actors.
`AllPersistenceIds`, or `PersistenceIds` in `IPersistenceIdsQuery`, is used to retrieve all cached persistenceIds of all persistent actors inside the `ActorSystem` where the journal actor is instantiated. Note that since this is a cached value, this query will only report `PersistentIds` that passed to the journal since the journal creation time (local cache).

```csharp
var queries = PersistenceQuery.Get(actorSystem)
Expand Down Expand Up @@ -157,6 +157,42 @@ As you can see, we can use all the usual stream combinators available from Akka

If your usage does not require a live stream, you can use the `CurrentEventsByTag` query.

**AllEvents and CurrentAllEvents**

`AllEvents` allows replaying and monitoring all events regardless of which `PersistenceId` they are associated with. The goal of this query is to allow replaying and monitoring for all events that are stored inside a journal, regardless of its source.Please refer to your read journal plugin's documentation to find out if and how it is supported.

The stream is not completed when it reaches the last event recorded, but it continues to push new events when new event are persisted. Corresponding query that is completed when it reaches the end of the last event persisted when the query id called is provided by `CurrentAllEvents`.

The write journal is notifying the query side as soon as new events are created and there is no periodic polling or batching involved in this query.

> [!NOTE]
> A very important thing to keep in mind when using queries spanning multiple `PersistenceIds`, such as `AllEvents` is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations).
Journals may choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering guarantee they provide - for example "ordered by timestamp ascending, independently of `PersistenceId`" is easy to achieve on relational databases, yet may be hard to implement efficiently on plain key-value datastores.

In the example below we query all events which have been stored inside the journal.

```csharp
// assuming journal is able to work with numeric offsets we can:
Source<EventEnvelope, NotUsed> allEvents = readJournal.AllEvents(offset: 0L);

// replay the first 10 things stored:
Task<ImmutableHashSet<object>> first10Things = allEvents
.Select(c => c.Event)
.Take(10) // cancels the query stream after pulling 10 elements
.RunAggregate(
ImmutableHashSet<object>.Empty,
(acc, c) => acc.Add(c),
mat);

// start another query, from the known offset
var next10Things = readJournal.AllEvents(offset: 10);
```

As you can see, we can use all the usual stream combinators available from Akka Streams on the resulting query stream, including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in `AllEvents` query has an optionally supported offset parameter (of type Long) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events.

If your usage does not require a live stream, you can use the `CurrentEventsByTag` query.

### Materialized values of queries
Journals are able to provide additional information related to a query by exposing materialized values, which are a feature of Akka Streams that allows to expose additional values at stream materialization time.

Expand Down
172 changes: 118 additions & 54 deletions src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Akka.Persistence.Query.Sql
{
internal sealed class AllEventsPublisher : ActorPublisher<EventEnvelope>
internal static class AllEventsPublisher
{
[Serializable]
public sealed class Continue
Expand All @@ -18,52 +18,63 @@ public sealed class Continue
private Continue() { }
}

public static Props Props(long fromOffset, int maxBufferSize, string writeJournalPluginId)
=> Actor.Props.Create(() => new AllEventsPublisher(fromOffset, maxBufferSize, writeJournalPluginId));

private readonly ILoggingAdapter _log;
public static Props Props(long fromOffset, TimeSpan? refreshInterval, int maxBufferSize, string writeJournalPluginId)
{
return refreshInterval.HasValue ?
Actor.Props.Create(() => new LiveAllEventsPublisher(fromOffset, refreshInterval.Value, maxBufferSize, writeJournalPluginId)) :
Actor.Props.Create(() => new CurrentAllEventsPublisher(fromOffset, maxBufferSize, writeJournalPluginId));
}
}

private readonly DeliveryBuffer<EventEnvelope> _buffer;
private readonly IActorRef _journalRef;
private readonly int _maxBufferSize;
private bool _completed = false;
internal abstract class AbstractAllEventsPublisher : ActorPublisher<EventEnvelope>
{

private readonly long _fromOffset;
private long _currentOffset;
private ILoggingAdapter _log;
protected long CurrentOffset;

public AllEventsPublisher(long fromOffset, int maxBufferSize, string writeJournalPluginId)
protected AbstractAllEventsPublisher(long fromOffset, int maxBufferSize, string writeJournalPluginId)
{
_currentOffset = _fromOffset = fromOffset;
_maxBufferSize = maxBufferSize;
_buffer = new DeliveryBuffer<EventEnvelope>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);

_log = Context.GetLogger();
CurrentOffset = FromOffset = fromOffset;
MaxBufferSize = maxBufferSize;
Buffer = new DeliveryBuffer<EventEnvelope>(OnNext);
JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

private bool IsTimeForReplay => (_buffer.IsEmpty || _buffer.Length <= _maxBufferSize / 2) && !_completed;
protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
protected IActorRef JournalRef { get; }
protected DeliveryBuffer<EventEnvelope> Buffer { get; }
protected long FromOffset { get; }
protected abstract long ToOffset { get; }
protected int MaxBufferSize { get; }
protected bool IsTimeForReplay => (Buffer.IsEmpty || Buffer.Length <= MaxBufferSize / 2) && (CurrentOffset <= ToOffset);

protected abstract void ReceiveInitialRequest();
protected abstract void ReceiveIdleRequest();
protected abstract void ReceiveRecoverySuccess(long highestSequenceNr);

protected override bool Receive(object message)
{
switch (message)
{
case Request _:
Replay();
return true;
case Continue _:
ReceiveInitialRequest();
return true;
case Cancel _:
Context.Stop(Self);
return true;
case AllEventsPublisher.Continue _:
return true;
default:
return false;
}
}
private bool Idle(object message)

protected bool Idle(object message)
{
switch (message)
{
case Continue _:
case AllEventsPublisher.Continue _:
case NewEventAppended _:
if (IsTimeForReplay) Replay();
return true;
case Request _:
Expand All @@ -77,76 +88,129 @@ private bool Idle(object message)
}
}

private void Replay()
protected void Replay()
{
var limit = _maxBufferSize - _buffer.Length;
_log.Debug("replay all events request from [{0}], limit [{1}]", _currentOffset, limit);
_journalRef.Tell(new ReplayAllEvents(_currentOffset, limit, Self));
var limit = MaxBufferSize - Buffer.Length;
Log.Debug("replay all events request from [{0}] to [{1}], limit [{2}]", CurrentOffset, ToOffset, limit);
JournalRef.Tell(new ReplayAllEvents(CurrentOffset, ToOffset, limit, Self));
Context.Become(Replaying);
}

private bool Replaying( object message )
protected bool Replaying( object message )
{
switch (message)
{
case ReplayedEvent replayed:
_buffer.Add(new EventEnvelope(
Buffer.Add(new EventEnvelope(
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
@event: replayed.Persistent.Payload));

_currentOffset = replayed.Offset;
_buffer.DeliverBuffer(TotalDemand);
CurrentOffset = replayed.Offset;
Buffer.DeliverBuffer(TotalDemand);
return true;
case EventReplaySuccess success:
_log.Debug("event replay completed, currOffset [{0}]", _currentOffset);
Log.Debug("event replay completed, currOffset [{0}], highestSequenceNr [{1}]", CurrentOffset, success.HighestSequenceNr);
ReceiveRecoverySuccess(success.HighestSequenceNr);
return true;
case EventReplayFailure failure:
_log.Debug("event replay failed, due to [{0}]", failure.Cause.Message);
_buffer.DeliverBuffer(TotalDemand);
Log.Debug("event replay failed, due to [{0}]", failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
return true;
case ReplayedAllEvents _:
_completed = true;
if (_buffer.IsEmpty)
OnCompleteThenStop();

_buffer.DeliverBuffer(TotalDemand);
return true;
case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;
case Continue _:
Buffer.DeliverBuffer(TotalDemand);
return true;
case Cancel _:
Context.Stop(Self);
return true;
case AllEventsPublisher.Continue _:
case NewEventAppended _:
return true;
default:
return false;
}
}

private void ReceiveIdleRequest()
}

internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher
{
private readonly ICancelable _tickCancelable;
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self);
}

protected override long ToOffset => long.MaxValue;

protected override void PostStop()
{
_tickCancelable.Cancel();
base.PostStop();
}

protected override void ReceiveInitialRequest()
{
_buffer.DeliverBuffer(TotalDemand);
if (_buffer.IsEmpty && _completed)
JournalRef.Tell(SubscribeNewEvents.Instance);
Replay();
}

protected override void ReceiveIdleRequest()
{
Buffer.DeliverBuffer(TotalDemand);
if (Buffer.IsEmpty && CurrentOffset > ToOffset)
OnCompleteThenStop();
}

protected override void ReceiveRecoverySuccess(long highestSequenceNr)
{
Buffer.DeliverBuffer(TotalDemand);
if (Buffer.IsEmpty && CurrentOffset > ToOffset)
OnCompleteThenStop();

Context.Become(Idle);
}
}

internal sealed class CurrentAllEventsPublisher : AbstractAllEventsPublisher
{
public CurrentAllEventsPublisher(long fromOffset, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
{ }

private long _toOffset = long.MaxValue;
protected override long ToOffset => _toOffset;

protected override void ReceiveInitialRequest()
{
Replay();
}

protected override void ReceiveIdleRequest()
{
Buffer.DeliverBuffer(TotalDemand);
if (Buffer.IsEmpty && CurrentOffset > ToOffset)
OnCompleteThenStop();
else
Self.Tell(Continue.Instance);
Self.Tell(AllEventsPublisher.Continue.Instance);
}

private void ReceiveRecoverySuccess(long highestSequenceNr)
protected override void ReceiveRecoverySuccess(long highestSequenceNr)
{
_buffer.DeliverBuffer(TotalDemand);
if (_buffer.IsEmpty && _completed)
Buffer.DeliverBuffer(TotalDemand);

if (highestSequenceNr < ToOffset)
_toOffset = highestSequenceNr;

if (Buffer.IsEmpty && CurrentOffset >= ToOffset)
OnCompleteThenStop();
else
Self.Tell(Continue.Instance);
Self.Tell(AllEventsPublisher.Continue.Instance);

Context.Become(Idle);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class SqlReadJournal :
ICurrentEventsByPersistenceIdQuery,
IEventsByTagQuery,
ICurrentEventsByTagQuery,
IAllEventsQuery
IAllEventsQuery,
ICurrentAllEventsQuery
{
public static string Identifier = "akka.persistence.query.journal.sql";

Expand Down Expand Up @@ -215,9 +216,30 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
throw new ArgumentException($"SqlReadJournal does not support {offset.GetType().Name} offsets");
}

return Source.ActorPublisher<EventEnvelope>(AllEventsPublisher.Props(seq.Value, _maxBufferSize, _writeJournalPluginId))
return Source.ActorPublisher<EventEnvelope>(AllEventsPublisher.Props(seq.Value, _refreshInterval, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("AllEvents");
}

public Source<EventEnvelope, NotUsed> CurrentAllEvents(Offset offset)
{
Sequence seq;
switch (offset)
{
case null:
case NoOffset _:
seq = new Sequence(0L);
break;
case Sequence s:
seq = s;
break;
default:
throw new ArgumentException($"SqlReadJournal does not support {offset.GetType().Name} offsets");
}

return Source.ActorPublisher<EventEnvelope>(AllEventsPublisher.Props(seq.Value, null, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("CurrentAllEvents");
}
}
}

0 comments on commit 8fd4af4

Please sign in to comment.