diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/Akka.Persistence.Query.Sql.csproj b/src/contrib/persistence/Akka.Persistence.Query.Sql/Akka.Persistence.Query.Sql.csproj index 24a4b36a877..33d71063c0b 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/Akka.Persistence.Query.Sql.csproj +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/Akka.Persistence.Query.Sql.csproj @@ -7,6 +7,7 @@ $(NetStandardLibVersion);$(NetLibVersion) $(AkkaPackageTags);persistence;eventsource;sql;reactive;streams true + 8.0 diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs index dd54bab9222..42cd7977cbd 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs @@ -47,7 +47,7 @@ protected AbstractAllEventsPublisher(long fromOffset, int maxBufferSize, string JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); } - protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected ILoggingAdapter Log => _log ??= Context.GetLogger(); protected IActorRef JournalRef { get; } protected DeliveryBuffer Buffer { get; } protected long FromOffset { get; } @@ -81,7 +81,6 @@ protected bool Idle(object message) switch (message) { case AllEventsPublisher.Continue _: - case NewEventAppended _: if (IsTimeForReplay) Replay(); return true; case Request _: @@ -138,7 +137,6 @@ protected bool Replaying( object message ) Context.Stop(Self); return true; case AllEventsPublisher.Continue _: - case NewEventAppended _: return true; default: return false; @@ -166,7 +164,6 @@ protected override void PostStop() protected override void ReceiveInitialRequest() { - JournalRef.Tell(SubscribeNewEvents.Instance); Replay(); } diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs index af777590f3c..09b9a6ebbf3 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs @@ -8,7 +8,6 @@ using System; using Akka.Actor; using Akka.Event; -using Akka.Persistence.Sql.Common.Journal; using Akka.Streams.Actors; namespace Akka.Persistence.Query.Sql @@ -53,7 +52,7 @@ protected AbstractEventsByPersistenceIdPublisher(string persistenceId, long from JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); } - protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected ILoggingAdapter Log => _log ??= Context.GetLogger(); protected string PersistenceId { get; } protected long FromSequenceNr { get; } protected long ToSequenceNr { get; set; } @@ -96,9 +95,6 @@ protected bool Idle(object message) case EventsByPersistenceIdPublisher.Continue _: if (IsTimeForReplay) Replay(); return true; - case EventAppended _: - if (IsTimeForReplay) Replay(); - return true; case Request _: ReceiveIdleRequest(); return true; @@ -157,10 +153,6 @@ protected Receive Replaying(int limit) // skip during replay return true; - case EventAppended _: - // skip during replay - return true; - case Cancel _: Context.Stop(Self); return true; @@ -190,7 +182,6 @@ protected override void PostStop() protected override void ReceiveInitialRequest() { - JournalRef.Tell(new SubscribePersistenceId(PersistenceId)); Replay(); } diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs index 13eda725a5d..8dfb7d705fa 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs @@ -50,7 +50,7 @@ protected AbstractEventsByTagPublisher(string tag, long fromOffset, int maxBuffe JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); } - protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected ILoggingAdapter Log => _log ??= Context.GetLogger(); protected string Tag { get; } protected long FromOffset { get; } protected abstract long ToOffset { get; } @@ -88,9 +88,6 @@ protected bool Idle(object message) case EventsByTagPublisher.Continue _: if (IsTimeForReplay) Replay(); return true; - case TaggedEventAppended _: - if (IsTimeForReplay) Replay(); - return true; case Request _: ReceiveIdleRequest(); return true; @@ -146,11 +143,6 @@ protected Receive Replaying(int limit) case EventsByTagPublisher.Continue _: // no-op return true; - - case TaggedEventAppended _: - // no-op - return true; - case Cancel _: Context.Stop(Self); return true; @@ -182,7 +174,6 @@ protected override void PostStop() protected override void ReceiveInitialRequest() { - JournalRef.Tell(new SubscribeTag(Tag)); Replay(); } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj index 44838405ee5..63f2361a3cc 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj @@ -7,6 +7,7 @@ $(NetStandardLibVersion);$(NetLibVersion) $(AkkaPackageTags);persistence;eventsource;sql true + 8.0 diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj.DotSettings b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj.DotSettings new file mode 100644 index 00000000000..b9fd6ee4f5a --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj.DotSettings @@ -0,0 +1,2 @@ + + CSharp80 \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs index bafb025185c..ac6f1296450 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs @@ -514,21 +514,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests) /// protected BatchingSqlJournalSetup Setup { get; } - /// - /// Flag determining if current journal has any subscribers for events. - /// - protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0; - - /// - /// Flag determining if current journal has any subscribers for events. - /// - protected bool HasTagSubscribers => _tagSubscribers.Count != 0; - - /// - /// Flag determining if current journal has any subscribers for and - /// - protected bool HasNewEventsSubscribers => _newEventSubscriber.Count != 0; - /// /// Flag determining if incoming journal requests should be published in current actor system event stream. /// Useful mostly for tests. @@ -559,10 +544,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests) private readonly AtomicCounterLong _bufferIdCounter; - private readonly Dictionary> _persistenceIdSubscribers; - private readonly Dictionary> _tagSubscribers; - private readonly HashSet _newEventSubscriber; - private readonly Akka.Serialization.Serialization _serialization; private readonly CircuitBreaker _circuitBreaker; private int _remainingOperations; @@ -575,11 +556,7 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup) { Setup = setup; CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands; - TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context); - - _persistenceIdSubscribers = new Dictionary>(); - _tagSubscribers = new Dictionary>(); - _newEventSubscriber = new HashSet(); + TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context); _remainingOperations = Setup.MaxConcurrentOperations; _buffers = new[] @@ -739,18 +716,6 @@ protected sealed override bool Receive(object message) case BatchComplete msg: CompleteBatch(msg); return true; - case SubscribePersistenceId msg: - AddPersistenceIdSubscriber(msg); - return true; - case SubscribeTag msg: - AddTagSubscriber(msg); - return true; - case SubscribeNewEvents msg: - AddNewEventsSubscriber(msg); - return true; - case Terminated msg: - RemoveSubscriber(msg.ActorRef); - return true; case ChunkExecutionFailure msg: FailChunkExecution(msg); return true; @@ -834,68 +799,6 @@ private void FailChunkExecution(ChunkExecutionFailure message) TryProcess(); } - #region subscriptions - private void RemoveSubscriber(IActorRef subscriberRef) - { - _persistenceIdSubscribers.RemoveItem(subscriberRef); - _tagSubscribers.RemoveItem(subscriberRef); - _newEventSubscriber.Remove(subscriberRef); - } - - private void AddNewEventsSubscriber(SubscribeNewEvents message) - { - var subscriber = Sender; - _newEventSubscriber.Add(subscriber); - Context.Watch(subscriber); - } - - private void AddTagSubscriber(SubscribeTag message) - { - var subscriber = Sender; - _tagSubscribers.AddItem(message.Tag, subscriber); - Context.Watch(subscriber); - } - - private void AddPersistenceIdSubscriber(SubscribePersistenceId message) - { - var subscriber = Sender; - _persistenceIdSubscribers.AddItem(message.PersistenceId, subscriber); - Context.Watch(subscriber); - } - - private void NotifyNewEventAppended() - { - if (HasNewEventsSubscribers) - { - foreach (var subscriber in _newEventSubscriber) - { - subscriber.Tell(NewEventAppended.Instance); - } - } - } - - private void NotifyTagChanged(string tag) - { - if (_tagSubscribers.TryGetValue(tag, out var bucket)) - { - var changed = new TaggedEventAppended(tag); - foreach (var subscriber in bucket) - subscriber.Tell(changed); - } - } - - private void NotifyPersistenceIdChanged(string persistenceId) - { - if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var bucket)) - { - var changed = new EventAppended(persistenceId); - foreach (var subscriber in bucket) - subscriber.Tell(changed); - } - } - - #endregion - /// /// Tries to add incoming to . /// Also checks if any DB connection has been released and next batch can be processed. @@ -1505,28 +1408,6 @@ public void FinalizeSuccess(BatchingSqlJournal journal) aRef.Tell(new WriteMessageSuccess(unadapted, actorInstanceId), unadapted.Sender); } } - - if (journal.HasTagSubscribers && _tags.Count != 0) - { - foreach (var tag in _tags) - { - journal.NotifyTagChanged(tag); - } - } - - if (journal.HasPersistenceIdSubscribers) - { - foreach (var persistenceId in _persistenceIds) - { - journal.NotifyPersistenceIdChanged(persistenceId); - } - } - - if (journal.HasNewEventsSubscribers) - { - journal.NotifyNewEventAppended(); - } - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs index bf21860fe7a..bd09640593d 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryApi.cs @@ -15,9 +15,7 @@ namespace Akka.Persistence.Sql.Common.Journal { - /// - /// TBD - /// + [Obsolete("Query is not implemented.")] public interface ISubscriptionCommand { } /// @@ -26,6 +24,7 @@ public interface ISubscriptionCommand { } /// the subscriber when has been called. /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribePersistenceId : ISubscriptionCommand { /// @@ -47,6 +46,7 @@ public SubscribePersistenceId(string persistenceId) /// TBD /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class EventAppended : IDeadLetterSuppression { /// @@ -108,6 +108,7 @@ public CurrentPersistenceIds(IEnumerable allPersistenceIds, long highest /// the subscriber when `asyncWriteMessages` has been called. /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribeNewEvents : ISubscriptionCommand { public static SubscribeNewEvents Instance = new SubscribeNewEvents(); @@ -116,6 +117,7 @@ public sealed class SubscribeNewEvents : ISubscriptionCommand } [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class NewEventAppended : IDeadLetterSuppression { public static NewEventAppended Instance = new NewEventAppended(); @@ -131,6 +133,7 @@ public sealed class NewEventAppended : IDeadLetterSuppression /// via an . /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribeTag : ISubscriptionCommand { /// @@ -152,6 +155,7 @@ public SubscribeTag(string tag) /// TBD /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class TaggedEventAppended : IDeadLetterSuppression { /// diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs index 57ccee64094..42ddbe54ec9 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs @@ -10,7 +10,6 @@ using System.Collections.Immutable; using System.Data.Common; using System.Linq; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -25,9 +24,6 @@ namespace Akka.Persistence.Sql.Common.Journal /// public abstract class SqlJournal : AsyncWriteJournal, IWithUnboundedStash { - private ImmutableDictionary> _persistenceIdSubscribers = ImmutableDictionary.Create>(); - private ImmutableDictionary> _tagSubscribers = ImmutableDictionary.Create>(); - private readonly HashSet _newEventsSubscriber = new HashSet(); private IImmutableDictionary _tagSequenceNr = ImmutableDictionary.Empty; private readonly CancellationTokenSource _pendingRequestsCancellation; @@ -47,19 +43,6 @@ protected SqlJournal(Config journalConfig) public IStash Stash { get; set; } - /// - /// TBD - /// - protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0; - /// - /// TBD - /// - protected bool HasTagSubscribers => _tagSubscribers.Count != 0; - /// - /// TBD - /// - protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0; - /// /// Returns a HOCON config path to associated journal. /// @@ -68,7 +51,7 @@ protected SqlJournal(Config journalConfig) /// /// System logger. /// - protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected ILoggingAdapter Log => _log ??= Context.GetLogger(); /// /// Initializes a database connection. @@ -91,6 +74,10 @@ protected override bool ReceivePluginInternal(object message) { switch (message) { + case SelectCurrentPersistenceIds msg: + SelectAllPersistenceIdsAsync(msg.Offset) + .PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering), failure: e => new Status.Failure(e)); + return true; case ReplayTaggedMessages replay: ReplayTaggedMessagesAsync(replay) .PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e)); @@ -100,25 +87,6 @@ protected override bool ReceivePluginInternal(object message) .PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h), failure: e => new EventReplayFailure(e)); return true; - case SubscribePersistenceId subscribe: - AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId); - Context.Watch(Sender); - return true; - case SelectCurrentPersistenceIds request: - SelectAllPersistenceIdsAsync(request.Offset) - .PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, result.LastOrdering)); - return true; - case SubscribeTag subscribe: - AddTagSubscriber(Sender, subscribe.Tag); - Context.Watch(Sender); - return true; - case SubscribeNewEvents _: - AddNewEventsSubscriber(Sender); - Context.Watch(Sender); - return true; - case Terminated terminated: - RemoveSubscriber(terminated.ActorRef); - return true; default: return false; } @@ -135,9 +103,6 @@ protected override bool ReceivePluginInternal(object message) /// TBD protected override async Task> WriteMessagesAsync(IEnumerable messages) { - var persistenceIds = new HashSet(); - var allTags = new HashSet(); - var writeTasks = messages.Select(async message => { using (var connection = CreateDbConnection()) @@ -146,18 +111,13 @@ protected override async Task> WriteMessagesAsync(IEnu var eventToTags = new Dictionary>(); var persistentMessages = ((IImmutableList)message.Payload).ToArray(); - for (int i = 0; i < persistentMessages.Length; i++) + for (var i = 0; i < persistentMessages.Length; i++) { var p = persistentMessages[i]; if (p.Payload is Tagged tagged) { persistentMessages[i] = p = p.WithPayload(tagged.Payload); - if (tagged.Tags.Count != 0) - { - allTags.UnionWith(tagged.Tags); - eventToTags.Add(p, tagged.Tags); - } - else eventToTags.Add(p, ImmutableHashSet.Empty); + eventToTags.Add(p, tagged.Tags.Count != 0 ? tagged.Tags : ImmutableHashSet.Empty); } else eventToTags.Add(p, ImmutableHashSet.Empty); @@ -176,25 +136,6 @@ protected override async Task> WriteMessagesAsync(IEnu .ContinueWhenAll(writeTasks, tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList()); - if (HasPersistenceIdSubscribers) - { - foreach (var persistenceId in persistenceIds) - { - NotifyPersistenceIdChange(persistenceId); - } - } - - if (HasTagSubscribers && allTags.Count != 0) - { - foreach (var tag in allTags) - { - NotifyTagChange(tag); - } - } - - if (HasNewEventSubscribers) - NotifyNewEventAppended(); - return result; } @@ -357,65 +298,9 @@ public DbConnection CreateDbConnection() return CreateDbConnection(connectionString); } - /// - /// TBD - /// - /// TBD - public void RemoveSubscriber(IActorRef subscriber) - { - _persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers - .Where(kv => kv.Value.Contains(subscriber)) - .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); - - _tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers - .Where(kv => kv.Value.Contains(subscriber)) - .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); - - _newEventsSubscriber.Remove(subscriber); - } - - public void AddNewEventsSubscriber(IActorRef subscriber) - { - _newEventsSubscriber.Add(subscriber); - } - - /// - /// TBD - /// - /// TBD - /// TBD - public void AddTagSubscriber(IActorRef subscriber, string tag) - { - if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) - { - _tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber)); - } - else - { - _tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber)); - } - } - - /// - /// TBD - /// - /// TBD - /// TBD - public void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId) - { - if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) - { - _persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber)); - } - else - { - _persistenceIdSubscribers = _persistenceIdSubscribers.SetItem(persistenceId, subscriptions.Add(subscriber)); - } - } - private async Task NextTagSequenceNr(string tag) { - if (!_tagSequenceNr.TryGetValue(tag, out long value)) + if (!_tagSequenceNr.TryGetValue(tag, out var value)) value = await ReadHighestSequenceNrAsync(TagId(tag), 0L); value++; @@ -430,37 +315,6 @@ private bool IsTagId(string persistenceId) return persistenceId.StartsWith(QueryExecutor.Configuration.TagsColumnName); } - private void NotifyPersistenceIdChange(string persistenceId) - { - if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers)) - { - var changed = new EventAppended(persistenceId); - foreach (var subscriber in subscribers) - subscriber.Tell(changed); - } - } - - private void NotifyTagChange(string tag) - { - if (_tagSubscribers.TryGetValue(tag, out var subscribers)) - { - var changed = new TaggedEventAppended(tag); - foreach (var subscriber in subscribers) - subscriber.Tell(changed); - } - } - - private void NotifyNewEventAppended() - { - if (HasNewEventSubscribers) - { - foreach (var subscriber in _newEventsSubscriber) - { - subscriber.Tell(NewEventAppended.Instance); - } - } - } - /// /// Asynchronously deletes all persisted messages identified by provided /// up to provided message sequence number (inclusive). diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt index 9cb9bae2b7f..85f87ef193b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt @@ -91,9 +91,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected virtual string ByPersistenceIdSql { get; } protected virtual string ByTagSql { get; } protected virtual string DeleteBatchSql { get; } - protected bool HasNewEventsSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected virtual string HighestOrderingSql { get; } protected virtual string HighestSequenceNrSql { get; } protected abstract System.Collections.Immutable.ImmutableDictionary Initializers { get; } @@ -137,6 +134,7 @@ namespace Akka.Persistence.Sql.Common.Journal public DefaultTimestampProvider() { } public long GenerateTimestamp(Akka.Persistence.IPersistentRepresentation message) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class EventAppended : Akka.Event.IDeadLetterSuppression { public readonly string PersistenceId; @@ -180,6 +178,7 @@ namespace Akka.Persistence.Sql.Common.Journal System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken, string persistenceId); System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken); } + [System.ObsoleteAttribute("Query is not implemented.")] public interface ISubscriptionCommand { } public interface ITimestampProvider { @@ -201,6 +200,7 @@ namespace Akka.Persistence.Sql.Common.Journal public readonly System.DateTime Timestamp; public JournalEntry(string persistenceId, long sequenceNr, bool isDeleted, string manifest, System.DateTime timestamp, object payload) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class NewEventAppended : Akka.Event.IDeadLetterSuppression { public static Akka.Persistence.Sql.Common.Journal.NewEventAppended Instance; @@ -275,16 +275,10 @@ namespace Akka.Persistence.Sql.Common.Journal public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } - protected bool HasNewEventSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected abstract string JournalConfigPath { get; } protected Akka.Event.ILoggingAdapter Log { get; } public abstract Akka.Persistence.Sql.Common.Journal.IJournalQueryExecutor QueryExecutor { get; } public Akka.Actor.IStash Stash { get; set; } - public void AddNewEventsSubscriber(Akka.Actor.IActorRef subscriber) { } - public void AddPersistenceIdSubscriber(Akka.Actor.IActorRef subscriber, string persistenceId) { } - public void AddTagSubscriber(Akka.Actor.IActorRef subscriber, string tag) { } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } @@ -294,7 +288,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected override void PreStart() { } public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } protected override bool ReceivePluginInternal(object message) { } - public void RemoveSubscriber(Akka.Actor.IActorRef subscriber) { } protected virtual System.Threading.Tasks.Task ReplayAllEventsAsync(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents replay) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } protected virtual System.Threading.Tasks.Task ReplayTaggedMessagesAsync(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages replay) { } @@ -305,20 +298,24 @@ namespace Akka.Persistence.Sql.Common.Journal protected bool WaitingForInitialization(object message) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeNewEvents : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public static Akka.Persistence.Sql.Common.Journal.SubscribeNewEvents Instance; } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribePersistenceId : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string PersistenceId; public SubscribePersistenceId(string persistenceId) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeTag : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string Tag; public SubscribeTag(string tag) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class TaggedEventAppended : Akka.Event.IDeadLetterSuppression { public readonly string Tag; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt index 920493760d0..3571138284a 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt @@ -91,9 +91,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected virtual string ByPersistenceIdSql { get; } protected virtual string ByTagSql { get; } protected virtual string DeleteBatchSql { get; } - protected bool HasNewEventsSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected virtual string HighestOrderingSql { get; } protected virtual string HighestSequenceNrSql { get; } protected abstract System.Collections.Immutable.ImmutableDictionary Initializers { get; } @@ -137,6 +134,7 @@ namespace Akka.Persistence.Sql.Common.Journal public DefaultTimestampProvider() { } public long GenerateTimestamp(Akka.Persistence.IPersistentRepresentation message) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class EventAppended : Akka.Event.IDeadLetterSuppression { public readonly string PersistenceId; @@ -180,6 +178,7 @@ namespace Akka.Persistence.Sql.Common.Journal System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken, string persistenceId); System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken); } + [System.ObsoleteAttribute("Query is not implemented.")] public interface ISubscriptionCommand { } public interface ITimestampProvider { @@ -201,6 +200,7 @@ namespace Akka.Persistence.Sql.Common.Journal public readonly System.DateTime Timestamp; public JournalEntry(string persistenceId, long sequenceNr, bool isDeleted, string manifest, System.DateTime timestamp, object payload) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class NewEventAppended : Akka.Event.IDeadLetterSuppression { public static Akka.Persistence.Sql.Common.Journal.NewEventAppended Instance; @@ -275,16 +275,10 @@ namespace Akka.Persistence.Sql.Common.Journal public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } - protected bool HasNewEventSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected abstract string JournalConfigPath { get; } protected Akka.Event.ILoggingAdapter Log { get; } public abstract Akka.Persistence.Sql.Common.Journal.IJournalQueryExecutor QueryExecutor { get; } public Akka.Actor.IStash Stash { get; set; } - public void AddNewEventsSubscriber(Akka.Actor.IActorRef subscriber) { } - public void AddPersistenceIdSubscriber(Akka.Actor.IActorRef subscriber, string persistenceId) { } - public void AddTagSubscriber(Akka.Actor.IActorRef subscriber, string tag) { } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } @@ -294,7 +288,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected override void PreStart() { } public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } protected override bool ReceivePluginInternal(object message) { } - public void RemoveSubscriber(Akka.Actor.IActorRef subscriber) { } protected virtual System.Threading.Tasks.Task ReplayAllEventsAsync(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents replay) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } protected virtual System.Threading.Tasks.Task ReplayTaggedMessagesAsync(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages replay) { } @@ -305,20 +298,24 @@ namespace Akka.Persistence.Sql.Common.Journal protected bool WaitingForInitialization(object message) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeNewEvents : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public static Akka.Persistence.Sql.Common.Journal.SubscribeNewEvents Instance; } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribePersistenceId : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string PersistenceId; public SubscribePersistenceId(string persistenceId) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeTag : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string Tag; public SubscribeTag(string tag) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class TaggedEventAppended : Akka.Event.IDeadLetterSuppression { public readonly string Tag; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt index 9cb9bae2b7f..85f87ef193b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt @@ -91,9 +91,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected virtual string ByPersistenceIdSql { get; } protected virtual string ByTagSql { get; } protected virtual string DeleteBatchSql { get; } - protected bool HasNewEventsSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected virtual string HighestOrderingSql { get; } protected virtual string HighestSequenceNrSql { get; } protected abstract System.Collections.Immutable.ImmutableDictionary Initializers { get; } @@ -137,6 +134,7 @@ namespace Akka.Persistence.Sql.Common.Journal public DefaultTimestampProvider() { } public long GenerateTimestamp(Akka.Persistence.IPersistentRepresentation message) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class EventAppended : Akka.Event.IDeadLetterSuppression { public readonly string PersistenceId; @@ -180,6 +178,7 @@ namespace Akka.Persistence.Sql.Common.Journal System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken, string persistenceId); System.Threading.Tasks.Task SelectHighestSequenceNrAsync(System.Data.Common.DbConnection connection, System.Threading.CancellationToken cancellationToken); } + [System.ObsoleteAttribute("Query is not implemented.")] public interface ISubscriptionCommand { } public interface ITimestampProvider { @@ -201,6 +200,7 @@ namespace Akka.Persistence.Sql.Common.Journal public readonly System.DateTime Timestamp; public JournalEntry(string persistenceId, long sequenceNr, bool isDeleted, string manifest, System.DateTime timestamp, object payload) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class NewEventAppended : Akka.Event.IDeadLetterSuppression { public static Akka.Persistence.Sql.Common.Journal.NewEventAppended Instance; @@ -275,16 +275,10 @@ namespace Akka.Persistence.Sql.Common.Journal public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } - protected bool HasNewEventSubscribers { get; } - protected bool HasPersistenceIdSubscribers { get; } - protected bool HasTagSubscribers { get; } protected abstract string JournalConfigPath { get; } protected Akka.Event.ILoggingAdapter Log { get; } public abstract Akka.Persistence.Sql.Common.Journal.IJournalQueryExecutor QueryExecutor { get; } public Akka.Actor.IStash Stash { get; set; } - public void AddNewEventsSubscriber(Akka.Actor.IActorRef subscriber) { } - public void AddPersistenceIdSubscriber(Akka.Actor.IActorRef subscriber, string persistenceId) { } - public void AddTagSubscriber(Akka.Actor.IActorRef subscriber, string tag) { } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } @@ -294,7 +288,6 @@ namespace Akka.Persistence.Sql.Common.Journal protected override void PreStart() { } public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } protected override bool ReceivePluginInternal(object message) { } - public void RemoveSubscriber(Akka.Actor.IActorRef subscriber) { } protected virtual System.Threading.Tasks.Task ReplayAllEventsAsync(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents replay) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } protected virtual System.Threading.Tasks.Task ReplayTaggedMessagesAsync(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages replay) { } @@ -305,20 +298,24 @@ namespace Akka.Persistence.Sql.Common.Journal protected bool WaitingForInitialization(object message) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeNewEvents : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public static Akka.Persistence.Sql.Common.Journal.SubscribeNewEvents Instance; } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribePersistenceId : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string PersistenceId; public SubscribePersistenceId(string persistenceId) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeTag : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand { public readonly string Tag; public SubscribeTag(string tag) { } } + [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class TaggedEventAppended : Akka.Event.IDeadLetterSuppression { public readonly string Tag;