diff --git a/FileProcessor.BusinessLogic/FileProcessor.BusinessLogic.csproj b/FileProcessor.BusinessLogic/FileProcessor.BusinessLogic.csproj index 7ce188c..a0befa6 100644 --- a/FileProcessor.BusinessLogic/FileProcessor.BusinessLogic.csproj +++ b/FileProcessor.BusinessLogic/FileProcessor.BusinessLogic.csproj @@ -9,10 +9,10 @@ - - + + - + diff --git a/FileProcessor.Client/FileProcessor.Client.csproj b/FileProcessor.Client/FileProcessor.Client.csproj index 0f942c7..801caf0 100644 --- a/FileProcessor.Client/FileProcessor.Client.csproj +++ b/FileProcessor.Client/FileProcessor.Client.csproj @@ -6,7 +6,7 @@ - + diff --git a/FileProcessor.File.DomainEvents/FileProcessor.File.DomainEvents.csproj b/FileProcessor.File.DomainEvents/FileProcessor.File.DomainEvents.csproj index d3a7382..23585a5 100644 --- a/FileProcessor.File.DomainEvents/FileProcessor.File.DomainEvents.csproj +++ b/FileProcessor.File.DomainEvents/FileProcessor.File.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/FileProcessor.FileAggregate/FileProcessor - Backup.FileAggregate.csproj b/FileProcessor.FileAggregate/FileProcessor - Backup.FileAggregate.csproj new file mode 100644 index 0000000..87838a5 --- /dev/null +++ b/FileProcessor.FileAggregate/FileProcessor - Backup.FileAggregate.csproj @@ -0,0 +1,18 @@ + + + + net7.0 + + + + + + + + + + + + + + diff --git a/FileProcessor.FileAggregate/FileProcessor.FileAggregate.csproj b/FileProcessor.FileAggregate/FileProcessor.FileAggregate.csproj index 73285a9..87838a5 100644 --- a/FileProcessor.FileAggregate/FileProcessor.FileAggregate.csproj +++ b/FileProcessor.FileAggregate/FileProcessor.FileAggregate.csproj @@ -6,8 +6,8 @@ - - + + diff --git a/FileProcessor.FileImportLog.DomainEvents/FileProcessor.FileImportLog.DomainEvents.csproj b/FileProcessor.FileImportLog.DomainEvents/FileProcessor.FileImportLog.DomainEvents.csproj index a79517b..aa323e7 100644 --- a/FileProcessor.FileImportLog.DomainEvents/FileProcessor.FileImportLog.DomainEvents.csproj +++ b/FileProcessor.FileImportLog.DomainEvents/FileProcessor.FileImportLog.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/FileProcessor.FileImportLogAggregate/FileProcessor.FileImportLogAggregate.csproj b/FileProcessor.FileImportLogAggregate/FileProcessor.FileImportLogAggregate.csproj index 04ba9e0..d509dee 100644 --- a/FileProcessor.FileImportLogAggregate/FileProcessor.FileImportLogAggregate.csproj +++ b/FileProcessor.FileImportLogAggregate/FileProcessor.FileImportLogAggregate.csproj @@ -6,7 +6,7 @@ - + diff --git a/FileProcessor.IntegrationTests/FileProcessor.IntegrationTests.csproj b/FileProcessor.IntegrationTests/FileProcessor.IntegrationTests.csproj index aef66f7..9969d2c 100644 --- a/FileProcessor.IntegrationTests/FileProcessor.IntegrationTests.csproj +++ b/FileProcessor.IntegrationTests/FileProcessor.IntegrationTests.csproj @@ -21,8 +21,8 @@ - - + + diff --git a/FileProcessor/Bootstrapper/DomainEventHandlerRegistry.cs b/FileProcessor/Bootstrapper/DomainEventHandlerRegistry.cs index 10df3c2..3210f10 100644 --- a/FileProcessor/Bootstrapper/DomainEventHandlerRegistry.cs +++ b/FileProcessor/Bootstrapper/DomainEventHandlerRegistry.cs @@ -42,7 +42,8 @@ public DomainEventHandlerRegistry() }); this.AddSingleton(); - this.AddSingleton(); + this.For().Use().Named("Main") + .Ctor>().Is(eventHandlersConfiguration).Singleton(); } #endregion diff --git a/FileProcessor/FileProcessingWorker.cs b/FileProcessor/FileProcessingWorker.cs index d62b032..7d1989b 100644 --- a/FileProcessor/FileProcessingWorker.cs +++ b/FileProcessor/FileProcessingWorker.cs @@ -16,7 +16,6 @@ using Microsoft.Extensions.Logging; using Shared.EventStore.Aggregate; using Shared.EventStore.EventStore; - using Shared.EventStore.Subscriptions; using Shared.General; /// diff --git a/FileProcessor/FileProcessor.csproj b/FileProcessor/FileProcessor.csproj index 179cac0..256fbcc 100644 --- a/FileProcessor/FileProcessor.csproj +++ b/FileProcessor/FileProcessor.csproj @@ -18,7 +18,7 @@ - + diff --git a/FileProcessor/Program.cs b/FileProcessor/Program.cs index 655e810..21779d2 100644 --- a/FileProcessor/Program.cs +++ b/FileProcessor/Program.cs @@ -21,7 +21,6 @@ namespace FileProcessor using MediatR; using Microsoft.Extensions.DependencyInjection; using Shared.EventStore.EventHandling; - using Shared.EventStore.Subscriptions; using Shared.Logger; [ExcludeFromCodeCoverage] diff --git a/FileProcessor/Startup.cs b/FileProcessor/Startup.cs index e14d14a..eef3cc0 100644 --- a/FileProcessor/Startup.cs +++ b/FileProcessor/Startup.cs @@ -244,61 +244,114 @@ public static IServiceCollection AddInSecureEventStoreClient( } }; - static Action concurrentLog = (tt, message) => log(tt, "CONCURRENT", message); + static Action mainLog = (tt, message) => Extensions.log(tt, "MAIN", message); + static Action orderedLog = (tt, message) => Extensions.log(tt, "ORDERED", message); public static void PreWarm(this IApplicationBuilder applicationBuilder) { Startup.LoadTypes(); - //SubscriptionWorker worker = new SubscriptionWorker() - var internalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("InternalSubscriptionService")); + IConfigurationSection subscriptionConfigSection = Startup.Configuration.GetSection("AppSettings:SubscriptionConfiguration"); + SubscriptionWorkersRoot subscriptionWorkersRoot = new SubscriptionWorkersRoot(); + subscriptionConfigSection.Bind(subscriptionWorkersRoot); - if (internalSubscriptionService) + if (subscriptionWorkersRoot.InternalSubscriptionService) { String eventStoreConnectionString = ConfigurationReader.GetValue("EventStoreSettings", "ConnectionString"); - Int32 inflightMessages = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InflightMessages")); - Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds")); - String filter = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceFilter"); - String ignore = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceIgnore"); - String streamName = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionFilterOnStreamName"); - Int32 cacheDuration = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceCacheDuration")); - ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration); - - ((SubscriptionRepository)subscriptionRepository).Trace += (sender, s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s); + ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, subscriptionWorkersRoot.InternalSubscriptionServiceCacheDuration); + ((SubscriptionRepository)subscriptionRepository).Trace += (sender, + s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s); // init our SubscriptionRepository subscriptionRepository.PreWarm(CancellationToken.None).Wait(); - var eventHandlerResolver = Startup.ServiceProvider.GetService(); - - SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateConcurrentSubscriptionWorker(eventStoreConnectionString, eventHandlerResolver, subscriptionRepository, inflightMessages, persistentSubscriptionPollingInSeconds); - - concurrentSubscriptions.Trace += (_, args) => concurrentLog(TraceEventType.Information, args.Message); - concurrentSubscriptions.Warning += (_, args) => concurrentLog(TraceEventType.Warning, args.Message); - concurrentSubscriptions.Error += (_, args) => concurrentLog(TraceEventType.Error, args.Message); - - if (!String.IsNullOrEmpty(ignore)) + List workers = ConfigureSubscriptions(subscriptionRepository, subscriptionWorkersRoot); + foreach (SubscriptionWorker subscriptionWorker in workers) { - concurrentSubscriptions = concurrentSubscriptions.IgnoreSubscriptions(ignore); + subscriptionWorker.StartAsync(CancellationToken.None).Wait(); } + } + } - if (!String.IsNullOrEmpty(filter)) - { - //NOTE: Not overly happy with this design, but - //the idea is if we supply a filter, this overrides ignore - concurrentSubscriptions = concurrentSubscriptions.FilterSubscriptions(filter) - .IgnoreSubscriptions(null); + private static List ConfigureSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionWorkersRoot configuration) + { + List workers = new List(); - } + foreach (SubscriptionWorkerConfig configurationSubscriptionWorker in configuration.SubscriptionWorkers) + { + if (configurationSubscriptionWorker.Enabled == false) + continue; - if (!String.IsNullOrEmpty(streamName)) + if (configurationSubscriptionWorker.IsOrdered) { - concurrentSubscriptions = concurrentSubscriptions.FilterByStreamName(streamName); + IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Ordered"); + SubscriptionWorker worker = SubscriptionWorker.CreateOrderedSubscriptionWorker(Startup.EventStoreClientSettings, + eventHandlerResolver, + subscriptionRepository, + configuration.PersistentSubscriptionPollingInSeconds); + worker.Trace += (_, + args) => Extensions.orderedLog(TraceEventType.Information, args.Message); + worker.Warning += (_, + args) => Extensions.orderedLog(TraceEventType.Warning, args.Message); + worker.Error += (_, + args) => Extensions.orderedLog(TraceEventType.Error, args.Message); + worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups); + worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams); + worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups); + worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams); + workers.Add(worker); + } + else + { + for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++) + { + IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Main"); + SubscriptionWorker worker = SubscriptionWorker.CreateSubscriptionWorker(Startup.EventStoreClientSettings, + eventHandlerResolver, + subscriptionRepository, + configurationSubscriptionWorker.InflightMessages, + configuration.PersistentSubscriptionPollingInSeconds); + + worker.Trace += (_, + args) => Extensions.mainLog(TraceEventType.Information, args.Message); + worker.Warning += (_, + args) => Extensions.mainLog(TraceEventType.Warning, args.Message); + worker.Error += (_, + args) => Extensions.mainLog(TraceEventType.Error, args.Message); + + worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups); + worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams); + worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups); + worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams); + + workers.Add(worker); + } } - - concurrentSubscriptions.StartAsync(CancellationToken.None).Wait(); } + + return workers; } } + + public class SubscriptionWorkersRoot + { + public Boolean InternalSubscriptionService { get; set; } + public Int32 PersistentSubscriptionPollingInSeconds { get; set; } + public Int32 InternalSubscriptionServiceCacheDuration { get; set; } + public List SubscriptionWorkers { get; set; } + } + + public class SubscriptionWorkerConfig + { + public String WorkerName { get; set; } + public String IncludeGroups { get; set; } + public String IgnoreGroups { get; set; } + public String IncludeStreams { get; set; } + public String IgnoreStreams { get; set; } + public Boolean Enabled { get; set; } + public Int32 InflightMessages { get; set; } + public Int32 InstanceCount { get; set; } + public Boolean IsOrdered { get; set; } + } } diff --git a/FileProcessor/appsettings.json b/FileProcessor/appsettings.json index 449ac71..39d4b0f 100644 --- a/FileProcessor/appsettings.json +++ b/FileProcessor/appsettings.json @@ -4,7 +4,22 @@ }, "AppSettings": { "FileProfilePollingWindowInSeconds": 20, - "InternalSubscriptionServiceFilter": "File Processor", + "SubscriptionConfiguration": { + "InternalSubscriptionService": "true", + "PersistentSubscriptionPollingInSeconds": 480, + "InternalSubscriptionServiceCacheDuration": 120, + "SubscriptionWorkers": [ + { + "WorkerName": "File Processor Main", + "IncludeGroups": "File Processor", + "IgnoreGroups": "Ordered,local-", + "Enabled": true, + "InflightMessages": 100, + "IsOrdered": false, + "InstanceCount": 1 + } + ] + }, "ClientId": "serviceClient", "ClientSecret": "d192cbc46d834d0da90e8a9d50ded543", "EventHandlerConfiguration": {