diff --git a/Shared.EventStore/Subscriptions/PersistentSubscription.cs b/Shared.EventStore/Subscriptions/PersistentSubscription.cs index e3249ff0..b427b471 100644 --- a/Shared.EventStore/Subscriptions/PersistentSubscription.cs +++ b/Shared.EventStore/Subscriptions/PersistentSubscription.cs @@ -102,7 +102,7 @@ public async Task ConnectToSubscription() this.EventAppeared, this.SubscriptionDropped, this.UserCredentials, - 200, + this.PersistentSubscriptionDetails.InflightCount == 0 ? 200 : this.PersistentSubscriptionDetails.InflightCount, false); diff --git a/Shared.EventStore/Subscriptions/PersistentSubscriptionDetails.cs b/Shared.EventStore/Subscriptions/PersistentSubscriptionDetails.cs index eb8f1d66..ed2d7780 100644 --- a/Shared.EventStore/Subscriptions/PersistentSubscriptionDetails.cs +++ b/Shared.EventStore/Subscriptions/PersistentSubscriptionDetails.cs @@ -12,10 +12,12 @@ public record PersistentSubscriptionDetails /// Name of the stream. /// Name of the group. public PersistentSubscriptionDetails(String streamName, - String groupName) + String groupName, + Int32 inflightCount) { this.StreamName = streamName; this.GroupName = groupName; + this.InflightCount = inflightCount; } #endregion @@ -38,6 +40,8 @@ public PersistentSubscriptionDetails(String streamName, /// public String StreamName { get; init; } + public Int32 InflightCount { get; init; } + #endregion #region Methods diff --git a/Shared.EventStore/Subscriptions/SubscriptionWorker.cs b/Shared.EventStore/Subscriptions/SubscriptionWorker.cs index 45912132..97070bbb 100644 --- a/Shared.EventStore/Subscriptions/SubscriptionWorker.cs +++ b/Shared.EventStore/Subscriptions/SubscriptionWorker.cs @@ -98,6 +98,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Boolean useInternalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("UseInternalSubscriptionService")); + String inflightMessageCount = ConfigurationReader.GetValue("InflightMessages"); if (useInternalSubscriptionService == false) return; @@ -137,7 +138,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { this.LogInformation($"Creating subscription {subscriptionDto.EventStreamId}-{subscriptionDto.GroupName}"); - PersistentSubscriptionDetails persistentSubscriptionDetails = new(subscriptionDto.EventStreamId, subscriptionDto.GroupName); + PersistentSubscriptionDetails persistentSubscriptionDetails = new(subscriptionDto.EventStreamId, subscriptionDto.GroupName, + String.IsNullOrEmpty(inflightMessageCount) ? 0 : Int32.Parse(inflightMessageCount)); PersistentSubscription subscription = PersistentSubscription.Create(this.PersistentSubscriptionsClient, persistentSubscriptionDetails,