Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 73 additions & 42 deletions Shared.EventStore/Extensions/IApplicationBuilderExtenstions.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
namespace Shared.EventStore.Extensions;

using EventHandling;
using global::EventStore.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Shared.EventStore.EventStore;
using Shared.TraceHandler;
using SubscriptionWorker;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventHandling;
using global::EventStore.Client;
using Microsoft.AspNetCore.Builder;
using SubscriptionWorker;

public static class IApplicationBuilderExtensions
{
Expand Down Expand Up @@ -51,52 +54,80 @@ public static async Task ConfigureSubscriptionService(this IApplicationBuilder a
}
}

internal static List<SubscriptionWorker> GetDomainWorker(Dictionary<String, IDomainEventHandlerResolver> eventHandlerResolvers,
ISubscriptionRepository subscriptionRepository,
SubscriptionWorkersRoot configuration,
String eventStoreConnectionString,
SubscriptionWorkerConfig configurationSubscriptionWorker,
Action<TraceEventType, String, String> traceHandler) {
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Domain");
List<SubscriptionWorker> workers = new();
if (ehr.Value == null)
return workers;
for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++) {
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration, eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "DOMAIN");

workers.Add(worker);
}

return workers;
}

internal static List<SubscriptionWorker> GetOrderedWorkers(Dictionary<String, IDomainEventHandlerResolver> eventHandlerResolvers,
ISubscriptionRepository subscriptionRepository,
SubscriptionWorkersRoot configuration,
String eventStoreConnectionString,
SubscriptionWorkerConfig configurationSubscriptionWorker,
Action<TraceEventType, String, String> traceHandler) {
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Ordered");
List<SubscriptionWorker> workers = new();
if (ehr.Value == null)
return workers;
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration,
eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "ORDERED");
workers.Add(worker);
return workers;
}

internal static List<SubscriptionWorker> GetMainWorkers(Dictionary<String, IDomainEventHandlerResolver> eventHandlerResolvers,
ISubscriptionRepository subscriptionRepository,
SubscriptionWorkersRoot configuration,
String eventStoreConnectionString,
SubscriptionWorkerConfig configurationSubscriptionWorker,
Action<TraceEventType, String, String> traceHandler) {
List<SubscriptionWorker> workers = new();
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Main");
if (ehr.Value == null)
return workers;
for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++)
{
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration,
eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "MAIN");

workers.Add(worker);
}

return workers;
}

internal static List<SubscriptionWorker> ConfigureSubscriptions(ISubscriptionRepository subscriptionRepository,
SubscriptionWorkersRoot configuration,
String eventStoreConnectionString,
Dictionary<String, IDomainEventHandlerResolver> eventHandlerResolvers,
Action<TraceEventType, String, String> traceHandler) {
SubscriptionWorkersRoot configuration,
String eventStoreConnectionString,
Dictionary<String, IDomainEventHandlerResolver> eventHandlerResolvers,
Action<TraceEventType, String, String> traceHandler) {
List<SubscriptionWorker> workers = new();

foreach (SubscriptionWorkerConfig configurationSubscriptionWorker in configuration.SubscriptionWorkers) {
if (!configurationSubscriptionWorker.Enabled)
continue;

if (configurationSubscriptionWorker.IsOrdered) {
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Ordered");
List<SubscriptionWorker> workersList = configurationSubscriptionWorker switch {
_ when configurationSubscriptionWorker.IsDomainOnly => GetDomainWorker(eventHandlerResolvers, subscriptionRepository, configuration, eventStoreConnectionString, configurationSubscriptionWorker, traceHandler),
_ when configurationSubscriptionWorker.IsOrdered => GetOrderedWorkers(eventHandlerResolvers, subscriptionRepository, configuration, eventStoreConnectionString, configurationSubscriptionWorker, traceHandler),
_ => GetMainWorkers(eventHandlerResolvers, subscriptionRepository, configuration, eventStoreConnectionString, configurationSubscriptionWorker, traceHandler),
};

if (ehr.Value != null) {
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration,
eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "ORDERED");
workers.Add(worker);
}
}
else if (configurationSubscriptionWorker.IsDomainOnly) {
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Domain");

if (ehr.Value != null)
{
for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++) {
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration,
eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "DOMAIN");

workers.Add(worker);
}
}
}
else {
KeyValuePair<String, IDomainEventHandlerResolver> ehr = eventHandlerResolvers.SingleOrDefault(e => e.Key == "Main");
if (ehr.Value != null)
{
for (Int32 i = 0; i < configurationSubscriptionWorker.InstanceCount; i++)
{
SubscriptionWorker worker = ConfigureSubscriptionWorker(subscriptionRepository, configuration,
eventStoreConnectionString, traceHandler, ehr, configurationSubscriptionWorker, "MAIN");

workers.Add(worker);
}
}
}
workers.AddRange(workersList);
}

return workers;
Expand Down
Loading