diff --git a/samples/persistent-subscriptions/Program.cs b/samples/persistent-subscriptions/Program.cs index ac503f766..5390d7f23 100644 --- a/samples/persistent-subscriptions/Program.cs +++ b/samples/persistent-subscriptions/Program.cs @@ -37,11 +37,12 @@ class Program static async Task ConnectToPersistentSubscriptionToStream(EventStorePersistentSubscriptionsClient client) { #region subscribe-to-persistent-subscription-to-stream - var subscription = await client.SubscribeAsync( + var subscription = await client.SubscribeToStreamAsync( "test-stream", "subscription-group", async (subscription, evnt, retryCount, cancellationToken) => { await HandleEvent(evnt); + await subscription.Ack(evnt); }, (subscription, dropReason, exception) => { Console.WriteLine($"Subscription was dropped due to {dropReason}. {exception}"); }); @@ -82,7 +83,7 @@ class Program static async Task ConnectToPersistentSubscriptionWithManualAcks(EventStorePersistentSubscriptionsClient client) { #region subscribe-to-persistent-subscription-with-manual-acks - var subscription = await client.SubscribeAsync( + var subscription = await client.SubscribeToStreamAsync( "test-stream", "subscription-group", async (subscription, evnt, retryCount, cancellationToken) => { @@ -94,7 +95,7 @@ class Program } }, (subscription, dropReason, exception) => { Console.WriteLine($"Subscription was dropped due to {dropReason}. {exception}"); - }, autoAck: false); + }); #endregion subscribe-to-persistent-subscription-with-manual-acks } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs index 76afaeef0..8fdb02bd8 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs @@ -21,11 +21,40 @@ partial class EventStorePersistentSubscriptionsClient { /// /// /// + [Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStreamAsync with manual acks instead.", true)] public async Task SubscribeAsync(string streamName, string groupName, Func eventAppeared, Action? subscriptionDropped = null, UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true, CancellationToken cancellationToken = default) { + if (autoAck) { + throw new InvalidOperationException( + $"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead."); + } + + return await SubscribeToStreamAsync(streamName, groupName, eventAppeared, subscriptionDropped, userCredentials, + bufferSize, cancellationToken).ConfigureAwait(false); + } + + /// + /// Subscribes to a persistent subscription. Messages must be manually acknowledged + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public async Task SubscribeToStreamAsync(string streamName, string groupName, + Func eventAppeared, + Action? subscriptionDropped = null, + UserCredentials? userCredentials = null, int bufferSize = 10, + CancellationToken cancellationToken = default) { if (streamName == null) { throw new ArgumentNullException(nameof(streamName)); } @@ -69,34 +98,32 @@ partial class EventStorePersistentSubscriptionsClient { readOptions.StreamIdentifier = streamName; } - return await PersistentSubscription.Confirm(call, readOptions, autoAck, eventAppeared, + return await PersistentSubscription.Confirm(call, readOptions, eventAppeared, subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false); } /// - /// Subscribes to a persistent subscription to $all. + /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged /// /// /// /// /// /// - /// /// /// public async Task SubscribeToAllAsync(string groupName, Func eventAppeared, Action? subscriptionDropped = null, - UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true, + UserCredentials? userCredentials = null, int bufferSize = 10, CancellationToken cancellationToken = default) => - await SubscribeAsync( + await SubscribeToStreamAsync( streamName: SystemStreams.AllStream, groupName: groupName, eventAppeared: eventAppeared, subscriptionDropped: subscriptionDropped, userCredentials: userCredentials, bufferSize: bufferSize, - autoAck: autoAck, cancellationToken: cancellationToken) .ConfigureAwait(false); } diff --git a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs index d5f910e73..00b875696 100644 --- a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs +++ b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs @@ -12,7 +12,6 @@ namespace EventStore.Client { /// Represents a persistent subscription connection. /// public class PersistentSubscription : IDisposable { - private readonly bool _autoAck; private readonly Func _eventAppeared; private readonly Action _subscriptionDropped; private readonly CancellationTokenSource _disposed; @@ -25,7 +24,7 @@ public class PersistentSubscription : IDisposable { public string SubscriptionId { get; } internal static async Task Confirm(AsyncDuplexStreamingCall call, - ReadReq.Types.Options options, bool autoAck, + ReadReq.Types.Options options, Func eventAppeared, Action subscriptionDropped, CancellationToken cancellationToken = default) { @@ -38,16 +37,14 @@ public class PersistentSubscription : IDisposable { throw new InvalidOperationException(); } - return new PersistentSubscription(call, autoAck, eventAppeared, subscriptionDropped); + return new PersistentSubscription(call, eventAppeared, subscriptionDropped); } private PersistentSubscription( AsyncDuplexStreamingCall call, - bool autoAck, Func eventAppeared, Action subscriptionDropped) { _call = call; - _autoAck = autoAck; _eventAppeared = eventAppeared; _subscriptionDropped = subscriptionDropped; _disposed = new CancellationTokenSource(); @@ -146,10 +143,6 @@ public class PersistentSubscription : IDisposable { ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => current.Event.RetryCount, _ => default }, _disposed.Token).ConfigureAwait(false); - if (_autoAck) { - await AckInternal(Uuid.FromDto(current.Event.Link?.Id ?? current.Event.Event.Id)) - .ConfigureAwait(false); - } } catch (Exception ex) when (ex is ObjectDisposedException || ex is OperationCanceledException) { SubscriptionDropped(SubscriptionDroppedReason.Disposed); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Bugs/Issue_1125.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Bugs/Issue_1125.cs index 7c14344fa..34a13144f 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/Bugs/Issue_1125.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Bugs/Issue_1125.cs @@ -46,7 +46,7 @@ public static IEnumerable TestCases() => Enumerable.Range(0, 50) resolveLinkTos: true, startFrom: StreamPosition.Start, readBatchSize: 10, historyBufferSize: 20), userCredentials: userCredentials); - using (await _fixture.Client.SubscribeAsync(streamName, subscriptionName, + using (await _fixture.Client.SubscribeToStreamAsync(streamName, subscriptionName, async (subscription, @event, retryCount, arg4) => { int result; if (retryCount == 0 || retryCount is null) { @@ -66,7 +66,7 @@ public static IEnumerable TestCases() => Enumerable.Range(0, 50) completed.TrySetException(e); else completed.TrySetException(new Exception($"{dr}")); - }, userCredentials, autoAck: false)) { + }, userCredentials)) { for (var i = 0; i < eventCount; i++) { await _fixture.StreamsClient.AppendToStreamAsync(streamName, StreamState.Any, _fixture.CreateTestEvents()); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs deleted file mode 100644 index 3c9d695f4..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.Linq; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToAll { - public class a_nak_in_autoack_mode_drops_the_subscription - : IClassFixture { - private readonly Fixture _fixture; - private const string Group = "naktest"; - - - public a_nak_in_autoack_mode_drops_the_subscription(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task the_subscription_gets_dropped() { - var (reason, exception) = await _fixture.SubscriptionDropped.WithTimeout(TimeSpan.FromSeconds(10)); - Assert.Equal(SubscriptionDroppedReason.SubscriberError, reason); - var ex = Assert.IsType(exception); - Assert.Equal("test", ex.Message); - } - - public class Fixture : EventStoreClientFixture { - private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _subscriptionDroppedSource; - - public Task<(SubscriptionDroppedReason reason, Exception exception)> SubscriptionDropped => - _subscriptionDroppedSource.Task; - - public readonly EventData[] Events; - private PersistentSubscription _subscription; - - public Fixture() { - _subscriptionDroppedSource = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); - Events = CreateTestEvents().ToArray(); - } - - protected override async Task Given() { - await Client.CreateToAllAsync(Group, - new PersistentSubscriptionSettings(startFrom: Position.Start), TestCredentials.Root); - _subscription = await Client.SubscribeToAllAsync(Group, - delegate { - throw new Exception("test"); - }, (subscription, reason, ex) => _subscriptionDroppedSource.SetResult((reason, ex)), TestCredentials.Root); - } - - protected override Task When() => Task.CompletedTask; - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs index 799b3c432..9ee28fc59 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs @@ -46,9 +46,9 @@ public class Fixture : EventStoreClientFixture { protected override async Task When() { _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs index bba316a72..57d20b01a 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs @@ -44,12 +44,13 @@ public class Fixture : EventStoreClientFixture { protected override async Task When() { _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { - return Task.CompletedTask; + await subscription.Ack(e); + return; } _firstNonSystemEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstNonSystemEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs index 9582c92b2..67a86111e 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs @@ -48,12 +48,13 @@ public class Fixture : EventStoreClientFixture { await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), TestCredentials.Root); _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { - return Task.CompletedTask; + await subscription.Ack(e); + return; } _firstNonSystemEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstNonSystemEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs index 620720405..85fb47ea2 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs @@ -44,12 +44,13 @@ public class Fixture : EventStoreClientFixture { protected override async Task When() { _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { - return Task.CompletedTask; + await subscription.Ack(e); + return; } _firstNonSystemEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstNonSystemEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs index db67ccb94..93caafc51 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs @@ -48,12 +48,13 @@ public class Fixture : EventStoreClientFixture { await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(startFrom: Position.End), TestCredentials.Root); _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { - return Task.CompletedTask; + await subscription.Ack(e); + return; } _firstNonSystemEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstNonSystemEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs index 2203646fa..d6d8cad0c 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs @@ -42,7 +42,7 @@ public class Fixture : EventStoreClientFixture { protected override async Task When() { _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => Task.CompletedTask, + async (subscription, e, r, ct) => await subscription.Ack(e), (subscription, reason, ex) => { _dropped.TrySetResult((reason, ex)); }, TestCredentials.Root); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs index 2ae49d7a0..6c48f2c67 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs @@ -46,9 +46,9 @@ public class Fixture : EventStoreClientFixture { protected override async Task When() { _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, r, ct) => { + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs index 0c6fa949f..da949cd91 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs @@ -39,7 +39,7 @@ public class Fixture : EventStoreClientFixture { await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, "Not yet tried enough times", e); } - }, autoAck: false, subscriptionDropped: (subscription, reason, ex) => { + }, subscriptionDropped: (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _retryCountSource.TrySetException(ex!); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs index fa9061214..f2de069c1 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs @@ -26,7 +26,7 @@ public class Fixture : EventStoreClientFixture { new PersistentSubscriptionSettings(), TestCredentials.Root); _subscription = await Client.SubscribeToAllAsync("groupname123", - (s, e, i, ct) => Task.CompletedTask, + async (s, e, i, ct) => await s.Ack(e), (s, r, e) => _dropped.TrySetResult((r, e)), TestCredentials.Root); // todo: investigate why this test is flaky without this delay await Task.Delay(500); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs deleted file mode 100644 index b76f870f2..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToAll { - public class happy_case_catching_up_to_link_to_events_auto_ack - : IClassFixture { - - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_catching_up_to_link_to_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount) - .Select((e, i) => new EventData(e.EventId, SystemEventTypes.LinkTo, - Encoding.UTF8.GetBytes($"{i}@test"), - contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream)) - .ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); - } - - await Client.CreateToAllAsync(Group, - new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, retryCount, ct) => { - if (e.OriginalStreamId.StartsWith("test-") - && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override Task When() => Task.CompletedTask; - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs index 9323f1b79..f8207740f 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs @@ -62,7 +62,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs deleted file mode 100644 index 0c67e6ade..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToAll { - public class happy_case_catching_up_to_normal_events_auto_ack - : IClassFixture { - - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_catching_up_to_normal_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount).ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync("test-"+Guid.NewGuid(), StreamState.Any, new[] {e}); - } - - await Client.CreateToAllAsync(Group, - new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, retryCount, ct) => { - if (e.OriginalStreamId.StartsWith("test-") - && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override Task When() => Task.CompletedTask; - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs index 8e861dbec..9f916a9b8 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs @@ -57,7 +57,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered.cs index 4fb1e71e8..0c72a3402 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered.cs @@ -33,12 +33,12 @@ public class happy_case_filtered : IClassFixture { new PersistentSubscriptionSettings(startFrom: Position.Start), TestCredentials.Root); using var subscription = await _fixture.Client.SubscribeToAllAsync(filterName, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { appearedEvents.Add(e.Event); if (appearedEvents.Count >= events.Length) { appeared.TrySetResult(true); } - return Task.CompletedTask; + await s.Ack(e); }, userCredentials: TestCredentials.Root) .WithTimeout(); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered_with_start_from_set.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered_with_start_from_set.cs index 9e9f65603..a426a5d94 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered_with_start_from_set.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_filtered_with_start_from_set.cs @@ -42,12 +42,12 @@ public class happy_case_filtered_with_start_from_set : IClassFixture { + eventAppeared: async (s, e, r, ct) => { appearedEvents.Add(e.Event); if (appearedEvents.Count >= eventsToCapture.Length) { appeared.TrySetResult(true); } - return Task.CompletedTask; + await s.Ack(e); }, userCredentials: TestCredentials.Root) .WithTimeout(); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs deleted file mode 100644 index b17c386a4..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToAll { - public class happy_case_writing_and_subscribing_to_normal_events_auto_ack - : IClassFixture { - - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_writing_and_subscribing_to_normal_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount).ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - await Client.CreateToAllAsync(Group, - new PersistentSubscriptionSettings(startFrom: Position.End, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeToAllAsync(Group, - (subscription, e, retryCount, ct) => { - if (e.OriginalStreamId.StartsWith("test-") - && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, autoAck: true, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override async Task When() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); - } - } - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs index ed25ab4bc..41037b1e0 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs @@ -51,7 +51,7 @@ public class Fixture : EventStoreClientFixture { if (e != null) { _eventsReceived.TrySetException(e); } - }, autoAck: false, + }, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs index 4a392dd05..6ec3ab267 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs @@ -65,12 +65,12 @@ public class Fixture : EventStoreClientFixture { userCredentials: TestCredentials.Root); _firstSubscription = await Client.SubscribeToAllAsync(Group, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); if (_appearedEvents.Count == _events.Length) _appeared.TrySetResult(true); - return Task.CompletedTask; + await s.Ack(e); }, (subscription, reason, ex) => _droppedSource.TrySetResult((reason, ex)), TestCredentials.Root); @@ -87,9 +87,9 @@ public class Fixture : EventStoreClientFixture { await _droppedSource.Task.WithTimeout(); _secondSubscription = await Client.SubscribeToAllAsync(Group, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { _resumedSource.TrySetResult(e); - return Task.CompletedTask; + await s.Ack(e); }, userCredentials: TestCredentials.Root); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs index b08aee590..08ea25fc0 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs @@ -60,18 +60,18 @@ public class Fixture : EventStoreClientFixture { var checkPointStream = $"$persistentsubscription-$all::{Group}-checkpoint"; _checkPointSubscription = await StreamsClient.SubscribeToStreamAsync(checkPointStream, (s, e, ct) => { - _checkPointSource.TrySetResult(e); + _checkPointSource.TrySetResult(e); return Task.CompletedTask; }, userCredentials: TestCredentials.Root); _firstSubscription = await Client.SubscribeToAllAsync(Group, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); if (_appearedEvents.Count == _events.Length) _appeared.TrySetResult(true); - return Task.CompletedTask; + await s.Ack(e); }, subscriptionDropped: (subscription, reason, ex) => _droppedSource.TrySetResult((reason, ex)), userCredentials: TestCredentials.Root); @@ -88,10 +88,10 @@ public class Fixture : EventStoreClientFixture { await _droppedSource.Task.WithTimeout(); _secondSubscription = await Client.SubscribeToAllAsync(Group, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { _resumedSource.TrySetResult(e); + await s.Ack(e); s.Dispose(); - return Task.CompletedTask; }, userCredentials: TestCredentials.Root); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs index 3641afa0c..29fe9dafa 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs @@ -73,12 +73,12 @@ public class Fixture : EventStoreClientFixture { userCredentials: TestCredentials.Root); _subscription = await Client.SubscribeToAllAsync(Group, - eventAppeared: (s, e, r, ct) => { + eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); if (_appearedEvents.Count == _events.Length) _appeared.TrySetResult(true); - return Task.CompletedTask; + await s.Ack(e); }, userCredentials: TestCredentials.Root); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs index e240910e7..3260e67b9 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs @@ -54,7 +54,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs deleted file mode 100644 index 2c5f96197..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Linq; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToStream { - public class a_nak_in_autoack_mode_drops_the_subscription - : IClassFixture { - private readonly Fixture _fixture; - private const string Group = "naktest"; - private const string Stream = nameof(a_nak_in_autoack_mode_drops_the_subscription); - - public a_nak_in_autoack_mode_drops_the_subscription(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task the_subscription_gets_dropped() { - var (reason, exception) = await _fixture.SubscriptionDropped.WithTimeout(TimeSpan.FromSeconds(10)); - Assert.Equal(SubscriptionDroppedReason.SubscriberError, reason); - var ex = Assert.IsType(exception); - Assert.Equal("test", ex.Message); - } - - public class Fixture : EventStoreClientFixture { - private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _subscriptionDroppedSource; - - public Task<(SubscriptionDroppedReason reason, Exception exception)> SubscriptionDropped => - _subscriptionDroppedSource.Task; - - public readonly EventData[] Events; - private PersistentSubscription _subscription; - - public Fixture() { - _subscriptionDroppedSource = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); - Events = CreateTestEvents().ToArray(); - } - - protected override async Task Given() { - await Client.CreateAsync(Stream, Group, - new PersistentSubscriptionSettings(startFrom: StreamPosition.Start), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - delegate { - throw new Exception("test"); - }, (subscription, reason, ex) => _subscriptionDroppedSource.SetResult((reason, ex)), TestCredentials.Root); - } - - protected override Task When() => - StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs index e4cc756f2..8d981307d 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs @@ -14,12 +14,12 @@ public class connect_to_existing_with_max_one_client [Fact] public async Task the_second_subscription_fails_to_connect() { - using var first = await _fixture.Client.SubscribeAsync(Stream, Group, + using var first = await _fixture.Client.SubscribeToStreamAsync(Stream, Group, delegate { return Task.CompletedTask; }, userCredentials: TestCredentials.Root).WithTimeout(); var ex = await Assert.ThrowsAsync(async () => { - using var _ = await _fixture.Client.SubscribeAsync(Stream, Group, + using var _ = await _fixture.Client.SubscribeToStreamAsync(Stream, Group, delegate { return Task.CompletedTask; }, userCredentials: TestCredentials.Root); }).WithTimeout(); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs index 865240edf..8841b6c80 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs @@ -16,7 +16,7 @@ public class connect_to_existing_with_permissions [Fact] public async Task the_subscription_succeeds() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); - using var subscription = await _fixture.Client.SubscribeAsync(Stream, "agroupname17", + using var subscription = await _fixture.Client.SubscribeToStreamAsync(Stream, "agroupname17", delegate { return Task.CompletedTask; }, (s, reason, ex) => dropped.TrySetResult((reason, ex)), TestCredentials.Root).WithTimeout(); Assert.NotNull(subscription); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs index 0f219b2db..60c6038d1 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs @@ -43,10 +43,10 @@ public class Fixture : EventStoreClientFixture { } protected override async Task When() { - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs index 2821b3b33..fcc5fcbd9 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs @@ -39,10 +39,10 @@ public class Fixture : EventStoreClientFixture { protected override async Task Given() { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs index b9dbacf3c..8a04c3431 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs @@ -41,10 +41,10 @@ public class Fixture : EventStoreClientFixture { } protected override async Task When() { - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs index 188b97f8f..68813f626 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs @@ -44,10 +44,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(10)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs index cdd0bc6cc..874393a29 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs @@ -41,10 +41,10 @@ public class Fixture : EventStoreClientFixture { } protected override async Task When() { - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs index ea26e24f1..238290da4 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs @@ -44,10 +44,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(10)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.End), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs index 91bc7f1f4..22e6ea085 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs @@ -38,10 +38,10 @@ public class Fixture : EventStoreClientFixture { protected override async Task Given() { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: new StreamPosition(2)), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs index 47fe75805..240a43db7 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs @@ -37,10 +37,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(10)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: new StreamPosition(4)), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs index 5796c8ad5..5ad8ecf53 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs @@ -42,10 +42,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(10)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: new StreamPosition(10)), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs index 7a2b03af9..d21f7fd33 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs @@ -45,10 +45,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(11)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: new StreamPosition(11)), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs index a606dfa95..8fbfb9d22 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs @@ -11,7 +11,7 @@ public class connect_to_existing_without_permissions [Fact] public Task throws_access_denied() => Assert.ThrowsAsync(async () => { - using var _ = await _fixture.Client.SubscribeAsync(Stream, "agroupname55", + using var _ = await _fixture.Client.SubscribeToStreamAsync(Stream, "agroupname55", delegate { return Task.CompletedTask; }); }).WithTimeout(); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs index 0e4995ae7..8a298b740 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs @@ -16,7 +16,7 @@ public class connect_to_non_existing_with_permissions [Fact] public async Task throws_persistent_subscription_not_found() { var ex = await Assert.ThrowsAsync(async () => { - using var _ = await _fixture.Client.SubscribeAsync( + using var _ = await _fixture.Client.SubscribeToStreamAsync( Stream, Group, delegate { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs index 236bc80c2..e7b8ba4d2 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs @@ -34,7 +34,7 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.Start), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, async (subscription, e, r, ct) => { if (r > 4) { _retryCountSource.TrySetResult(r.Value); @@ -43,7 +43,7 @@ public class Fixture : EventStoreClientFixture { await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, "Not yet tried enough times", e); } - }, autoAck: false, subscriptionDropped: (subscription, reason, ex) => { + }, subscriptionDropped: (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _retryCountSource.TrySetException(ex!); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs index 4b91b39a1..1933cb093 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs @@ -45,10 +45,10 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(11)); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: new StreamPosition(11)), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, r, ct) => { + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, + async (subscription, e, r, ct) => { _firstEventSource.TrySetResult(e); - return Task.CompletedTask; + await subscription.Ack(e); }, (subscription, reason, ex) => { if (reason != SubscriptionDroppedReason.Disposed) { _firstEventSource.TrySetException(ex!); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs index 1be98bd1b..0b359c4a0 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs @@ -25,7 +25,7 @@ public class Fixture : EventStoreClientFixture { await Client.CreateAsync(Stream, "groupname123", new PersistentSubscriptionSettings(), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, "groupname123", + _subscription = await Client.SubscribeToStreamAsync(Stream, "groupname123", (s, e, i, ct) => Task.CompletedTask, (s, r, e) => _dropped.TrySetResult((r, e)), TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs deleted file mode 100644 index eb4c93638..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs +++ /dev/null @@ -1,75 +0,0 @@ -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToStream { - public class happy_case_catching_up_to_link_to_events_auto_ack - : IClassFixture { - private const string Stream = nameof(happy_case_catching_up_to_link_to_events_auto_ack); - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_catching_up_to_link_to_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount) - .Select((e, i) => new EventData(e.EventId, SystemEventTypes.LinkTo, - Encoding.UTF8.GetBytes($"{i}@{Stream}"), - contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream)) - .ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync(Stream, StreamState.Any, new[] {e}); - } - - await Client.CreateAsync(Stream, Group, - new PersistentSubscriptionSettings(startFrom: StreamPosition.Start, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, retryCount, ct) => { - if (Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override Task When() => Task.CompletedTask; - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs index 0dfee2c0c..6856cad50 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs @@ -48,7 +48,7 @@ public class Fixture : EventStoreClientFixture { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.Start, resolveLinkTos: true), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, async (subscription, e, retryCount, ct) => { await subscription.Ack(e); @@ -60,7 +60,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs deleted file mode 100644 index 8c6cbfb0f..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs +++ /dev/null @@ -1,70 +0,0 @@ -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToStream { - public class happy_case_catching_up_to_normal_events_auto_ack - : IClassFixture { - private const string Stream = nameof(happy_case_catching_up_to_normal_events_auto_ack); - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_catching_up_to_normal_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount).ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync(Stream, StreamState.Any, new[] {e}); - } - - await Client.CreateAsync(Stream, Group, - new PersistentSubscriptionSettings(startFrom: StreamPosition.Start, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, retryCount, ct) => { - if (Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override Task When() => Task.CompletedTask; - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs index f8af434c6..3af3c8050 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs @@ -43,7 +43,7 @@ public class Fixture : EventStoreClientFixture { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.Start, resolveLinkTos: true), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, async(subscription, e, retryCount, ct) => { await subscription.Ack(e); @@ -55,7 +55,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs deleted file mode 100644 index 591ce24c8..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs +++ /dev/null @@ -1,70 +0,0 @@ -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client.SubscriptionToStream { - public class happy_case_writing_and_subscribing_to_normal_events_auto_ack - : IClassFixture { - private const string Stream = nameof(happy_case_writing_and_subscribing_to_normal_events_auto_ack); - private const string Group = nameof(Group); - private const int BufferCount = 10; - private const int EventWriteCount = BufferCount * 2; - - private readonly Fixture _fixture; - - public happy_case_writing_and_subscribing_to_normal_events_auto_ack(Fixture fixture) { - _fixture = fixture; - } - - [Fact] - public async Task Test() { - await _fixture.EventsReceived.WithTimeout(); - } - - public class Fixture : EventStoreClientFixture { - private readonly EventData[] _events; - private readonly TaskCompletionSource _eventsReceived; - public Task EventsReceived => _eventsReceived.Task; - - private PersistentSubscription _subscription; - private int _eventReceivedCount; - - public Fixture() { - _events = CreateTestEvents(EventWriteCount).ToArray(); - _eventsReceived = new TaskCompletionSource(); - } - - protected override async Task Given() { - await Client.CreateAsync(Stream, Group, - new PersistentSubscriptionSettings(startFrom: StreamPosition.End, resolveLinkTos: true), - TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, - (subscription, e, retryCount, ct) => { - if (Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { - _eventsReceived.TrySetResult(true); - } - - return Task.CompletedTask; - }, (s, r, e) => { - if (e != null) { - _eventsReceived.TrySetException(e); - } - }, autoAck: true, - bufferSize: BufferCount, - userCredentials: TestCredentials.Root); - } - - protected override async Task When() { - foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync(Stream, StreamState.Any, new[] {e}); - } - } - - public override Task DisposeAsync() { - _subscription?.Dispose(); - return base.DisposeAsync(); - } - } - } -} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs index 56359c9e3..63b57bc88 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs @@ -39,7 +39,7 @@ public class Fixture : EventStoreClientFixture { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.End, resolveLinkTos: true), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, async (subscription, e, retryCount, ct) => { await subscription.Ack(e); if (Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { @@ -49,7 +49,7 @@ public class Fixture : EventStoreClientFixture { if (e != null) { _eventsReceived.TrySetException(e); } - }, autoAck: false, + }, bufferSize: BufferCount, userCredentials: TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs index 3ebe4f09d..f747c36d8 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs @@ -56,18 +56,18 @@ public class Fixture : EventStoreClientFixture { var checkPointStream = $"$persistentsubscription-{Stream}::{Group}-checkpoint"; await StreamsClient.SubscribeToStreamAsync(checkPointStream, (s, e, ct) => { - _checkPointSource.TrySetResult(e); + _checkPointSource.TrySetResult(e); return Task.CompletedTask; }, userCredentials: TestCredentials.Root); - _firstSubscription = await Client.SubscribeAsync(Stream, Group, - eventAppeared: (s, e, r, ct) => { + _firstSubscription = await Client.SubscribeToStreamAsync(Stream, Group, + eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); + await s.Ack(e); if (_appearedEvents.Count == _events.Length) _appeared.TrySetResult(true); - return Task.CompletedTask; }, (subscription, reason, ex) => _droppedSource.TrySetResult((reason, ex)), TestCredentials.Root); @@ -83,10 +83,10 @@ public class Fixture : EventStoreClientFixture { await _droppedSource.Task.WithTimeout(); - _secondSubscription = await Client.SubscribeAsync(Stream, Group, - eventAppeared: (s, e, r, ct) => { + _secondSubscription = await Client.SubscribeToStreamAsync(Stream, Group, + eventAppeared: async (s, e, r, ct) => { _resumedSource.TrySetResult(e); - return Task.CompletedTask; + await s.Ack(e); }, userCredentials: TestCredentials.Root); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs index b2d53079e..727b816a9 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs @@ -35,7 +35,7 @@ public class Fixture : EventStoreClientFixture { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, CreateTestEvents()); await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, delegate { return Task.CompletedTask; }, (subscription, reason, ex) => _droppedSource.TrySetResult((reason, ex)), TestCredentials.Root); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs index a06d8e613..2a0a56372 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs @@ -40,7 +40,7 @@ public class Fixture : EventStoreClientFixture { await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings(startFrom: StreamPosition.Start, resolveLinkTos: true), TestCredentials.Root); - _subscription = await Client.SubscribeAsync(Stream, Group, + _subscription = await Client.SubscribeToStreamAsync(Stream, Group, async (subscription, e, retryCount, ct) => { await subscription.Nack(PersistentSubscriptionNakEventAction.Park, "fail", e); @@ -52,7 +52,6 @@ public class Fixture : EventStoreClientFixture { _eventsReceived.TrySetException(e); } }, - autoAck: false, bufferSize: BufferCount, userCredentials: TestCredentials.Root); }