Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove autoAck from Persistent Subscriptions #175

Merged
merged 1 commit into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions samples/persistent-subscriptions/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ class Program
static async Task ConnectToPersistentSubscriptionToStream(EventStorePersistentSubscriptionsClient client) {
#region subscribe-to-persistent-subscription-to-stream

var subscription = await client.SubscribeAsync(
var subscription = await client.SubscribeToStreamAsync(
"test-stream",
"subscription-group",
async (subscription, evnt, retryCount, cancellationToken) => {
await HandleEvent(evnt);
await subscription.Ack(evnt);
}, (subscription, dropReason, exception) => {
Console.WriteLine($"Subscription was dropped due to {dropReason}. {exception}");
});
Expand Down Expand Up @@ -82,7 +83,7 @@ class Program
static async Task ConnectToPersistentSubscriptionWithManualAcks(EventStorePersistentSubscriptionsClient client) {
#region subscribe-to-persistent-subscription-with-manual-acks

var subscription = await client.SubscribeAsync(
var subscription = await client.SubscribeToStreamAsync(
"test-stream",
"subscription-group",
async (subscription, evnt, retryCount, cancellationToken) => {
Expand All @@ -94,7 +95,7 @@ class Program
}
}, (subscription, dropReason, exception) => {
Console.WriteLine($"Subscription was dropped due to {dropReason}. {exception}");
}, autoAck: false);
});

#endregion subscribe-to-persistent-subscription-with-manual-acks
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,40 @@ partial class EventStorePersistentSubscriptionsClient {
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentOutOfRangeException"></exception>
[Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStreamAsync with manual acks instead.", true)]
public async Task<PersistentSubscription> SubscribeAsync(string streamName, string groupName,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = null,
UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
CancellationToken cancellationToken = default) {
if (autoAck) {
throw new InvalidOperationException(
$"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead.");
}

return await SubscribeToStreamAsync(streamName, groupName, eventAppeared, subscriptionDropped, userCredentials,
bufferSize, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Subscribes to a persistent subscription. Messages must be manually acknowledged
/// </summary>
/// <param name="streamName"></param>
/// <param name="groupName"></param>
/// <param name="eventAppeared"></param>
/// <param name="subscriptionDropped"></param>
/// <param name="userCredentials"></param>
/// <param name="bufferSize"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public async Task<PersistentSubscription> SubscribeToStreamAsync(string streamName, string groupName,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = null,
UserCredentials? userCredentials = null, int bufferSize = 10,
CancellationToken cancellationToken = default) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
}
Expand Down Expand Up @@ -69,34 +98,32 @@ partial class EventStorePersistentSubscriptionsClient {
readOptions.StreamIdentifier = streamName;
}

return await PersistentSubscription.Confirm(call, readOptions, autoAck, eventAppeared,
return await PersistentSubscription.Confirm(call, readOptions, eventAppeared,
subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Subscribes to a persistent subscription to $all.
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventAppeared"></param>
/// <param name="subscriptionDropped"></param>
/// <param name="userCredentials"></param>
/// <param name="bufferSize"></param>
/// <param name="autoAck"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<PersistentSubscription> SubscribeToAllAsync(string groupName,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = null,
UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
UserCredentials? userCredentials = null, int bufferSize = 10,
CancellationToken cancellationToken = default) =>
await SubscribeAsync(
await SubscribeToStreamAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventAppeared: eventAppeared,
subscriptionDropped: subscriptionDropped,
userCredentials: userCredentials,
bufferSize: bufferSize,
autoAck: autoAck,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace EventStore.Client {
/// Represents a persistent subscription connection.
/// </summary>
public class PersistentSubscription : IDisposable {
private readonly bool _autoAck;
private readonly Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> _eventAppeared;
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
private readonly CancellationTokenSource _disposed;
Expand All @@ -25,7 +24,7 @@ public class PersistentSubscription : IDisposable {
public string SubscriptionId { get; }

internal static async Task<PersistentSubscription> Confirm(AsyncDuplexStreamingCall<ReadReq, ReadResp> call,
ReadReq.Types.Options options, bool autoAck,
ReadReq.Types.Options options,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
CancellationToken cancellationToken = default) {
Expand All @@ -38,16 +37,14 @@ public class PersistentSubscription : IDisposable {
throw new InvalidOperationException();
}

return new PersistentSubscription(call, autoAck, eventAppeared, subscriptionDropped);
return new PersistentSubscription(call, eventAppeared, subscriptionDropped);
}

private PersistentSubscription(
AsyncDuplexStreamingCall<ReadReq, ReadResp> call,
bool autoAck,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped) {
_call = call;
_autoAck = autoAck;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_disposed = new CancellationTokenSource();
Expand Down Expand Up @@ -146,10 +143,6 @@ public class PersistentSubscription : IDisposable {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => current.Event.RetryCount,
_ => default
}, _disposed.Token).ConfigureAwait(false);
if (_autoAck) {
await AckInternal(Uuid.FromDto(current.Event.Link?.Id ?? current.Event.Event.Id))
.ConfigureAwait(false);
}
} catch (Exception ex) when (ex is ObjectDisposedException ||
ex is OperationCanceledException) {
SubscriptionDropped(SubscriptionDroppedReason.Disposed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static IEnumerable<object[]> TestCases() => Enumerable.Range(0, 50)
resolveLinkTos: true, startFrom: StreamPosition.Start, readBatchSize: 10, historyBufferSize: 20),
userCredentials: userCredentials);

using (await _fixture.Client.SubscribeAsync(streamName, subscriptionName,
using (await _fixture.Client.SubscribeToStreamAsync(streamName, subscriptionName,
async (subscription, @event, retryCount, arg4) => {
int result;
if (retryCount == 0 || retryCount is null) {
Expand All @@ -66,7 +66,7 @@ public static IEnumerable<object[]> TestCases() => Enumerable.Range(0, 50)
completed.TrySetException(e);
else
completed.TrySetException(new Exception($"{dr}"));
}, userCredentials, autoAck: false)) {
}, userCredentials)) {
for (var i = 0; i < eventCount; i++) {
await _fixture.StreamsClient.AppendToStreamAsync(streamName, StreamState.Any,
_fixture.CreateTestEvents());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public class Fixture : EventStoreClientFixture {

protected override async Task When() {
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
_firstEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ public class Fixture : EventStoreClientFixture {

protected override async Task When() {
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
if (SystemStreams.IsSystemStream(e.OriginalStreamId)) {
return Task.CompletedTask;
await subscription.Ack(e);
return;
}
_firstNonSystemEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstNonSystemEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public class Fixture : EventStoreClientFixture {

await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), TestCredentials.Root);
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
if (SystemStreams.IsSystemStream(e.OriginalStreamId)) {
return Task.CompletedTask;
await subscription.Ack(e);
return;
}
_firstNonSystemEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstNonSystemEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ public class Fixture : EventStoreClientFixture {

protected override async Task When() {
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
if (SystemStreams.IsSystemStream(e.OriginalStreamId)) {
return Task.CompletedTask;
await subscription.Ack(e);
return;
}
_firstNonSystemEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstNonSystemEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public class Fixture : EventStoreClientFixture {

await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(startFrom: Position.End), TestCredentials.Root);
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
if (SystemStreams.IsSystemStream(e.OriginalStreamId)) {
return Task.CompletedTask;
await subscription.Ack(e);
return;
}
_firstNonSystemEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstNonSystemEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Fixture : EventStoreClientFixture {

protected override async Task When() {
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => Task.CompletedTask,
async (subscription, e, r, ct) => await subscription.Ack(e),
(subscription, reason, ex) => {
_dropped.TrySetResult((reason, ex));
}, TestCredentials.Root);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public class Fixture : EventStoreClientFixture {

protected override async Task When() {
_subscription = await Client.SubscribeToAllAsync(Group,
(subscription, e, r, ct) => {
async (subscription, e, r, ct) => {
_firstEventSource.TrySetResult(e);
return Task.CompletedTask;
await subscription.Ack(e);
}, (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_firstEventSource.TrySetException(ex!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Fixture : EventStoreClientFixture {
await subscription.Nack(PersistentSubscriptionNakEventAction.Retry,
"Not yet tried enough times", e);
}
}, autoAck: false, subscriptionDropped: (subscription, reason, ex) => {
}, subscriptionDropped: (subscription, reason, ex) => {
if (reason != SubscriptionDroppedReason.Disposed) {
_retryCountSource.TrySetException(ex!);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class Fixture : EventStoreClientFixture {
new PersistentSubscriptionSettings(),
TestCredentials.Root);
_subscription = await Client.SubscribeToAllAsync("groupname123",
(s, e, i, ct) => Task.CompletedTask,
async (s, e, i, ct) => await s.Ack(e),
(s, r, e) => _dropped.TrySetResult((r, e)), TestCredentials.Root);
// todo: investigate why this test is flaky without this delay
await Task.Delay(500);
Expand Down