Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<PackageReference Include="EstateReporting.Database" Version="1.1.7" />
<PackageReference Include="MessagingService.Client" Version="1.1.4" />
<PackageReference Include="SecurityService.Client" Version="1.1.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="2022.12.0-beta3" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
<PackageReference Include="MediatR" Version="11.0.0" />
<PackageReference Include="System.IO.Abstractions" Version="17.2.3" />
<PackageReference Include="System.ServiceModel.Duplex" Version="4.10.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ClientProxyBase" Version="1.5.1" />
<PackageReference Include="ClientProxyBase" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ClientProxyBase" Version="1.5.1" />
<PackageReference Include="ClientProxyBase" Version="2022.12.0-beta3" />
<PackageReference Include="Ductus.FluentDocker" Version="2.10.57" />
<PackageReference Include="EstateManagement.Client" Version="1.1.5" />
<PackageReference Include="EstateReporting.Database" Version="1.1.7" />
Expand All @@ -16,7 +16,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="SecurityService.Client" Version="1.1.1" />
<PackageReference Include="Shared.IntegrationTesting" Version="1.5.1" />
<PackageReference Include="Shared.IntegrationTesting" Version="2022.12.0-beta3" />
<PackageReference Include="Shouldly" Version="4.1.0" />
<PackageReference Include="SpecFlow" Version="3.9.74" />
<PackageReference Include="SpecFlow.Tools.MsBuild.Generation" Version="3.9.74" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="7.0.0-alpha.1" />
<PackageReference Include="Shared" Version="1.5.1" />
<PackageReference Include="Shared" Version="2022.12.0-beta3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="7.0.0" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Shared.DomainDrivenDesign" Version="1.5.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="2022.12.0-beta3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.50.0" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Shared.DomainDrivenDesign" Version="1.5.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="2022.12.0-beta3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.50.0" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Shared.DomainDrivenDesign" Version="1.5.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="2022.12.0-beta3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.50.0" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Shared.DomainDrivenDesign" Version="1.5.1" />
<PackageReference Include="Shared.DomainDrivenDesign" Version="2022.12.0-beta3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.50.0" />
<PackageReference Include="Shared.EventStore" Version="1.5.1" />
<PackageReference Include="Shared.EventStore" Version="2022.12.0-beta3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public DomainEventHandlerRegistry()
}

//this.AddSingleton(eventHandlersConfiguration);
this.Use(eventHandlersConfiguration).Named("Concurrent");
this.Use(eventHandlersConfiguration).Named("EventHandlerConfiguration");

section = Startup.Configuration.GetSection("AppSettings:EventHandlerConfigurationOrdered");

if (section != null)
{
Startup.Configuration.GetSection("AppSettings:EventHandlerConfigurationOrdered").Bind(eventHandlersConfigurationOrdered);
}
this.Use(eventHandlersConfigurationOrdered).Named("Ordered");

this.Use(eventHandlersConfigurationOrdered).Named("EventHandlerConfigurationOrdered");
}

this.AddSingleton<Func<Type, IDomainEventHandler>>(container => type =>
Expand All @@ -75,7 +75,7 @@ public DomainEventHandlerRegistry()
this.AddSingleton<IProjection<MerchantBalanceState>, MerchantBalanceProjection>();
this.AddSingleton<IStateDispatcher<MerchantBalanceState>, MerchantBalanceStateDispatcher>();

this.For<IDomainEventHandlerResolver>().Use<DomainEventHandlerResolver>().Named("Concurrent")
this.For<IDomainEventHandlerResolver>().Use<DomainEventHandlerResolver>().Named("Main")
.Ctor<Dictionary<String, String[]>>().Is(eventHandlersConfiguration).Singleton();
this.For<IDomainEventHandlerResolver>().Use<DomainEventHandlerResolver>().Named("Ordered")
.Ctor<Dictionary<String, String[]>>().Is(eventHandlersConfigurationOrdered).Singleton();
Expand Down
178 changes: 81 additions & 97 deletions TransactionProcessor/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace TransactionProcessor
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand Down Expand Up @@ -56,37 +57,30 @@ public static IServiceCollection AddInSecureEventStoreClient(this IServiceCollec
}
};

static Action<TraceEventType, String> concurrentLog = (tt, message) => Extensions.log(tt, "CONCURRENT", message);
static Action<TraceEventType, String> mainLog = (tt, message) => Extensions.log(tt, "MAIN", message);
static Action<TraceEventType, String> orderedLog = (tt, message) => Extensions.log(tt, "ORDERED", message);

