From 46ce3dc0ccb765cc9f919aefa0293a571e11f674 Mon Sep 17 00:00:00 2001 From: Stuart Ferguson Date: Sat, 4 Dec 2021 06:55:08 +0000 Subject: [PATCH 1/2] Upgrade to latest subscription worker --- .../TransactionProcessor.BusinessLogic.csproj | 4 +- ...ansactionProcessor.IntegrationTests.csproj | 6 +- ...ocessor.Reconciliation.DomainEvents.csproj | 2 +- ...onProcessor.ReconciliationAggregate.csproj | 2 +- ...onProcessor.Settlement.DomainEvents.csproj | 2 +- ...ctionProcessor.SettlementAggregates.csproj | 2 +- ...nProcessor.Transaction.DomainEvents.csproj | 2 +- ...ctionProcessor.TransactionAggregate.csproj | 2 +- TransactionProcessor/Program.cs | 46 ++++---- TransactionProcessor/Startup.cs | 109 ++++++++++++++++++ .../TransactionProcessor.csproj | 8 +- TransactionProcessor/appsettings.json | 1 - 12 files changed, 147 insertions(+), 39 deletions(-) diff --git a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj index ca8ccc0a..a161a0d6 100644 --- a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj +++ b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj index d0720753..bca10764 100644 --- a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj +++ b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj @@ -11,9 +11,9 @@ - - - + + + diff --git a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj index cda72881..8ef26d10 100644 --- a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj +++ b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj @@ -5,6 +5,6 @@ - + diff --git a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj index ab8a81f6..c33f8242 100644 --- a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj +++ b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj index 6da5d437..2240eb2f 100644 --- a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj +++ b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj index c28eaf6d..f9503c07 100644 --- a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj +++ b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj index caff460d..e4f63fc7 100644 --- a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj +++ b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj index b46c122a..21f69f7f 100644 --- a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj +++ b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor/Program.cs b/TransactionProcessor/Program.cs index a507823c..da2c7bff 100644 --- a/TransactionProcessor/Program.cs +++ b/TransactionProcessor/Program.cs @@ -46,33 +46,33 @@ public static IHostBuilder CreateHostBuilder(string[] args) webBuilder.UseStartup(); webBuilder.UseConfiguration(config); webBuilder.UseKestrel(); - }) - .ConfigureServices(services => - { - SettlementCreatedForDateEvent s = - new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"), - Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"), - DateTime.Now); + }); + //.ConfigureServices(services => + // { + // SettlementCreatedForDateEvent s = + // new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"), + // Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"), + // DateTime.Now); - TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(),Guid.NewGuid(), - DateTime.Now, "","","","",null); + // TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(),Guid.NewGuid(), + // DateTime.Now, "","","","",null); - ReconciliationHasStartedEvent r = - new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now); + // ReconciliationHasStartedEvent r = + // new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now); - TypeProvider.LoadDomainEventsTypeDynamically(); + // TypeProvider.LoadDomainEventsTypeDynamically(); - services.AddHostedService(provider => - { - IDomainEventHandlerResolver r = - provider.GetRequiredService(); - EventStorePersistentSubscriptionsClient p = provider.GetRequiredService(); - HttpClient h = provider.GetRequiredService(); - SubscriptionWorker worker = new SubscriptionWorker(r, p, h); - worker.TraceGenerated += Worker_TraceGenerated; - return worker; - }); - }); + // services.AddHostedService(provider => + // { + // IDomainEventHandlerResolver r = + // provider.GetRequiredService(); + // EventStorePersistentSubscriptionsClient p = provider.GetRequiredService(); + // HttpClient h = provider.GetRequiredService(); + // SubscriptionWorker worker = new SubscriptionWorker(r, p, h); + // worker.TraceGenerated += Worker_TraceGenerated; + // return worker; + // }); + // }); return hostBuilder; } diff --git a/TransactionProcessor/Startup.cs b/TransactionProcessor/Startup.cs index 75856019..75b91c0c 100644 --- a/TransactionProcessor/Startup.cs +++ b/TransactionProcessor/Startup.cs @@ -12,11 +12,13 @@ namespace TransactionProcessor { + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.IO.Abstractions; using System.Net.Http; using System.Reflection; + using System.Threading; using BusinessLogic.EventHandling; using BusinessLogic.Manager; using BusinessLogic.OperatorInterfaces; @@ -41,7 +43,9 @@ namespace TransactionProcessor using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using NLog.Extensions.Logging; + using Reconciliation.DomainEvents; using SecurityService.Client; + using Settlement.DomainEvents; using SettlementAggregates; using Shared.DomainDrivenDesign.CommandHandling; using Shared.DomainDrivenDesign.EventSourcing; @@ -50,12 +54,14 @@ namespace TransactionProcessor using Shared.EventStore.EventHandling; using Shared.EventStore.EventStore; using Shared.EventStore.Extensions; + using Shared.EventStore.SubscriptionWorker; using Shared.Extensions; using Shared.General; using Shared.Logger; using Shared.Repositories; using Swashbuckle.AspNetCore.Filters; using Swashbuckle.AspNetCore.SwaggerGen; + using Transaction.DomainEvents; using TransactionAggregate; using VoucherManagement.Client; using ILogger = Microsoft.Extensions.Logging.ILogger; @@ -99,6 +105,8 @@ public Startup(IWebHostEnvironment webHostEnvironment) /// public static IWebHostEnvironment WebHostEnvironment { get; set; } + public static IServiceProvider ServiceProvider { get; set; } + // This method gets called by the runtime. Use this method to add services to the container. /// /// Configures the services. @@ -238,6 +246,8 @@ public void ConfigureServices(IServiceCollection services) services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + + Startup.ServiceProvider = services.BuildServiceProvider(); } /// @@ -426,6 +436,105 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerF app.UseSwagger(); app.UseSwaggerUI(); + + app.PreWarm(); + } + + public static void LoadTypes() + { + SettlementCreatedForDateEvent s = + new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"), + Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"), + DateTime.Now); + + TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(), Guid.NewGuid(), + DateTime.Now, "", "", "", "", null); + + ReconciliationHasStartedEvent r = + new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now); + + TypeProvider.LoadDomainEventsTypeDynamically(); + } + } + + public static class Extensions + { + static Action log = (tt, subType, message) => { + String logMessage = $"{subType} - {message}"; + switch (tt) + { + case TraceEventType.Critical: + Logger.LogCritical(new Exception(logMessage)); + break; + case TraceEventType.Error: + Logger.LogError(new Exception(logMessage)); + break; + case TraceEventType.Warning: + Logger.LogWarning(logMessage); + break; + case TraceEventType.Information: + Logger.LogInformation(logMessage); + break; + case TraceEventType.Verbose: + Logger.LogDebug(logMessage); + break; + } + }; + + static Action concurrentLog = (tt, message) => log(tt, "CONCURRENT", message); + + public static void PreWarm(this IApplicationBuilder applicationBuilder) + { + Startup.LoadTypes(); + + //SubscriptionWorker worker = new SubscriptionWorker() + var internalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("InternalSubscriptionService")); + + if (internalSubscriptionService) + { + String eventStoreConnectionString = ConfigurationReader.GetValue("EventStoreSettings", "ConnectionString"); + Int32 inflightMessages = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InflightMessages")); + Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds")); + String filter = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceFilter"); + String ignore = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceIgnore"); + String streamName = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionFilterOnStreamName"); + + ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString); + + ((SubscriptionRepository)subscriptionRepository).Trace += (sender, s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s); + + // init our SubscriptionRepository + subscriptionRepository.PreWarm(CancellationToken.None).Wait(); + + var eventHandlerResolver = Startup.ServiceProvider.GetService(); + + SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateConcurrentSubscriptionWorker(eventStoreConnectionString, eventHandlerResolver, subscriptionRepository, inflightMessages, persistentSubscriptionPollingInSeconds); + + concurrentSubscriptions.Trace += (_, args) => concurrentLog(TraceEventType.Information, args.Message); + concurrentSubscriptions.Warning += (_, args) => concurrentLog(TraceEventType.Warning, args.Message); + concurrentSubscriptions.Error += (_, args) => concurrentLog(TraceEventType.Error, args.Message); + + if (!String.IsNullOrEmpty(ignore)) + { + concurrentSubscriptions = concurrentSubscriptions.IgnoreSubscriptions(ignore); + } + + if (!String.IsNullOrEmpty(filter)) + { + //NOTE: Not overly happy with this design, but + //the idea is if we supply a filter, this overrides ignore + concurrentSubscriptions = concurrentSubscriptions.FilterSubscriptions(filter) + .IgnoreSubscriptions(null); + + } + + if (!String.IsNullOrEmpty(streamName)) + { + concurrentSubscriptions = concurrentSubscriptions.FilterByStreamName(streamName); + } + + concurrentSubscriptions.StartAsync(CancellationToken.None).Wait(); + } } } } diff --git a/TransactionProcessor/TransactionProcessor.csproj b/TransactionProcessor/TransactionProcessor.csproj index b55bce28..9016fd71 100644 --- a/TransactionProcessor/TransactionProcessor.csproj +++ b/TransactionProcessor/TransactionProcessor.csproj @@ -15,8 +15,8 @@ - - + + @@ -28,8 +28,8 @@ - - + + diff --git a/TransactionProcessor/appsettings.json b/TransactionProcessor/appsettings.json index 083414b4..826158a6 100644 --- a/TransactionProcessor/appsettings.json +++ b/TransactionProcessor/appsettings.json @@ -1,7 +1,6 @@ { "AppSettings": { "SubscriptionFilter": "Transaction Processor", - "UseInternalSubscriptionService": "true", "ClientId": "serviceClient", "ClientSecret": "d192cbc46d834d0da90e8a9d50ded543", "EventHandlerConfiguration": { From 0d047589a89a6f02c4bf3741ceff5c552aade4f6 Mon Sep 17 00:00:00 2001 From: Stuart Ferguson Date: Mon, 6 Dec 2021 10:12:33 +0000 Subject: [PATCH 2/2] Use latest shared nugets --- .../TransactionProcessor.BusinessLogic.csproj | 4 +- .../Common/DockerHelper.cs | 70 ++++++++++------ .../Shared/SharedSteps.cs | 27 ++++-- ...ansactionProcessor.IntegrationTests.csproj | 23 +++++- .../continuous/CallbackHandlerEnricher.js | 82 +++++++++++++++++++ .../continuous/EstateAggregator.js | 48 +++++++++++ .../FileProcessorSubscriptionStreamBuilder.js | 68 +++++++++++++++ .../continuous/MerchantAggregator.js | 8 +- .../continuous/MerchantBalanceCalculator.js | 49 ++++++----- .../continuous/TransactionEnricher.js | 69 ++++++++++++++++ ...ctionProcessorSubscriptionStreamBuilder.js | 62 ++++++++++++++ ...ocessor.Reconciliation.DomainEvents.csproj | 2 +- ...onProcessor.ReconciliationAggregate.csproj | 2 +- ...onProcessor.Settlement.DomainEvents.csproj | 2 +- ...ctionProcessor.SettlementAggregates.csproj | 2 +- ...nProcessor.Transaction.DomainEvents.csproj | 2 +- ...ctionProcessor.TransactionAggregate.csproj | 2 +- TransactionProcessor/Startup.cs | 4 +- .../TransactionProcessor.csproj | 4 +- 19 files changed, 459 insertions(+), 71 deletions(-) create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js diff --git a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj index a161a0d6..d47cb7a5 100644 --- a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj +++ b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs index 12643ad7..8758443a 100644 --- a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs +++ b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs @@ -161,6 +161,8 @@ private async Task LoadEventStoreProjections() Logger.LogInformation("Loaded projections"); } + protected String EventStoreConnectionString; + #region Methods /// @@ -188,9 +190,7 @@ public override async Task StartContainersForScenarioRun(String scenarioName) this.TestHostContainerName = $"testhosts{testGuid:N}"; this.VoucherManagementContainerName = $"vouchermanagement{testGuid:N}"; this.MessagingServiceContainerName = $"messaging{testGuid:N}"; - - String eventStoreAddress = $"http://{this.EventStoreContainerName}"; - + (String, String, String) dockerCredentials = ("https://www.docker.com", "stuartferguson", "Sc0tland"); INetworkService testNetwork = DockerHelper.SetupTestNetwork(); @@ -198,11 +198,13 @@ public override async Task StartContainersForScenarioRun(String scenarioName) IContainerService eventStoreContainer = DockerHelper.SetupEventStoreContainer(this.EventStoreContainerName, this.Logger, "eventstore/eventstore:20.10.0-buster-slim", testNetwork, traceFolder); this.EventStoreHttpPort = eventStoreContainer.ToHostExposedEndpoint($"{DockerHelper.EventStoreHttpDockerPort}/tcp").Port; - await Retry.For(async () => - { - await this.PopulateSubscriptionServiceConfiguration().ConfigureAwait(false); - }, retryFor: TimeSpan.FromMinutes(2), retryInterval: TimeSpan.FromSeconds(30)); - + this.EventStoreConnectionString = + $"esdb://admin:changeit@{this.EventStoreContainerName}:{DockerHelper.EventStoreHttpDockerPort}?tls=false"; + + String insecureEventStoreEnvironmentVariable = "EventStoreSettings:Insecure=true"; + String persistentSubscriptionPollingInSeconds = "AppSettings:PersistentSubscriptionPollingInSeconds=10"; + String internalSubscriptionServiceCacheDuration = "AppSettings:InternalSubscriptionServiceCacheDuration=0"; + IContainerService voucherManagementContainer = SetupVoucherManagementContainer(this.VoucherManagementContainerName, this.Logger, "stuartferguson/vouchermanagement", @@ -214,12 +216,16 @@ await Retry.For(async () => dockerCredentials, this.SecurityServiceContainerName, this.EstateManagementContainerName, - eventStoreAddress, + this.EventStoreConnectionString, (Setup.SqlServerContainerName, "sa", "thisisalongpassword123!"), ("serviceClient", "Secret1"), - true); + true, + additionalEnvironmentVariables: new List + { + insecureEventStoreEnvironmentVariable, + }); IContainerService estateManagementContainer = DockerHelper.SetupEstateManagementContainer(this.EstateManagementContainerName, this.Logger, "stuartferguson/estatemanagement", new List @@ -228,12 +234,18 @@ await Retry.For(async () => Setup.DatabaseServerNetwork }, traceFolder, dockerCredentials, this.SecurityServiceContainerName, - eventStoreAddress, + this.EventStoreConnectionString, (Setup.SqlServerContainerName, "sa", "thisisalongpassword123!"), ("serviceClient", "Secret1"), - true); + true, + additionalEnvironmentVariables: new List + { + insecureEventStoreEnvironmentVariable, + persistentSubscriptionPollingInSeconds, + internalSubscriptionServiceCacheDuration + }); IContainerService messagingServiceContainer = DockerHelper.SetupMessagingServiceContainer(this.MessagingServiceContainerName, this.Logger, "stuartferguson/messagingservice", new List @@ -241,7 +253,7 @@ await Retry.For(async () => testNetwork }, traceFolder, dockerCredentials, this.SecurityServiceContainerName, - eventStoreAddress, + this.EventStoreConnectionString, ("serviceClient", "Secret1"), true); @@ -269,10 +281,16 @@ await Retry.For(async () => dockerCredentials, this.SecurityServiceContainerName, this.EstateManagementContainerName, - eventStoreAddress, + this.EventStoreConnectionString, ("serviceClient", "Secret1"), this.TestHostContainerName, - this.VoucherManagementContainerName); + this.VoucherManagementContainerName, + additionalEnvironmentVariables: new List + { + insecureEventStoreEnvironmentVariable, + persistentSubscriptionPollingInSeconds, + internalSubscriptionServiceCacheDuration + }); IContainerService estateReportingContainer = DockerHelper.SetupEstateReportingContainer(this.EstateReportingContainerName, this.Logger, @@ -285,12 +303,18 @@ await Retry.For(async () => traceFolder, dockerCredentials, this.SecurityServiceContainerName, - eventStoreAddress, + this.EventStoreConnectionString, (Setup.SqlServerContainerName, "sa", "thisisalongpassword123!"), ("serviceClient", "Secret1"), - true); + true, + additionalEnvironmentVariables: new List + { + insecureEventStoreEnvironmentVariable, + persistentSubscriptionPollingInSeconds, + internalSubscriptionServiceCacheDuration + }); IContainerService testhostContainer = SetupTestHostContainer(this.TestHostContainerName, this.Logger, @@ -348,17 +372,13 @@ await Retry.For(async () => await this.LoadEventStoreProjections().ConfigureAwait(false); } - protected async Task PopulateSubscriptionServiceConfiguration() + public async Task PopulateSubscriptionServiceConfiguration(String estateName) { EventStorePersistentSubscriptionsClient client = new EventStorePersistentSubscriptionsClient(ConfigureEventStoreSettings(this.EventStoreHttpPort)); - PersistentSubscriptionSettings settings = new PersistentSubscriptionSettings(resolveLinkTos: true); - await client.CreateAsync("$ce-EstateAggregate", "Reporting", settings); - await client.CreateAsync("$ce-MerchantAggregate", "Reporting", settings); - await client.CreateAsync("$ce-ContractAggregate", "Reporting", settings); - await client.CreateAsync("$ce-TransactionAggregate", "Reporting", settings); - await client.CreateAsync("$et-TransactionHasBeenCompletedEvent", "TransactionProcessor", settings); - await client.CreateAsync("$et-MerchantFeeAddedToTransactionEvent", "TransactionProcessor", settings); + PersistentSubscriptionSettings settings = new PersistentSubscriptionSettings(resolveLinkTos: true, StreamPosition.Start); + await client.CreateAsync(estateName.Replace(" ", ""), "Reporting", settings); + await client.CreateAsync($"EstateManagementSubscriptionStream_{estateName.Replace(" ", "")}", "Estate Management", settings); } private static EventStoreClientSettings ConfigureEventStoreSettings(Int32 eventStoreHttpPort) diff --git a/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs b/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs index 0d0d7eb5..79f1762e 100644 --- a/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs +++ b/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs @@ -46,15 +46,24 @@ public SharedSteps(ScenarioContext scenarioContext, [When(@"I create the following estates")] public async Task WhenICreateTheFollowingEstates(Table table) { + foreach (TableRow tableRow in table.Rows) + { + String estateName = SpecflowTableHelper.GetStringRowValue(tableRow, "EstateName"); + // Setup the subscriptions for the estate + await Retry.For(async () => { await this.TestingContext.DockerHelper.PopulateSubscriptionServiceConfiguration(estateName).ConfigureAwait(false); }, + retryFor: TimeSpan.FromMinutes(2), + retryInterval: TimeSpan.FromSeconds(30)); + } + foreach (TableRow tableRow in table.Rows) { String estateName = SpecflowTableHelper.GetStringRowValue(tableRow, "EstateName"); CreateEstateRequest createEstateRequest = new CreateEstateRequest - { - EstateId = Guid.NewGuid(), - EstateName = estateName - }; + { + EstateId = Guid.NewGuid(), + EstateName = estateName + }; CreateEstateResponse response = await this.TestingContext.DockerHelper.EstateClient.CreateEstate(this.TestingContext.AccessToken, createEstateRequest, CancellationToken.None).ConfigureAwait(false); @@ -73,11 +82,11 @@ public async Task WhenICreateTheFollowingEstates(Table table) EstateResponse estate = null; await Retry.For(async () => - { - estate = await this.TestingContext.DockerHelper.EstateClient - .GetEstate(this.TestingContext.AccessToken, estateDetails.EstateId, CancellationToken.None).ConfigureAwait(false); - estate.ShouldNotBeNull(); - }).ConfigureAwait(false); + { + estate = await this.TestingContext.DockerHelper.EstateClient + .GetEstate(this.TestingContext.AccessToken, estateDetails.EstateId, CancellationToken.None).ConfigureAwait(false); + estate.ShouldNotBeNull(); + }, retryFor: TimeSpan.FromSeconds(90)).ConfigureAwait(false); estate.EstateName.ShouldBe(estateDetails.EstateName); } diff --git a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj index bca10764..c2d781ef 100644 --- a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj +++ b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj @@ -14,7 +14,7 @@ - + @@ -57,6 +57,27 @@ Always + + Always + + + Always + + + Always + + + Always + + + Always + + + Always + + + Always + diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js new file mode 100644 index 00000000..9294582e --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js @@ -0,0 +1,82 @@ +var fromStreams = fromStreams || require('../../node_modules/esprojection-testing-framework').scope.fromStreams; +var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; + +fromStreams("$ce-EstateAggregate", "$et-CallbackReceivedEvent") + .when({ + $init: function (s, e) { + return { + estates: [], + debug: [] + } + }, + "EstateCreatedEvent": function (s, e) { + s.estates.push({ + estateId: e.data.estateId, + estateName: e.data.estateName + }); + }, + "EstateReferenceAllocatedEvent": function (s, e) { + var estateIndex = s.estates.findIndex(element => element.estateId === e.data.estateId); + s.estates[estateIndex].reference = e.data.estateReference; + }, + "CallbackReceivedEvent": function (s, e) { + // find the estate from the reference + if (s.debug === undefined) { + s.debug = []; + } + var ref = e.data.reference.split("-"); // Element 0 is estate reference, Element 1 is merchant reference + var estate = s.estates.find(element => element.reference === ref[0]); + if (estate !== undefined && estate !== null) { + var enrichedEvent = createEnrichedEvent(e, estate); + + // Emit the enriched event + emit(getStreamName(estate, e), "CallbackReceivedEnrichedEvent", enrichedEvent); + } + else { + var enrichedEvent = createEnrichedEvent(e); + // Emit the enriched event + emit(getStreamName(estate, e), "CallbackReceivedEnrichedWithNoEstateEvent", enrichedEvent); + } + } + }); + +function createEnrichedEvent(originalEvent, estate) { + var enrichedEvent = {}; + if (estate !== undefined && estate !== null) { + enrichedEvent = { + typeString: originalEvent.data.typeString, + messageFormat: originalEvent.data.messageFormat, + callbackMessage: originalEvent.data.callbackMessage, + estateId: estate.estateId, + reference: originalEvent.data.reference + }; + } + else { + enrichedEvent = { + typeString: originalEvent.data.typeString, + messageFormat: originalEvent.data.messageFormat, + callbackMessage: originalEvent.data.callbackMessage, + reference: originalEvent.data.reference + }; + } + + return enrichedEvent; +} + +function getStreamName(estate, e) { + var streamName = ""; + if (e.data.destination === "EstateManagement") { + streamName += "EstateManagementSubscriptionStream_"; + } + + // Add the estate name + if (estate !== undefined && estate !== null) { + streamName += estate.estateName.replace(/ /g, ""); + } + else { + streamName += "UnknownEstate"; + } + + return streamName; + +} \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js new file mode 100644 index 00000000..ff4df1ea --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js @@ -0,0 +1,48 @@ +var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; +var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; + +isEstateEvent = (e) => { return (e.data && e.data.estateId); } +isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; + +isAMerchantFeeAddedToTransactionEvent = (e) => { return compareEventTypeSafely(e.eventType, 'MerchantFeeAddedToTransactionEvent') }; +isAServiceProviderFeeAddedToTransactionEvent = (e) => { return compareEventTypeSafely(e.eventType, 'ServiceProviderFeeAddedToTransactionEvent') }; + +compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } + +ignoreEvent = (e) => isAServiceProviderFeeAddedToTransactionEvent(e) | isAMerchantFeeAddedToTransactionEvent(e); + +isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined); + +isTruncated = function (metadata) { + if (metadata && metadata['$v']) { + var parts = metadata['$v'].split(":"); + var projectionEpoch = parts[1]; + + return (projectionEpoch < 0); + } + return false; +}; + +fromAll() + .when({ + $init: function (s, e) { + return { estates: {} } + }, + $any: function (s, e) { + if (isTruncated(e)) return; + + if (isEstateEvent(e)) { + if (ignoreEvent(e)) return; + + if (isAnEstateCreatedEvent(e)) { + s.estates[e.data.estateId] = { + filteredName: e.data.estateName.replace(/-/gi, ""), + name: e.data.estateName.replace(/-/gi, "").replace(/ /g, "") + }; + } + + linkTo(s.estates[e.data.estateId].name, e); + } + } + } + ); \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js new file mode 100644 index 00000000..9fe4d87c --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js @@ -0,0 +1,68 @@ +var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; +var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; + +isEstateEvent = (e) => { return (e.data && e.data.estateId); } +isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; +compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } +isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined); + +getSupportedEventTypes = function () { + var eventTypes = []; + + eventTypes.push('ImportLogCreatedEvent'); + eventTypes.push('FileAddedToImportLogEvent'); + eventTypes.push('FileCreatedEvent'); + eventTypes.push('FileLineAddedEvent'); + eventTypes.push('FileLineProcessingSuccessfulEvent'); + eventTypes.push('FileLineProcessingIgnoredEvent'); + eventTypes.push('FileLineProcessingFailedEvent'); + eventTypes.push('FileProcessingCompletedEvent'); + + return eventTypes; +} + +isARequiredEvent = (e) => { + var supportedEvents = getSupportedEventTypes(); + + var index = supportedEvents.indexOf(e.eventType); + + return index !== -1 +}; + +isTruncated = function (metadata) { + if (metadata && metadata['$v']) { + var parts = metadata['$v'].split(":"); + var projectionEpoch = parts[1]; + + return (projectionEpoch < 0); + } + return false; +}; +getStreamName = function (estateName) { + return 'FileProcessorSubscriptionStream_' + estateName; +} + +fromAll() + .when({ + $init: function (s, e) { + return { estates: {} } + }, + $any: function (s, e) { + if (isTruncated(e)) return; + + if (isEstateEvent(e)) { + + if (isAnEstateCreatedEvent(e)) { + s.estates[e.data.estateId] = { + filteredName: e.data.estateName.replace(/-/gi, ""), + name: e.data.estateName.replace(/-/gi, "").replace(" ", "") + }; + } + + if (isARequiredEvent(e) === false) return; + + linkTo(getStreamName(s.estates[e.data.estateId].name), e); + } + } + } +); \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js index b91b12aa..76f420ec 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js @@ -1,5 +1,5 @@ -var fromAll = fromAll || require("../../node_modules/event-store-projection-testing").scope.fromAll; -var linkTo = linkTo || require("../../node_modules/event-store-projection-testing").scope.linkTo; +var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; +var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; isValidEvent = function (e) { @@ -29,10 +29,8 @@ fromAll() if (isValidEvent(e)) { var merchantId = getMerchantId(e); if (merchantId !== null) { - s.merchantId = merchantId; var streamName = "MerchantArchive-" + merchantId.replace(/-/gi, ""); - s.streamName = streamName; - linkTo(streamName, e, e.metadata); + linkTo(streamName, e); } } } diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js index 77c377dd..fc41fb66 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js @@ -1,6 +1,6 @@ -var fromCategory = fromCategory || require('../../node_modules/event-store-projection-testing').scope.fromCategory; -var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy; -var emit = emit || require('../../node_modules/event-store-projection-testing').scope.emit; +var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory; +var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/esprojection-testing-framework').scope.partitionBy; +var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; fromCategory('MerchantArchive') .foreachStream() @@ -17,11 +17,12 @@ fromCategory('MerchantArchive') totalDeposits: 0, totalAuthorisedSales: 0, totalDeclinedSales: 0, - totalFees: 0 + totalFees: 0, + emittedEvents:1 } }, - $any: function (s, e) { - + $any: function (s, e) + { if (e === null || e.data === null || e.data.IsJson === false) return; @@ -42,6 +43,11 @@ var eventbus = { return; } + if (e.eventType === 'AutomaticDepositMadeEvent') { + depositMadeEventHandler(s, e); + return; + } + if (e.eventType === 'TransactionHasStartedEvent') { transactionHasStartedEventHandler(s, e); return; @@ -121,45 +127,47 @@ var incrementAvailableBalanceFromDeclinedTransaction = function (s, amount) { var merchantCreatedEventHandler = function (s, e) { // Setup the state here - s.estateId = e.data.EstateId; - s.merchantId = e.data.MerchantId; - s.merchantName = e.data.MerchantName; + s.estateId = e.data.estateId; + s.merchantId = e.data.merchantId; + s.merchantName = e.data.merchantName; }; var emitBalanceChangedEvent = function (aggregateId, eventId, s, changeAmount, dateTime, reference) { if (s.initialised === true) { + // Emit an opening balance event var openingBalanceEvent = { $type: getEventTypeName(), - "aggregateId": aggregateId, "merchantId": s.merchantId, "estateId": s.estateId, "balance": 0, "changeAmount": 0, "eventId": s.merchantId, "eventCreatedDateTime": dateTime, - "reference": "Opening Balance" + "reference": "Opening Balance", + "aggregateId": s.merchantId } emit(getStreamName(s), getEventType(), openingBalanceEvent); + s.emittedEvents++; s.initialised = false; } var balanceChangedEvent = { $type: getEventTypeName(), - "aggregateId": aggregateId, "merchantId": s.merchantId, "estateId": s.estateId, "balance": s.balance, "changeAmount": changeAmount, "eventId": eventId, "eventCreatedDateTime": dateTime, - "reference": reference + "reference": reference, + "aggregateId": aggregateId } // emit an balance changed event here emit(getStreamName(s), getEventType(), balanceChangedEvent); - + s.emittedEvents++; return s; }; @@ -176,7 +184,8 @@ var depositMadeEventHandler = function (s, e) { incrementBalanceFromDeposit(s, e.data.amount, e.data.depositDateTime); // emit an balance changed event here - s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, e.data.amount, e.data.depositDateTime, "Merchant Deposit"); + console.log(e); + s = emitBalanceChangedEvent(e.data.merchantId, e.eventId, s, e.data.amount, e.data.depositDateTime, "Merchant Deposit"); }; var transactionHasStartedEventHandler = function (s, e) { @@ -214,12 +223,12 @@ var transactionHasCompletedEventHandler = function (s, e) { var transactionDateTime = new Date(Date.parse(e.data.completedDateTime)); var completedTime = new Date(transactionDateTime.getFullYear(), transactionDateTime.getMonth(), transactionDateTime.getDate(), transactionDateTime.getHours(), transactionDateTime.getMinutes(), transactionDateTime.getSeconds() + 2); - if (e.data.IsAuthorised) { + if (e.data.isAuthorised) { decrementBalanceFromAuthorisedTransaction(s, amount, completedTime); // emit an balance changed event here if (amount > 0) { - s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, amount * -1, completedTime, "Transaction Completed"); + s = emitBalanceChangedEvent(e.data.transactionId, e.eventId, s, amount * -1, completedTime, "Transaction Completed"); } } else { @@ -238,8 +247,8 @@ var merchantFeeAddedToTransactionEventHandler = function (s, e) { } // increment the balance now - incrementBalanceFromMerchantFee(s, e.data.calculatedValue, e.data.eventCreatedDateTime); - + incrementBalanceFromMerchantFee(s, e.data.calculatedValue, e.data.feeCalculatedDateTime); + // emit an balance changed event here - s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, e.data.calculatedValue, e.data.eventCreatedDateTime, "Transaction Fee Processed"); + s = emitBalanceChangedEvent(e.data.transactionId, e.eventId, s, e.data.calculatedValue, e.data.feeCalculatedDateTime, "Transaction Fee Processed"); } \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js new file mode 100644 index 00000000..9587ff63 --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js @@ -0,0 +1,69 @@ +var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory; +//var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy; +var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; +var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; + +fromCategory('TransactionAggregate') + .foreachStream() + .when({ + $any: function (s, e) { + + if (e === null || e.data === null || e.data.IsJson === false) + return; + + eventbus.dispatch(s, e); + } + }); + +var eventbus = { + dispatch: function (s, e) { + + if (e.eventType === 'MerchantFeeAddedToTransactionEvent') { + merchantFeeAddedToTransactionEventHandler(s, e); + return; + } + if (e.eventType === 'ServiceProviderFeeAddedToTransactionEvent') { + serviceProviderFeeAddedToTransactionEventHandler(s, e); + return; + } + else { + //Just add the existing event to to our stream + linkTo(getStreamName(s), e); + } + + } +} + +function merchantFeeAddedToTransactionEventHandler(s, e) { + var newEvent = { + calculatedValue: e.data.calculatedValue, + feeCalculatedDateTime: e.data.feeCalculatedDateTime, + estateId: e.data.estateId, + feeId: e.data.feeId, + feeValue: e.data.feeValue, + merchantId: e.data.merchantId, + transactionId: e.data.transactionId, + feeCalculationType: e.data.feeCalculationType, + eventId: e.eventId + } + emit(getStreamName(s), "MerchantFeeAddedToTransactionEnrichedEvent", newEvent, null); +} + +function serviceProviderFeeAddedToTransactionEventHandler(s, e) { + var newEvent = { + calculatedValue: e.data.calculatedValue, + feeCalculatedDateTime: e.data.feeCalculatedDateTime, + estateId: e.data.estateId, + feeId: e.data.feeId, + feeValue: e.data.feeValue, + merchantId: e.data.merchantId, + transactionId: e.data.transactionId, + feeCalculationType: e.data.feeCalculationType, + eventId: e.eventId + } + emit(getStreamName(s), "ServiceProviderFeeAddedToTransactionEnrichedEvent", newEvent, null); +} + +function getStreamName(s) { + return "TransactionEnricherResult"; +} \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js new file mode 100644 index 00000000..9e0f9e9b --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js @@ -0,0 +1,62 @@ +var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; +var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; + +isEstateEvent = (e) => { return (e.data && e.data.estateId); } +isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; +compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } +isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined); + +getSupportedEventTypes = function () { + var eventTypes = []; + + eventTypes.push('CustomerEmailReceiptRequestedEvent'); + eventTypes.push('TransactionHasBeenCompletedEvent'); + + return eventTypes; +} + +isARequiredEvent = (e) => { + var supportedEvents = getSupportedEventTypes(); + + var index = supportedEvents.indexOf(e.eventType); + + return index !== -1; +}; + +isTruncated = function (metadata) { + if (metadata && metadata['$v']) { + var parts = metadata['$v'].split(":"); + var projectionEpoch = parts[1]; + + return (projectionEpoch < 0); + } + return false; +}; +getStreamName = function (estateName) { + return 'TransactionProcessorSubscriptionStream_' + estateName; +} + +fromAll() + .when({ + $init: function (s, e) { + return { estates: {} } + }, + $any: function (s, e) { + if (isTruncated(e)) return; + + if (isEstateEvent(e)) { + + if (isAnEstateCreatedEvent(e)) { + s.estates[e.data.estateId] = { + filteredName: e.data.estateName.replace(/-/gi, ""), + name: e.data.estateName.replace(/-/gi, "").replace(" ", "") + }; + } + + if (isARequiredEvent(e) === false) return; + + linkTo(getStreamName(s.estates[e.data.estateId].name), e); + } + } + } +); \ No newline at end of file diff --git a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj index 8ef26d10..3fb9b89b 100644 --- a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj +++ b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj @@ -5,6 +5,6 @@ - + diff --git a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj index c33f8242..5993e2cd 100644 --- a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj +++ b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj index 2240eb2f..9335a64a 100644 --- a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj +++ b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj index f9503c07..61f5def5 100644 --- a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj +++ b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj index e4f63fc7..0be56590 100644 --- a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj +++ b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj index 21f69f7f..07a7d7cb 100644 --- a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj +++ b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj @@ -5,7 +5,7 @@ - + diff --git a/TransactionProcessor/Startup.cs b/TransactionProcessor/Startup.cs index 75b91c0c..de3db4b7 100644 --- a/TransactionProcessor/Startup.cs +++ b/TransactionProcessor/Startup.cs @@ -279,6 +279,7 @@ private static void ConfigureEventStoreSettings(EventStoreClientSettings setting settings.ConnectionName = Startup.Configuration.GetValue("EventStoreSettings:ConnectionName"); settings.ConnectivitySettings = new EventStoreClientConnectivitySettings { + Insecure = Startup.Configuration.GetValue("EventStoreSettings:Insecure"), Address = new Uri(Startup.Configuration.GetValue("EventStoreSettings:ConnectionString")), }; @@ -498,8 +499,9 @@ public static void PreWarm(this IApplicationBuilder applicationBuilder) String filter = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceFilter"); String ignore = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceIgnore"); String streamName = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionFilterOnStreamName"); + Int32 cacheDuration = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceCacheDuration")); - ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString); + ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration); ((SubscriptionRepository)subscriptionRepository).Trace += (sender, s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s); diff --git a/TransactionProcessor/TransactionProcessor.csproj b/TransactionProcessor/TransactionProcessor.csproj index 9016fd71..f7c83292 100644 --- a/TransactionProcessor/TransactionProcessor.csproj +++ b/TransactionProcessor/TransactionProcessor.csproj @@ -19,7 +19,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -29,7 +29,7 @@ - +