From 0a7e4ff26ea43055351a9917bc045bfc75e9186f Mon Sep 17 00:00:00 2001 From: Stuart Ferguson Date: Mon, 16 Nov 2020 17:57:22 +0000 Subject: [PATCH] Updated projections --- .../Common/DockerHelper.cs | 2 +- .../continuous/MerchantAggregator.js | 39 +++ .../continuous/MerchantBalanceCalculator.js | 242 +++++++++++------- 3 files changed, 183 insertions(+), 100 deletions(-) create mode 100644 TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js diff --git a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs index 6f250593..212ca09a 100644 --- a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs +++ b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs @@ -190,7 +190,7 @@ private async Task LoadEventStoreProjections() try { Logger.LogInformation($"Creating projection [{projectionName}]"); - await projectionClient.CreateContinuousAsync(projectionName, projection).ConfigureAwait(false); + await projectionClient.CreateContinuousAsync(projectionName, projection, trackEmittedStreams:true).ConfigureAwait(false); } catch (Exception e) { diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js new file mode 100644 index 00000000..acdad8c2 --- /dev/null +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js @@ -0,0 +1,39 @@ +var fromAll = fromAll || require("../../node_modules/event-store-projection-testing").scope.fromAll; +var linkTo = linkTo || require("../../node_modules/event-store-projection-testing").scope.linkTo; + +isValidEvent = function (e) { + + if (e) { + if (e.data) { + if (e.isJson) { + if (e.eventType !== "$metadata") { + return true; + } + } + } + } + + return false; +}; + +getMerchantId = function (e) { + if (e.data.MerchantId === undefined) { + return null; + } + return e.data.MerchantId; +}; + +fromAll() + .when({ + $any: function (s, e) { + if (isValidEvent(e)) { + var merchantId = getMerchantId(e); + if (merchantId !== null) { + s.merchantId = merchantId; + var streamName = "MerchantArchive-" + merchantId.replace(/-/gi, ""); + s.streamName = streamName; + linkTo(streamName, e, e.metadata); + } + } + } + }); \ No newline at end of file diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js index 138998a4..de2f5951 100644 --- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js +++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js @@ -1,53 +1,18 @@ -var fromStreams = fromStreams || require('../../node_modules/event-store-projection-testing').scope.fromStreams; +var fromCategory = fromCategory || require('../../node_modules/event-store-projection-testing').scope.fromCategory; var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy; var emit = emit || require('../../node_modules/event-store-projection-testing').scope.emit; -var incrementBalanceFromDeposit = function(s, merchantId, amount, dateTime) { - var merchant = s.merchants[merchantId]; - merchant.Balance += amount; - merchant.AvailableBalance += amount; - // protect against events coming in out of order - if (merchant.LastDepositDateTime === null || dateTime > merchant.LastDepositDateTime) { - merchant.LastDepositDateTime = dateTime; - } - } - -var addPendingBalanceUpdate = function(s, merchantId, amount, transactionId, dateTime) -{ - var merchant = s.merchants[merchantId]; - merchant.AvailableBalance -= amount; - merchant.PendingBalanceUpdates[transactionId] = { - Amount: amount, - TransactionId: transactionId - }; - // protect against events coming in out of order - if (merchant.LastSaleDateTime === null || dateTime > merchant.LastSaleDateTime) - { - merchant.LastSaleDateTime = dateTime; - } -} +fromCategory('MerchantArchive') + .foreachStream() + .when({ + $any: function (s, e) { -var decrementBalanceForSale = function(s, merchantId, transactionId, isAuthorised) -{ - var merchant = s.merchants[merchantId]; - // lookup the balance update - var balanceUpdate = merchant.PendingBalanceUpdates[transactionId]; + if (e === null || e.data === null || e.data.IsJson === false) + return; - if (balanceUpdate !== undefined) - { - if (isAuthorised) - { - merchant.Balance -= balanceUpdate.Amount; - } - else - { - merchant.AvailableBalance += balanceUpdate.Amount; + eventbus.dispatch(s, e); } - - delete merchant.PendingBalanceUpdates[transactionId]; - } -} - + }); var eventbus = { dispatch: function (s, e) { @@ -71,78 +36,157 @@ var eventbus = { transactionHasCompletedEventHandler(s, e); return; } + + if (e.eventType === 'TransactionProcessor.Transaction.DomainEvents.MerchantFeeAddedToTransactionEvent') { + merchantFeeAddedToTransactionEventHandler(s, e); + return; + } } } -var merchantCreatedEventHandler = function (s, e) -{ - var merchantId = e.data.MerchantId; - - if (s.merchants[merchantId] === undefined) { - s.merchants[merchantId] = { - MerchantId: e.data.MerchantId, - MerchantName: e.data.MerchantName, - AvailableBalance: 0, - Balance: 0, - LastDepositDateTime: null, - LastSaleDateTime: null, - PendingBalanceUpdates: [] - }; +function getStreamName(s) { + return "MerchantBalanceHistory-" + s.merchantId.replace(/-/gi, ""); +} + +function getEventTypeName() { + return 'EstateReporting.BusinessLogic.Events.' + getEventType() + ', EstateReporting.BusinessLogic'; +} + +function getEventType() { return "MerchantBalanceChangedEvent"; } + +function generateEventId() { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, + function (c) { + var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); + }); +} + +var incrementBalanceFromDeposit = function (s, amount, dateTime) { + s.balance += amount; + s.availableBalance += amount; + s.totalDeposits += amount; + // protect against events coming in out of order + if (s.lastDepositDateTime === null || dateTime > s.lastDepositDateTime) { + s.lastDepositDateTime = dateTime; + } +}; + +var addPendingBalanceUpdate = function (s, amount, transactionId, dateTime) { + s.availableBalance -= amount; + s.pendingBalanceUpdates.push({ + amount: amount, + transactionId: transactionId + }); + // protect against events coming in out of order + if (s.lastSaleDateTime === null || dateTime > s.lastSaleDateTime) { + s.lastSaleDateTime = dateTime; + } +}; + +var incrementBalanceFromMerchantFee = function (s, amount, dateTime) { + s.balance += amount; + s.availableBalance += amount; + s.totalFees += amount; + + // protect against events coming in out of order + if (s.lastFeeProcessedDateTime === null || dateTime > s.lastFeeProcessedDateTime) { + s.lastFeeProcessedDateTime = dateTime; } } +var handlePendingBalanceUpdate = function (s, transactionId, isAuthorised) { + // lookup the balance update + //var balanceUpdate = s.pendingBalanceUpdates[transactionId]; + var balanceUpdateIndex = s.pendingBalanceUpdates.findIndex(element => element.transactionId === transactionId); + var balanceUpdate = s.pendingBalanceUpdates[balanceUpdateIndex]; + if (balanceUpdate !== undefined) { + if (isAuthorised) { + s.balance -= balanceUpdate.amount; + s.totalSales += balanceUpdate.amount; + } + else { + s.availableBalance += balanceUpdate.amount; + s.totalIncompleteSales += balanceUpdate.amount; + } + + s.pendingBalanceUpdates.splice(balanceUpdateIndex); + return balanceUpdate.amount; + } +}; + +var merchantCreatedEventHandler = function (s, e) { + + // Setup the state here + s.estateId = e.data.EstateId; + s.merchantId = e.data.MerchantId; + s.merchantName = e.data.MerchantName; + s.availableBalance = 0; + s.balance = 0; + s.lastDepositDateTime = null; + s.lastSaleDateTime = null; + s.lastFeeProcessedDateTime = null; + s.pendingBalanceUpdates = []; + s.totalDeposits = 0; + s.totalSales = 0; + s.totalIncompleteSales = 0; + s.totalFees = 0; + emitBalanceChangedEvent(s, 0, e.data.EventCreatedDateTime, "Merchant Created"); +}; + +var emitBalanceChangedEvent = function (s, changeAmount, dateTime, reference) { + var balanceChangedEvent = { + $type: getEventTypeName(), + "merchantId": s.merchantId, + "estateId": s.estateId, + "availableBalance": s.availableBalance, + "balance": s.balance, + "changeAmount": changeAmount, + "eventId": generateEventId(), + "eventTimestamp": dateTime, + "reference": reference + } + + // emit an balance changed event here + emit(getStreamName(s), getEventType(), balanceChangedEvent); +}; + var depositMadeEventHandler = function (s, e) { - var merchantId = e.data.MerchantId; - var merchant = s.merchants[merchantId]; + incrementBalanceFromDeposit(s, e.data.Amount, e.data.DepositDateTime); - incrementBalanceFromDeposit(s, merchantId,e.data.Amount, e.data.DepositDateTime); -} + // emit an balance changed event here + emitBalanceChangedEvent(s, e.data.Amount, e.data.DepositDateTime, "Merchant Deposit"); +}; -var transactionHasStartedEventHandler = function(s, e) -{ - // Add this to a pending balance update list - var merchantId = e.data.MerchantId; - var merchant = s.merchants[merchantId]; +var transactionHasStartedEventHandler = function (s, e) { var amount = e.data.TransactionAmount; - if (amount === undefined) - { + if (amount === undefined) { amount = 0; } - addPendingBalanceUpdate(s, merchantId,amount, e.data.TransactionId, e.data.TransactionDateTime); -} - -var transactionHasCompletedEventHandler = function(s, e) -{ // Add this to a pending balance update list - var merchantId = e.data.MerchantId; - var merchant = s.merchants[merchantId]; + addPendingBalanceUpdate(s, amount, e.data.TransactionId, e.data.TransactionDateTime); - decrementBalanceForSale(s, merchantId, e.data.TransactionId, e.data.IsAuthorised); -} + // emit an balance changed event here + emitBalanceChangedEvent(s, 0, e.data.TransactionDateTime, "Transaction Started"); +}; -fromStreams('$et-EstateManagement.Merchant.DomainEvents.MerchantCreatedEvent', - '$et-EstateManagement.Merchant.DomainEvents.ManualDepositMadeEvent', - '$et-TransactionProcessor.Transaction.DomainEvents.TransactionHasStartedEvent', - '$et-TransactionProcessor.Transaction.DomainEvents.TransactionHasBeenCompletedEvent') - .partitionBy(function(e) - { - return "MerchantBalanceHistory-" + e.data.MerchantId.replace(/-/gi, ""); - }) - .when({ - $init: function (s, e) { - return { - merchants: {}, - debug: [] - }; - }, +var transactionHasCompletedEventHandler = function (s, e) { + // Handle the pending balance recorda + var balanceUpdateValue = handlePendingBalanceUpdate(s, e.data.TransactionId, e.data.IsAuthorised); + + // emit an balance changed event here + emitBalanceChangedEvent(s, balanceUpdateValue, e.data.EventCreatedDateTime, "Transaction Completed"); +}; + +var merchantFeeAddedToTransactionEventHandler = function (s, e) { + // increment the balance now + incrementBalanceFromMerchantFee(s, e.data.CalculatedValue, e.data.EventCreatedDateTime); + + // emit an balance changed event here + emitBalanceChangedEvent(s, e.data.CalculatedValue, e.data.EventCreatedDateTime, "Transaction Fee Processed"); +} - $any: function (s, e) { - if (e === null || e.data === null || e.data.IsJson === false) - return; - eventbus.dispatch(s, e); - } - }); \ No newline at end of file