public static void PreWarm(this IApplicationBuilder applicationBuilder) {
Startup.LoadTypes();

Boolean internalSubscriptionService = Boolean.Parse(ConfigurationReader.GetValue("InternalSubscriptionService"));

if (internalSubscriptionService) {
IConfigurationSection subscriptionConfigSection = Startup.Configuration.GetSection("AppSettings:SubscriptionConfig");
SubscriptionConfigRoot subscriptionConfigRoot = new SubscriptionConfigRoot();
subscriptionConfigSection.Bind(subscriptionConfigRoot);
IConfigurationSection subscriptionConfigSection = Startup.Configuration.GetSection("AppSettings:SubscriptionConfiguration");
SubscriptionWorkersRoot subscriptionWorkersRoot = new SubscriptionWorkersRoot();
subscriptionConfigSection.Bind(subscriptionWorkersRoot);

if (subscriptionWorkersRoot.InternalSubscriptionService) {

String eventStoreConnectionString = ConfigurationReader.GetValue("EventStoreSettings", "ConnectionString");
Int32 cacheDuration = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InternalSubscriptionServiceCacheDuration"));

ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration);

ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, subscriptionWorkersRoot.InternalSubscriptionServiceCacheDuration);
((SubscriptionRepository)subscriptionRepository).Trace += (sender,
s) => Extensions.log(TraceEventType.Information, "REPOSITORY", s);

// init our SubscriptionRepository
subscriptionRepository.PreWarm(CancellationToken.None).Wait();

if (subscriptionConfigRoot.Concurrent.IsEnabled) {
SubscriptionWorker concurrentSubscriptions = ConfigureConcurrentSubscriptions(subscriptionRepository, subscriptionConfigRoot.Concurrent);
concurrentSubscriptions.StartAsync(CancellationToken.None).Wait();
}

if (subscriptionConfigRoot.Ordered.IsEnabled) {
SubscriptionWorker orderedSubscriptions = ConfigureOrderedSubscriptions(subscriptionRepository, subscriptionConfigRoot.Ordered);
orderedSubscriptions.StartAsync(CancellationToken.None).Wait();

List<SubscriptionWorker> workers = ConfigureSubscriptions(subscriptionRepository, subscriptionWorkersRoot);
foreach (SubscriptionWorker subscriptionWorker in workers) {
subscriptionWorker.StartAsync(CancellationToken.None).Wait();
}
}

Expand All @@ -97,77 +91,59 @@ public static void PreWarm(this IApplicationBuilder applicationBuilder) {
}
}

private static SubscriptionWorker ConfigureConcurrentSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionConfig concurrent) {
IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance<IDomainEventHandlerResolver>("Concurrent");

Int32 inflightMessages = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "InflightMessages"));
Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds"));

SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateConcurrentSubscriptionWorker(Startup.EventStoreClientSettings, eventHandlerResolver, subscriptionRepository,
inflightMessages, persistentSubscriptionPollingInSeconds);

concurrentSubscriptions.Trace += (_, args) => Extensions.concurrentLog(TraceEventType.Information, args.Message);
concurrentSubscriptions.Warning += (_, args) => Extensions.concurrentLog(TraceEventType.Warning, args.Message);
concurrentSubscriptions.Error += (_, args) => Extensions.concurrentLog(TraceEventType.Error, args.Message);

if (!String.IsNullOrEmpty(concurrent.Ignore))
{
concurrentSubscriptions = concurrentSubscriptions.IgnoreSubscriptions(concurrent.Ignore);
}

if (!String.IsNullOrEmpty(concurrent.Filter))
{
//NOTE: Not overly happy with this design, but;
//the idea is if we supply a filter, this overrides ignore
concurrentSubscriptions = concurrentSubscriptions.FilterSubscriptions(concurrent.Filter);
//.IgnoreSubscriptions(null);

}

