Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private async Task LoadEventStoreProjections()
try
{
Logger.LogInformation($"Creating projection [{projectionName}]");
await projectionClient.CreateContinuousAsync(projectionName, projection).ConfigureAwait(false);
await projectionClient.CreateContinuousAsync(projectionName, projection, trackEmittedStreams:true).ConfigureAwait(false);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
var fromAll = fromAll || require("../../node_modules/event-store-projection-testing").scope.fromAll;
var linkTo = linkTo || require("../../node_modules/event-store-projection-testing").scope.linkTo;

isValidEvent = function (e) {

if (e) {
if (e.data) {
if (e.isJson) {
if (e.eventType !== "$metadata") {
return true;
}
}
}
}

return false;
};

getMerchantId = function (e) {
if (e.data.MerchantId === undefined) {
return null;
}
return e.data.MerchantId;
};

fromAll()
.when({
$any: function (s, e) {
if (isValidEvent(e)) {
var merchantId = getMerchantId(e);
if (merchantId !== null) {
s.merchantId = merchantId;
var streamName = "MerchantArchive-" + merchantId.replace(/-/gi, "");
s.streamName = streamName;
linkTo(streamName, e, e.metadata);
}
}
}
});
Original file line number Diff line number Diff line change
@@ -1,53 +1,18 @@
var fromStreams = fromStreams || require('../../node_modules/event-store-projection-testing').scope.fromStreams;
var fromCategory = fromCategory || require('../../node_modules/event-store-projection-testing').scope.fromCategory;
var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy;
var emit = emit || require('../../node_modules/event-store-projection-testing').scope.emit;

var incrementBalanceFromDeposit = function(s, merchantId, amount, dateTime) {
var merchant = s.merchants[merchantId];
merchant.Balance += amount;
merchant.AvailableBalance += amount;
// protect against events coming in out of order
if (merchant.LastDepositDateTime === null || dateTime > merchant.LastDepositDateTime) {
merchant.LastDepositDateTime = dateTime;
}
}

var addPendingBalanceUpdate = function(s, merchantId, amount, transactionId, dateTime)
{
var merchant = s.merchants[merchantId];
merchant.AvailableBalance -= amount;
merchant.PendingBalanceUpdates[transactionId] = {
Amount: amount,
TransactionId: transactionId
};
// protect against events coming in out of order
if (merchant.LastSaleDateTime === null || dateTime > merchant.LastSaleDateTime)
{
merchant.LastSaleDateTime = dateTime;
}
}
fromCategory('MerchantArchive')
.foreachStream()
.when({
$any: function (s, e) {

var decrementBalanceForSale = function(s, merchantId, transactionId, isAuthorised)
{
var merchant = s.merchants[merchantId];
// lookup the balance update
var balanceUpdate = merchant.PendingBalanceUpdates[transactionId];
if (e === null || e.data === null || e.data.IsJson === false)
return;

if (balanceUpdate !== undefined)
{
if (isAuthorised)
{
merchant.Balance -= balanceUpdate.Amount;
}
else
{
merchant.AvailableBalance += balanceUpdate.Amount;
eventbus.dispatch(s, e);
}

delete merchant.PendingBalanceUpdates[transactionId];
}
}

});

var eventbus = {
dispatch: function (s, e) {
Expand All @@ -71,78 +36,157 @@ var eventbus = {
transactionHasCompletedEventHandler(s, e);
return;
}

if (e.eventType === 'TransactionProcessor.Transaction.DomainEvents.MerchantFeeAddedToTransactionEvent') {
merchantFeeAddedToTransactionEventHandler(s, e);
return;
}
}
}

var merchantCreatedEventHandler = function (s, e)
{
var merchantId = e.data.MerchantId;

if (s.merchants[merchantId] === undefined) {
s.merchants[merchantId] = {
MerchantId: e.data.MerchantId,
MerchantName: e.data.MerchantName,
AvailableBalance: 0,
Balance: 0,
LastDepositDateTime: null,
LastSaleDateTime: null,
PendingBalanceUpdates: []
};
function getStreamName(s) {
return "MerchantBalanceHistory-" + s.merchantId.replace(/-/gi, "");
}

function getEventTypeName() {
return 'EstateReporting.BusinessLogic.Events.' + getEventType() + ', EstateReporting.BusinessLogic';
}

function getEventType() { return "MerchantBalanceChangedEvent"; }

function generateEventId() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g,
function (c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}

var incrementBalanceFromDeposit = function (s, amount, dateTime) {
s.balance += amount;
s.availableBalance += amount;
s.totalDeposits += amount;
// protect against events coming in out of order
if (s.lastDepositDateTime === null || dateTime > s.lastDepositDateTime) {
s.lastDepositDateTime = dateTime;
}
};

var addPendingBalanceUpdate = function (s, amount, transactionId, dateTime) {
s.availableBalance -= amount;
s.pendingBalanceUpdates.push({
amount: amount,
transactionId: transactionId
});
// protect against events coming in out of order
if (s.lastSaleDateTime === null || dateTime > s.lastSaleDateTime) {
s.lastSaleDateTime = dateTime;
}
};

var incrementBalanceFromMerchantFee = function (s, amount, dateTime) {
s.balance += amount;
s.availableBalance += amount;
s.totalFees += amount;

// protect against events coming in out of order
if (s.lastFeeProcessedDateTime === null || dateTime > s.lastFeeProcessedDateTime) {
s.lastFeeProcessedDateTime = dateTime;
}
}

var handlePendingBalanceUpdate = function (s, transactionId, isAuthorised) {
// lookup the balance update
//var balanceUpdate = s.pendingBalanceUpdates[transactionId];
var balanceUpdateIndex = s.pendingBalanceUpdates.findIndex(element => element.transactionId === transactionId);
var balanceUpdate = s.pendingBalanceUpdates[balanceUpdateIndex];
if (balanceUpdate !== undefined) {
if (isAuthorised) {
s.balance -= balanceUpdate.amount;
s.totalSales += balanceUpdate.amount;
}
else {
s.availableBalance += balanceUpdate.amount;
s.totalIncompleteSales += balanceUpdate.amount;
}

s.pendingBalanceUpdates.splice(balanceUpdateIndex);
return balanceUpdate.amount;
}
};

var merchantCreatedEventHandler = function (s, e) {

// Setup the state here
s.estateId = e.data.EstateId;
s.merchantId = e.data.MerchantId;
s.merchantName = e.data.MerchantName;
s.availableBalance = 0;
s.balance = 0;
s.lastDepositDateTime = null;
s.lastSaleDateTime = null;
s.lastFeeProcessedDateTime = null;
s.pendingBalanceUpdates = [];
s.totalDeposits = 0;
s.totalSales = 0;
s.totalIncompleteSales = 0;
s.totalFees = 0;
emitBalanceChangedEvent(s, 0, e.data.EventCreatedDateTime, "Merchant Created");
};

var emitBalanceChangedEvent = function (s, changeAmount, dateTime, reference) {
var balanceChangedEvent = {
$type: getEventTypeName(),
"merchantId": s.merchantId,
"estateId": s.estateId,
"availableBalance": s.availableBalance,
"balance": s.balance,
"changeAmount": changeAmount,
"eventId": generateEventId(),
"eventTimestamp": dateTime,
"reference": reference
}

// emit an balance changed event here
emit(getStreamName(s), getEventType(), balanceChangedEvent);
};

var depositMadeEventHandler = function (s, e) {
var merchantId = e.data.MerchantId;
var merchant = s.merchants[merchantId];
incrementBalanceFromDeposit(s, e.data.Amount, e.data.DepositDateTime);

incrementBalanceFromDeposit(s, merchantId,e.data.Amount, e.data.DepositDateTime);
}
// emit an balance changed event here
emitBalanceChangedEvent(s, e.data.Amount, e.data.DepositDateTime, "Merchant Deposit");
};

var transactionHasStartedEventHandler = function(s, e)
{
// Add this to a pending balance update list
var merchantId = e.data.MerchantId;
var merchant = s.merchants[merchantId];
var transactionHasStartedEventHandler = function (s, e) {

var amount = e.data.TransactionAmount;
if (amount === undefined)
{
if (amount === undefined) {
amount = 0;
}

addPendingBalanceUpdate(s, merchantId,amount, e.data.TransactionId, e.data.TransactionDateTime);
}

var transactionHasCompletedEventHandler = function(s, e)
{
// Add this to a pending balance update list
var merchantId = e.data.MerchantId;
var merchant = s.merchants[merchantId];
addPendingBalanceUpdate(s, amount, e.data.TransactionId, e.data.TransactionDateTime);

decrementBalanceForSale(s, merchantId, e.data.TransactionId, e.data.IsAuthorised);
}
// emit an balance changed event here
emitBalanceChangedEvent(s, 0, e.data.TransactionDateTime, "Transaction Started");
};

fromStreams('$et-EstateManagement.Merchant.DomainEvents.MerchantCreatedEvent',
'$et-EstateManagement.Merchant.DomainEvents.ManualDepositMadeEvent',
'$et-TransactionProcessor.Transaction.DomainEvents.TransactionHasStartedEvent',
'$et-TransactionProcessor.Transaction.DomainEvents.TransactionHasBeenCompletedEvent')
.partitionBy(function(e)
{
return "MerchantBalanceHistory-" + e.data.MerchantId.replace(/-/gi, "");
})
.when({
$init: function (s, e) {
return {
merchants: {},
debug: []
};
},
var transactionHasCompletedEventHandler = function (s, e) {
// Handle the pending balance recorda
var balanceUpdateValue = handlePendingBalanceUpdate(s, e.data.TransactionId, e.data.IsAuthorised);

// emit an balance changed event here
emitBalanceChangedEvent(s, balanceUpdateValue, e.data.EventCreatedDateTime, "Transaction Completed");
};

var merchantFeeAddedToTransactionEventHandler = function (s, e) {
// increment the balance now
incrementBalanceFromMerchantFee(s, e.data.CalculatedValue, e.data.EventCreatedDateTime);

// emit an balance changed event here
emitBalanceChangedEvent(s, e.data.CalculatedValue, e.data.EventCreatedDateTime, "Transaction Fee Processed");
}

$any: function (s, e) {

if (e === null || e.data === null || e.data.IsJson === false)
return;

eventbus.dispatch(s, e);
}
});