diff --git a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj
index ca8ccc0a..d47cb7a5 100644
--- a/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj
+++ b/TransactionProcessor.BusinessLogic/TransactionProcessor.BusinessLogic.csproj
@@ -8,8 +8,8 @@
-
-
+
+
diff --git a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs
index 12643ad7..8758443a 100644
--- a/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs
+++ b/TransactionProcessor.IntegrationTests/Common/DockerHelper.cs
@@ -161,6 +161,8 @@ private async Task LoadEventStoreProjections()
Logger.LogInformation("Loaded projections");
}
+ protected String EventStoreConnectionString;
+
#region Methods
///
@@ -188,9 +190,7 @@ public override async Task StartContainersForScenarioRun(String scenarioName)
this.TestHostContainerName = $"testhosts{testGuid:N}";
this.VoucherManagementContainerName = $"vouchermanagement{testGuid:N}";
this.MessagingServiceContainerName = $"messaging{testGuid:N}";
-
- String eventStoreAddress = $"http://{this.EventStoreContainerName}";
-
+
(String, String, String) dockerCredentials = ("https://www.docker.com", "stuartferguson", "Sc0tland");
INetworkService testNetwork = DockerHelper.SetupTestNetwork();
@@ -198,11 +198,13 @@ public override async Task StartContainersForScenarioRun(String scenarioName)
IContainerService eventStoreContainer = DockerHelper.SetupEventStoreContainer(this.EventStoreContainerName, this.Logger, "eventstore/eventstore:20.10.0-buster-slim", testNetwork, traceFolder);
this.EventStoreHttpPort = eventStoreContainer.ToHostExposedEndpoint($"{DockerHelper.EventStoreHttpDockerPort}/tcp").Port;
- await Retry.For(async () =>
- {
- await this.PopulateSubscriptionServiceConfiguration().ConfigureAwait(false);
- }, retryFor: TimeSpan.FromMinutes(2), retryInterval: TimeSpan.FromSeconds(30));
-
+ this.EventStoreConnectionString =
+ $"esdb://admin:changeit@{this.EventStoreContainerName}:{DockerHelper.EventStoreHttpDockerPort}?tls=false";
+
+ String insecureEventStoreEnvironmentVariable = "EventStoreSettings:Insecure=true";
+ String persistentSubscriptionPollingInSeconds = "AppSettings:PersistentSubscriptionPollingInSeconds=10";
+ String internalSubscriptionServiceCacheDuration = "AppSettings:InternalSubscriptionServiceCacheDuration=0";
+
IContainerService voucherManagementContainer = SetupVoucherManagementContainer(this.VoucherManagementContainerName,
this.Logger,
"stuartferguson/vouchermanagement",
@@ -214,12 +216,16 @@ await Retry.For(async () =>
dockerCredentials,
this.SecurityServiceContainerName,
this.EstateManagementContainerName,
- eventStoreAddress,
+ this.EventStoreConnectionString,
(Setup.SqlServerContainerName,
"sa",
"thisisalongpassword123!"),
("serviceClient", "Secret1"),
- true);
+ true,
+ additionalEnvironmentVariables: new List
+ {
+ insecureEventStoreEnvironmentVariable,
+ });
IContainerService estateManagementContainer = DockerHelper.SetupEstateManagementContainer(this.EstateManagementContainerName, this.Logger,
"stuartferguson/estatemanagement", new List
@@ -228,12 +234,18 @@ await Retry.For(async () =>
Setup.DatabaseServerNetwork
}, traceFolder, dockerCredentials,
this.SecurityServiceContainerName,
- eventStoreAddress,
+ this.EventStoreConnectionString,
(Setup.SqlServerContainerName,
"sa",
"thisisalongpassword123!"),
("serviceClient", "Secret1"),
- true);
+ true,
+ additionalEnvironmentVariables: new List
+ {
+ insecureEventStoreEnvironmentVariable,
+ persistentSubscriptionPollingInSeconds,
+ internalSubscriptionServiceCacheDuration
+ });
IContainerService messagingServiceContainer = DockerHelper.SetupMessagingServiceContainer(this.MessagingServiceContainerName, this.Logger,
"stuartferguson/messagingservice", new List
@@ -241,7 +253,7 @@ await Retry.For(async () =>
testNetwork
}, traceFolder, dockerCredentials,
this.SecurityServiceContainerName,
- eventStoreAddress,
+ this.EventStoreConnectionString,
("serviceClient", "Secret1"),
true);
@@ -269,10 +281,16 @@ await Retry.For(async () =>
dockerCredentials,
this.SecurityServiceContainerName,
this.EstateManagementContainerName,
- eventStoreAddress,
+ this.EventStoreConnectionString,
("serviceClient", "Secret1"),
this.TestHostContainerName,
- this.VoucherManagementContainerName);
+ this.VoucherManagementContainerName,
+ additionalEnvironmentVariables: new List
+ {
+ insecureEventStoreEnvironmentVariable,
+ persistentSubscriptionPollingInSeconds,
+ internalSubscriptionServiceCacheDuration
+ });
IContainerService estateReportingContainer = DockerHelper.SetupEstateReportingContainer(this.EstateReportingContainerName,
this.Logger,
@@ -285,12 +303,18 @@ await Retry.For(async () =>
traceFolder,
dockerCredentials,
this.SecurityServiceContainerName,
- eventStoreAddress,
+ this.EventStoreConnectionString,
(Setup.SqlServerContainerName,
"sa",
"thisisalongpassword123!"),
("serviceClient", "Secret1"),
- true);
+ true,
+ additionalEnvironmentVariables: new List
+ {
+ insecureEventStoreEnvironmentVariable,
+ persistentSubscriptionPollingInSeconds,
+ internalSubscriptionServiceCacheDuration
+ });
IContainerService testhostContainer = SetupTestHostContainer(this.TestHostContainerName,
this.Logger,
@@ -348,17 +372,13 @@ await Retry.For(async () =>
await this.LoadEventStoreProjections().ConfigureAwait(false);
}
- protected async Task PopulateSubscriptionServiceConfiguration()
+ public async Task PopulateSubscriptionServiceConfiguration(String estateName)
{
EventStorePersistentSubscriptionsClient client = new EventStorePersistentSubscriptionsClient(ConfigureEventStoreSettings(this.EventStoreHttpPort));
- PersistentSubscriptionSettings settings = new PersistentSubscriptionSettings(resolveLinkTos: true);
- await client.CreateAsync("$ce-EstateAggregate", "Reporting", settings);
- await client.CreateAsync("$ce-MerchantAggregate", "Reporting", settings);
- await client.CreateAsync("$ce-ContractAggregate", "Reporting", settings);
- await client.CreateAsync("$ce-TransactionAggregate", "Reporting", settings);
- await client.CreateAsync("$et-TransactionHasBeenCompletedEvent", "TransactionProcessor", settings);
- await client.CreateAsync("$et-MerchantFeeAddedToTransactionEvent", "TransactionProcessor", settings);
+ PersistentSubscriptionSettings settings = new PersistentSubscriptionSettings(resolveLinkTos: true, StreamPosition.Start);
+ await client.CreateAsync(estateName.Replace(" ", ""), "Reporting", settings);
+ await client.CreateAsync($"EstateManagementSubscriptionStream_{estateName.Replace(" ", "")}", "Estate Management", settings);
}
private static EventStoreClientSettings ConfigureEventStoreSettings(Int32 eventStoreHttpPort)
diff --git a/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs b/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs
index 0d0d7eb5..79f1762e 100644
--- a/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs
+++ b/TransactionProcessor.IntegrationTests/Shared/SharedSteps.cs
@@ -46,15 +46,24 @@ public SharedSteps(ScenarioContext scenarioContext,
[When(@"I create the following estates")]
public async Task WhenICreateTheFollowingEstates(Table table)
{
+ foreach (TableRow tableRow in table.Rows)
+ {
+ String estateName = SpecflowTableHelper.GetStringRowValue(tableRow, "EstateName");
+ // Setup the subscriptions for the estate
+ await Retry.For(async () => { await this.TestingContext.DockerHelper.PopulateSubscriptionServiceConfiguration(estateName).ConfigureAwait(false); },
+ retryFor: TimeSpan.FromMinutes(2),
+ retryInterval: TimeSpan.FromSeconds(30));
+ }
+
foreach (TableRow tableRow in table.Rows)
{
String estateName = SpecflowTableHelper.GetStringRowValue(tableRow, "EstateName");
CreateEstateRequest createEstateRequest = new CreateEstateRequest
- {
- EstateId = Guid.NewGuid(),
- EstateName = estateName
- };
+ {
+ EstateId = Guid.NewGuid(),
+ EstateName = estateName
+ };
CreateEstateResponse response = await this.TestingContext.DockerHelper.EstateClient.CreateEstate(this.TestingContext.AccessToken, createEstateRequest, CancellationToken.None).ConfigureAwait(false);
@@ -73,11 +82,11 @@ public async Task WhenICreateTheFollowingEstates(Table table)
EstateResponse estate = null;
await Retry.For(async () =>
- {
- estate = await this.TestingContext.DockerHelper.EstateClient
- .GetEstate(this.TestingContext.AccessToken, estateDetails.EstateId, CancellationToken.None).ConfigureAwait(false);
- estate.ShouldNotBeNull();
- }).ConfigureAwait(false);
+ {
+ estate = await this.TestingContext.DockerHelper.EstateClient
+ .GetEstate(this.TestingContext.AccessToken, estateDetails.EstateId, CancellationToken.None).ConfigureAwait(false);
+ estate.ShouldNotBeNull();
+ }, retryFor: TimeSpan.FromSeconds(90)).ConfigureAwait(false);
estate.EstateName.ShouldBe(estateDetails.EstateName);
}
diff --git a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj
index d0720753..c2d781ef 100644
--- a/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj
+++ b/TransactionProcessor.IntegrationTests/TransactionProcessor.IntegrationTests.csproj
@@ -11,10 +11,10 @@
-
-
-
-
+
+
+
+
@@ -57,6 +57,27 @@
Always
+
+ Always
+
+
+ Always
+
+
+ Always
+
+
+ Always
+
+
+ Always
+
+
+ Always
+
+
+ Always
+
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js
new file mode 100644
index 00000000..9294582e
--- /dev/null
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/CallbackHandlerEnricher.js
@@ -0,0 +1,82 @@
+var fromStreams = fromStreams || require('../../node_modules/esprojection-testing-framework').scope.fromStreams;
+var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit;
+
+fromStreams("$ce-EstateAggregate", "$et-CallbackReceivedEvent")
+ .when({
+ $init: function (s, e) {
+ return {
+ estates: [],
+ debug: []
+ }
+ },
+ "EstateCreatedEvent": function (s, e) {
+ s.estates.push({
+ estateId: e.data.estateId,
+ estateName: e.data.estateName
+ });
+ },
+ "EstateReferenceAllocatedEvent": function (s, e) {
+ var estateIndex = s.estates.findIndex(element => element.estateId === e.data.estateId);
+ s.estates[estateIndex].reference = e.data.estateReference;
+ },
+ "CallbackReceivedEvent": function (s, e) {
+ // find the estate from the reference
+ if (s.debug === undefined) {
+ s.debug = [];
+ }
+ var ref = e.data.reference.split("-"); // Element 0 is estate reference, Element 1 is merchant reference
+ var estate = s.estates.find(element => element.reference === ref[0]);
+ if (estate !== undefined && estate !== null) {
+ var enrichedEvent = createEnrichedEvent(e, estate);
+
+ // Emit the enriched event
+ emit(getStreamName(estate, e), "CallbackReceivedEnrichedEvent", enrichedEvent);
+ }
+ else {
+ var enrichedEvent = createEnrichedEvent(e);
+ // Emit the enriched event
+ emit(getStreamName(estate, e), "CallbackReceivedEnrichedWithNoEstateEvent", enrichedEvent);
+ }
+ }
+ });
+
+function createEnrichedEvent(originalEvent, estate) {
+ var enrichedEvent = {};
+ if (estate !== undefined && estate !== null) {
+ enrichedEvent = {
+ typeString: originalEvent.data.typeString,
+ messageFormat: originalEvent.data.messageFormat,
+ callbackMessage: originalEvent.data.callbackMessage,
+ estateId: estate.estateId,
+ reference: originalEvent.data.reference
+ };
+ }
+ else {
+ enrichedEvent = {
+ typeString: originalEvent.data.typeString,
+ messageFormat: originalEvent.data.messageFormat,
+ callbackMessage: originalEvent.data.callbackMessage,
+ reference: originalEvent.data.reference
+ };
+ }
+
+ return enrichedEvent;
+}
+
+function getStreamName(estate, e) {
+ var streamName = "";
+ if (e.data.destination === "EstateManagement") {
+ streamName += "EstateManagementSubscriptionStream_";
+ }
+
+ // Add the estate name
+ if (estate !== undefined && estate !== null) {
+ streamName += estate.estateName.replace(/ /g, "");
+ }
+ else {
+ streamName += "UnknownEstate";
+ }
+
+ return streamName;
+
+}
\ No newline at end of file
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js
new file mode 100644
index 00000000..ff4df1ea
--- /dev/null
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/EstateAggregator.js
@@ -0,0 +1,48 @@
+var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll;
+var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo;
+
+isEstateEvent = (e) => { return (e.data && e.data.estateId); }
+isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') };
+
+isAMerchantFeeAddedToTransactionEvent = (e) => { return compareEventTypeSafely(e.eventType, 'MerchantFeeAddedToTransactionEvent') };
+isAServiceProviderFeeAddedToTransactionEvent = (e) => { return compareEventTypeSafely(e.eventType, 'ServiceProviderFeeAddedToTransactionEvent') };
+
+compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); }
+
+ignoreEvent = (e) => isAServiceProviderFeeAddedToTransactionEvent(e) | isAMerchantFeeAddedToTransactionEvent(e);
+
+isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined);
+
+isTruncated = function (metadata) {
+ if (metadata && metadata['$v']) {
+ var parts = metadata['$v'].split(":");
+ var projectionEpoch = parts[1];
+
+ return (projectionEpoch < 0);
+ }
+ return false;
+};
+
+fromAll()
+ .when({
+ $init: function (s, e) {
+ return { estates: {} }
+ },
+ $any: function (s, e) {
+ if (isTruncated(e)) return;
+
+ if (isEstateEvent(e)) {
+ if (ignoreEvent(e)) return;
+
+ if (isAnEstateCreatedEvent(e)) {
+ s.estates[e.data.estateId] = {
+ filteredName: e.data.estateName.replace(/-/gi, ""),
+ name: e.data.estateName.replace(/-/gi, "").replace(/ /g, "")
+ };
+ }
+
+ linkTo(s.estates[e.data.estateId].name, e);
+ }
+ }
+ }
+ );
\ No newline at end of file
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js
new file mode 100644
index 00000000..9fe4d87c
--- /dev/null
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/FileProcessorSubscriptionStreamBuilder.js
@@ -0,0 +1,68 @@
+var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll;
+var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo;
+
+isEstateEvent = (e) => { return (e.data && e.data.estateId); }
+isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') };
+compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); }
+isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined);
+
+getSupportedEventTypes = function () {
+ var eventTypes = [];
+
+ eventTypes.push('ImportLogCreatedEvent');
+ eventTypes.push('FileAddedToImportLogEvent');
+ eventTypes.push('FileCreatedEvent');
+ eventTypes.push('FileLineAddedEvent');
+ eventTypes.push('FileLineProcessingSuccessfulEvent');
+ eventTypes.push('FileLineProcessingIgnoredEvent');
+ eventTypes.push('FileLineProcessingFailedEvent');
+ eventTypes.push('FileProcessingCompletedEvent');
+
+ return eventTypes;
+}
+
+isARequiredEvent = (e) => {
+ var supportedEvents = getSupportedEventTypes();
+
+ var index = supportedEvents.indexOf(e.eventType);
+
+ return index !== -1
+};
+
+isTruncated = function (metadata) {
+ if (metadata && metadata['$v']) {
+ var parts = metadata['$v'].split(":");
+ var projectionEpoch = parts[1];
+
+ return (projectionEpoch < 0);
+ }
+ return false;
+};
+getStreamName = function (estateName) {
+ return 'FileProcessorSubscriptionStream_' + estateName;
+}
+
+fromAll()
+ .when({
+ $init: function (s, e) {
+ return { estates: {} }
+ },
+ $any: function (s, e) {
+ if (isTruncated(e)) return;
+
+ if (isEstateEvent(e)) {
+
+ if (isAnEstateCreatedEvent(e)) {
+ s.estates[e.data.estateId] = {
+ filteredName: e.data.estateName.replace(/-/gi, ""),
+ name: e.data.estateName.replace(/-/gi, "").replace(" ", "")
+ };
+ }
+
+ if (isARequiredEvent(e) === false) return;
+
+ linkTo(getStreamName(s.estates[e.data.estateId].name), e);
+ }
+ }
+ }
+);
\ No newline at end of file
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js
index b91b12aa..76f420ec 100644
--- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantAggregator.js
@@ -1,5 +1,5 @@
-var fromAll = fromAll || require("../../node_modules/event-store-projection-testing").scope.fromAll;
-var linkTo = linkTo || require("../../node_modules/event-store-projection-testing").scope.linkTo;
+var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll;
+var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo;
isValidEvent = function (e) {
@@ -29,10 +29,8 @@ fromAll()
if (isValidEvent(e)) {
var merchantId = getMerchantId(e);
if (merchantId !== null) {
- s.merchantId = merchantId;
var streamName = "MerchantArchive-" + merchantId.replace(/-/gi, "");
- s.streamName = streamName;
- linkTo(streamName, e, e.metadata);
+ linkTo(streamName, e);
}
}
}
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js
index 77c377dd..fc41fb66 100644
--- a/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/MerchantBalanceCalculator.js
@@ -1,6 +1,6 @@
-var fromCategory = fromCategory || require('../../node_modules/event-store-projection-testing').scope.fromCategory;
-var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy;
-var emit = emit || require('../../node_modules/event-store-projection-testing').scope.emit;
+var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory;
+var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/esprojection-testing-framework').scope.partitionBy;
+var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit;
fromCategory('MerchantArchive')
.foreachStream()
@@ -17,11 +17,12 @@ fromCategory('MerchantArchive')
totalDeposits: 0,
totalAuthorisedSales: 0,
totalDeclinedSales: 0,
- totalFees: 0
+ totalFees: 0,
+ emittedEvents:1
}
},
- $any: function (s, e) {
-
+ $any: function (s, e)
+ {
if (e === null || e.data === null || e.data.IsJson === false)
return;
@@ -42,6 +43,11 @@ var eventbus = {
return;
}
+ if (e.eventType === 'AutomaticDepositMadeEvent') {
+ depositMadeEventHandler(s, e);
+ return;
+ }
+
if (e.eventType === 'TransactionHasStartedEvent') {
transactionHasStartedEventHandler(s, e);
return;
@@ -121,45 +127,47 @@ var incrementAvailableBalanceFromDeclinedTransaction = function (s, amount) {
var merchantCreatedEventHandler = function (s, e) {
// Setup the state here
- s.estateId = e.data.EstateId;
- s.merchantId = e.data.MerchantId;
- s.merchantName = e.data.MerchantName;
+ s.estateId = e.data.estateId;
+ s.merchantId = e.data.merchantId;
+ s.merchantName = e.data.merchantName;
};
var emitBalanceChangedEvent = function (aggregateId, eventId, s, changeAmount, dateTime, reference) {
if (s.initialised === true) {
+
// Emit an opening balance event
var openingBalanceEvent = {
$type: getEventTypeName(),
- "aggregateId": aggregateId,
"merchantId": s.merchantId,
"estateId": s.estateId,
"balance": 0,
"changeAmount": 0,
"eventId": s.merchantId,
"eventCreatedDateTime": dateTime,
- "reference": "Opening Balance"
+ "reference": "Opening Balance",
+ "aggregateId": s.merchantId
}
emit(getStreamName(s), getEventType(), openingBalanceEvent);
+ s.emittedEvents++;
s.initialised = false;
}
var balanceChangedEvent = {
$type: getEventTypeName(),
- "aggregateId": aggregateId,
"merchantId": s.merchantId,
"estateId": s.estateId,
"balance": s.balance,
"changeAmount": changeAmount,
"eventId": eventId,
"eventCreatedDateTime": dateTime,
- "reference": reference
+ "reference": reference,
+ "aggregateId": aggregateId
}
// emit an balance changed event here
emit(getStreamName(s), getEventType(), balanceChangedEvent);
-
+ s.emittedEvents++;
return s;
};
@@ -176,7 +184,8 @@ var depositMadeEventHandler = function (s, e) {
incrementBalanceFromDeposit(s, e.data.amount, e.data.depositDateTime);
// emit an balance changed event here
- s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, e.data.amount, e.data.depositDateTime, "Merchant Deposit");
+ console.log(e);
+ s = emitBalanceChangedEvent(e.data.merchantId, e.eventId, s, e.data.amount, e.data.depositDateTime, "Merchant Deposit");
};
var transactionHasStartedEventHandler = function (s, e) {
@@ -214,12 +223,12 @@ var transactionHasCompletedEventHandler = function (s, e) {
var transactionDateTime = new Date(Date.parse(e.data.completedDateTime));
var completedTime = new Date(transactionDateTime.getFullYear(), transactionDateTime.getMonth(), transactionDateTime.getDate(), transactionDateTime.getHours(), transactionDateTime.getMinutes(), transactionDateTime.getSeconds() + 2);
- if (e.data.IsAuthorised) {
+ if (e.data.isAuthorised) {
decrementBalanceFromAuthorisedTransaction(s, amount, completedTime);
// emit an balance changed event here
if (amount > 0) {
- s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, amount * -1, completedTime, "Transaction Completed");
+ s = emitBalanceChangedEvent(e.data.transactionId, e.eventId, s, amount * -1, completedTime, "Transaction Completed");
}
}
else {
@@ -238,8 +247,8 @@ var merchantFeeAddedToTransactionEventHandler = function (s, e) {
}
// increment the balance now
- incrementBalanceFromMerchantFee(s, e.data.calculatedValue, e.data.eventCreatedDateTime);
-
+ incrementBalanceFromMerchantFee(s, e.data.calculatedValue, e.data.feeCalculatedDateTime);
+
// emit an balance changed event here
- s = emitBalanceChangedEvent(e.data.aggregateId, e.data.eventId, s, e.data.calculatedValue, e.data.eventCreatedDateTime, "Transaction Fee Processed");
+ s = emitBalanceChangedEvent(e.data.transactionId, e.eventId, s, e.data.calculatedValue, e.data.feeCalculatedDateTime, "Transaction Fee Processed");
}
\ No newline at end of file
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js
new file mode 100644
index 00000000..9587ff63
--- /dev/null
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionEnricher.js
@@ -0,0 +1,69 @@
+var fromCategory = fromCategory || require('../../node_modules/esprojection-testing-framework').scope.fromCategory;
+//var partitionBy = partitionBy !== null ? partitionBy : require('../../node_modules/event-store-projection-testing').scope.partitionBy;
+var emit = emit || require('../../node_modules/esprojection-testing-framework').scope.emit;
+var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo;
+
+fromCategory('TransactionAggregate')
+ .foreachStream()
+ .when({
+ $any: function (s, e) {
+
+ if (e === null || e.data === null || e.data.IsJson === false)
+ return;
+
+ eventbus.dispatch(s, e);
+ }
+ });
+
+var eventbus = {
+ dispatch: function (s, e) {
+
+ if (e.eventType === 'MerchantFeeAddedToTransactionEvent') {
+ merchantFeeAddedToTransactionEventHandler(s, e);
+ return;
+ }
+ if (e.eventType === 'ServiceProviderFeeAddedToTransactionEvent') {
+ serviceProviderFeeAddedToTransactionEventHandler(s, e);
+ return;
+ }
+ else {
+ //Just add the existing event to to our stream
+ linkTo(getStreamName(s), e);
+ }
+
+ }
+}
+
+function merchantFeeAddedToTransactionEventHandler(s, e) {
+ var newEvent = {
+ calculatedValue: e.data.calculatedValue,
+ feeCalculatedDateTime: e.data.feeCalculatedDateTime,
+ estateId: e.data.estateId,
+ feeId: e.data.feeId,
+ feeValue: e.data.feeValue,
+ merchantId: e.data.merchantId,
+ transactionId: e.data.transactionId,
+ feeCalculationType: e.data.feeCalculationType,
+ eventId: e.eventId
+ }
+ emit(getStreamName(s), "MerchantFeeAddedToTransactionEnrichedEvent", newEvent, null);
+}
+
+function serviceProviderFeeAddedToTransactionEventHandler(s, e) {
+ var newEvent = {
+ calculatedValue: e.data.calculatedValue,
+ feeCalculatedDateTime: e.data.feeCalculatedDateTime,
+ estateId: e.data.estateId,
+ feeId: e.data.feeId,
+ feeValue: e.data.feeValue,
+ merchantId: e.data.merchantId,
+ transactionId: e.data.transactionId,
+ feeCalculationType: e.data.feeCalculationType,
+ eventId: e.eventId
+ }
+ emit(getStreamName(s), "ServiceProviderFeeAddedToTransactionEnrichedEvent", newEvent, null);
+}
+
+function getStreamName(s) {
+ return "TransactionEnricherResult";
+}
\ No newline at end of file
diff --git a/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js
new file mode 100644
index 00000000..9e0f9e9b
--- /dev/null
+++ b/TransactionProcessor.IntegrationTests/projections/continuous/TransactionProcessorSubscriptionStreamBuilder.js
@@ -0,0 +1,62 @@
+var fromAll = fromAll || require("../../node_modules/esprojection-testing-framework").scope.fromAll;
+var linkTo = linkTo || require("../../node_modules/esprojection-testing-framework").scope.linkTo;
+
+isEstateEvent = (e) => { return (e.data && e.data.estateId); }
+isAnEstateCreatedEvent = (e) => { return compareEventTypeSafely(e.eventType, 'EstateCreatedEvent') };
+compareEventTypeSafely = (sourceEventType, targetEventType) => { return (sourceEventType.toUpperCase() === targetEventType.toUpperCase()); }
+isInvalidEvent = (e) => (e === null || e === undefined || e.data === undefined);
+
+getSupportedEventTypes = function () {
+ var eventTypes = [];
+
+ eventTypes.push('CustomerEmailReceiptRequestedEvent');
+ eventTypes.push('TransactionHasBeenCompletedEvent');
+
+ return eventTypes;
+}
+
+isARequiredEvent = (e) => {
+ var supportedEvents = getSupportedEventTypes();
+
+ var index = supportedEvents.indexOf(e.eventType);
+
+ return index !== -1;
+};
+
+isTruncated = function (metadata) {
+ if (metadata && metadata['$v']) {
+ var parts = metadata['$v'].split(":");
+ var projectionEpoch = parts[1];
+
+ return (projectionEpoch < 0);
+ }
+ return false;
+};
+getStreamName = function (estateName) {
+ return 'TransactionProcessorSubscriptionStream_' + estateName;
+}
+
+fromAll()
+ .when({
+ $init: function (s, e) {
+ return { estates: {} }
+ },
+ $any: function (s, e) {
+ if (isTruncated(e)) return;
+
+ if (isEstateEvent(e)) {
+
+ if (isAnEstateCreatedEvent(e)) {
+ s.estates[e.data.estateId] = {
+ filteredName: e.data.estateName.replace(/-/gi, ""),
+ name: e.data.estateName.replace(/-/gi, "").replace(" ", "")
+ };
+ }
+
+ if (isARequiredEvent(e) === false) return;
+
+ linkTo(getStreamName(s.estates[e.data.estateId].name), e);
+ }
+ }
+ }
+);
\ No newline at end of file
diff --git a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj
index cda72881..3fb9b89b 100644
--- a/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj
+++ b/TransactionProcessor.Reconciliation.DomainEvents/TransactionProcessor.Reconciliation.DomainEvents.csproj
@@ -5,6 +5,6 @@
-
+
diff --git a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj
index ab8a81f6..5993e2cd 100644
--- a/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj
+++ b/TransactionProcessor.ReconciliationAggregate/TransactionProcessor.ReconciliationAggregate.csproj
@@ -5,7 +5,7 @@
-
+
diff --git a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj
index 6da5d437..9335a64a 100644
--- a/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj
+++ b/TransactionProcessor.Settlement.DomainEvents/TransactionProcessor.Settlement.DomainEvents.csproj
@@ -5,7 +5,7 @@
-
+
diff --git a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj
index c28eaf6d..61f5def5 100644
--- a/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj
+++ b/TransactionProcessor.SettlementAggregates/TransactionProcessor.SettlementAggregates.csproj
@@ -5,7 +5,7 @@
-
+
diff --git a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj
index caff460d..0be56590 100644
--- a/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj
+++ b/TransactionProcessor.Transaction.DomainEvents/TransactionProcessor.Transaction.DomainEvents.csproj
@@ -5,7 +5,7 @@
-
+
diff --git a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj
index b46c122a..07a7d7cb 100644
--- a/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj
+++ b/TransactionProcessor.TransactionAgrgegate/TransactionProcessor.TransactionAggregate.csproj
@@ -5,7 +5,7 @@
-
+
diff --git a/TransactionProcessor/Program.cs b/TransactionProcessor/Program.cs
index a507823c..da2c7bff 100644
--- a/TransactionProcessor/Program.cs
+++ b/TransactionProcessor/Program.cs
@@ -46,33 +46,33 @@ public static IHostBuilder CreateHostBuilder(string[] args)
webBuilder.UseStartup();
webBuilder.UseConfiguration(config);
webBuilder.UseKestrel();
- })
- .ConfigureServices(services =>
- {
- SettlementCreatedForDateEvent s =
- new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"),
- Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"),
- DateTime.Now);
+ });
+ //.ConfigureServices(services =>
+ // {
+ // SettlementCreatedForDateEvent s =
+ // new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"),
+ // Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"),
+ // DateTime.Now);
- TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(),Guid.NewGuid(),
- DateTime.Now, "","","","",null);
+ // TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(),Guid.NewGuid(),
+ // DateTime.Now, "","","","",null);
- ReconciliationHasStartedEvent r =
- new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now);
+ // ReconciliationHasStartedEvent r =
+ // new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now);
- TypeProvider.LoadDomainEventsTypeDynamically();
+ // TypeProvider.LoadDomainEventsTypeDynamically();
- services.AddHostedService(provider =>
- {
- IDomainEventHandlerResolver r =
- provider.GetRequiredService();
- EventStorePersistentSubscriptionsClient p = provider.GetRequiredService();
- HttpClient h = provider.GetRequiredService();
- SubscriptionWorker worker = new SubscriptionWorker(r, p, h);
- worker.TraceGenerated += Worker_TraceGenerated;
- return worker;
- });
- });
+ // services.AddHostedService(provider =>
+ // {
+ // IDomainEventHandlerResolver r =
+ // provider.GetRequiredService();
+ // EventStorePersistentSubscriptionsClient p = provider.GetRequiredService();
+ // HttpClient h = provider.GetRequiredService();
+ // SubscriptionWorker worker = new SubscriptionWorker(r, p, h);
+ // worker.TraceGenerated += Worker_TraceGenerated;
+ // return worker;
+ // });
+ // });
return hostBuilder;
}
diff --git a/TransactionProcessor/Startup.cs b/TransactionProcessor/Startup.cs
index 75856019..de3db4b7 100644
--- a/TransactionProcessor/Startup.cs
+++ b/TransactionProcessor/Startup.cs
@@ -12,11 +12,13 @@
namespace TransactionProcessor
{
+ using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Abstractions;
using System.Net.Http;
using System.Reflection;
+ using System.Threading;
using BusinessLogic.EventHandling;
using BusinessLogic.Manager;
using BusinessLogic.OperatorInterfaces;
@@ -41,7 +43,9 @@ namespace TransactionProcessor
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using NLog.Extensions.Logging;
+ using Reconciliation.DomainEvents;
using SecurityService.Client;
+ using Settlement.DomainEvents;
using SettlementAggregates;
using Shared.DomainDrivenDesign.CommandHandling;
using Shared.DomainDrivenDesign.EventSourcing;
@@ -50,12 +54,14 @@ namespace TransactionProcessor
using Shared.EventStore.EventHandling;
using Shared.EventStore.EventStore;
using Shared.EventStore.Extensions;
+ using Shared.EventStore.SubscriptionWorker;
using Shared.Extensions;
using Shared.General;
using Shared.Logger;
using Shared.Repositories;
using Swashbuckle.AspNetCore.Filters;
using Swashbuckle.AspNetCore.SwaggerGen;
+ using Transaction.DomainEvents;
using TransactionAggregate;
using VoucherManagement.Client;
using ILogger = Microsoft.Extensions.Logging.ILogger;
@@ -99,6 +105,8 @@ public Startup(IWebHostEnvironment webHostEnvironment)
///
public static IWebHostEnvironment WebHostEnvironment { get; set; }
+ public static IServiceProvider ServiceProvider { get; set; }
+
// This method gets called by the runtime. Use this method to add services to the container.
///
/// Configures the services.
@@ -238,6 +246,8 @@ public void ConfigureServices(IServiceCollection services)
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+
+ Startup.ServiceProvider = services.BuildServiceProvider();
}
///
@@ -269,6 +279,7 @@ private static void ConfigureEventStoreSettings(EventStoreClientSettings setting
settings.ConnectionName = Startup.Configuration.GetValue("EventStoreSettings:ConnectionName");
settings.ConnectivitySettings = new EventStoreClientConnectivitySettings
{
+ Insecure = Startup.Configuration.GetValue("EventStoreSettings:Insecure"),
Address = new Uri(Startup.Configuration.GetValue("EventStoreSettings:ConnectionString")),
};
@@ -426,6 +437,106 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerF
app.UseSwagger();
app.UseSwaggerUI();
+
+ app.PreWarm();
+ }
+
+ public static void LoadTypes()
+ {
+ SettlementCreatedForDateEvent s =
+ new SettlementCreatedForDateEvent(Guid.Parse("62CA5BF0-D138-4A19-9970-A4F7D52DE292"),
+ Guid.Parse("3E42516B-6C6F-4F86-BF08-3EF0ACDDDD55"),
+ DateTime.Now);
+
+ TransactionHasStartedEvent t = new TransactionHasStartedEvent(Guid.Parse("2AA2D43B-5E24-4327-8029-1135B20F35CE"), Guid.NewGuid(), Guid.NewGuid(),
+ DateTime.Now, "", "", "", "", null);
+
+ ReconciliationHasStartedEvent r =
+ new ReconciliationHasStartedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), DateTime.Now);
+
+ TypeProvider.LoadDomainEventsTypeDynamically();
+ }
+ }
+
+ public static class Extensions
+ {
+ static Action log = (tt, subType, message) => {
+ String logMessage = $"{subType} - {message}";
+ switch (tt)
+ {
+ case TraceEventType.Critical:
+ Logger.LogCritical(new Exception(logMessage));
+ break;
+ case TraceEventType.Error:
+ Logger.LogError(new Exception(logMessage));
+ break;
+ case TraceEventType.Warning:
+ Logger.LogWarning(logMessage);
+ break;
+ case TraceEventType.Information:
+ Logger.LogInformation(logMessage);
+ break;
+ case TraceEventType.Verbose:
+ Logger.LogDebug(logMessage);
+ break;
+ }
+ };
+
+ static Action concurrentLog = (tt, message) => log(tt, "CONCURRENT", message);
+
+ public static void PreWarm(this IApplicationBuilder applicationBuilder)
+ {
+ Startup.LoadTypes();
+
+ //SubscriptionWorker worker = new SubscriptionWorker()
+ var internalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("InternalSubscriptionService"));
+
+ if (internalSubscriptionService)
+ {
+ String eventStoreConnectionString = ConfigurationReader.GetValue("EventStoreSettings", "ConnectionString");
+ Int32 inflightMessages = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InflightMessages"));
+ Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds"));
+ String filter = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceFilter");
+ String ignore = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceIgnore");
+ String streamName = ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionFilterOnStreamName");
+ Int32 cacheDuration = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceCacheDuration"));
+
+ ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration);
+
+ ((SubscriptionRepository)subscriptionRepository).Trace += (sender, s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s);
+
+ // init our SubscriptionRepository
+ subscriptionRepository.PreWarm(CancellationToken.None).Wait();
+
+ var eventHandlerResolver = Startup.ServiceProvider.GetService();
+
+ SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateConcurrentSubscriptionWorker(eventStoreConnectionString, eventHandlerResolver, subscriptionRepository, inflightMessages, persistentSubscriptionPollingInSeconds);
+
+ concurrentSubscriptions.Trace += (_, args) => concurrentLog(TraceEventType.Information, args.Message);
+ concurrentSubscriptions.Warning += (_, args) => concurrentLog(TraceEventType.Warning, args.Message);
+ concurrentSubscriptions.Error += (_, args) => concurrentLog(TraceEventType.Error, args.Message);
+
+ if (!String.IsNullOrEmpty(ignore))
+ {
+ concurrentSubscriptions = concurrentSubscriptions.IgnoreSubscriptions(ignore);
+ }
+
+ if (!String.IsNullOrEmpty(filter))
+ {
+ //NOTE: Not overly happy with this design, but
+ //the idea is if we supply a filter, this overrides ignore
+ concurrentSubscriptions = concurrentSubscriptions.FilterSubscriptions(filter)
+ .IgnoreSubscriptions(null);
+
+ }
+
+ if (!String.IsNullOrEmpty(streamName))
+ {
+ concurrentSubscriptions = concurrentSubscriptions.FilterByStreamName(streamName);
+ }
+
+ concurrentSubscriptions.StartAsync(CancellationToken.None).Wait();
+ }
}
}
}
diff --git a/TransactionProcessor/TransactionProcessor.csproj b/TransactionProcessor/TransactionProcessor.csproj
index b55bce28..f7c83292 100644
--- a/TransactionProcessor/TransactionProcessor.csproj
+++ b/TransactionProcessor/TransactionProcessor.csproj
@@ -15,11 +15,11 @@
-
-
+
+
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
@@ -28,8 +28,8 @@
-
-
+
+
diff --git a/TransactionProcessor/appsettings.json b/TransactionProcessor/appsettings.json
index 083414b4..826158a6 100644
--- a/TransactionProcessor/appsettings.json
+++ b/TransactionProcessor/appsettings.json
@@ -1,7 +1,6 @@
{
"AppSettings": {
"SubscriptionFilter": "Transaction Processor",
- "UseInternalSubscriptionService": "true",
"ClientId": "serviceClient",
"ClientSecret": "d192cbc46d834d0da90e8a9d50ded543",
"EventHandlerConfiguration": {