if (!String.IsNullOrEmpty(concurrent.StreamName))
{
concurrentSubscriptions = concurrentSubscriptions.FilterByStreamName(concurrent.StreamName);
private static List<SubscriptionWorker> ConfigureSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionWorkersRoot configuration) {
List<SubscriptionWorker> workers = new List<SubscriptionWorker>();

foreach (SubscriptionWorkerConfig configurationSubscriptionWorker in configuration.SubscriptionWorkers) {
if (configurationSubscriptionWorker.Enabled == false)
continue;

if (configurationSubscriptionWorker.IsOrdered) {
IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance<IDomainEventHandlerResolver>("Ordered");
SubscriptionWorker worker = SubscriptionWorker.CreateOrderedSubscriptionWorker(Startup.EventStoreClientSettings,
eventHandlerResolver,
subscriptionRepository,
configuration.PersistentSubscriptionPollingInSeconds);
worker.Trace += (_,
args) => Extensions.orderedLog(TraceEventType.Information, args.Message);
worker.Warning += (_,
args) => Extensions.orderedLog(TraceEventType.Warning, args.Message);
worker.Error += (_,
args) => Extensions.orderedLog(TraceEventType.Error, args.Message);
worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups);
worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams);
worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups);
worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams);
workers.Add(worker);

}
else {
for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++) {
IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance<IDomainEventHandlerResolver>("Main");
SubscriptionWorker worker = SubscriptionWorker.CreateSubscriptionWorker(Startup.EventStoreClientSettings,
eventHandlerResolver,
subscriptionRepository,
configurationSubscriptionWorker.InflightMessages,
configuration.PersistentSubscriptionPollingInSeconds);

worker.Trace += (_,
args) => Extensions.mainLog(TraceEventType.Information, args.Message);
worker.Warning += (_,
args) => Extensions.mainLog(TraceEventType.Warning, args.Message);
worker.Error += (_,
args) => Extensions.mainLog(TraceEventType.Error, args.Message);

worker.SetIgnoreGroups(configurationSubscriptionWorker.IgnoreGroups);
worker.SetIgnoreStreams(configurationSubscriptionWorker.IgnoreStreams);
worker.SetIncludeGroups(configurationSubscriptionWorker.IncludeGroups);
worker.SetIncludeStreams(configurationSubscriptionWorker.IncludeStreams);

workers.Add(worker);
}
}
}

return concurrentSubscriptions;
}

private static SubscriptionWorker ConfigureOrderedSubscriptions(ISubscriptionRepository subscriptionRepository, SubscriptionConfig ordered)
{
IDomainEventHandlerResolver eventHandlerResolver = Startup.Container.GetInstance<IDomainEventHandlerResolver>("Ordered");

Int32 persistentSubscriptionPollingInSeconds = Int32.Parse(ConfigurationReader.GetValue("AppSettings", "PersistentSubscriptionPollingInSeconds"));

SubscriptionWorker orderedSubscriptions =
SubscriptionWorker.CreateOrderedSubscriptionWorker(Startup.EventStoreClientSettings,
eventHandlerResolver,
subscriptionRepository,
persistentSubscriptionPollingInSeconds);

orderedSubscriptions.Trace += (_, args) => Extensions.orderedLog(TraceEventType.Information, args.Message);
orderedSubscriptions.Warning += (_, args) => Extensions.orderedLog(TraceEventType.Warning, args.Message);
orderedSubscriptions.Error += (_, args) => Extensions.orderedLog(TraceEventType.Error, args.Message);

if (!String.IsNullOrEmpty(ordered.Ignore))
{
orderedSubscriptions = orderedSubscriptions.IgnoreSubscriptions(ordered.Ignore);
}

if (!String.IsNullOrEmpty(ordered.Filter))
{
//NOTE: Not overly happy with this design, but;
//the idea is if we supply a filter, this overrides ignore
orderedSubscriptions = orderedSubscriptions.FilterSubscriptions(ordered.Filter)
.IgnoreSubscriptions(null);

}

if (!String.IsNullOrEmpty(ordered.StreamName))
{
orderedSubscriptions = orderedSubscriptions.FilterByStreamName(ordered.StreamName);
}

return orderedSubscriptions;
return workers;
}

private static void OperatorLogon(String operatorId)
Expand All @@ -185,17 +161,25 @@ private static void OperatorLogon(String operatorId)
}
}
}

public class SubscriptionConfigRoot
public class SubscriptionWorkersRoot
{
public SubscriptionConfig Ordered { get; set; }
public SubscriptionConfig Concurrent { get; set; }
public Boolean InternalSubscriptionService { get; set; }
public Int32 PersistentSubscriptionPollingInSeconds { get; set; }
public Int32 InternalSubscriptionServiceCacheDuration { get; set; }
public List<SubscriptionWorkerConfig> SubscriptionWorkers { get; set; }
}

public class SubscriptionConfig{
public Boolean IsEnabled { get; set; }
public String Filter { get; set; }
public String Ignore { get; set; }
public String StreamName { get; set; }
public class SubscriptionWorkerConfig
{
public String WorkerName { get; set; }
public String IncludeGroups { get; set; }
public String IgnoreGroups { get; set; }
public String IncludeStreams { get; set; }
public String IgnoreStreams { get; set; }
public Boolean Enabled { get; set; }
public Int32 InflightMessages { get; set; }
public Int32 InstanceCount { get; set; }
public Boolean IsOrdered { get; set; }
}
}
Loading