diff --git a/.github/workflows/preview.yml b/.github/workflows/preview.yml
index 2409393f4..6bf955305 100644
--- a/.github/workflows/preview.yml
+++ b/.github/workflows/preview.yml
@@ -43,7 +43,5 @@ jobs:
-
name: Create and push NuGet package
run: |
- dotnet pack -c Debug -o nuget
- dotnet nuget push nuget/**/*.nupkg --skip-duplicate --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v3/index.json
- env:
- NUGET_AUTH_TOKEN: ${{ github.token }}
+ dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg
+ dotnet nuget push nuget/**/*.nupkg --api-key "${{ secrets.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 9e7b8a9d1..99cbe7b6d 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -11,6 +11,9 @@ on:
jobs:
nuget:
runs-on: [ self-hosted, type-cpx52, setup-docker, volume-cache-50GB ]
+ permissions:
+ id-token: write
+ contents: read
env:
NUGET_PACKAGES: "/mnt/cache/.nuget/packages"
DOTNET_INSTALL_DIR: "/mnt/cache/.dotnet"
@@ -48,11 +51,14 @@ jobs:
files: |
test-results/**/*.xml
test-results/**/*.trx
+ -
+ name: NuGet trusted publishing login
+ id: nuget-login
+ uses: NuGet/login@v1
+ with:
+ user: ${{ vars.NUGET_USER || 'Eventuous' }}
-
name: Create and push NuGet package
run: |
dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg
- dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate
- dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v2/package --skip-duplicate
- env:
- NUGET_AUTH_TOKEN: ${{ github.token }}
+ dotnet nuget push nuget/**/*.nupkg --api-key "${{ steps.nuget-login.outputs.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 4b5fb3e1b..59f4734dd 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -41,7 +41,8 @@
-
+
+
@@ -105,12 +106,12 @@
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs b/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs
index f502f861c..64588a6de 100644
--- a/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs
+++ b/src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs
@@ -25,20 +25,34 @@ public record SpyglassLoadResult(object State, SpyglassEventInfo[] Events);
public record SpyglassEventInfo(string EventType, object? Payload);
public static class SpyglassRegistry {
- static readonly List Aggregates = [];
-
- public static void Register(SpyglassAggregateInfo info)
- => Aggregates.Add(info with { Id = Guid.NewGuid() });
+ // Module initializers from different assemblies can call Register concurrently when the assemblies
+ // are loaded by parallel test fixtures (or any parallel host startup), so the backing store has to
+ // be thread-safe. Reads also enumerate the snapshot, so we publish a fresh array on every write.
+ static SpyglassAggregateInfo[] _aggregates = [];
+ static readonly object _lock = new();
+
+ public static void Register(SpyglassAggregateInfo info) {
+ lock (_lock) {
+ var entry = info with { Id = Guid.NewGuid() };
+ var next = new SpyglassAggregateInfo[_aggregates.Length + 1];
+ Array.Copy(_aggregates, next, _aggregates.Length);
+ next[^1] = entry;
+ _aggregates = next;
+ }
+ }
public static SpyglassAggregateEntry[] GetAggregates()
- => Aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray();
+ => _aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray();
public static SpyglassAggregateInfo? FindById(Guid id)
- => Aggregates.FirstOrDefault(x => x.Id == id);
+ => Array.Find(_aggregates, x => x.Id == id);
+
+ public static SpyglassAggregateInfo? FindByTypeName(string typeName) {
+ var snapshot = _aggregates;
- public static SpyglassAggregateInfo? FindByTypeName(string typeName)
- => Aggregates.FirstOrDefault(x => x.AggregateType == typeName)
- ?? Aggregates.FirstOrDefault(x => StripStateSuffix(x.StateType) == typeName);
+ return Array.Find(snapshot, x => x.AggregateType == typeName)
+ ?? Array.Find(snapshot, x => StripStateSuffix(x.StateType) == typeName);
+ }
static string StripStateSuffix(string s)
=> s.EndsWith("State") && s.Length > 5 ? s[..^5] : s;
diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj b/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj
index 5e4a2379d..5fc6fa764 100644
--- a/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj
+++ b/src/KurrentDB/src/Eventuous.KurrentDB/Eventuous.KurrentDB.csproj
@@ -7,6 +7,9 @@
+
+
diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs
index 691080504..c8506018b 100644
--- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs
+++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs
@@ -185,9 +185,11 @@ async ValueTask Nack(MessageConsumeContext ctx, Exception exception) {
return;
}
- ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception);
-
- if (Options.ThrowOnError) throw exception;
+ // Handler-pipeline failures are already logged via context.Nack inside EventSubscription.Handler.
+ // Anything else reaching here (e.g. an Ack failure after the handler returned) needs its own log entry.
+ if (!ctx.HasFailed()) {
+ ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception);
+ }
var re = ctx.Items.GetItem(ResolvedEventKey);
var subscription = ctx.Items.GetItem(SubscriptionKey)!;
@@ -248,6 +250,12 @@ static Task DefaultEventProcessingFailureHandler(
PersistentSubscription subscription,
ResolvedEvent resolvedEvent,
Exception exception
- )
- => subscription.Nack(PersistentSubscriptionNakEventAction.Retry, exception.Message, resolvedEvent);
+ ) {
+ // When ThrowOnError is enabled, Handler wraps the original exception in SubscriptionException;
+ // unwrap it so the parked-message reason carries the actual handler error rather than a generic
+ // "Error processing event ..." string.
+ var cause = exception is SubscriptionException { InnerException: { } inner } ? inner : exception;
+
+ return subscription.Nack(PersistentSubscriptionNakEventAction.Retry, cause.Message, resolvedEvent);
+ }
}
diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs
index 594484dd7..6a3482452 100644
--- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs
+++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/Fixtures/PersistentSubscriptionFixture.cs
@@ -2,6 +2,7 @@
using Eventuous.KurrentDB.Producers;
using Eventuous.KurrentDB.Subscriptions;
using Eventuous.Tests.Subscriptions.Base;
+using KurrentDB.Client;
using LoggingExtensions = Eventuous.TestHelpers.TUnit.Logging.LoggingExtensions;
namespace Eventuous.Tests.KurrentDB.Subscriptions.Fixtures;
@@ -15,12 +16,14 @@ public class PersistentSubscriptionFixture(
where THandler : class, IEventHandler
where TSubscription : PersistentSubscriptionBase
where TOptions : PersistentSubscriptionOptions {
- public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}");
- public THandler Handler { get; } = handler;
- public KurrentDBProducer Producer { get; private set; } = null!;
- protected ILogger Log { get; set; } = null!;
- protected StoreFixture Fixture { get; } = new(logLevel);
- TSubscription Subscription { get; set; } = null!;
+ public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}");
+ public THandler Handler { get; } = handler;
+ public KurrentDBProducer Producer { get; private set; } = null!;
+ public string SubscriptionId { get; private set; } = null!;
+ public KurrentDBClient Client => Fixture.Client;
+ protected ILogger Log { get; set; } = null!;
+ protected StoreFixture Fixture { get; } = new(logLevel);
+ TSubscription Subscription { get; set; } = null!;
public ValueTask Start() => Subscription.SubscribeWithLog(Log);
@@ -32,13 +35,13 @@ public async ValueTask InitializeAsync() {
Fixture.TypeMapper.RegisterKnownEventTypes(typeof(TestEvent).Assembly);
await Fixture.InitializeAsync();
Producer = new(Fixture.Client);
- var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel);
- var subscriptionId = $"test-{Guid.NewGuid():N}";
- Log = loggerFactory.CreateLogger(GetType());
+ var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel);
+ SubscriptionId = $"test-{Guid.NewGuid():N}";
+ Log = loggerFactory.CreateLogger(GetType());
_listener = new(loggerFactory);
- Subscription = subscriptionFactory(subscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory);
+ Subscription = subscriptionFactory(SubscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory);
if (autoStart) await Start();
}
diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs
new file mode 100644
index 000000000..ba723f13c
--- /dev/null
+++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs
@@ -0,0 +1,111 @@
+using Eventuous.KurrentDB.Subscriptions;
+using Eventuous.Producers;
+using Eventuous.Subscriptions.Context;
+using Eventuous.Subscriptions.Filters;
+using Eventuous.Tests.KurrentDB.Subscriptions.Fixtures;
+using Eventuous.Tests.Subscriptions.Base;
+using KurrentDB.Client;
+
+namespace Eventuous.Tests.KurrentDB.Subscriptions;
+
+public class PersistentSubscriptionFailureTests {
+ [Test]
+ [Category("Persistent subscription")]
+ public async Task Esdb_ShouldParkMessageWhenHandlerFails(CancellationToken cancellationToken) {
+ var fixture = new PersistentSubscriptionFixture(
+ new(),
+ CreateSubscription,
+ autoStart: false
+ );
+
+ await fixture.InitializeAsync();
+ var started = false;
+
+ try {
+ await fixture.Start();
+ started = true;
+
+ var testEvent = TestEvent.Create();
+ await fixture.Producer.Produce(fixture.Stream, testEvent, new(), cancellationToken: cancellationToken);
+
+ var parkedStream = $"$persistentsubscription-{fixture.Stream}::{fixture.SubscriptionId}-parked";
+ var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken)
+ ?? throw new TimeoutException($"No event was parked on {parkedStream} within the timeout");
+
+ await Assert.That(parked.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString());
+ await Assert.That(parked.Event.EventType).IsEqualTo(TestEvent.TypeName);
+ await Assert.That(fixture.Handler.Failures).IsGreaterThan(0);
+ } finally {
+ // Fixture only auto-stops when autoStart is true, so stop explicitly to avoid leaking the subscription.
+ if (started) await fixture.Stop();
+ await fixture.DisposeAsync();
+ }
+ }
+
+ static StreamPersistentSubscription CreateSubscription(string id, string connectionString, StreamName stream, AlwaysFailingHandler handler, ILoggerFactory loggerFactory) {
+ var settings = KurrentDBClientSettings.Create(connectionString);
+
+ return new(
+ new KurrentDBClient(settings),
+ new() {
+ StreamName = stream,
+ SubscriptionId = id,
+ ThrowOnError = true,
+ SubscriptionSettings = new PersistentSubscriptionSettings(
+ resolveLinkTos: false,
+ messageTimeout: TimeSpan.FromSeconds(2),
+ maxRetryCount: 0
+ )
+ },
+ new ConsumePipe().AddDefaultConsumer(handler),
+ loggerFactory
+ );
+ }
+
+ static async Task ReadFirstParkedEvent(KurrentDBClient client, string parkedStream, TimeSpan timeout, CancellationToken cancellationToken) {
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(timeout);
+
+ while (!cts.Token.IsCancellationRequested) {
+ try {
+ var read = client.ReadStreamAsync(
+ Direction.Forwards,
+ parkedStream,
+ StreamPosition.Start,
+ maxCount: 1,
+ resolveLinkTos: true,
+ cancellationToken: cts.Token
+ );
+ var state = await read.ReadState;
+
+ if (state == ReadState.Ok) {
+ await foreach (var resolved in read) {
+ return resolved;
+ }
+ }
+ } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) {
+ return null;
+ }
+
+ try {
+ await Task.Delay(200, cts.Token);
+ } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) {
+ return null;
+ }
+ }
+
+ return null;
+ }
+}
+
+public class AlwaysFailingHandler : BaseEventHandler {
+ int _failures;
+
+ public int Failures => Volatile.Read(ref _failures);
+
+ public override ValueTask HandleEvent(IMessageConsumeContext context) {
+ Interlocked.Increment(ref _failures);
+
+ throw new InvalidOperationException("Simulated handler failure");
+ }
+}