diff --git a/src/Akka.sln.DotSettings b/src/Akka.sln.DotSettings
index 41a06c2ddfc..c5ab510a684 100644
--- a/src/Akka.sln.DotSettings
+++ b/src/Akka.sln.DotSettings
@@ -24,4 +24,5 @@
True
True
True
+
\ No newline at end of file
diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs
index c55d2b9ec09..ea6fd7ce3df 100644
--- a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs
+++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs
@@ -141,7 +141,7 @@ internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
{
- _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self);
+ _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, AllEventsPublisher.Continue.Instance, Self);
}
protected override long ToOffset => long.MaxValue;
diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs
index d4e36826f3b..74a2eba3c8f 100644
--- a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs
+++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs
@@ -5,64 +5,193 @@
//
//-----------------------------------------------------------------------
+using System;
using Akka.Actor;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;
namespace Akka.Persistence.Query.Sql
{
- internal sealed class AllPersistenceIdsPublisher : ActorPublisher
+ internal sealed class CurrentPersistenceIdsPublisher : ActorPublisher, IWithUnboundedStash
{
- public static Props Props(bool liveQuery, string writeJournalPluginId)
+ public static Props Props(string writeJournalPluginId)
{
- return Actor.Props.Create(() => new AllPersistenceIdsPublisher(liveQuery, writeJournalPluginId));
+ return Actor.Props.Create(() => new CurrentPersistenceIdsPublisher(writeJournalPluginId));
}
- private readonly bool _liveQuery;
private readonly IActorRef _journalRef;
private readonly DeliveryBuffer _buffer;
- public AllPersistenceIdsPublisher(bool liveQuery, string writeJournalPluginId)
+ public IStash Stash { get; set; }
+
+ public CurrentPersistenceIdsPublisher(string writeJournalPluginId)
{
- _liveQuery = liveQuery;
_buffer = new DeliveryBuffer(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}
- protected override bool Receive(object message) => message.Match()
- .With(_ =>
+ protected override bool Receive(object message)
+ {
+ switch (message)
{
- _journalRef.Tell(SubscribeAllPersistenceIds.Instance);
- Become(Active);
- })
- .With(_ => Context.Stop(Self))
- .WasHandled;
-
- private bool Active(object message) => message.Match()
- .With(current =>
+ case Request _:
+ _journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
+ Become(Initializing);
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private bool Initializing(object message)
+ {
+ switch (message)
{
- _buffer.AddRange(current.AllPersistenceIds);
- _buffer.DeliverBuffer(TotalDemand);
+ case CurrentPersistenceIds current:
+ _buffer.AddRange(current.AllPersistenceIds);
+ _buffer.DeliverBuffer(TotalDemand);
- if (!_liveQuery && _buffer.IsEmpty)
- OnCompleteThenStop();
- })
- .With(added =>
+ if (_buffer.IsEmpty)
+ {
+ OnCompleteThenStop();
+ return true;
+ }
+
+ Become(Active);
+ Stash.UnstashAll();
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ Stash.Stash();
+ return true;
+ }
+ }
+
+ private bool Active(object message)
+ {
+ switch (message)
{
- if (_liveQuery)
- {
- _buffer.Add(added.PersistenceId);
+ case Request _:
_buffer.DeliverBuffer(TotalDemand);
- }
- })
- .With(_ =>
+ if (_buffer.IsEmpty)
+ OnCompleteThenStop();
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ return false;
+ }
+ }
+ }
+
+ internal sealed class LivePersistenceIdsPublisher : ActorPublisher, IWithUnboundedStash
+ {
+ private class Continue
+ {
+ public static readonly Continue Instance = new Continue();
+
+ private Continue() { }
+ }
+
+ public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId)
+ {
+ return Actor.Props.Create(() => new LivePersistenceIdsPublisher(refreshInterval, writeJournalPluginId));
+ }
+
+ private long _lastOrderingOffset;
+ private readonly ICancelable _tickCancelable;
+ private readonly IActorRef _journalRef;
+ private readonly DeliveryBuffer _buffer;
+
+ public IStash Stash { get; set; }
+
+ public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId)
+ {
+ _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
+ refreshInterval,
+ refreshInterval,
+ Self,
+ Continue.Instance,
+ Self);
+ _buffer = new DeliveryBuffer(OnNext);
+ _journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
+ }
+
+ protected override void PostStop()
+ {
+ _tickCancelable.Cancel();
+ base.PostStop();
+ }
+
+ protected override bool Receive(object message)
+ {
+ switch (message)
{
- _buffer.DeliverBuffer(TotalDemand);
- if (!_liveQuery && _buffer.IsEmpty)
- OnCompleteThenStop();
- })
- .With(_ => Context.Stop(Self))
- .WasHandled;
+ case Request _:
+ _journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
+ Become(Initializing);
+ return true;
+ case Continue _:
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private bool Initializing(object message)
+ {
+ switch (message)
+ {
+ case CurrentPersistenceIds current:
+ _lastOrderingOffset = current.HighestOrderingNumber;
+ _buffer.AddRange(current.AllPersistenceIds);
+ _buffer.DeliverBuffer(TotalDemand);
+
+ Become(Active);
+ Stash.UnstashAll();
+ return true;
+ case Continue _:
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ Stash.Stash();
+ return true;
+ }
+ }
+
+ private bool Active(object message)
+ {
+ switch (message)
+ {
+ case CurrentPersistenceIds added:
+ _lastOrderingOffset = added.HighestOrderingNumber;
+ _buffer.AddRange(added.AllPersistenceIds);
+ _buffer.DeliverBuffer(TotalDemand);
+ return true;
+ case Request _:
+ _buffer.DeliverBuffer(TotalDemand);
+ return true;
+ case Continue _:
+ _journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
+ return true;
+ case Cancel _:
+ Context.Stop(Self);
+ return true;
+ default:
+ return false;
+ }
+ }
}
}
diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs
index b2fb8e19f75..4bf46e0b5b2 100644
--- a/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs
+++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs
@@ -6,17 +6,17 @@
//-----------------------------------------------------------------------
using System;
+using System.Threading;
using Reactive.Streams;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Streams.Dsl;
-using Akka.Util.Internal;
+using Akka.Streams;
namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal :
- IReadJournal,
IPersistenceIdsQuery,
ICurrentPersistenceIdsQuery,
IEventsByPersistenceIdQuery,
@@ -40,12 +40,20 @@ public static Config DefaultConfiguration()
private readonly TimeSpan _refreshInterval;
private readonly string _writeJournalPluginId;
private readonly int _maxBufferSize;
+ private readonly ExtendedActorSystem _system;
+
+ private readonly object _lock = new object();
+ private IPublisher _persistenceIdsPublisher;
public SqlReadJournal(ExtendedActorSystem system, Config config)
{
_refreshInterval = config.GetTimeSpan("refresh-interval", null);
_writeJournalPluginId = config.GetString("write-plugin", null);
_maxBufferSize = config.GetInt("max-buffer-size", 0);
+ _system = system;
+
+ _lock = new ReaderWriterLockSlim();
+ _persistenceIdsPublisher = null;
}
///
@@ -68,20 +76,45 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
/// backend journal.
///
///
- public Source PersistenceIds() =>
- Source.ActorPublisher(AllPersistenceIdsPublisher.Props(true, _writeJournalPluginId))
- .MapMaterializedValue(_ => NotUsed.Instance)
- .Named("AllPersistenceIds") as Source;
+ public Source PersistenceIds()
+ {
+ lock (_lock)
+ {
+ if (_persistenceIdsPublisher is null)
+ {
+ var graph =
+ Source.ActorPublisher(
+ LivePersistenceIdsPublisher.Props(
+ _refreshInterval,
+ _writeJournalPluginId))
+ .ToMaterialized(Sink.DistinctRetainingFanOutPublisher(PersistenceIdsShutdownCallback), Keep.Right);
+
+ _persistenceIdsPublisher = graph.Run(_system.Materializer());
+ }
+ return Source.FromPublisher(_persistenceIdsPublisher)
+ .MapMaterializedValue(_ => NotUsed.Instance)
+ .Named("AllPersistenceIds");
+ }
+
+ }
+
+ private void PersistenceIdsShutdownCallback()
+ {
+ lock (_lock)
+ {
+ _persistenceIdsPublisher = null;
+ }
+ }
///
/// Same type of query as but the stream
/// is completed immediately when it reaches the end of the "result set". Persistent
/// actors that are created after the query is completed are not included in the stream.
///
- public Source CurrentPersistenceIds() =>
- Source.ActorPublisher(AllPersistenceIdsPublisher.Props(false, _writeJournalPluginId))
- .MapMaterializedValue(_ => NotUsed.Instance)
- .Named("CurrentPersistenceIds") as Source;
+ public Source CurrentPersistenceIds()
+ => Source.ActorPublisher(CurrentPersistenceIdsPublisher.Props(_writeJournalPluginId))
+ .MapMaterializedValue(_ => NotUsed.Instance)
+ .Named("CurrentPersistenceIds");
///
/// is used for retrieving events for a specific
@@ -112,7 +145,7 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
public Source EventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) =>
Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, _refreshInterval, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
- .Named("EventsByPersistenceId-" + persistenceId) as Source;
+ .Named("EventsByPersistenceId-" + persistenceId);
///
/// Same type of query as but the event stream
@@ -122,7 +155,7 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
public Source CurrentEventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) =>
Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, null, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
- .Named("CurrentEventsByPersistenceId-" + persistenceId) as Source;
+ .Named("CurrentEventsByPersistenceId-" + persistenceId);
///
/// is used for retrieving events that were marked with
diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
index 92b2c4ba40e..eef79a89168 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
@@ -383,13 +383,6 @@ public BatchComplete(int chunkId, int operationCount, TimeSpan timeSpent, Except
}
}
- // this little guy will be called only once, only by the current journal
- private sealed class GetCurrentPersistenceIds
- {
- public static readonly GetCurrentPersistenceIds Instance = new GetCurrentPersistenceIds();
- private GetCurrentPersistenceIds() { }
- }
-
private struct RequestChunk
{
public readonly int ChunkId;
@@ -463,7 +456,7 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
protected virtual string InsertEventSql { get; }
///
- /// SQL query executed as result of request to journal.
+ /// SQL query executed as result of request to journal.
/// It's a part of persistence query protocol.
///
protected virtual string AllPersistenceIdsSql { get; }
@@ -518,12 +511,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
///
protected bool HasTagSubscribers => _tagSubscribers.Count != 0;
- ///
- /// Flag determining if current journal has any subscribers for and
- /// messages.
- ///
- protected bool HasAllIdsSubscribers => _allIdsSubscribers.Count != 0;
-
///
/// Flag determining if current journal has any subscribers for and
///
@@ -548,8 +535,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
private readonly Dictionary> _persistenceIdSubscribers;
private readonly Dictionary> _tagSubscribers;
- private readonly HashSet _allIdsSubscribers;
- private readonly HashSet _allPersistenceIds;
private readonly HashSet _newEventSubscriber;
private readonly Akka.Serialization.Serialization _serialization;
@@ -567,8 +552,6 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
_persistenceIdSubscribers = new Dictionary>();
_tagSubscribers = new Dictionary>();
- _allIdsSubscribers = new HashSet();
- _allPersistenceIds = new HashSet();
_newEventSubscriber = new HashSet();
_remainingOperations = Setup.MaxConcurrentOperations;
@@ -592,8 +575,15 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
e.{conventions.SerializerIdColumnName} as SerializerId";
AllPersistenceIdsSql = $@"
- SELECT DISTINCT e.{conventions.PersistenceIdColumnName} as PersistenceId
- FROM {conventions.FullJournalTableName} e;";
+ SELECT DISTINCT u.Id as PersistenceId
+ FROM (
+ SELECT DISTINCT e.{conventions.PersistenceIdColumnName} as Id
+ FROM {conventions.FullJournalTableName} e
+ WHERE e.{conventions.OrderingColumnName} > @Ordering
+ UNION
+ SELECT DISTINCT e.{conventions.PersistenceIdColumnName} as Id
+ FROM {conventions.FullMetaTableName} e
+ ) as u";
HighestSequenceNrSql = $@"
SELECT MAX(u.SeqNr) as SequenceNr
@@ -637,8 +627,7 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
HighestOrderingSql =
$@"
SELECT MAX(e.{conventions.OrderingColumnName}) as Ordering
- FROM {conventions.FullJournalTableName} e
- WHERE e.{conventions.OrderingColumnName} > @Ordering";
+ FROM {conventions.FullJournalTableName} e";
InsertEventSql = $@"
INSERT INTO {conventions.FullJournalTableName} (
@@ -710,15 +699,15 @@ protected sealed override bool Receive(object message)
case ReplayAllEvents msg:
BatchRequest(msg);
return true;
+ case SelectCurrentPersistenceIds msg:
+ BatchRequest(msg);
+ return true;
case BatchComplete msg:
CompleteBatch(msg);
return true;
case SubscribePersistenceId msg:
AddPersistenceIdSubscriber(msg);
return true;
- case SubscribeAllPersistenceIds msg:
- AddAllPersistenceIdsSubscriber(msg);
- return true;
case SubscribeTag msg:
AddTagSubscriber(msg);
return true;
@@ -728,12 +717,6 @@ protected sealed override bool Receive(object message)
case Terminated msg:
RemoveSubscriber(msg.ActorRef);
return true;
- case GetCurrentPersistenceIds _:
- InitializePersistenceIds();
- return true;
- case CurrentPersistenceIds msg:
- SendCurrentPersistenceIds(msg);
- return true;
case ChunkExecutionFailure msg:
FailChunkExecution(msg);
return true;
@@ -770,62 +753,9 @@ private void FailChunkExecution(ChunkExecutionFailure message)
}
}
- private void SendCurrentPersistenceIds(CurrentPersistenceIds message)
- {
- foreach (var persistenceId in message.AllPersistenceIds)
- {
- _allPersistenceIds.Add(persistenceId);
- }
-
- foreach (var subscriber in _allIdsSubscribers)
- {
- subscriber.Tell(message);
- }
- }
-
#region subscriptions
-
- private void InitializePersistenceIds()
- {
- var self = Self;
- GetAllPersistenceIdsAsync()
- .ContinueWith(task =>
- {
- if (task.IsCanceled || task.IsFaulted)
- {
- var cause = (Exception)task.Exception ?? new OperationCanceledException("Cancellation occurred while trying to retrieve current persistence ids");
- Log.Error(cause, "Couldn't retrieve current persistence ids");
- }
- else
- {
- self.Tell(new CurrentPersistenceIds(task.Result));
- }
- });
- }
-
- private async Task> GetAllPersistenceIdsAsync()
- {
- var result = new List(256);
- using (var connection = CreateConnection(Setup.ConnectionString))
- {
- await connection.OpenAsync();
- using (var command = connection.CreateCommand())
- {
- command.CommandText = AllPersistenceIdsSql;
-
- var reader = await command.ExecuteReaderAsync();
- while (await reader.ReadAsync())
- {
- result.Add(reader.GetString(0));
- }
- }
- }
- return result;
- }
-
private void RemoveSubscriber(IActorRef subscriberRef)
{
- _allIdsSubscribers.Remove(subscriberRef);
_persistenceIdSubscribers.RemoveItem(subscriberRef);
_tagSubscribers.RemoveItem(subscriberRef);
_newEventSubscriber.Remove(subscriberRef);
@@ -845,18 +775,6 @@ private void AddTagSubscriber(SubscribeTag message)
Context.Watch(subscriber);
}
- private void AddAllPersistenceIdsSubscriber(SubscribeAllPersistenceIds message)
- {
- if (!HasAllIdsSubscribers)
- {
- Self.Tell(GetCurrentPersistenceIds.Instance);
- }
-
- var subscriber = Sender;
- _allIdsSubscribers.Add(subscriber);
- Context.Watch(subscriber);
- }
-
private void AddPersistenceIdSubscriber(SubscribePersistenceId message)
{
var subscriber = Sender;
@@ -895,18 +813,6 @@ private void NotifyPersistenceIdChanged(string persistenceId)
}
}
- protected void NotifyNewPersistenceIdAdded(string persistenceId)
- {
- if (_allPersistenceIds.Add(persistenceId) && HasAllIdsSubscribers)
- {
- var added = new PersistenceIdAdded(persistenceId);
- foreach (var subscriber in _allIdsSubscribers)
- {
- subscriber.Tell(added, ActorRefs.NoSender);
- }
- }
- }
-
#endregion
///
@@ -1004,6 +910,9 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext
case ReplayAllEvents msg:
await HandleReplayAllMessages(msg, command);
break;
+ case SelectCurrentPersistenceIds msg:
+ await HandleSelectCurrentPersistenceIds(msg, command);
+ break;
default:
Unhandled(req);
break;
@@ -1040,8 +949,6 @@ protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TComma
var toSequenceNr = req.ToSequenceNr;
var persistenceId = req.PersistenceId;
- NotifyNewPersistenceIdAdded(persistenceId);
-
try
{
var highestSequenceNr = await ReadHighestSequenceNr(persistenceId, command);
@@ -1086,6 +993,34 @@ protected virtual async Task ReadHighestSequenceNr(string persistenceId, T
return highestSequenceNr;
}
+ protected virtual async Task ReadHighestSequenceNr(TCommand command)
+ {
+ command.CommandText = HighestOrderingSql;
+ command.Parameters.Clear();
+
+ var result = await command.ExecuteScalarAsync();
+ var highestSequenceNr = result is long ? Convert.ToInt64(result) : 0L;
+ return highestSequenceNr;
+ }
+
+ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message, TCommand command)
+ {
+ long highestOrderingNumber = await ReadHighestSequenceNr(command);
+
+ var result = new List(256);
+ command.CommandText = AllPersistenceIdsSql;
+ command.Parameters.Clear();
+ AddParameter(command, "@Ordering", DbType.Int64, message.Offset);
+
+ var reader = await command.ExecuteReaderAsync();
+ while (await reader.ReadAsync())
+ {
+ result.Add(reader.GetString(0));
+ }
+
+ message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber));
+ }
+
protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req, TCommand command)
{
var replyTo = req.ReplyTo;
@@ -1179,8 +1114,6 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c
: req.PersistentActor;
var persistenceId = req.PersistenceId;
- NotifyNewPersistenceIdAdded(persistenceId);
-
try
{
var highestSequenceNr = await ReadHighestSequenceNr(persistenceId, command);
@@ -1268,8 +1201,6 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
var response = (new WriteMessageSuccess(unadapted, actorInstanceId), unadapted.Sender);
responses.Add(response);
persistenceIds.Add(persistent.PersistenceId);
-
- NotifyNewPersistenceIdAdded(persistent.PersistenceId);
}
catch (DbException cause)
{
diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs
index 8ac1b79bdec..5e993190323 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs
@@ -64,20 +64,17 @@ public EventAppended(string persistenceId)
}
}
- ///
- /// Subscribe the `sender` to current and new persistenceIds.
- /// Used by query-side. The journal will send one to the
- /// subscriber followed by messages when new persistenceIds
- /// are created.
- ///
[Serializable]
- public sealed class SubscribeAllPersistenceIds : ISubscriptionCommand
+ public sealed class SelectCurrentPersistenceIds : IJournalRequest
{
- ///
- /// TBD
- ///
- public static readonly SubscribeAllPersistenceIds Instance = new SubscribeAllPersistenceIds();
- private SubscribeAllPersistenceIds() { }
+ public IActorRef ReplyTo { get; }
+ public long Offset { get; }
+
+ public SelectCurrentPersistenceIds(long offset, IActorRef replyTo)
+ {
+ Offset = offset;
+ ReplyTo = replyTo;
+ }
}
///
@@ -91,34 +88,17 @@ public sealed class CurrentPersistenceIds : IDeadLetterSuppression
///
public readonly IEnumerable AllPersistenceIds;
+ public readonly long HighestOrderingNumber;
+
///
/// TBD
///
/// TBD
- public CurrentPersistenceIds(IEnumerable allPersistenceIds)
+ /// TBD
+ public CurrentPersistenceIds(IEnumerable allPersistenceIds, long highestOrderingNumber)
{
AllPersistenceIds = allPersistenceIds.ToImmutableHashSet();
- }
- }
-
- ///
- /// TBD
- ///
- [Serializable]
- public sealed class PersistenceIdAdded : IDeadLetterSuppression
- {
- ///
- /// TBD
- ///
- public readonly string PersistenceId;
-
- ///
- /// TBD
- ///
- /// TBD
- public PersistenceIdAdded(string persistenceId)
- {
- PersistenceId = persistenceId;
+ HighestOrderingNumber = highestOrderingNumber;
}
}
diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs
index eb9ed40e93f..aa155b70483 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs
@@ -6,6 +6,7 @@
//-----------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using System.Collections.Immutable;
using System.Data;
using System.Data.Common;
@@ -34,8 +35,9 @@ public interface IJournalQueryExecutor
///
/// TBD
/// TBD
+ /// TBD
/// TBD
- Task> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken);
+ Task> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken, long offset);
///
/// Asynchronously replays a on all selected events for provided
@@ -69,9 +71,10 @@ public interface IJournalQueryExecutor
Task SelectByTagAsync(DbConnection connection, CancellationToken cancellationToken, string tag, long fromOffset, long toOffset, long max, Action callback);
Task SelectAllEventsAsync(
- DbConnection connection,
+ DbConnection connection,
CancellationToken cancellationToken,
long fromOffset,
+ long toOffset,
long max,
Action callback);
@@ -83,7 +86,9 @@ public interface IJournalQueryExecutor
/// TBD
/// TBD
Task SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId);
-
+
+ Task SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken);
+
///
/// Asynchronously inserts a collection of events and theirs tags into a journal table.
///
@@ -325,8 +330,15 @@ protected AbstractQueryExecutor(QueryConfiguration configuration, Akka.Serializa
e.{Configuration.SerializerIdColumnName} as SerializerId";
AllPersistenceIdsSql = $@"
- SELECT DISTINCT e.{Configuration.PersistenceIdColumnName} as PersistenceId
- FROM {Configuration.FullJournalTableName} e;";
+ SELECT DISTINCT u.Id as PersistenceId
+ FROM (
+ SELECT DISTINCT e.{Configuration.PersistenceIdColumnName} as Id
+ FROM {Configuration.FullJournalTableName} e
+ WHERE e.{Configuration.OrderingColumnName} > @Ordering
+ UNION
+ SELECT DISTINCT e.{Configuration.PersistenceIdColumnName} as Id
+ FROM {Configuration.FullMetaTableName} e
+ ) as u";
HighestSequenceNrSql = $@"
SELECT MAX(u.SeqNr) as SequenceNr
@@ -376,8 +388,7 @@ protected AbstractQueryExecutor(QueryConfiguration configuration, Akka.Serializa
HighestOrderingSql =
$@"
SELECT MAX(e.{Configuration.OrderingColumnName}) as Ordering
- FROM {Configuration.FullJournalTableName} e
- WHERE e.{Configuration.OrderingColumnName} > @Ordering";
+ FROM {Configuration.FullJournalTableName} e";
InsertEventSql = $@"
INSERT INTO {Configuration.FullJournalTableName} (
@@ -473,19 +484,23 @@ protected AbstractQueryExecutor(QueryConfiguration configuration, Akka.Serializa
///
/// TBD
/// TBD
+ /// TBD
/// TBD
- public virtual async Task> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken)
+ public virtual async Task> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken, long offset)
{
using (var command = GetCommand(connection, AllPersistenceIdsSql))
- using (var reader = await command.ExecuteReaderAsync(cancellationToken))
{
- var builder = ImmutableArray.CreateBuilder();
- while (await reader.ReadAsync(cancellationToken))
+ AddParameter(command, "@Ordering", DbType.Int64, offset);
+
+ using (var reader = await command.ExecuteReaderAsync(cancellationToken))
{
- builder.Add(reader.GetString(0));
+ var builder = ImmutableArray.CreateBuilder();
+ while (await reader.ReadAsync(cancellationToken))
+ {
+ builder.Add(reader.GetString(0));
+ }
+ return builder.ToImmutable();
}
-
- return builder.ToImmutable();
}
}
@@ -585,16 +600,25 @@ public virtual async Task> SelectAllPersistenceIdsAsync(D
}
public async Task SelectAllEventsAsync(
- DbConnection connection,
+ DbConnection connection,
CancellationToken cancellationToken,
long fromOffset,
+ long toOffset,
long max,
Action callback)
{
+ long maxOrdering;
+ using (var command = GetCommand(connection, HighestOrderingSql))
+ {
+ maxOrdering = (await command.ExecuteScalarAsync(cancellationToken)) as long? ?? 0L;
+ }
+
using (var command = GetCommand(connection, AllEventsSql))
{
+ var take = Math.Min(toOffset - fromOffset, max);
+
AddParameter(command, "@Ordering", DbType.Int64, fromOffset);
- AddParameter(command, "@Take", DbType.Int64, max);
+ AddParameter(command, "@Take", DbType.Int64, take);
var commandBehavior = Configuration.UseSequentialAccess ?
CommandBehavior.SequentialAccess :
@@ -611,12 +635,7 @@ public virtual async Task> SelectAllPersistenceIdsAsync(D
}
}
- using (var command = GetCommand(connection, HighestOrderingSql))
- {
- AddParameter(command, "@Ordering", DbType.Int64, fromOffset);
- var maxOrdering = (await command.ExecuteScalarAsync(cancellationToken)) as long? ?? 0L;
- return maxOrdering;
- }
+ return maxOrdering;
}
///
@@ -637,6 +656,15 @@ public virtual async Task SelectHighestSequenceNrAsync(DbConnection connec
}
}
+ public virtual async Task SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken)
+ {
+ using (var command = GetCommand(connection, HighestOrderingSql))
+ {
+ var result = await command.ExecuteScalarAsync(cancellationToken);
+ return result is long ? Convert.ToInt64(result) : 0L;
+ }
+ }
+
///
/// TBD
///
diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
index b19a589440d..72c5ae0dff9 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
@@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Data.Common;
using System.Linq;
+using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
@@ -27,9 +28,6 @@ public abstract class SqlJournal : AsyncWriteJournal, IWithUnboundedStash
private ImmutableDictionary> _persistenceIdSubscribers = ImmutableDictionary.Create>();
private ImmutableDictionary> _tagSubscribers = ImmutableDictionary.Create>();
private readonly HashSet _newEventsSubscriber = new HashSet();
- private readonly HashSet _allPersistenceIdSubscribers = new HashSet();
- private readonly ReaderWriterLockSlim _allPersistenceIdsLock = new ReaderWriterLockSlim();
- private HashSet _allPersistenceIds = new HashSet();
private IImmutableDictionary _tagSequenceNr = ImmutableDictionary.Empty;
private readonly CancellationTokenSource _pendingRequestsCancellation;
@@ -47,16 +45,8 @@ protected SqlJournal(Config journalConfig)
_pendingRequestsCancellation = new CancellationTokenSource();
}
- ///
- /// TBD
- ///
public IStash Stash { get; set; }
- ///
- /// TBD
- ///
- public IEnumerable AllPersistenceIds => _allPersistenceIds;
-
///
/// TBD
///
@@ -69,10 +59,6 @@ protected SqlJournal(Config journalConfig)
/// TBD
///
protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0;
- ///
- /// TBD
- ///
- protected bool HasAllPersistenceIdSubscribers => _allPersistenceIdSubscribers.Count != 0;
///
/// Returns a HOCON config path to associated journal.
@@ -118,9 +104,9 @@ protected override bool ReceivePluginInternal(object message)
AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId);
Context.Watch(Sender);
return true;
- case SubscribeAllPersistenceIds _:
- AddAllPersistenceIdSubscriber(Sender);
- Context.Watch(Sender);
+ case SelectCurrentPersistenceIds request:
+ SelectAllPersistenceIdsAsync(request.Offset)
+ .PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, request.Offset));
return true;
case SubscribeTag subscribe:
AddTagSubscriber(Sender, subscribe.Tag);
@@ -178,8 +164,6 @@ protected override async Task> WriteMessagesAsync(IEnu
if (IsTagId(p.PersistenceId))
throw new InvalidOperationException($"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
-
- NotifyNewPersistenceIdAdded(p.PersistenceId);
}
var batch = new WriteJournalBatch(eventToTags);
@@ -246,9 +230,11 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay)
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
- .SelectAllEventsAsync(connection,
+ .SelectAllEventsAsync(
+ connection,
cancellationToken.Token,
replay.FromOffset,
+ replay.ToOffset,
replay.Max,
replayedEvent => {
foreach (var adapted in AdaptFromJournal(replayedEvent.Persistent))
@@ -260,6 +246,20 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay)
}
}
+ protected virtual async Task<(IEnumerable Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
+ {
+ using (var connection = CreateDbConnection())
+ {
+ await connection.OpenAsync();
+ using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ {
+ var lastOrdering = await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
+ var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token, offset);
+ return (ids, lastOrdering);
+ }
+ }
+ }
+
///
/// TBD
///
@@ -273,7 +273,6 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay)
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max,
Action recoveryCallback)
{
- NotifyNewPersistenceIdAdded(persistenceId);
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
@@ -310,24 +309,27 @@ protected override void PostStop()
/// TBD
protected bool WaitingForInitialization(object message)
{
- return message.Match()
- .With(all =>
- {
- _allPersistenceIds = new HashSet(all.Ids);
+ switch (message)
+ {
+ case Status.Success _:
UnbecomeStacked();
Stash.UnstashAll();
- })
- .With(fail =>
- {
- Log.Error(fail.Exception, "Failure during {0} initialization.", Self);
+ return true;
+ case Status.Failure fail:
+ Log.Error(fail.Cause, "Failure during {0} initialization.", Self);
Context.Stop(Self);
- })
- .Default(_ => Stash.Stash())
- .WasHandled;
+ return true;
+ default:
+ Stash.Stash();
+ return true;
+ }
}
private async Task
public ManualProbe ExpectNextUnordered(T e1, T e2, params T[] elems)
+ {
+ return ExpectNextUnordered(null, e1, e2, elems);
+ }
+
+ public ManualProbe ExpectNextUnordered(TimeSpan? timeout, T e1, T e2, params T[] elems)
{
var len = elems.Length + 2;
- var e = ExpectNextN(len).ToArray();
+ var e = ExpectNextN(len, timeout).ToArray();
AssertEquals(e.Length, len, "expected to get {0} events, but got {1}", len, e.Length);
- var expectedSet = new HashSet(elems) {e1, e2};
+ var expectedSet = new HashSet(elems) { e1, e2 };
expectedSet.ExceptWith(e);
Assert(expectedSet.Count == 0, "unexpected elements [{0}] found in the result", string.Join(", ", expectedSet));
return this;
}
+ public ManualProbe ExpectNextWithinSet(List elems)
+ {
+ var next = _probe.ExpectMsg>();
+ if(!elems.Contains(next.Element))
+ Assert(false, "unexpected elements [{0}] found in the result", next.Element);
+ elems.Remove(next.Element);
+ _probe.Log.Info($"Received '{next.Element}' within OnNext().");
+ return this;
+ }
+
///
/// Expect and return the next stream elements.
///
- public IEnumerable ExpectNextN(long n)
+ public IEnumerable ExpectNextN(long n, TimeSpan? timeout = null)
{
var res = new List((int)n);
for (int i = 0; i < n; i++)
{
- var next = _probe.ExpectMsg>();
+ var next = _probe.ExpectMsg>(timeout);
res.Add(next.Element);
}
return res;
@@ -224,10 +242,10 @@ public IEnumerable ExpectNextN(long n)
///
/// Fluent DSL. Expect the given elements to be signalled in order.
///
- public ManualProbe ExpectNextN(IEnumerable all)
+ public ManualProbe ExpectNextN(IEnumerable all, TimeSpan? timeout = null)
{
foreach (var x in all)
- _probe.ExpectMsg>(y => AssertEquals(y.Element, x, "Expected one of ({0}), but got '{1}'", string.Join(", ", all), y.Element));
+ _probe.ExpectMsg>(y => AssertEquals(y.Element, x, "Expected one of ({0}), but got '{1}'", string.Join(", ", all), y.Element), timeout);
return this;
}
@@ -235,12 +253,12 @@ public ManualProbe ExpectNextN(IEnumerable all)
///
/// Fluent DSL. Expect the given elements to be signalled in any order.
///
- public ManualProbe ExpectNextUnorderedN(IEnumerable all)
+ public ManualProbe ExpectNextUnorderedN(IEnumerable all, TimeSpan? timeout = null)
{
var collection = new HashSet(all);
while (collection.Count > 0)
{
- var next = ExpectNext();
+ var next = timeout.HasValue ? ExpectNext(timeout.Value) : ExpectNext();
Assert(collection.Contains(next), $"expected one of (${string.Join(", ", collection)}), but received {next}");
collection.Remove(next);
}
@@ -480,6 +498,11 @@ private void Assert(bool predicate, string format, params object[] args)
if (!predicate) throw new Exception(string.Format(format, args));
}
+ private void Assert(Func predicate, string format, params object[] args)
+ {
+ if (!predicate()) throw new Exception(string.Format(format, args));
+ }
+
private void AssertEquals(T1 x, T2 y, string format, params object[] args)
{
if (!Equals(x, y)) throw new Exception(string.Format(format, args));
diff --git a/src/core/Akka.Streams.Tests/Implementation/DistinctRetainingMultiReaderBufferSpec.cs b/src/core/Akka.Streams.Tests/Implementation/DistinctRetainingMultiReaderBufferSpec.cs
new file mode 100644
index 00000000000..d56d8d42a82
--- /dev/null
+++ b/src/core/Akka.Streams.Tests/Implementation/DistinctRetainingMultiReaderBufferSpec.cs
@@ -0,0 +1,149 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Akka.Streams.Implementation;
+using FluentAssertions;
+using FluentAssertions.Execution;
+using Xunit;
+
+namespace Akka.Streams.Tests.Implementation
+{
+ public class DistinctRetainingMultiReaderBufferSpec
+ {
+ // The rest of the tests are covered by ResizableMultiReaderRingBufferSpec
+
+ [Fact]
+ public void A_DistinctRetainingMultiReaderBuffer_should_store_distinct_values_only()
+ {
+ var test = new Test(4, 4, 3);
+ test.Write(1).Should().BeTrue();
+ test.Write(2).Should().BeTrue();
+ test.Write(3).Should().BeTrue();
+ test.Write(2).Should().BeTrue();
+ test.Write(2).Should().BeTrue();
+ test.Write(1).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(0).Should().Be(1);
+ test.Read(0).Should().Be(2);
+ test.Read(1).Should().Be(1);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(0).Should().Be(3);
+ test.Read(0).Should().Be(null);
+ test.Read(1).Should().Be(2);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(2).Should().Be(1);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(1).Should().Be(3);
+ test.Read(1).Should().Be(null);
+ test.Read(2).Should().Be(2);
+ test.Read(2).Should().Be(3);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ }
+
+ private class TestBuffer : DistinctRetainingMultiReaderBuffer
+ {
+ public ICursors UnderlyingCursors { get; }
+
+ public TestBuffer(int initialSize, int maxSize, ICursors cursors) : base(initialSize, maxSize, cursors)
+ {
+ UnderlyingCursors = cursors;
+ }
+
+ public string Inspect()
+ {
+ return Buffer.Select(x => x ?? 0).Aggregate("", (s, i) => s + i + " ") +
+ ToString().SkipWhile(c => c != '(').Aggregate("", (s, c) => s + c);
+ }
+ }
+
+ private class Test : TestBuffer
+ {
+ public Test(int initialSize, int maxSize, int cursorCount) : base(initialSize, maxSize, new SimpleCursors(cursorCount))
+ {
+ }
+
+ public int? Read(int cursorIx)
+ {
+ try
+ {
+ return Read(Cursors.Cursors.ElementAt(cursorIx));
+ }
+ catch (NothingToReadException)
+ {
+ return null;
+ }
+ }
+ }
+
+ private class SimpleCursors : ICursors
+ {
+ public SimpleCursors(IEnumerable cursors)
+ {
+ Cursors = cursors;
+ }
+
+ public SimpleCursors(int cursorCount)
+ {
+ Cursors = Enumerable.Range(0, cursorCount).Select(_ => new SimpleCursor()).ToList();
+ }
+
+ public IEnumerable Cursors { get; }
+ }
+
+ private class SimpleCursor : ICursor
+ {
+ public long Cursor { get; set; }
+ }
+
+ private class StressTestCursor : ICursor
+ {
+ private readonly int _cursorNr;
+ private readonly int _run;
+ private readonly Action _log;
+ private readonly int _counterLimit;
+ private readonly StringBuilder _sb;
+ private int _counter = 1;
+
+ public StressTestCursor(int cursorNr, int run, Action log, int counterLimit, StringBuilder sb)
+ {
+ _cursorNr = cursorNr;
+ _run = run;
+ _log = log;
+ _counterLimit = counterLimit;
+ _sb = sb;
+ }
+
+ public bool TryReadAndReturnTrueIfDone(TestBuffer buf)
+ {
+ _log($" Try reading of {this}: ");
+ try
+ {
+ var x = buf.Read(this);
+ _log("OK\n");
+ if (x != _counter)
+ {
+ throw new AssertionFailedException(
+ $@"|Run {_run}, cursorNr {_cursorNr}, counter {_counter}: got unexpected {x}
+ | Buf: {buf.Inspect()}
+ | Cursors: {buf.UnderlyingCursors.Cursors.Aggregate(" ", (s, cursor) => s + cursor + "\n ")}
+ |Log: {_sb}
+ ");
+ }
+ _counter++;
+ return _counter == _counterLimit;
+ }
+ catch (NothingToReadException)
+ {
+ _log("FAILED\n");
+ return false; // ok, we currently can't read, try again later
+ }
+ }
+
+ public long Cursor { get; set; }
+
+ public override string ToString() => $"cursorNr {_cursorNr}, ix {Cursor}, counter {_counter}";
+ }
+ }
+}
diff --git a/src/core/Akka.Streams.Tests/Implementation/ResizableMultiReaderRingBufferSpec.cs b/src/core/Akka.Streams.Tests/Implementation/ResizableMultiReaderRingBufferSpec.cs
index bdaebbdf75b..f04ac789ace 100644
--- a/src/core/Akka.Streams.Tests/Implementation/ResizableMultiReaderRingBufferSpec.cs
+++ b/src/core/Akka.Streams.Tests/Implementation/ResizableMultiReaderRingBufferSpec.cs
@@ -244,7 +244,7 @@ public SimpleCursors(int cursorCount)
private class SimpleCursor : ICursor
{
- public int Cursor { get; set; }
+ public long Cursor { get; set; }
}
private class StressTestCursor : ICursor
@@ -291,7 +291,7 @@ public bool TryReadAndReturnTrueIfDone(TestBuffer buf)
}
}
- public int Cursor { get; set; }
+ public long Cursor { get; set; }
public override string ToString() => $"cursorNr {_cursorNr}, ix {Cursor}, counter {_counter}";
}
diff --git a/src/core/Akka.Streams.Tests/Implementation/RetainingMultiReaderBufferSpec.cs b/src/core/Akka.Streams.Tests/Implementation/RetainingMultiReaderBufferSpec.cs
new file mode 100644
index 00000000000..64a9e6b9761
--- /dev/null
+++ b/src/core/Akka.Streams.Tests/Implementation/RetainingMultiReaderBufferSpec.cs
@@ -0,0 +1,249 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Akka.Streams.Implementation;
+using FluentAssertions;
+using FluentAssertions.Execution;
+using Xunit;
+
+namespace Akka.Streams.Tests.Implementation
+{
+ public class RetainingMultiReaderBufferSpec
+ {
+ [Theory]
+ [InlineData(2, 4, 1, "0 0 (size=0, cursors=1)")]
+ [InlineData(4, 4, 3, "0 0 0 0 (size=0, cursors=3)")]
+ public void A_RetainingMultiReaderBufferSpec_should_initially_be_empty(int iSize, int mSize, int cursorCount, string expected)
+ {
+ var test = new Test(iSize, mSize, cursorCount);
+ test.Inspect().Should().Be(expected);
+ }
+
+ [Fact]
+ public void A_RetainingMultiReaderBufferSpec_should_fail_reads_if_nothing_can_be_read()
+ {
+ var test = new Test(4, 4, 3);
+ test.Write(1).Should().BeTrue();
+ test.Write(2).Should().BeTrue();
+ test.Write(3).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(0).Should().Be(1);
+ test.Read(0).Should().Be(2);
+ test.Read(1).Should().Be(1);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(0).Should().Be(3);
+ test.Read(0).Should().Be(null);
+ test.Read(1).Should().Be(2);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(2).Should().Be(1);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ test.Read(1).Should().Be(3);
+ test.Read(1).Should().Be(null);
+ test.Read(2).Should().Be(2);
+ test.Read(2).Should().Be(3);
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=3)");
+ }
+
+ [Fact]
+ public void A_RetainingMultiReaderBufferSpec_should_automatically_grow_if_possible()
+ {
+ var test = new Test(2, 8, 2);
+ test.Write(1).Should().BeTrue();
+ test.Inspect().Should().Be("1 0 (size=1, cursors=2)");
+ test.Write(2).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 (size=2, cursors=2)");
+ test.Write(3).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 0 (size=3, cursors=2)");
+ test.Write(4).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 4 (size=4, cursors=2)");
+ test.Read(0).Should().Be(1);
+ test.Read(0).Should().Be(2);
+ test.Read(0).Should().Be(3);
+ test.Read(1).Should().Be(1);
+ test.Read(1).Should().Be(2);
+ test.Write(5).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 4 5 0 0 0 (size=5, cursors=2)");
+ test.Write(6).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 4 5 6 0 0 (size=6, cursors=2)");
+ test.Write(7).Should().BeTrue();
+ test.Inspect().Should().Be("1 2 3 4 5 6 7 0 (size=7, cursors=2)");
+ test.Read(0).Should().Be(4);
+ test.Read(0).Should().Be(5);
+ test.Read(0).Should().Be(6);
+ test.Read(0).Should().Be(7);
+ test.Read(0).Should().Be(null);
+ test.Read(1).Should().Be(3);
+ test.Read(1).Should().Be(4);
+ test.Read(1).Should().Be(5);
+ test.Read(1).Should().Be(6);
+ test.Read(1).Should().Be(7);
+ test.Read(1).Should().Be(null);
+ test.Inspect().Should().Be("1 2 3 4 5 6 7 0 (size=7, cursors=2)");
+ }
+
+ [Fact]
+ public void A_RetainingMultiReaderBufferSpec_should_pass_the_stress_test()
+ {
+ // create 100 buffers with an initialSize of 1 and a maxSize of 1 to 64,
+ // for each one attach 1 to 8 cursors and randomly try reading and writing to the buffer;
+ // in total 200 elements need to be written to the buffer and read in the correct order by each cursor
+ var MAXSIZEBIT_LIMIT = 6; // 2 ^ (this number)
+ var COUNTER_LIMIT = 200;
+ var LOG = false;
+ var sb = new StringBuilder();
+ var log = new Action(s =>
+ {
+ if (LOG)
+ sb.Append(s);
+ });
+
+ var random = new Random();
+ for (var bit = 1; bit <= MAXSIZEBIT_LIMIT; bit++)
+ for (var n = 1; n <= 2; n++)
+ {
+ var counter = 1;
+ var activeCoursors =
+ Enumerable.Range(0, random.Next(8) + 1)
+ .Select(i => new StressTestCursor(i, 1 << bit, log, COUNTER_LIMIT, sb))
+ .ToList();
+ var stillWriting = 2;// give writing a slight bias, so as to somewhat "stretch" the buffer
+ var buf = new TestBuffer(1, 1 << bit, new SimpleCursors(activeCoursors));
+ sb.Clear();
+
+ while (activeCoursors.Count != 0)
+ {
+ log($"Buf: {buf.Inspect()}\n");
+ var activeCursorCount = activeCoursors.Count;
+ var index = random.Next(activeCursorCount + stillWriting);
+ if (index >= activeCursorCount)
+ {
+ log($" Writing {counter}: ");
+ if (buf.Write(counter))
+ {
+ log("OK\n");
+ counter++;
+ }
+ else
+ {
+ log("FAILED\n");
+ if (counter == COUNTER_LIMIT)
+ stillWriting = 0;
+ }
+ }
+ else
+ {
+ var cursor = activeCoursors[index];
+ if (cursor.TryReadAndReturnTrueIfDone(buf))
+ activeCoursors = activeCoursors.Where(c => c != cursor).ToList();
+ }
+ }
+ }
+ }
+
+ private class TestBuffer : RetainingMultiReaderBuffer
+ {
+ public ICursors UnderlyingCursors { get; }
+
+ public TestBuffer(int initialSize, int maxSize, ICursors cursors) : base(initialSize, maxSize, cursors)
+ {
+ UnderlyingCursors = cursors;
+ }
+
+ public string Inspect()
+ {
+ return Buffer.Select(x => x ?? 0).Aggregate("", (s, i) => s + i + " ") +
+ ToString().SkipWhile(c => c != '(').Aggregate("", (s, c) => s + c);
+ }
+ }
+
+ private class Test : TestBuffer
+ {
+ public Test(int initialSize, int maxSize, int cursorCount) : base(initialSize, maxSize, new SimpleCursors(cursorCount))
+ {
+ }
+
+ public int? Read(int cursorIx)
+ {
+ try
+ {
+ return Read(Cursors.Cursors.ElementAt(cursorIx));
+ }
+ catch (NothingToReadException)
+ {
+ return null;
+ }
+ }
+ }
+
+ private class SimpleCursors : ICursors
+ {
+ public SimpleCursors(IEnumerable cursors)
+ {
+ Cursors = cursors;
+ }
+
+ public SimpleCursors(int cursorCount)
+ {
+ Cursors = Enumerable.Range(0, cursorCount).Select(_ => new SimpleCursor()).ToList();
+ }
+
+ public IEnumerable Cursors { get; }
+ }
+
+ private class SimpleCursor : ICursor
+ {
+ public long Cursor { get; set; }
+ }
+
+ private class StressTestCursor : ICursor
+ {
+ private readonly int _cursorNr;
+ private readonly int _run;
+ private readonly Action _log;
+ private readonly int _counterLimit;
+ private readonly StringBuilder _sb;
+ private int _counter = 1;
+
+ public StressTestCursor(int cursorNr, int run, Action log, int counterLimit, StringBuilder sb)
+ {
+ _cursorNr = cursorNr;
+ _run = run;
+ _log = log;
+ _counterLimit = counterLimit;
+ _sb = sb;
+ }
+
+ public bool TryReadAndReturnTrueIfDone(TestBuffer buf)
+ {
+ _log($" Try reading of {this}: ");
+ try
+ {
+ var x = buf.Read(this);
+ _log("OK\n");
+ if (x != _counter)
+ {
+ throw new AssertionFailedException(
+ $@"|Run {_run}, cursorNr {_cursorNr}, counter {_counter}: got unexpected {x}
+ | Buf: {buf.Inspect()}
+ | Cursors: {buf.UnderlyingCursors.Cursors.Aggregate(" ", (s, cursor) => s + cursor + "\n ")}
+ |Log: {_sb}
+ ");
+ }
+ _counter++;
+ return _counter == _counterLimit;
+ }
+ catch (NothingToReadException)
+ {
+ _log("FAILED\n");
+ return false; // ok, we currently can't read, try again later
+ }
+ }
+
+ public long Cursor { get; set; }
+
+ public override string ToString() => $"cursorNr {_cursorNr}, ix {Cursor}, counter {_counter}";
+ }
+ }
+}
diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs
index a7b400a8530..54925ccf2b4 100644
--- a/src/core/Akka.Streams/Dsl/Sink.cs
+++ b/src/core/Akka.Streams/Dsl/Sink.cs
@@ -282,7 +282,10 @@ public static class Sink
/// TBD
/// TBD
public static Sink> FanoutPublisher()
- => new Sink>(new FanoutPublisherSink(DefaultAttributes.FanoutPublisherSink, Shape("FanoutPublisherSink")));
+ => new Sink>(new FanoutPublisherSink>(DefaultAttributes.FanoutPublisherSink, Shape("FanoutPublisherSink")));
+
+ internal static Sink> DistinctRetainingFanOutPublisher(Action onTerminated = null)
+ => new Sink>(new FanoutPublisherSink>(DefaultAttributes.FanoutPublisherSink, Shape("DistinctRetainingFanOutPublisherSink"), onTerminated));
///
/// A that will consume the stream and discard the elements.
@@ -592,7 +595,7 @@ public static class Sink
{
SinkModule> publisherSink;
if (fanout)
- publisherSink = new FanoutPublisherSink(DefaultAttributes.FanoutPublisherSink, Shape("FanoutPublisherSink"));
+ publisherSink = new FanoutPublisherSink>(DefaultAttributes.FanoutPublisherSink, Shape("FanoutPublisherSink"));
else
publisherSink = new PublisherSink(DefaultAttributes.PublisherSink, Shape("PublisherSink"));
diff --git a/src/core/Akka.Streams/Implementation/ActorPublisher.cs b/src/core/Akka.Streams/Implementation/ActorPublisher.cs
index f6b6e9dfbac..b8a273e64b1 100644
--- a/src/core/Akka.Streams/Implementation/ActorPublisher.cs
+++ b/src/core/Akka.Streams/Implementation/ActorPublisher.cs
@@ -390,7 +390,7 @@ public ActorSubscriptionWithCursor(IActorRef implementor, ISubscriber subsc
///
/// TBD
///
- public int Cursor { get; private set; }
+ public long Cursor { get; private set; }
long ISubscriptionWithCursor.TotalDemand
{
@@ -409,7 +409,7 @@ public ActorSubscriptionWithCursor(IActorRef implementor, ISubscriber subsc
/// TBD
public void Dispatch(TIn element) => ReactiveStreamsCompliance.TryOnNext(Subscriber, element);
- int ICursor.Cursor
+ long ICursor.Cursor
{
get { return Cursor; }
set { Cursor = value; }
diff --git a/src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs b/src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs
index 4f7312f341c..c7da56b2adc 100644
--- a/src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs
+++ b/src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs
@@ -17,7 +17,8 @@ namespace Akka.Streams.Implementation
/// TBD
///
/// TBD
- internal class FanoutOutputs : SubscriberManagement, IOutputs
+ /// TBD
+ internal class FanoutOutputs : SubscriberManagement, IOutputs where TStreamBuffer : IStreamBuffer
{
private long _downstreamBufferSpace;
private bool _downstreamCompleted;
@@ -87,8 +88,7 @@ public FanoutOutputs(int maxBufferSize, int initialBufferSize, IActorRef self, I
NeedsDemandOrCancel = DefaultOutputTransferStates.NeedsDemandOrCancel(this);
SubReceive = new SubReceive(message =>
{
- var publisher = message as ExposedPublisher;
- if (publisher == null)
+ if (!(message is ExposedPublisher publisher))
throw new IllegalStateException($"The first message must be ExposedPublisher but was {message}");
ExposedPublisher = publisher.Publisher;
@@ -112,24 +112,22 @@ protected override ISubscriptionWithCursor CreateSubscription(ISubscriber
/// TBD
protected bool DownstreamRunning(object message)
{
- if (message is SubscribePending)
- SubscribePending();
- else if (message is RequestMore)
+ switch (message)
{
- var requestMore = (RequestMore) message;
- MoreRequested((ActorSubscriptionWithCursor) requestMore.Subscription, requestMore.Demand);
- _pump.Pump();
+ case SubscribePending _:
+ SubscribePending();
+ return true;
+ case RequestMore requestMore:
+ MoreRequested((ActorSubscriptionWithCursor) requestMore.Subscription, requestMore.Demand);
+ _pump.Pump();
+ return true;
+ case Cancel cancel:
+ UnregisterSubscription((ActorSubscriptionWithCursor) cancel.Subscription);
+ _pump.Pump();
+ return true;
+ default:
+ return false;
}
- else if (message is Cancel)
- {
- var cancel = (Cancel) message;
- UnregisterSubscription((ActorSubscriptionWithCursor) cancel.Subscription);
- _pump.Pump();
- }
- else
- return false;
-
- return true;
}
///
@@ -217,15 +215,19 @@ public void Error(Exception e)
/// TBD
///
/// TBD
- internal sealed class FanoutProcessorImpl : ActorProcessorImpl
+ /// TBD
+ internal sealed class FanoutProcessorImpl : ActorProcessorImpl where TStreamBuffer : IStreamBuffer
{
+ private readonly Action _onTerminated;
+
///
/// TBD
///
/// TBD
+ /// TBD
/// TBD
- public static Props Props(ActorMaterializerSettings settings)
- => Actor.Props.Create(() => new FanoutProcessorImpl(settings)).WithDeploy(Deploy.Local);
+ public static Props Props(ActorMaterializerSettings settings, Action onTerminated = null)
+ => Actor.Props.Create(() => new FanoutProcessorImpl(settings, onTerminated)).WithDeploy(Deploy.Local);
///
/// TBD
@@ -236,11 +238,14 @@ public static Props Props(ActorMaterializerSettings settings)
/// TBD
///
/// TBD
- public FanoutProcessorImpl(ActorMaterializerSettings settings) : base(settings)
+ /// TBD
+ public FanoutProcessorImpl(ActorMaterializerSettings settings, Action onTerminated) : base(settings)
{
- PrimaryOutputs = new FanoutOutputs(settings.MaxInputBufferSize,
+ PrimaryOutputs = new FanoutOutputs(settings.MaxInputBufferSize,
settings.InitialInputBufferSize, Self, this, AfterFlush);
+ _onTerminated = onTerminated;
+
var running = new TransferPhase(PrimaryInputs.NeedsInput.And(PrimaryOutputs.NeedsDemand),
() => PrimaryOutputs.EnqueueOutputElement(PrimaryInputs.DequeueInputElement()));
InitialPhase(1, running);
@@ -269,6 +274,10 @@ public override void PumpFinished()
PrimaryOutputs.Complete();
}
- private void AfterFlush() => Context.Stop(Self);
+ private void AfterFlush()
+ {
+ _onTerminated?.Invoke();
+ Context.Stop(Self);
+ }
}
}
diff --git a/src/core/Akka.Streams/Implementation/ResizableMultiReaderRingBuffer.cs b/src/core/Akka.Streams/Implementation/ResizableMultiReaderRingBuffer.cs
index ccdf061e711..7e885623213 100644
--- a/src/core/Akka.Streams/Implementation/ResizableMultiReaderRingBuffer.cs
+++ b/src/core/Akka.Streams/Implementation/ResizableMultiReaderRingBuffer.cs
@@ -8,7 +8,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using System.Runtime.Serialization;
using Akka.Annotations;
using Akka.Streams.Util;
@@ -61,7 +60,149 @@ public interface ICursor
///
/// TBD
///
- int Cursor { get; set; }
+ long Cursor { get; set; }
+ }
+
+ internal interface IStreamBuffer
+ {
+ bool IsEmpty { get; }
+
+ long Length { get; }
+
+ long AvailableData { get; }
+
+ long CapacityLeft { get; }
+
+ long Count(ICursor cursor);
+
+ T Read(ICursor cursor);
+
+ bool Write(T value);
+
+ void InitCursor(ICursor cursor);
+
+ void OnCursorRemoved(ICursor cursor);
+ }
+
+ public class DistinctRetainingMultiReaderBuffer : RetainingMultiReaderBuffer
+ {
+ public DistinctRetainingMultiReaderBuffer(long initialSize, long maxSize, ICursors cursors) : base(initialSize, maxSize, cursors)
+ { }
+
+ public override bool Write(T value)
+ {
+ return Buffer.Contains(value) || base.Write(value);
+ }
+
+ ///
+ /// TBD
+ ///
+ /// TBD
+ public override string ToString() => $"DistinctRetainingMultiReaderBuffer(size={Length}, cursors={Cursors.Cursors.Count()})";
+ }
+
+ public class RetainingMultiReaderBuffer : IStreamBuffer
+ {
+ ///
+ /// TBD
+ ///
+ protected readonly ICursors Cursors;
+
+ protected T[] Buffer { get; private set; }
+
+ ///
+ /// The number of elements currently in the buffer.
+ ///
+ public long Length { get; private set; }
+
+ public bool IsEmpty => Buffer.LongLength == 0;
+
+ ///
+ /// The maximum number of elements the buffer can still take.
+ ///
+ public long CapacityLeft => long.MaxValue - Length;
+
+ // DO NOT REMOVE maxSize parameter, the parameters are fixed and passed through reflection
+ public RetainingMultiReaderBuffer(long initialSize, long maxSize, ICursors cursors)
+ {
+ Cursors = cursors;
+
+ if ((initialSize & (initialSize - 1)) != 0 || initialSize <= 0)
+ throw new ArgumentException("initialSize must be a power of 2 that is > 0");
+
+ // We don't care about the maximum size
+ Buffer = new T[initialSize];
+ }
+
+ ///
+ /// Returns the number of elements that the buffer currently contains for the given cursor.
+ ///
+ /// TBD
+ /// TBD
+ public long Count(ICursor cursor) => Length - cursor.Cursor;
+
+ public long AvailableData
+ {
+ get
+ {
+ var lowest = 0L;
+ foreach (var cursor in Cursors.Cursors)
+ lowest = Math.Max(cursor.Cursor, lowest);
+
+ return Length - lowest;
+ }
+ }
+
+ public T Read(ICursor cursor)
+ {
+ var c = cursor.Cursor;
+ if (c < Length)
+ {
+ cursor.Cursor++;
+ return Buffer[c];
+ }
+
+ throw NothingToReadException.Instance;
+ }
+
+ public virtual bool Write(T value)
+ {
+ if (Length < Buffer.Length)
+ {
+ // if we have space left we can simply write and be done
+ Buffer[Length] = value;
+ Length++;
+ return true;
+ }
+
+ if (Buffer.LongLength >= long.MaxValue) return false;
+
+ // if we are full but can grow we do so
+ // Array.Resize() does not work here, because it is limited to int.MaxValue
+ var newLength = unchecked(Buffer.LongLength << 1);
+ if (newLength < 0)
+ newLength = long.MaxValue;
+ var newArray = new T[newLength];
+
+ Array.Copy(Buffer, newArray, Buffer.LongLength);
+ Buffer = newArray;
+ Buffer[Length] = value;
+ Length++;
+ return true;
+ }
+
+ public void InitCursor(ICursor cursor) => cursor.Cursor = 0;
+
+ public void OnCursorRemoved(ICursor cursor)
+ {
+ // no op
+ }
+
+ ///
+ /// TBD
+ ///
+ /// TBD
+ public override string ToString() => $"RetainingMultiReaderBuffer(size={Length}, cursors={Cursors.Cursors.Count()})";
}
///
@@ -72,27 +213,27 @@ public interface ICursor
///
/// TBD
[InternalApi]
- public class ResizableMultiReaderRingBuffer
+ public class ResizableMultiReaderRingBuffer : IStreamBuffer
{
private readonly int _maxSizeBit;
- private object[] _array;
+ private T[] _array;
///
/// Two counters counting the number of elements ever written and read; wrap-around is
/// handled by always looking at differences or masked values
///
- private int _writeIndex;
+ private long _writeIndex;
+
+ private long _readIndex; // the "oldest" of all read cursor indices, i.e. the one that is most behind
- private int _readIndex; // the "oldest" of all read cursor indices, i.e. the one that is most behind
-
///
/// Current array.length log2, we don't keep it as an extra field because
/// is a JVM intrinsic compiling down to a `BSF` instruction on x86, which is very fast on modern CPUs
///
- private int LengthBit => _array.Length.NumberOfTrailingZeros();
+ private int LengthBit => BitOperations.TrailingZeroCount(_array.LongLength);
// bit mask for converting a cursor into an array index
- private int Mask => int.MaxValue >> (31 - LengthBit);
+ private long Mask => long.MaxValue >> (63 - LengthBit);
///
/// TBD
@@ -101,7 +242,7 @@ public class ResizableMultiReaderRingBuffer
/// TBD
/// TBD
/// TBD
- public ResizableMultiReaderRingBuffer(int initialSize, int maxSize, ICursors cursors)
+ public ResizableMultiReaderRingBuffer(long initialSize, long maxSize, ICursors cursors)
{
Cursors = cursors;
if ((initialSize & (initialSize - 1)) != 0 || initialSize <= 0 || initialSize > maxSize)
@@ -111,8 +252,8 @@ public ResizableMultiReaderRingBuffer(int initialSize, int maxSize, ICursors cur
if ((maxSize & (maxSize - 1)) != 0 || maxSize <= 0 || maxSize > int.MaxValue / 2)
throw new ArgumentException("maxSize must be a power of 2 that is > 0 and < Int.MaxValue/2");
- _array = new object[initialSize];
- _maxSizeBit = maxSize.NumberOfTrailingZeros();
+ _array = new T[initialSize];
+ _maxSizeBit = BitOperations.TrailingZeroCount(maxSize);
}
///
@@ -123,12 +264,14 @@ public ResizableMultiReaderRingBuffer(int initialSize, int maxSize, ICursors cur
///
/// TBD
///
- protected object[] UnderlyingArray => _array;
+ protected T[] UnderlyingArray => _array;
///
/// The number of elements currently in the buffer.
///
- public int Length => _writeIndex - _readIndex;
+ public long Length => _writeIndex - _readIndex;
+
+ public long AvailableData => Length;
///
/// TBD
@@ -143,19 +286,19 @@ public ResizableMultiReaderRingBuffer(int initialSize, int maxSize, ICursors cur
///
/// The number of elements the buffer can still take without having to be resized.
///
- public int ImmediatelyAvailable => _array.Length - Length;
+ public long ImmediatelyAvailable => _array.Length - Length;
///
/// The maximum number of elements the buffer can still take.
///
- public int CapacityLeft => (1 << _maxSizeBit) - Length;
+ public long CapacityLeft => (1 << _maxSizeBit) - Length;
///
/// Returns the number of elements that the buffer currently contains for the given cursor.
///
/// TBD
/// TBD
- public int Count(ICursor cursor) => _writeIndex - cursor.Cursor;
+ public long Count(ICursor cursor) => _writeIndex - cursor.Cursor;
///
/// Initializes the given Cursor to the oldest buffer entry that is still available.
@@ -184,7 +327,12 @@ public bool Write(T value)
// the growing logic is quite simple: we assemble all current buffer entries in the new array
// in their natural order (removing potential wrap around) and rebase all indices to zero
var r = _readIndex & Mask;
- var newArray = new object[_array.Length << 1];
+
+ var newLength = unchecked(_array.LongLength << 1);
+ if (newLength < 0)
+ newLength = long.MaxValue;
+ var newArray = new T[newLength];
+
Array.Copy(_array, r, newArray, 0, _array.Length - r);
Array.Copy(_array, 0, newArray, _array.Length - r, r);
RebaseCursors(Cursors.Cursors);
@@ -219,7 +367,7 @@ public T Read(ICursor cursor)
if (c - _writeIndex < 0)
{
cursor.Cursor += 1;
- var ret = (T)_array[c & Mask];
+ var ret = _array[c & Mask];
if(c == _readIndex)
UpdateReadIndex();
return ret;
@@ -243,12 +391,12 @@ private void UpdateReadIndex()
var newReadIx = _writeIndex + MinCursor(Cursors.Cursors, 0);
while (_readIndex != newReadIx)
{
- _array[_readIndex & Mask] = null;
+ _array[_readIndex & Mask] = default;
_readIndex++;
}
}
- private int MinCursor(IEnumerable remaining, int result)
+ private long MinCursor(IEnumerable remaining, long result)
{
foreach (var cursor in remaining)
result = Math.Min(cursor.Cursor - _writeIndex, result);
diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs
index 46e585325b5..e3ba7b8e030 100644
--- a/src/core/Akka.Streams/Implementation/Sinks.cs
+++ b/src/core/Akka.Streams/Implementation/Sinks.cs
@@ -205,16 +205,21 @@ public override object Create(MaterializationContext context, out IPublisher
/// TBD
- internal sealed class FanoutPublisherSink : SinkModule>
+ /// TBD
+ internal sealed class FanoutPublisherSink : SinkModule> where TStreamBuffer : IStreamBuffer
{
+ private readonly Action _onTerminated;
+
///
/// TBD
///
/// TBD
/// TBD
- public FanoutPublisherSink(Attributes attributes, SinkShape shape) : base(shape)
+ /// TBD
+ public FanoutPublisherSink(Attributes attributes, SinkShape shape, Action onTerminated = null) : base(shape)
{
Attributes = attributes;
+ _onTerminated = onTerminated;
}
///
@@ -228,7 +233,7 @@ public FanoutPublisherSink(Attributes attributes, SinkShape shape) : base(s
/// TBD
/// TBD
public override IModule WithAttributes(Attributes attributes)
- => new FanoutPublisherSink(attributes, AmendShape(attributes));
+ => new FanoutPublisherSink(attributes, AmendShape(attributes), _onTerminated);
///
/// TBD
@@ -236,7 +241,7 @@ public override IModule WithAttributes(Attributes attributes)
/// TBD
/// TBD
protected override SinkModule> NewInstance(SinkShape shape)
- => new FanoutPublisherSink(Attributes, shape);
+ => new FanoutPublisherSink(Attributes, shape, _onTerminated);
///
/// TBD
@@ -248,7 +253,7 @@ public override object Create(MaterializationContext context, out IPublisher.Props(settings));
+ var impl = actorMaterializer.ActorOf(context, FanoutProcessorImpl.Props(settings, _onTerminated));
var fanoutProcessor = new ActorProcessor(impl);
impl.Tell(new ExposedPublisher(fanoutProcessor));
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
diff --git a/src/core/Akka.Streams/Implementation/SubscriberManagement.cs b/src/core/Akka.Streams/Implementation/SubscriberManagement.cs
index 7bdeaca776f..1976c2ef71c 100644
--- a/src/core/Akka.Streams/Implementation/SubscriberManagement.cs
+++ b/src/core/Akka.Streams/Implementation/SubscriberManagement.cs
@@ -37,7 +37,7 @@ internal interface ISubscriptionWithCursor : ISubscription, ICursor
bool IsActive { get; set; }
///
- /// Do not increment directly, use instead (it provides overflow protection)!
+ /// Do not increment directly, use instead (it provides overflow protection)!
///
long TotalDemand { get; set; } // number of requested but not yet dispatched elements
}
@@ -143,9 +143,10 @@ public ErrorCompleted(Exception cause)
/// TBD
///
/// TBD
- internal abstract class SubscriberManagement : ICursors
+ /// TBD
+ internal abstract class SubscriberManagement : ICursors where TStreamBuffer : IStreamBuffer
{
- private readonly Lazy> _buffer;
+ private readonly Lazy> _buffer;
// optimize for small numbers of subscribers by keeping subscribers in a plain list
private ICollection> _subscriptions = new List>();
@@ -161,8 +162,8 @@ internal abstract class SubscriberManagement : ICursors
///
protected SubscriberManagement()
{
- _buffer = new Lazy>(() =>
- new ResizableMultiReaderRingBuffer(InitialBufferSize, MaxBufferSize, this));
+ _buffer = new Lazy>(()
+ => (IStreamBuffer) Activator.CreateInstance(typeof(TStreamBuffer), InitialBufferSize, MaxBufferSize, this));
}
///
@@ -213,40 +214,39 @@ protected SubscriberManagement()
/// TBD
protected void MoreRequested(ISubscriptionWithCursor subscription, long elements)
{
- if (subscription.IsActive)
+ if (!subscription.IsActive) return;
+
+ // check for illegal demand See 3.9
+ if (elements < 1)
{
- // check for illegal demand See 3.9
- if (elements < 1)
+ try
{
- try
- {
- ReactiveStreamsCompliance.TryOnError(subscription.Subscriber, ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException);
- }
- finally
+ ReactiveStreamsCompliance.TryOnError(subscription.Subscriber, ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException);
+ }
+ finally
+ {
+ UnregisterSubscriptionInternal(subscription);
+ }
+ }
+ else
+ {
+ if (_endOfStream is SubscriberManagement.NotReached || _endOfStream is SubscriberManagement.Completed)
+ {
+ var d = subscription.TotalDemand + elements;
+ // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
+ var demand = d < 1 ? long.MaxValue : d;
+ subscription.TotalDemand = demand;
+ // returns Long.MinValue if the subscription is to be terminated
+ var remainingRequested = DispatchFromBufferAndReturnRemainingRequested(demand, subscription, _endOfStream);
+ if (remainingRequested == long.MinValue)
{
+ _endOfStream.Apply(subscription.Subscriber);
UnregisterSubscriptionInternal(subscription);
}
- }
- else
- {
- if (_endOfStream is SubscriberManagement.NotReached || _endOfStream is SubscriberManagement.Completed)
+ else
{
- var d = subscription.TotalDemand + elements;
- // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
- var demand = d < 1 ? long.MaxValue : d;
- subscription.TotalDemand = demand;
- // returns Long.MinValue if the subscription is to be terminated
- var remainingRequested = DispatchFromBufferAndReturnRemainingRequested(demand, subscription, _endOfStream);
- if (remainingRequested == long.MinValue)
- {
- _endOfStream.Apply(subscription.Subscriber);
- UnregisterSubscriptionInternal(subscription);
- }
- else
- {
- subscription.TotalDemand = remainingRequested;
- RequestFromUpstreamIfRequired();
- }
+ subscription.TotalDemand = remainingRequested;
+ RequestFromUpstreamIfRequired();
}
}
}
@@ -315,7 +315,7 @@ protected void PushToDownstream(T value)
_pendingFromUpstream--;
if (!_buffer.Value.Write(value))
throw new IllegalStateException("Output buffer overflow");
- if (Dispatch(_subscriptions))
+ if (_buffer.Value.AvailableData > 0 && Dispatch(_subscriptions))
RequestFromUpstreamIfRequired();
}
else throw new IllegalStateException("PushToDownStream(...) after CompleteDownstream() or AbortDownstream(...)");
diff --git a/src/core/Akka.Streams/Properties/AssemblyInfo.cs b/src/core/Akka.Streams/Properties/AssemblyInfo.cs
index ff5be28a670..394606e42b5 100644
--- a/src/core/Akka.Streams/Properties/AssemblyInfo.cs
+++ b/src/core/Akka.Streams/Properties/AssemblyInfo.cs
@@ -12,6 +12,8 @@
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
+[assembly: InternalsVisibleTo("Akka.Persistence.Query.Sql")]
+[assembly: InternalsVisibleTo("Akka.Persistence.TCK")]
[assembly: InternalsVisibleTo("Akka.Streams.Tests")]
[assembly: InternalsVisibleTo("Akka.Streams.TestKit")]
[assembly: InternalsVisibleTo("Akka.Benchmarks")]
diff --git a/src/core/Akka.Streams/Util/BitOperations.cs b/src/core/Akka.Streams/Util/BitOperations.cs
new file mode 100644
index 00000000000..47cb3510d90
--- /dev/null
+++ b/src/core/Akka.Streams/Util/BitOperations.cs
@@ -0,0 +1,279 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// Some routines inspired by the Stanford Bit Twiddling Hacks by Sean Eron Anderson:
+// http://graphics.stanford.edu/~seander/bithacks.html
+namespace Akka.Streams.Util
+{
+ // TODO: replace this with the official System.Numerics.BitOperations when we move on to .NET Core 3.0
+ ///
+ /// Utility methods for intrinsic bit-twiddling operations.
+ ///
+ /// A copy of Microsoft .NET core 3.0 implementation, without the hardware optimization
+ ///
+ internal static class BitOperations
+ {
+ // C# no-alloc optimization that directly wraps the data section of the dll (similar to string constants)
+ // https://github.com/dotnet/roslyn/pull/24621
+
+ private static ReadOnlySpan TrailingZeroCountDeBruijn => new byte[32]
+ {
+ 00, 01, 28, 02, 29, 14, 24, 03,
+ 30, 22, 20, 15, 25, 17, 04, 08,
+ 31, 27, 13, 23, 21, 19, 16, 07,
+ 26, 12, 18, 06, 11, 05, 10, 09
+ };
+
+ private static ReadOnlySpan Log2DeBruijn => new byte[32]
+ {
+ 00, 09, 01, 10, 13, 21, 02, 29,
+ 11, 14, 16, 18, 22, 25, 03, 30,
+ 08, 12, 20, 28, 15, 17, 24, 07,
+ 19, 27, 23, 06, 26, 05, 04, 31
+ };
+
+ ///
+ /// Count the number of leading zero bits in a mask.
+ /// Similar in behavior to the x86 instruction LZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int LeadingZeroCount(uint value)
+ {
+ // Unguarded fallback contract is 0->31, BSR contract is 0->undefined
+ if (value == 0)
+ return 32;
+
+ return 31 ^ Log2SoftwareFallback(value);
+ }
+
+ ///
+ /// Count the number of leading zero bits in a mask.
+ /// Similar in behavior to the x86 instruction LZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int LeadingZeroCount(ulong value)
+ {
+ var hi = (uint)(value >> 32);
+
+ if (hi == 0)
+ return 32 + LeadingZeroCount((uint)value);
+
+ return LeadingZeroCount(hi);
+ }
+
+ ///
+ /// Returns the integer (floor) log of the specified value, base 2.
+ /// Note that by convention, input value 0 returns 0 since log(0) is undefined.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int Log2(uint value)
+ {
+ // The 0->0 contract is fulfilled by setting the LSB to 1.
+ // Log(1) is 0, and setting the LSB for values > 1 does not change the log2 result.
+ value |= 1;
+
+ // value lzcnt actual expected
+ // ..0001 31 31-31 0
+ // ..0010 30 31-30 1
+ // 0010.. 2 31-2 29
+ // 0100.. 1 31-1 30
+ // 1000.. 0 31-0 31
+
+ // Fallback contract is 0->0
+ return Log2SoftwareFallback(value);
+ }
+
+ ///
+ /// Returns the integer (floor) log of the specified value, base 2.
+ /// Note that by convention, input value 0 returns 0 since log(0) is undefined.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int Log2(ulong value)
+ {
+ value |= 1;
+
+ var hi = (uint)(value >> 32);
+
+ if (hi == 0)
+ return Log2((uint)value);
+
+ return 32 + Log2(hi);
+ }
+
+ ///
+ /// Returns the integer (floor) log of the specified value, base 2.
+ /// Note that by convention, input value 0 returns 0 since Log(0) is undefined.
+ /// Does not directly use any hardware intrinsics, nor does it incur branching.
+ ///
+ /// The value.
+ private static int Log2SoftwareFallback(uint value)
+ {
+ // No AggressiveInlining due to large method size
+ // Has conventional contract 0->0 (Log(0) is undefined)
+
+ // Fill trailing zeros with ones, eg 00010010 becomes 00011111
+ value |= value >> 01;
+ value |= value >> 02;
+ value |= value >> 04;
+ value |= value >> 08;
+ value |= value >> 16;
+
+ // uint.MaxValue >> 27 is always in range [0 - 31] so we use Unsafe.AddByteOffset to avoid bounds check
+ return Unsafe.AddByteOffset(
+ // Using deBruijn sequence, k=2, n=5 (2^5=32) : 0b_0000_0111_1100_0100_1010_1100_1101_1101u
+ ref MemoryMarshal.GetReference(Log2DeBruijn),
+ // uint|long -> IntPtr cast on 32-bit platforms does expensive overflow checks not needed here
+ (IntPtr)(int)((value * 0x07C4ACDDu) >> 27));
+ }
+
+ ///
+ /// Returns the population count (number of bits set) of a mask.
+ /// Similar in behavior to the x86 instruction POPCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int PopCount(uint value)
+ {
+ const uint c1 = 0x_55555555u;
+ const uint c2 = 0x_33333333u;
+ const uint c3 = 0x_0F0F0F0Fu;
+ const uint c4 = 0x_01010101u;
+
+ value -= (value >> 1) & c1;
+ value = (value & c2) + ((value >> 2) & c2);
+ value = (((value + (value >> 4)) & c3) * c4) >> 24;
+
+ return (int)value;
+ }
+
+ ///
+ /// Returns the population count (number of bits set) of a mask.
+ /// Similar in behavior to the x86 instruction POPCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int PopCount(ulong value)
+ {
+ const ulong c1 = 0x_55555555_55555555ul;
+ const ulong c2 = 0x_33333333_33333333ul;
+ const ulong c3 = 0x_0F0F0F0F_0F0F0F0Ful;
+ const ulong c4 = 0x_01010101_01010101ul;
+
+ value -= (value >> 1) & c1;
+ value = (value & c2) + ((value >> 2) & c2);
+ value = (((value + (value >> 4)) & c3) * c4) >> 56;
+
+ return (int)value;
+ }
+
+ ///
+ /// Count the number of trailing zero bits in an integer value.
+ /// Similar in behavior to the x86 instruction TZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int TrailingZeroCount(int value)
+ => TrailingZeroCount((uint)value);
+
+ ///
+ /// Count the number of trailing zero bits in an integer value.
+ /// Similar in behavior to the x86 instruction TZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int TrailingZeroCount(uint value)
+ {
+ // Unguarded fallback contract is 0->0, BSF contract is 0->undefined
+ if (value == 0)
+ return 32;
+
+ // uint.MaxValue >> 27 is always in range [0 - 31] so we use Unsafe.AddByteOffset to avoid bounds check
+ return Unsafe.AddByteOffset(
+ // Using deBruijn sequence, k=2, n=5 (2^5=32) : 0b_0000_0111_0111_1100_1011_0101_0011_0001u
+ ref MemoryMarshal.GetReference(TrailingZeroCountDeBruijn),
+ // uint|long -> IntPtr cast on 32-bit platforms does expensive overflow checks not needed here
+ (IntPtr)(int)(((value & (uint)-(int)value) * 0x077CB531u) >> 27)); // Multi-cast mitigates redundant conv.u8
+ }
+
+ ///
+ /// Count the number of trailing zero bits in a mask.
+ /// Similar in behavior to the x86 instruction TZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int TrailingZeroCount(long value)
+ => TrailingZeroCount((ulong)value);
+
+ ///
+ /// Count the number of trailing zero bits in a mask.
+ /// Similar in behavior to the x86 instruction TZCNT.
+ ///
+ /// The value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static int TrailingZeroCount(ulong value)
+ {
+ var lo = (uint)value;
+
+ if (lo == 0)
+ return 32 + TrailingZeroCount((uint)(value >> 32));
+
+ return TrailingZeroCount(lo);
+ }
+
+ ///
+ /// Rotates the specified value left by the specified number of bits.
+ /// Similar in behavior to the x86 instruction ROL.
+ ///
+ /// The value to rotate.
+ /// The number of bits to rotate by.
+ /// Any value outside the range [0..31] is treated as congruent mod 32.
+ /// The rotated value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static uint RotateLeft(uint value, int offset)
+ => (value << offset) | (value >> (32 - offset));
+
+ ///
+ /// Rotates the specified value left by the specified number of bits.
+ /// Similar in behavior to the x86 instruction ROL.
+ ///
+ /// The value to rotate.
+ /// The number of bits to rotate by.
+ /// Any value outside the range [0..63] is treated as congruent mod 64.
+ /// The rotated value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ulong RotateLeft(ulong value, int offset)
+ => (value << offset) | (value >> (64 - offset));
+
+ ///
+ /// Rotates the specified value right by the specified number of bits.
+ /// Similar in behavior to the x86 instruction ROR.
+ ///
+ /// The value to rotate.
+ /// The number of bits to rotate by.
+ /// Any value outside the range [0..31] is treated as congruent mod 32.
+ /// The rotated value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static uint RotateRight(uint value, int offset)
+ => (value >> offset) | (value << (32 - offset));
+
+ ///
+ /// Rotates the specified value right by the specified number of bits.
+ /// Similar in behavior to the x86 instruction ROR.
+ ///
+ /// The value to rotate.
+ /// The number of bits to rotate by.
+ /// Any value outside the range [0..63] is treated as congruent mod 64.
+ /// The rotated value.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ulong RotateRight(ulong value, int offset)
+ => (value >> offset) | (value << (64 - offset));
+ }
+}
diff --git a/src/xunit.runner.json b/src/xunit.runner.json
index cafdde412f1..4a73b1e56a4 100644
--- a/src/xunit.runner.json
+++ b/src/xunit.runner.json
@@ -1,4 +1,6 @@
{
- "$schema": "https://xunit.github.io/schema/current/xunit.runner.schema.json",
- "parallelizeTestCollections": false
+ "$schema": "https://xunit.github.io/schema/current/xunit.runner.schema.json",
+ "longRunningTestSeconds": 60,
+ "parallelizeAssembly": false,
+ "parallelizeTestCollections": false
}
\ No newline at end of file