diff --git a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj index 81cc78b3..26fccc3e 100644 --- a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj +++ b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj @@ -10,8 +10,8 @@ - - + + diff --git a/TransactionProcessor.Client/TransactionProcessor.Client.csproj b/TransactionProcessor.Client/TransactionProcessor.Client.csproj index 268e79eb..32ed2999 100644 --- a/TransactionProcessor.Client/TransactionProcessor.Client.csproj +++ b/TransactionProcessor.Client/TransactionProcessor.Client.csproj @@ -6,7 +6,7 @@ - + diff --git a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj index ed853c6e..5d3a697e 100644 --- a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj +++ b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj @@ -7,7 +7,7 @@ - + @@ -16,7 +16,7 @@ - + diff --git a/TransactionProcessor.ProjectionEngine/TransactionProcessor.ProjectionEngine.csproj b/TransactionProcessor.ProjectionEngine/TransactionProcessor.ProjectionEngine.csproj index 2104894b..2baf3e77 100644 --- a/TransactionProcessor.ProjectionEngine/TransactionProcessor.ProjectionEngine.csproj +++ b/TransactionProcessor.ProjectionEngine/TransactionProcessor.ProjectionEngine.csproj @@ -19,9 +19,9 @@ - + - + diff --git a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj index cfb67d0b..73da5ea7 100644 --- a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj +++ b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj @@ -5,6 +5,6 @@ - + diff --git a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj index c4b870d3..a426a6cb 100644 --- a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj +++ b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj @@ -6,7 +6,7 @@ - + diff --git a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj index a79517b1..8be74393 100644 --- a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj +++ b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj index 812a1786..2955834e 100644 --- a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj +++ b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj @@ -6,7 +6,7 @@ - + diff --git a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj index 81e51f6c..a4484565 100644 --- a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj +++ b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj index c0724ff1..744b1489 100644 --- a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj +++ b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj @@ -6,7 +6,7 @@ - + diff --git a/TransactionProcessor.Voucher.DomainEvents/TransactionProcessor.Voucher.DomainEvents.csproj b/TransactionProcessor.Voucher.DomainEvents/TransactionProcessor.Voucher.DomainEvents.csproj index 1fdebce9..a8b66816 100644 --- a/TransactionProcessor.Voucher.DomainEvents/TransactionProcessor.Voucher.DomainEvents.csproj +++ b/TransactionProcessor.Voucher.DomainEvents/TransactionProcessor.Voucher.DomainEvents.csproj @@ -7,6 +7,6 @@ - + diff --git a/TransactionProcessor.VoucherAggregate/TransactionProcessor.VoucherAggregate.csproj b/TransactionProcessor.VoucherAggregate/TransactionProcessor.VoucherAggregate.csproj index ac497f12..3dced288 100644 --- a/TransactionProcessor.VoucherAggregate/TransactionProcessor.VoucherAggregate.csproj +++ b/TransactionProcessor.VoucherAggregate/TransactionProcessor.VoucherAggregate.csproj @@ -8,7 +8,7 @@ - + diff --git a/TransactionProcessor/Bootstrapper/DomainEventHandlerRegistry.cs b/TransactionProcessor/Bootstrapper/DomainEventHandlerRegistry.cs index 79bfef1f..e5542097 100644 --- a/TransactionProcessor/Bootstrapper/DomainEventHandlerRegistry.cs +++ b/TransactionProcessor/Bootstrapper/DomainEventHandlerRegistry.cs @@ -44,7 +44,7 @@ public DomainEventHandlerRegistry() } //this.AddSingleton(eventHandlersConfiguration); - this.Use(eventHandlersConfiguration).Named("Concurrent"); + this.Use(eventHandlersConfiguration).Named("EventHandlerConfiguration"); section = Startup.Configuration.GetSection("AppSettings:EventHandlerConfigurationOrdered"); @@ -52,8 +52,8 @@ public DomainEventHandlerRegistry() { Startup.Configuration.GetSection("AppSettings:EventHandlerConfigurationOrdered").Bind(eventHandlersConfigurationOrdered); } - - this.Use(eventHandlersConfigurationOrdered).Named("Ordered"); + + this.Use(eventHandlersConfigurationOrdered).Named("EventHandlerConfigurationOrdered"); } this.AddSingleton>(container => type => @@ -75,7 +75,7 @@ public DomainEventHandlerRegistry() this.AddSingleton, MerchantBalanceProjection>(); this.AddSingleton, MerchantBalanceStateDispatcher>(); - this.For().Use().Named("Concurrent") + this.For().Use().Named("Main") .Ctor>().Is(eventHandlersConfiguration).Singleton(); this.For().Use().Named("Ordered") .Ctor>().Is(eventHandlersConfigurationOrdered).Singleton(); diff --git a/TransactionProcessor/Extensions.cs b/TransactionProcessor/Extensions.cs index 95d69f95..f870d897 100644 --- a/TransactionProcessor/Extensions.cs +++ b/TransactionProcessor/Extensions.cs @@ -1,6 +1,7 @@ namespace TransactionProcessor { using System; + using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; @@ -56,37 +57,30 @@ public static IServiceCollection AddInSecureEventStoreClient(this IServiceCollec } }; - static Action concurrentLog = (tt, message) => Extensions.log(tt, "CONCURRENT", message); + static Action mainLog = (tt, message) => Extensions.log(tt, "MAIN", message); static Action orderedLog = (tt, message) => Extensions.log(tt, "ORDERED", message); public static void PreWarm(this IApplicationBuilder applicationBuilder) { Startup.LoadTypes(); - Boolean internalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("InternalSubscriptionService")); - - if (internalSubscriptionService) { - IConfigurationSection subscriptionConfigSection = Startup.Configuration.GetSection("AppSettings:SubscriptionConfig"); - SubscriptionConfigRoot subscriptionConfigRoot = new SubscriptionConfigRoot(); - subscriptionConfigSection.Bind(subscriptionConfigRoot); + IConfigurationSection subscriptionConfigSection = Startup.Configuration.GetSection("AppSettings:SubscriptionConfiguration"); + SubscriptionWorkersRoot subscriptionWorkersRoot = new SubscriptionWorkersRoot(); + subscriptionConfigSection.Bind(subscriptionWorkersRoot); + if (subscriptionWorkersRoot.InternalSubscriptionService) { + String eventStoreConnectionString = ConfigurationReader.GetValue("EventStoreSettings", "ConnectionString"); - Int32 cacheDuration = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceCacheDuration")); - - ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration); + + ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, subscriptionWorkersRoot.InternalSubscriptionServiceCacheDuration); ((SubscriptionRepository)subscriptionRepository).Trace += (sender, s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s); // init our SubscriptionRepository subscriptionRepository.PreWarm(CancellationToken.None).Wait(); - - if (subscriptionConfigRoot.Concurrent.IsEnabled) { - SubscriptionWorker concurrentSubscriptions = ConfigureConcurrentSubscriptions(subscriptionRepository, subscriptionConfigRoot.Concurrent); - concurrentSubscriptions.StartAsync(CancellationToken.None).Wait(); - } - - if (subscriptionConfigRoot.Ordered.IsEnabled) { - SubscriptionWorker orderedSubscriptions = ConfigureOrderedSubscriptions(subscriptionRepository, subscriptionConfigRoot.Ordered); - orderedSubscriptions.StartAsync(CancellationToken.None).Wait(); + + List workers = ConfigureSubscriptions(subscriptionRepository, subscriptionWorkersRoot); + foreach (SubscriptionWorker subscriptionWorker in workers) { + subscriptionWorker.StartAsync(CancellationToken.None).Wait(); } } @@ -97,77 +91,59 @@ public static void PreWarm(this IApplicationBuilder applicationBuilder) { } } - private static SubscriptionWorker ConfigureConcurrentSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionConfig concurrent) { - IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Concurrent"); - - Int32 inflightMessages = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InflightMessages")); - Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds")); - - SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateConcurrentSubscriptionWorker(Startup.EventStoreClientSettings, eventHandlerResolver, subscriptionRepository, - inflightMessages, persistentSubscriptionPollingInSeconds); - - concurrentSubscriptions.Trace += (_, args) => Extensions.concurrentLog(TraceEventType.Information, args.Message); - concurrentSubscriptions.Warning += (_, args) => Extensions.concurrentLog(TraceEventType.Warning, args.Message); - concurrentSubscriptions.Error += (_, args) => Extensions.concurrentLog(TraceEventType.Error, args.Message); - - if (!String.IsNullOrEmpty(concurrent.Ignore)) - { - concurrentSubscriptions = concurrentSubscriptions.IgnoreSubscriptions(concurrent.Ignore); - } - - if (!String.IsNullOrEmpty(concurrent.Filter)) - { - //NOTE: Not overly happy with this design, but; - //the idea is if we supply a filter, this overrides ignore - concurrentSubscriptions = concurrentSubscriptions.FilterSubscriptions(concurrent.Filter); - //.IgnoreSubscriptions(null); - - } - - if (!String.IsNullOrEmpty(concurrent.StreamName)) - { - concurrentSubscriptions = concurrentSubscriptions.FilterByStreamName(concurrent.StreamName); + private static List ConfigureSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionWorkersRoot configuration) { + List workers = new List(); + + foreach (SubscriptionWorkerConfig configurationSubscriptionWorker in configuration.SubscriptionWorkers) { + if (configurationSubscriptionWorker.Enabled == false) + continue; + + if (configurationSubscriptionWorker.IsOrdered) { + IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Ordered"); + SubscriptionWorker worker = SubscriptionWorker.CreateOrderedSubscriptionWorker(Startup.EventStoreClientSettings, + eventHandlerResolver, + subscriptionRepository, + configuration.PersistentSubscriptionPollingInSeconds); + worker.Trace += (_, + args) => Extensions.orderedLog(TraceEventType.Information, args.Message); + worker.Warning += (_, + args) => Extensions.orderedLog(TraceEventType.Warning, args.Message); + worker.Error += (_, + args) => Extensions.orderedLog(TraceEventType.Error, args.Message); + worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups); + worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams); + worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups); + worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams); + workers.Add(worker); + + } + else { + for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++) { + IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Main"); + SubscriptionWorker worker = SubscriptionWorker.CreateSubscriptionWorker(Startup.EventStoreClientSettings, + eventHandlerResolver, + subscriptionRepository, + configurationSubscriptionWorker.InflightMessages, + configuration.PersistentSubscriptionPollingInSeconds); + + worker.Trace += (_, + args) => Extensions.mainLog(TraceEventType.Information, args.Message); + worker.Warning += (_, + args) => Extensions.mainLog(TraceEventType.Warning, args.Message); + worker.Error += (_, + args) => Extensions.mainLog(TraceEventType.Error, args.Message); + + worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups); + worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams); + worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups); + worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams); + + workers.Add(worker); + } + } } - - return concurrentSubscriptions; - } - - private static SubscriptionWorker ConfigureOrderedSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionConfig ordered) - { - IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance("Ordered"); - Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds")); - - SubscriptionWorker orderedSubscriptions = - SubscriptionWorker.CreateOrderedSubscriptionWorker(Startup.EventStoreClientSettings, - eventHandlerResolver, - subscriptionRepository, - persistentSubscriptionPollingInSeconds); - - orderedSubscriptions.Trace += (_, args) => Extensions.orderedLog(TraceEventType.Information, args.Message); - orderedSubscriptions.Warning += (_, args) => Extensions.orderedLog(TraceEventType.Warning, args.Message); - orderedSubscriptions.Error += (_, args) => Extensions.orderedLog(TraceEventType.Error, args.Message); - - if (!String.IsNullOrEmpty(ordered.Ignore)) - { - orderedSubscriptions = orderedSubscriptions.IgnoreSubscriptions(ordered.Ignore); - } - - if (!String.IsNullOrEmpty(ordered.Filter)) - { - //NOTE: Not overly happy with this design, but; - //the idea is if we supply a filter, this overrides ignore - orderedSubscriptions = orderedSubscriptions.FilterSubscriptions(ordered.Filter) - .IgnoreSubscriptions(null); - - } - - if (!String.IsNullOrEmpty(ordered.StreamName)) - { - orderedSubscriptions = orderedSubscriptions.FilterByStreamName(ordered.StreamName); - } - - return orderedSubscriptions; + return workers; } private static void OperatorLogon(String operatorId) @@ -185,17 +161,25 @@ private static void OperatorLogon(String operatorId) } } } - - public class SubscriptionConfigRoot + + public class SubscriptionWorkersRoot { - public SubscriptionConfig Ordered { get; set; } - public SubscriptionConfig Concurrent { get; set; } + public Boolean InternalSubscriptionService { get; set; } + public Int32 PersistentSubscriptionPollingInSeconds { get; set; } + public Int32 InternalSubscriptionServiceCacheDuration { get; set; } + public List SubscriptionWorkers { get; set; } } - public class SubscriptionConfig{ - public Boolean IsEnabled { get; set; } - public String Filter { get; set; } - public String Ignore { get; set; } - public String StreamName { get; set; } + public class SubscriptionWorkerConfig + { + public String WorkerName { get; set; } + public String IncludeGroups { get; set; } + public String IgnoreGroups { get; set; } + public String IncludeStreams { get; set; } + public String IgnoreStreams { get; set; } + public Boolean Enabled { get; set; } + public Int32 InflightMessages { get; set; } + public Int32 InstanceCount { get; set; } + public Boolean IsOrdered { get; set; } } } \ No newline at end of file diff --git a/TransactionProcessor/Program.cs b/TransactionProcessor/Program.cs index d7a50111..52a0ec7c 100644 --- a/TransactionProcessor/Program.cs +++ b/TransactionProcessor/Program.cs @@ -19,7 +19,6 @@ namespace TransactionProcessor using Settlement.DomainEvents; using Shared.EventStore.Aggregate; using Shared.EventStore.EventHandling; - using Shared.EventStore.Subscriptions; using Shared.Logger; using Transaction.DomainEvents; diff --git a/TransactionProcessor/Startup.cs b/TransactionProcessor/Startup.cs index c1e0b7ac..2f64b466 100644 --- a/TransactionProcessor/Startup.cs +++ b/TransactionProcessor/Startup.cs @@ -130,7 +130,7 @@ public void Configure(IApplicationBuilder app, Logger.Initialise(logger); - Action loggerAction = message => { Logger.LogInformation(message); }; + Action loggerAction = Logger.LogInformation; Startup.Configuration.LogConfiguration(loggerAction); foreach (KeyValuePair type in TypeMap.Map) { diff --git a/TransactionProcessor/TransactionProcessor.csproj b/TransactionProcessor/TransactionProcessor.csproj index 69252045..94594502 100644 --- a/TransactionProcessor/TransactionProcessor.csproj +++ b/TransactionProcessor/TransactionProcessor.csproj @@ -35,8 +35,8 @@ - - + + diff --git a/TransactionProcessor/appsettings.json b/TransactionProcessor/appsettings.json index 70613eb1..3a0699e9 100644 --- a/TransactionProcessor/appsettings.json +++ b/TransactionProcessor/appsettings.json @@ -1,22 +1,5 @@ { "AppSettings": { - "InternalSubscriptionServiceFilter": "Transaction Processor", - "SubscriptionConfig": { - "Ordered": { - "Filter": "Transaction Processor", - "Ignore": "", - "StreamName": "", - "IsEnabled": true - }, - "Concurrent": { - "Filter": "Transaction Processor", - "Ignore": "Ordered", - "StreamName": "", - //"InflightMessages": 1, - "IsEnabled": true - } - }, - "ClientId": "serviceClient", "ClientSecret": "d192cbc46d834d0da90e8a9d50ded543", //"SecurityService": "https://127.0.0.1:5001", @@ -60,7 +43,6 @@ "TransactionProcessor.ProjectionEngine.EventHandling.EventHandler,TransactionProcessor.ProjectionEngine" ] }, - "EventStateConfig": { "EstateCreatedEvent": "MerchantBalanceProjectionState", "MerchantCreatedEvent": "MerchantBalanceProjectionState", @@ -69,8 +51,32 @@ "TransactionHasStartedEvent": "MerchantBalanceProjectionState", "TransactionHasBeenCompletedEvent": "MerchantBalanceProjectionState", "MerchantFeeAddedToTransactionEvent": "MerchantBalanceProjectionState" + }, + "SubscriptionConfiguration": { + "InternalSubscriptionService": "true", + "PersistentSubscriptionPollingInSeconds": 480, + "InternalSubscriptionServiceCacheDuration": 120, + "SubscriptionWorkers": [ + { + "WorkerName": "Transaction Processor Main", + "IncludeGroups": "Transaction Processor", + "IgnoreGroups": "Ordered,local-", + "Enabled": true, + "InflightMessages": 100, + "IsOrdered": false, + "InstanceCount": 2 + }, + { + "WorkerName": "Transaction Processor Projections", + "IncludeGroups": "Transaction Processor - Ordered", + "IgnoreGroups": "local-", + "Enabled": true, + "InstanceCount": 1, + "IsOrdered": true + } + ] } - }, + }, "ConnectionStrings": { // SQL Server "TransactionProcessorReadModel": "server=192.168.1.133;user id=sa;password=Sc0tland;database=TransactionProcessorReadModel;Encrypt=false"