From 33560109d37caa4ed096a675724c0926251b4372 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 17:45:45 +0200 Subject: [PATCH 1/6] Fix persistent subscription failure handler never being invoked When the handler throws and `ThrowOnError = true`, `PersistentSubscriptionBase.Nack` re-threw the exception before reaching `_handleEventProcessingFailure`. Because `Nack` is only ever entered through that re-throw path, the failure handler was unreachable: the subscription dropped on every failure instead of issuing a server-side Nack and letting KurrentDB retry/park the message. Removing the early throw lets the default failure handler send `Nack(Retry, ...)` so the server retries up to `MaxRetryCount` and parks afterwards, while the subscription stays connected. Adds an integration test that produces an event, fails the handler with `maxRetryCount: 0`, and asserts the message lands on `$persistentsubscription-{stream}::{group}-parked` (read with `resolveLinkTos: true`). Closes #544 --- .../PersistentSubscriptionBase.cs | 4 - .../Fixtures/PersistentSubscriptionFixture.cs | 23 ++-- .../PersistentSubscriptionFailureTests.cs | 107 ++++++++++++++++++ 3 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs index 691080504..bac8c0e76 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs @@ -185,10 +185,6 @@ async ValueTask Nack(MessageConsumeContext ctx, Exception exception) { return; } - ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception); - - if (Options.ThrowOnError) throw exception; - var re = ctx.Items.GetItem(ResolvedEventKey); var subscription = ctx.Items.GetItem(SubscriptionKey)!; await _handleEventProcessingFailure(Client, subscription, re, exception).NoContext(); diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs index 594484dd7..6a3482452 100644 --- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs @@ -2,6 +2,7 @@ using Eventuous.KurrentDB.Producers; using Eventuous.KurrentDB.Subscriptions; using Eventuous.Tests.Subscriptions.Base; +using KurrentDB.Client; using LoggingExtensions = Eventuous.TestHelpers.TUnit.Logging.LoggingExtensions; namespace Eventuous.Tests.KurrentDB.Subscriptions.Fixtures; @@ -15,12 +16,14 @@ public class PersistentSubscriptionFixture( where THandler : class, IEventHandler where TSubscription : PersistentSubscriptionBase where TOptions : PersistentSubscriptionOptions { - public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}"); - public THandler Handler { get; } = handler; - public KurrentDBProducer Producer { get; private set; } = null!; - protected ILogger Log { get; set; } = null!; - protected StoreFixture Fixture { get; } = new(logLevel); - TSubscription Subscription { get; set; } = null!; + public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}"); + public THandler Handler { get; } = handler; + public KurrentDBProducer Producer { get; private set; } = null!; + public string SubscriptionId { get; private set; } = null!; + public KurrentDBClient Client => Fixture.Client; + protected ILogger Log { get; set; } = null!; + protected StoreFixture Fixture { get; } = new(logLevel); + TSubscription Subscription { get; set; } = null!; public ValueTask Start() => Subscription.SubscribeWithLog(Log); @@ -32,13 +35,13 @@ public async ValueTask InitializeAsync() { Fixture.TypeMapper.RegisterKnownEventTypes(typeof(TestEvent).Assembly); await Fixture.InitializeAsync(); Producer = new(Fixture.Client); - var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel); - var subscriptionId = $"test-{Guid.NewGuid():N}"; - Log = loggerFactory.CreateLogger(GetType()); + var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel); + SubscriptionId = $"test-{Guid.NewGuid():N}"; + Log = loggerFactory.CreateLogger(GetType()); _listener = new(loggerFactory); - Subscription = subscriptionFactory(subscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory); + Subscription = subscriptionFactory(SubscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory); if (autoStart) await Start(); } diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs new file mode 100644 index 000000000..ca1418800 --- /dev/null +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs @@ -0,0 +1,107 @@ +using Eventuous.KurrentDB.Subscriptions; +using Eventuous.Producers; +using Eventuous.Subscriptions.Context; +using Eventuous.Subscriptions.Filters; +using Eventuous.Tests.KurrentDB.Subscriptions.Fixtures; +using Eventuous.Tests.Subscriptions.Base; +using KurrentDB.Client; + +namespace Eventuous.Tests.KurrentDB.Subscriptions; + +public class PersistentSubscriptionFailureTests { + [Test] + [Category("Persistent subscription")] + public async Task Esdb_ShouldParkMessageWhenHandlerFails(CancellationToken cancellationToken) { + var fixture = new PersistentSubscriptionFixture( + new(), + CreateSubscription, + autoStart: false + ); + + await fixture.InitializeAsync(); + + try { + await fixture.Start(); + + var testEvent = TestEvent.Create(); + await fixture.Producer.Produce(fixture.Stream, testEvent, new(), cancellationToken: cancellationToken); + + var parkedStream = $"$persistentsubscription-{fixture.Stream}::{fixture.SubscriptionId}-parked"; + var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken); + + await Assert.That(parked).IsNotNull(); + await Assert.That(parked!.Value.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString()); + await Assert.That(parked.Value.Event.EventType).IsEqualTo(TestEvent.TypeName); + await Assert.That(fixture.Handler.Failures).IsGreaterThan(0); + } finally { + await fixture.DisposeAsync(); + } + } + + static StreamPersistentSubscription CreateSubscription(string id, string connectionString, StreamName stream, AlwaysFailingHandler handler, ILoggerFactory loggerFactory) { + var settings = KurrentDBClientSettings.Create(connectionString); + + return new( + new KurrentDBClient(settings), + new() { + StreamName = stream, + SubscriptionId = id, + ThrowOnError = true, + SubscriptionSettings = new PersistentSubscriptionSettings( + resolveLinkTos: false, + messageTimeout: TimeSpan.FromSeconds(2), + maxRetryCount: 0 + ) + }, + new ConsumePipe().AddDefaultConsumer(handler), + loggerFactory + ); + } + + static async Task ReadFirstParkedEvent(KurrentDBClient client, string parkedStream, TimeSpan timeout, CancellationToken cancellationToken) { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + while (!cts.Token.IsCancellationRequested) { + try { + var read = client.ReadStreamAsync( + Direction.Forwards, + parkedStream, + StreamPosition.Start, + maxCount: 1, + resolveLinkTos: true, + cancellationToken: cts.Token + ); + var state = await read.ReadState; + + if (state == ReadState.Ok) { + await foreach (var resolved in read) { + return resolved; + } + } + } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { + return null; + } + + try { + await Task.Delay(200, cts.Token); + } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { + return null; + } + } + + return null; + } +} + +public class AlwaysFailingHandler : BaseEventHandler { + int _failures; + + public int Failures => Volatile.Read(ref _failures); + + public override ValueTask HandleEvent(IMessageConsumeContext context) { + Interlocked.Increment(ref _failures); + + throw new InvalidOperationException("Simulated handler failure"); + } +} From 40756d6489e95f3c3442b01652d0e97762af1685 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 17:51:38 +0200 Subject: [PATCH 2/6] Bump OpenTelemetry to 1.15.x and switch preview pushes to nuget.org - Updates OpenTelemetry packages from 1.13.x to 1.15.x to clear GHSA-g94r-2vxg-569j (excessive memory allocation in baggage/B3/Jaeger propagation header parsing). 1.15.3 is the first patched version per the advisory. - Bumps OpenTelemetry.Instrumentation.AspNetCore to 1.15.2 (latest stable) and OpenTelemetry.Instrumentation.GrpcNetClient to 1.15.1-beta.1 (latest beta). - Switches the preview workflow to publish to nuget.org with NUGET_API_KEY, mirroring the release pipeline. MyGet is currently unreachable, and dev-branch builds are MinVer-tagged as previews so they're safe to push to nuget.org. --- .github/workflows/preview.yml | 4 ++-- Directory.Packages.props | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/preview.yml b/.github/workflows/preview.yml index 2409393f4..e1d283172 100644 --- a/.github/workflows/preview.yml +++ b/.github/workflows/preview.yml @@ -43,7 +43,7 @@ jobs: - name: Create and push NuGet package run: | - dotnet pack -c Debug -o nuget - dotnet nuget push nuget/**/*.nupkg --skip-duplicate --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v3/index.json + dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg + dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate env: NUGET_AUTH_TOKEN: ${{ github.token }} diff --git a/Directory.Packages.props b/Directory.Packages.props index 4b5fb3e1b..852bb28a0 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -41,7 +41,7 @@ - + @@ -105,12 +105,12 @@ - - - - - - + + + + + + From 0c81f95172681407a9b930931f3c9a6e29fb1be0 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 17:54:36 +0200 Subject: [PATCH 3/6] Switch release pipeline to NuGet trusted publishing and drop MyGet - Drops the MyGet push from publish.yml; MyGet is unreachable and the release pipeline now publishes only to nuget.org. - Replaces the long-lived NUGET_API_KEY secret with NuGet OIDC trusted publishing via the NuGet/login@v1 action. Adds id-token: write at the job level so the runner can mint the OIDC token for the trust profile configured on nuget.org. - NuGet user defaults to 'Eventuous' but is overridable via the NUGET_USER repository variable. Once this is green the NUGET_API_KEY and MYGET_* secrets/variables can be revoked from the repository settings. --- .github/workflows/publish.yml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 9e7b8a9d1..a3cad1292 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -11,6 +11,9 @@ on: jobs: nuget: runs-on: [ self-hosted, type-cpx52, setup-docker, volume-cache-50GB ] + permissions: + id-token: write + contents: read env: NUGET_PACKAGES: "/mnt/cache/.nuget/packages" DOTNET_INSTALL_DIR: "/mnt/cache/.dotnet" @@ -48,11 +51,16 @@ jobs: files: | test-results/**/*.xml test-results/**/*.trx + - + name: NuGet trusted publishing login + id: nuget-login + uses: NuGet/login@v1 + with: + user: ${{ vars.NUGET_USER || 'Eventuous' }} - name: Create and push NuGet package run: | dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg - dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate - dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v2/package --skip-duplicate + dotnet nuget push nuget/**/*.nupkg --api-key ${{ steps.nuget-login.outputs.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate env: NUGET_AUTH_TOKEN: ${{ github.token }} From ff6444836fed893e087e48292c5335787433b5e2 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 17:57:39 +0200 Subject: [PATCH 4/6] Tidy NuGet push steps in preview/publish workflows - Quote the api-key substitution to be defensive against future key formats. - Drop the leftover NUGET_AUTH_TOKEN env. It's a NuGet config-provider variable, not consumed by `dotnet nuget push` against nuget.org, and next to a trusted-publishing step it's actively misleading. --- .github/workflows/preview.yml | 4 +--- .github/workflows/publish.yml | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/.github/workflows/preview.yml b/.github/workflows/preview.yml index e1d283172..6bf955305 100644 --- a/.github/workflows/preview.yml +++ b/.github/workflows/preview.yml @@ -44,6 +44,4 @@ jobs: name: Create and push NuGet package run: | dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg - dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate - env: - NUGET_AUTH_TOKEN: ${{ github.token }} + dotnet nuget push nuget/**/*.nupkg --api-key "${{ secrets.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index a3cad1292..99cbe7b6d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -61,6 +61,4 @@ jobs: name: Create and push NuGet package run: | dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg - dotnet nuget push nuget/**/*.nupkg --api-key ${{ steps.nuget-login.outputs.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate - env: - NUGET_AUTH_TOKEN: ${{ github.token }} + dotnet nuget push nuget/**/*.nupkg --api-key "${{ steps.nuget-login.outputs.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate From 7b8e35bc85bc424e5d6bcffd4576659ed8407fab Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 18:06:06 +0200 Subject: [PATCH 5/6] Address review feedback on PR #546 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three real issues from reviewers, plus one transitive-vuln cleanup. Skipped: the suggestion to add `.NoContext()` everywhere in the new test — that convention is for library async paths, not test code. * Log Ack-stage failures in PersistentSubscriptionBase.Nack Handler-pipeline failures are still logged exactly once (via `context.Nack` inside `EventSubscription.Handler`); anything that reaches `Nack` without `HasFailed()` (e.g. an `Ack` throwing after the handler returned) now gets its own `MessageHandlingFailed` entry, so it no longer fails silently before being routed to the failure handler. * Unwrap SubscriptionException in DefaultEventProcessingFailureHandler When `ThrowOnError = true`, `Handler` wraps the original exception in `SubscriptionException`. The default failure handler used `exception.Message` for the Nack reason, which made parked messages carry the generic "Error processing event ..." string. Now it prefers `InnerException.Message` when available so the parked reason carries the actual handler error. * Stop the persistent subscription before disposing in the new test The fixture only auto-stops when `autoStart: true`, so the test was leaking a running subscription. Track whether `Start()` succeeded and call `Stop()` in the `finally` before `DisposeAsync()`. Also tighten the parked-event helper to throw `TimeoutException` on timeout instead of returning null, removing the awkward `parked!.Value` chain. * Pin OpenTelemetry.Api in Eventuous.KurrentDB KurrentDB.Client transitively pulls OpenTelemetry.Api 1.12.0, which is also in scope for GHSA-g94r-2vxg-569j. Adding a direct PackageReference forces the central pinned 1.15.3 across Eventuous.KurrentDB and every project that consumes it, clearing the remaining NU1902 warnings without enabling repo-wide transitive pinning (which cascades into unrelated framework-version downgrades). --- Directory.Packages.props | 1 + .../Eventuous.KurrentDB.csproj | 3 +++ .../Subscriptions/PersistentSubscriptionBase.cs | 16 ++++++++++++++-- .../PersistentSubscriptionFailureTests.cs | 12 ++++++++---- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 852bb28a0..59f4734dd 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -41,6 +41,7 @@ + diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj b/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj index 5e4a2379d..5fc6fa764 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj @@ -7,6 +7,9 @@ + + diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs index bac8c0e76..c8506018b 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs @@ -185,6 +185,12 @@ async ValueTask Nack(MessageConsumeContext ctx, Exception exception) { return; } + // Handler-pipeline failures are already logged via context.Nack inside EventSubscription.Handler. + // Anything else reaching here (e.g. an Ack failure after the handler returned) needs its own log entry. + if (!ctx.HasFailed()) { + ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception); + } + var re = ctx.Items.GetItem(ResolvedEventKey); var subscription = ctx.Items.GetItem(SubscriptionKey)!; await _handleEventProcessingFailure(Client, subscription, re, exception).NoContext(); @@ -244,6 +250,12 @@ static Task DefaultEventProcessingFailureHandler( PersistentSubscription subscription, ResolvedEvent resolvedEvent, Exception exception - ) - => subscription.Nack(PersistentSubscriptionNakEventAction.Retry, exception.Message, resolvedEvent); + ) { + // When ThrowOnError is enabled, Handler wraps the original exception in SubscriptionException; + // unwrap it so the parked-message reason carries the actual handler error rather than a generic + // "Error processing event ..." string. + var cause = exception is SubscriptionException { InnerException: { } inner } ? inner : exception; + + return subscription.Nack(PersistentSubscriptionNakEventAction.Retry, cause.Message, resolvedEvent); + } } diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs index ca1418800..ba723f13c 100644 --- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs @@ -19,21 +19,25 @@ public async Task Esdb_ShouldParkMessageWhenHandlerFails(CancellationToken cance ); await fixture.InitializeAsync(); + var started = false; try { await fixture.Start(); + started = true; var testEvent = TestEvent.Create(); await fixture.Producer.Produce(fixture.Stream, testEvent, new(), cancellationToken: cancellationToken); var parkedStream = $"$persistentsubscription-{fixture.Stream}::{fixture.SubscriptionId}-parked"; - var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken); + var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken) + ?? throw new TimeoutException($"No event was parked on {parkedStream} within the timeout"); - await Assert.That(parked).IsNotNull(); - await Assert.That(parked!.Value.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString()); - await Assert.That(parked.Value.Event.EventType).IsEqualTo(TestEvent.TypeName); + await Assert.That(parked.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString()); + await Assert.That(parked.Event.EventType).IsEqualTo(TestEvent.TypeName); await Assert.That(fixture.Handler.Failures).IsGreaterThan(0); } finally { + // Fixture only auto-stops when autoStart is true, so stop explicitly to avoid leaking the subscription. + if (started) await fixture.Stop(); await fixture.DisposeAsync(); } } From 8f209897e78c7a75858509369c07a76b8ab1b11f Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 29 Apr 2026 18:14:42 +0200 Subject: [PATCH 6/6] Make SpyglassRegistry thread-safe `SpyglassRegistry.Aggregates` was a plain `List` mutated from `[ModuleInitializer]` methods. When multiple sample assemblies are referenced from the same test process and TUnit loads/initialises them in parallel (which happens whenever more than one test forces an assembly via `RuntimeHelpers.RunModuleConstructor`), the concurrent `List.Add` calls race and entries are lost. That's the root cause of the flaky `Aggregates_contains_payment_state_as_standalone` test seen on the .NET 8 leg of CI: the `PaymentState` registration sometimes never made it into the list. Switch the backing store to a copy-on-write `SpyglassAggregateInfo[]` under a lock for writes, and read from the snapshot on every query. Lookups remain allocation-free for the hit path, registrations are serialised, and readers always see a consistent snapshot. --- .../Eventuous.Spyglass/SpyglassRegistry.cs | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs b/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs index f502f861c..64588a6de 100644 --- a/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs +++ b/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs @@ -25,20 +25,34 @@ public record SpyglassLoadResult(object State, SpyglassEventInfo[] Events); public record SpyglassEventInfo(string EventType, object? Payload); public static class SpyglassRegistry { - static readonly List Aggregates = []; - - public static void Register(SpyglassAggregateInfo info) - => Aggregates.Add(info with { Id = Guid.NewGuid() }); + // Module initializers from different assemblies can call Register concurrently when the assemblies + // are loaded by parallel test fixtures (or any parallel host startup), so the backing store has to + // be thread-safe. Reads also enumerate the snapshot, so we publish a fresh array on every write. + static SpyglassAggregateInfo[] _aggregates = []; + static readonly object _lock = new(); + + public static void Register(SpyglassAggregateInfo info) { + lock (_lock) { + var entry = info with { Id = Guid.NewGuid() }; + var next = new SpyglassAggregateInfo[_aggregates.Length + 1]; + Array.Copy(_aggregates, next, _aggregates.Length); + next[^1] = entry; + _aggregates = next; + } + } public static SpyglassAggregateEntry[] GetAggregates() - => Aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray(); + => _aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray(); public static SpyglassAggregateInfo? FindById(Guid id) - => Aggregates.FirstOrDefault(x => x.Id == id); + => Array.Find(_aggregates, x => x.Id == id); + + public static SpyglassAggregateInfo? FindByTypeName(string typeName) { + var snapshot = _aggregates; - public static SpyglassAggregateInfo? FindByTypeName(string typeName) - => Aggregates.FirstOrDefault(x => x.AggregateType == typeName) - ?? Aggregates.FirstOrDefault(x => StripStateSuffix(x.StateType) == typeName); + return Array.Find(snapshot, x => x.AggregateType == typeName) + ?? Array.Find(snapshot, x => StripStateSuffix(x.StateType) == typeName); + } static string StripStateSuffix(string s) => s.EndsWith("State") && s.Length > 5 ? s[..^5] : s;