From 46df43e491f56427d4396823c3eb0e9d48cf4a26 Mon Sep 17 00:00:00 2001 From: Stuart Ferguson Date: Fri, 21 Jan 2022 19:23:54 +0000 Subject: [PATCH] :| --- .../Common/DockerHelper.cs | 90 ++----------------- .../continuous/CallbackHandlerEnricher.js | 8 +- .../continuous/EstateAggregator.js | 9 +- ...tateManagementSubscriptionStreamBuilder.js | 63 +++++++++++++ .../FileProcessorSubscriptionStreamBuilder.js | 9 +- .../continuous/MerchantAggregator.js | 7 +- .../continuous/MerchantBalanceCalculator.js | 17 ++-- .../continuous/TransactionEnricher.js | 11 +-- ...ctionProcessorSubscriptionStreamBuilder.js | 10 +-- 9 files changed, 98 insertions(+), 126 deletions(-) create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/EstateManagementSubscriptionStreamBuilder.js diff --git a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs index 01df22fa..77fe32a9 100644 --- a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs +++ b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs @@ -102,23 +102,11 @@ public DockerHelper(NlogLogger logger, public async Task PopulateSubscriptionServiceConfiguration(String estateName) { - EventStorePersistentSubscriptionsClient client = - new EventStorePersistentSubscriptionsClient(DockerHelper.ConfigureEventStoreSettings(this.EventStoreHttpPort)); - - PersistentSubscriptionSettings settings = new PersistentSubscriptionSettings(resolveLinkTos:true, StreamPosition.Start); - await client.CreateAsync(estateName.Replace(" ", ""), "Reporting", settings); - await client.CreateAsync($"EstateManagementSubscriptionStream_{estateName.Replace(" ", "")}", "Estate Management", settings); - await client.CreateAsync($"TransactionProcessorSubscriptionStream_{ReplaceFirst(estateName," ", "")}", "Transaction Processor", settings); - } - - public string ReplaceFirst(string text, string search, string replace) - { - int pos = text.IndexOf(search); - if (pos < 0) - { - return text; - } - return text.Substring(0, pos) + replace + text.Substring(pos + search.Length); + List<(String streamName, String groupName)> subscriptions = new List<(String streamName, String groupName)>(); + subscriptions.Add((estateName.Replace(" ", ""), "Reporting")); + subscriptions.Add(($"EstateManagementSubscriptionStream_{estateName.Replace(" ", "")}", "Estate Management")); + subscriptions.Add(($"TransactionProcessorSubscriptionStream_{estateName.Replace(" ", "")}", "Transaction Processor")); + await this.PopulateSubscriptionServiceConfiguration(this.EventStoreHttpPort, subscriptions); } /// @@ -293,73 +281,7 @@ public override async Task StopContainersForScenarioRun() } } } - - private static EventStoreClientSettings ConfigureEventStoreSettings(Int32 eventStoreHttpPort) - { - String connectionString = $"http://127.0.0.1:{eventStoreHttpPort}"; - - EventStoreClientSettings settings = new EventStoreClientSettings(); - settings.CreateHttpMessageHandler = () => new SocketsHttpHandler - { - SslOptions = - { - RemoteCertificateValidationCallback = (sender, - certificate, - chain, - errors) => true, - } - }; - settings.ConnectionName = "Specflow"; - settings.ConnectivitySettings = new EventStoreClientConnectivitySettings - { - Insecure = true, - Address = new Uri(connectionString), - }; - - settings.DefaultCredentials = new UserCredentials("admin", "changeit"); - return settings; - } - - private async Task LoadEventStoreProjections() - { - //Start our Continous Projections - we might decide to do this at a different stage, but now lets try here - String projectionsFolder = "../../../projections/continuous"; - IPAddress[] ipAddresses = Dns.GetHostAddresses("127.0.0.1"); - - if (!string.IsNullOrWhiteSpace(projectionsFolder)) - { - DirectoryInfo di = new DirectoryInfo(projectionsFolder); - - if (di.Exists) - { - FileInfo[] files = di.GetFiles(); - - EventStoreProjectionManagementClient projectionClient = - new EventStoreProjectionManagementClient(DockerHelper.ConfigureEventStoreSettings(this.EventStoreHttpPort)); - - foreach (FileInfo file in files) - { - String projection = File.ReadAllText(file.FullName); - String projectionName = file.Name.Replace(".js", string.Empty); - - try - { - this.Logger.LogInformation($"Creating projection [{projectionName}]"); - await projectionClient.CreateContinuousAsync(projectionName, projection, trackEmittedStreams:true).ConfigureAwait(false); - var status = await projectionClient.GetStatusAsync(projectionName); - - } - catch(Exception e) - { - this.Logger.LogError(new Exception($"Projection [{projectionName}] error", e)); - } - } - } - } - - this.Logger.LogInformation("Loaded projections"); - } - + private async Task RemoveEstateReadModel() { List estateIdList = this.TestingContext.GetAllEstateIds(); diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js index 39e6807f..cb2b1faa 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js @@ -1,9 +1,7 @@ -//var fromStreams = fromStreams || require('../../node_modules/esprojection-testing-framework').scope.fromStreams; -//var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; - fromStreams("$ce-EstateAggregate", "$et-CallbackReceivedEvent") .when({ - $init: function (s, e) { + $init: function (s, e) + { return { estates: [], debug: [] @@ -79,4 +77,4 @@ function getStreamName(estate, e) { return streamName; -} \ No newline at end of file +} diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js index 773d7869..e70c9eb6 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js @@ -1,6 +1,3 @@ -//var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; -//var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; - isEstateEvent = (e) => { return (e.data && e.data.estateId); } isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; @@ -23,6 +20,8 @@ isTruncated = function (metadata) { return false; }; +getStringWithNoSpaces = function(inputString) { return inputString.replace(/-/gi, "").replace(/ /g, ""); } + fromAll() .when({ $init: function (s, e) { @@ -36,7 +35,7 @@ fromAll() if (isAnEstateCreatedEvent(e)) { s.estates[e.data.estateId] = { - name: e.data.estateName.replace(/-/gi, "").replace(/ /g, "") + name: getStringWithNoSpaces(e.data.estateName) }; } @@ -44,4 +43,4 @@ fromAll() } } } - ); \ No newline at end of file + ); diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/EstateManagementSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/EstateManagementSubscriptionStreamBuilder.js new file mode 100644 index 00000000..130bdd90 --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/EstateManagementSubscriptionStreamBuilder.js @@ -0,0 +1,63 @@ +isEstateEvent = (e) => { return (e.data && e.data.estateId); } +isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; +compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } +isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined); + +getSupportedEventTypes = function () { + var eventTypes = []; + + eventTypes.push('TransactionHasBeenCompletedEvent'); + eventTypes.push('MerchantFeeSettledEvent'); + eventTypes.push('StatementGeneratedEvent'); + + return eventTypes; +} + +isARequiredEvent = (e) => { + var supportedEvents = getSupportedEventTypes(); + + var index = supportedEvents.indexOf(e.eventType); + + return index !== -1; +}; + +isTruncated = function (metadata) { + if (metadata && metadata['$v']) { + var parts = metadata['$v'].split(":"); + var projectionEpoch = parts[1]; + + return (projectionEpoch < 0); + } + return false; +}; + +getStreamName = function (estateName) { + return 'EstateManagementSubscriptionStream_' + estateName; +} + +getStringWithNoSpaces = function (inputString) { return inputString.replace(/-/gi, "").replace(/ /g, ""); } + +fromAll() + .when({ + $init: function (s, e) { + return { estates: {} } + }, + $any: function (s, e) { + if (isTruncated(e)) return; + + if (isEstateEvent(e)) { + + if (isAnEstateCreatedEvent(e)) { + s.estates[e.data.estateId] = { + filteredName: e.data.estateName.replace(/-/gi, ""), + name: getStringWithNoSpaces(e.data.estateName) + }; + } + + if (isARequiredEvent(e) === false) return; + + linkTo(getStreamName(s.estates[e.data.estateId].name), e); + } + } + } + ); diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js index 94ab83d1..bf74789b 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js @@ -1,6 +1,3 @@ -//var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; -//var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; - isEstateEvent = (e) => { return (e.data && e.data.estateId); } isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } @@ -42,6 +39,8 @@ getStreamName = function (estateName) { return 'FileProcessorSubscriptionStream_' + estateName; } +getStringWithNoSpaces = function (inputString) { return inputString.replace(/-/gi, "").replace(/ /g, ""); } + fromAll() .when({ $init: function (s, e) { @@ -55,7 +54,7 @@ fromAll() if (isAnEstateCreatedEvent(e)) { s.estates[e.data.estateId] = { filteredName: e.data.estateName.replace(/-/gi, ""), - name: e.data.estateName.replace(/-/gi, "").replace(" ", "") + name: getStringWithNoSpaces(e.data.estateName) }; } @@ -65,4 +64,4 @@ fromAll() } } } - ); \ No newline at end of file +); diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js index f8d67be8..f379c84b 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js @@ -1,6 +1,3 @@ -//var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; -//var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; - isValidEvent = function (e) { if (e) { @@ -29,11 +26,9 @@ fromAll() if (isValidEvent(e)) { var merchantId = getMerchantId(e); if (merchantId !== null) { - s.merchantId = merchantId; var streamName = "MerchantArchive-" + merchantId.replace(/-/gi, ""); - s.streamName = streamName; linkTo(streamName, e); } } } - }); \ No newline at end of file + }); diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js index 6d1d31d8..092978c3 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js @@ -1,7 +1,3 @@ -//var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory; -//var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/esprojection-testing-framework').scope.partitionBy; -//var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; - fromCategory('MerchantArchive') .foreachStream() .when({ @@ -18,10 +14,11 @@ fromCategory('MerchantArchive') totalAuthorisedSales: 0, totalDeclinedSales: 0, totalFees: 0, - emittedEvents: 1 + emittedEvents:1 } }, - $any: function (s, e) { + $any: function (s, e) + { if (e === null || e.data === null || e.data.IsJson === false) return; @@ -42,6 +39,11 @@ var eventbus = { return; } + if (e.eventType === 'AutomaticDepositMadeEvent') { + depositMadeEventHandler(s, e); + return; + } + if (e.eventType === 'TransactionHasStartedEvent') { transactionHasStartedEventHandler(s, e); return; @@ -178,6 +180,7 @@ var depositMadeEventHandler = function (s, e) { incrementBalanceFromDeposit(s, e.data.amount, e.data.depositDateTime); // emit an balance changed event here + console.log(e); s = emitBalanceChangedEvent(e.data.merchantId, e.eventId, s, e.data.amount, e.data.depositDateTime, "Merchant Deposit"); }; @@ -244,4 +247,4 @@ var merchantFeeAddedToTransactionEventHandler = function (s, e) { // emit an balance changed event here s = emitBalanceChangedEvent(e.data.transactionId, e.eventId, s, e.data.calculatedValue, e.data.feeCalculatedDateTime, "Transaction Fee Processed"); -} \ No newline at end of file +} diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js index 91d6a52d..bfb97273 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js @@ -1,8 +1,3 @@ -//var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory; -//var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy; -//var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit; -//var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; - fromCategory('TransactionAggregate') .foreachStream() .when({ @@ -46,7 +41,7 @@ function merchantFeeAddedToTransactionEventHandler(s, e) { feeCalculationType: e.data.feeCalculationType, eventId: e.eventId } - emit(getStreamName(s), "MerchantFeeAddedToTransactionEnrichedEvent", newEvent, {}); + emit(getStreamName(s), "MerchantFeeAddedToTransactionEnrichedEvent", newEvent, null); } function serviceProviderFeeAddedToTransactionEventHandler(s, e) { @@ -61,9 +56,9 @@ function serviceProviderFeeAddedToTransactionEventHandler(s, e) { feeCalculationType: e.data.feeCalculationType, eventId: e.eventId } - emit(getStreamName(s), "ServiceProviderFeeAddedToTransactionEnrichedEvent", newEvent, {}); + emit(getStreamName(s), "ServiceProviderFeeAddedToTransactionEnrichedEvent", newEvent, null); } function getStreamName(s) { return "TransactionEnricherResult"; -} \ No newline at end of file +} diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js index 099a6c9c..ebdb00e0 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js @@ -1,6 +1,3 @@ -//var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll; -//var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo; - isEstateEvent = (e) => { return (e.data && e.data.estateId); } isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') }; compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); } @@ -11,7 +8,6 @@ getSupportedEventTypes = function () { eventTypes.push('CustomerEmailReceiptRequestedEvent'); eventTypes.push('TransactionHasBeenCompletedEvent'); - eventTypes.push('MerchantFeeAddedToTransactionEvent'); return eventTypes; } @@ -37,6 +33,8 @@ getStreamName = function (estateName) { return 'TransactionProcessorSubscriptionStream_' + estateName; } +getStringWithNoSpaces = function (inputString) { return inputString.replace(/-/gi, "").replace(/ /g, ""); } + fromAll() .when({ $init: function (s, e) { @@ -50,7 +48,7 @@ fromAll() if (isAnEstateCreatedEvent(e)) { s.estates[e.data.estateId] = { filteredName: e.data.estateName.replace(/-/gi, ""), - name: e.data.estateName.replace(/-/gi, "").replace(" ", "") + name: getStringWithNoSpaces(e.data.estateName) }; } @@ -60,4 +58,4 @@ fromAll() } } } - ); \ No newline at end of file +);