Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions Shared.EventStore.Tests/EventDispatchHelperTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Shared.EventStore.Tests;
using SimpleResults;

namespace Shared.EventStore.Tests;

using System;
using System.Collections.Generic;
Expand All @@ -18,9 +20,8 @@ public async Task EventDispatchHelper_DispatchToHandlers_AllSuccessful(){
AggregateNameSetEvent @event = new AggregateNameSetEvent(TestData.AggregateId, TestData.EventId, TestData.EstateName);
List<IDomainEventHandler> handlers = new List<IDomainEventHandler>();
handlers.Add(new TestDomainEventHandler());
Should.NotThrow(async () => {
await @event.DispatchToHandlers(handlers, CancellationToken.None);
});
Result result = await @event.DispatchToHandlers(handlers, CancellationToken.None);
result.IsSuccess.ShouldBeTrue();
}

[Fact]
Expand All @@ -29,10 +30,15 @@ public async Task EventDispatchHelper_DispatchToHandlers_HandlerThrowsException_
AggregateNameSetEvent @event = new AggregateNameSetEvent(TestData.AggregateId, TestData.EventId, TestData.EstateName);
List<IDomainEventHandler> handlers = new List<IDomainEventHandler>();
Mock<IDomainEventHandler> domainEventHandler = new Mock<IDomainEventHandler>();
domainEventHandler.Setup(s => s.Handle(It.IsAny<IDomainEvent>(), It.IsAny<CancellationToken>())).Throws<Exception>();
domainEventHandler.Setup(s => s.Handle(It.IsAny<IDomainEvent>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Result.Failure);
Mock<IDomainEventHandler> domainEventHandler2 = new Mock<IDomainEventHandler>();
domainEventHandler2.Setup(s => s.Handle(It.IsAny<IDomainEvent>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Result.Failure);
handlers.Add(domainEventHandler.Object);
Should.Throw<Exception>(async () => {
await @event.DispatchToHandlers(handlers, CancellationToken.None);
});
handlers.Add(domainEventHandler2.Object);
Result dispatchResult = await @event.DispatchToHandlers(handlers, CancellationToken.None);
dispatchResult.IsFailed.ShouldBeTrue();

}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using SimpleResults;

namespace Shared.EventStore.Tests.TestObjects;

using System.Collections.Generic;
Expand All @@ -10,8 +12,9 @@ public class TestDomainEventHandler : IDomainEventHandler
{
public List<IDomainEvent> DomainEvents = new();

public async Task Handle(IDomainEvent domainEvent, CancellationToken cancellationToken)
public async Task<Result> Handle(IDomainEvent domainEvent, CancellationToken cancellationToken)
{
DomainEvents.Add(domainEvent);
return Result.Success();
}
}
6 changes: 4 additions & 2 deletions Shared.EventStore/EventHandling/IDomainEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Shared.EventStore.EventHandling
using SimpleResults;

namespace Shared.EventStore.EventHandling
{
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -14,7 +16,7 @@ public interface IDomainEventHandler
/// <param name="domainEvent">The domain event.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
Task Handle(IDomainEvent domainEvent,
Task<Result> Handle(IDomainEvent domainEvent,
CancellationToken cancellationToken);

#endregion
Expand Down
8 changes: 5 additions & 3 deletions Shared.EventStore/ProjectionEngine/EventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Shared.EventStore.ProjectionEngine;
using SimpleResults;

namespace Shared.EventStore.ProjectionEngine;

using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -36,14 +38,14 @@ public EventHandler(Func<String, IDomainEventHandler> resolver){

#region Methods

public async Task Handle(IDomainEvent domainEvent,
public async Task<Result> Handle(IDomainEvent domainEvent,
CancellationToken cancellationToken){
// Lookup the event type in the config
String handlerType = ConfigurationReader.GetValue("AppSettings:EventStateConfig", domainEvent.GetType().Name);

IDomainEventHandler handler = this.Resolver(handlerType);

await handler.Handle(domainEvent, cancellationToken);
return await handler.Handle(domainEvent, cancellationToken);
}

#endregion
Expand Down
23 changes: 17 additions & 6 deletions Shared.EventStore/SubscriptionWorker/EventDispatchHelper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace Shared.EventStore.SubscriptionWorker
using Shared.Exceptions;
using SimpleResults;

namespace Shared.EventStore.SubscriptionWorker
{
using System;
using System.Collections.Generic;
Expand All @@ -18,25 +21,33 @@ public static class EventDispatchHelper
/// <param name="event">The event.</param>
/// <param name="eventHandlers">The event handlers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task DispatchToHandlers(this IDomainEvent @event,
public static async Task<Result> DispatchToHandlers(this IDomainEvent @event,
List<IDomainEventHandler> eventHandlers,
CancellationToken cancellationToken)
{
// Now execute all the tasks
Task all = Task.WhenAll(eventHandlers.Select(x => x.Handle(@event, cancellationToken)));
Task<Result[]> all = Task.WhenAll(eventHandlers.Select(x => x.Handle(@event, cancellationToken)));

try
{
await all;
Result[] results = await all;
if (results.Any(r => r.IsFailed)) {
IEnumerable<String> failedResults = results.Where(r => r.IsFailed).Select(r => r.Message);
String errors = String.Join(Environment.NewLine, failedResults);
// We have a failed result so need to do something with it
return Result.Failure($"One or more event handlers have failed. Error Messages [{errors}]");
}
}
catch (Exception)
catch (Exception ex)
{
if (all.Exception != null)
{
Logger.Logger.LogError(all.Exception);
throw all.Exception;
return Result.Failure(all.Exception.GetExceptionMessages());
}
return Result.Failure(ex.GetExceptionMessages());
}
return Result.Success();
}

#endregion
Expand Down
16 changes: 11 additions & 5 deletions Shared.EventStore/SubscriptionWorker/PersistentSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Shared.EventStore.SubscriptionWorker
using SimpleResults;

namespace Shared.EventStore.SubscriptionWorker
{
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -128,13 +130,17 @@ internal static async Task EventAppeared(global::EventStore.Client.PersistentSub
// Log a warning out
Logger.Logger.LogWarning(
$"No event handlers configured for Event Type [{domainEvent.GetType().Name}]");
await PersistentSubscriptionsHelper.AckEvent(persistentSubscription, resolvedEvent);
return;
}

await domainEvent.DispatchToHandlers(domainEventHandlers, cts.Token);

await PersistentSubscriptionsHelper.AckEvent(persistentSubscription, resolvedEvent);
Result result = await domainEvent.DispatchToHandlers(domainEventHandlers, cts.Token);
if (result.IsSuccess) {
await PersistentSubscriptionsHelper.AckEvent(persistentSubscription, resolvedEvent);
}
else {
Exception ex = new($"Failed to process the event type {resolvedEvent.Event.EventType} {resolvedEvent.GetResolvedEventDataAsString()} Result was {result.Message}");
Logger.Logger.LogError(ex);
}
}
}
catch (Exception e)
Expand Down