Skip to content

Add IMessageBatch / IMessageOutbox plumbing for projection-emitted me…#98

Merged
jeremydmiller merged 1 commit into
mainfrom
feature/projection-message-outbox-84
May 14, 2026
Merged

Add IMessageBatch / IMessageOutbox plumbing for projection-emitted me…#98
jeremydmiller merged 1 commit into
mainfrom
feature/projection-message-outbox-84

Conversation

@jeremydmiller
Copy link
Copy Markdown
Member

…ssages

Closes #84.

Marten projections can publish side-effect messages on commit (transactionally with the projection write) via Wolverine.Marten's MartenToWolverineMessageBatch. Polecat's projection daemon had no equivalent — the IProjectionBatch.PublishMessageAsync hook was a Task.CompletedTask stub.

This adds the Polecat-side plumbing only — a future Wolverine.Polecat will implement IMessageOutbox to bridge into Wolverine's outgoing- message machinery. The user explicitly scoped this PR Polecat-only.

Added

  • Polecat.Events.Aggregation.IMessageBatch — extends JasperFx.Events.IMessageSink with BeforeCommitAsync(token) and AfterCommitAsync(token) hooks. The batch buffers messages in PublishAsync; the implementer chooses which hook to flush in based on the desired delivery guarantee.

  • Polecat.Events.Aggregation.IMessageOutbox — factory that vends a fresh IMessageBatch per projection daemon batch. The signature takes the public IDocumentSession (rather than internal DocumentSessionBase) so downstream integrations don't need InternalsVisibleTo; IDocumentSession already exposes ITransactionParticipantRegistrar for enrollment.

  • Polecat.Events.Aggregation.NulloMessageOutbox — singleton no-op default (drops messages, fires no hooks). Apps that don't integrate a message bus pay zero overhead.

  • EventStoreOptions.MessageOutbox property defaulting to NulloMessageOutbox. This is the registration point for downstream integrations.

Wired through PolecatProjectionBatch

  • New _messageBatch field + _messageBatchGate semaphore for thread-safe lazy initialization. Stays null when no projection in this batch publishes.

  • PublishMessageAsync(object, string) — was a Task.CompletedTask stub; now lazily creates the batch via IMessageOutbox.CreateBatch and forwards the message via cached generic IMessageSink.PublishAsync<T> MethodInfo. The reflection lookup is done once at static init; per-publish cost is one MakeGenericMethod + one Invoke (off the per-event hot path — only fires when a projection explicitly emits a side-effect).

  • New PublishMessageAsync(object, MessageMetadata) overload mirroring the same pattern; satisfies the JFx.Events.IProjectionBatch contract added in the canonical interface (default impl forwards tenant-only; we wire the metadata-aware path so downstream integrations can stamp correlation / causation / headers).

  • ExecuteAsync passes the snapshot batch into the resilience lambda's state tuple. BeforeCommitAsync fires inside the SQL transaction (right before tx.CommitAsync) so an at-least-once outbox can persist its envelopes in the same transaction. AfterCommitAsync fires outside the resilience pipeline (after success) so it does not re-fire on a transient SQL retry.

Tests

src/Polecat.Tests/Daemon/projection_message_outbox_tests.cs — 4 cases:

  • No publish in the batch → IMessageOutbox.CreateBatch is never called and no hooks fire (the common no-op path)
  • Publish creates the batch exactly once and forwards every published message with the right type and tenant id
  • BeforeCommit + AfterCommit hooks both fire after ExecuteAsync completes, in the right order (publish → before → after)
  • The metadata overload correctly extracts tenant id (default IMessageSink impl) and forwards through the same path

All 4 pass against SQL Server 2025 docker on net9.0.

…ssages

Closes #84.

Marten projections can publish side-effect messages on commit
(transactionally with the projection write) via Wolverine.Marten's
MartenToWolverineMessageBatch. Polecat's projection daemon had no
equivalent — the IProjectionBatch.PublishMessageAsync hook was a
Task.CompletedTask stub.

This adds the Polecat-side plumbing only — a future Wolverine.Polecat
will implement IMessageOutbox to bridge into Wolverine's outgoing-
message machinery. The user explicitly scoped this PR Polecat-only.

## Added

- `Polecat.Events.Aggregation.IMessageBatch` — extends `JasperFx.Events.IMessageSink`
  with `BeforeCommitAsync(token)` and `AfterCommitAsync(token)` hooks. The batch
  buffers messages in PublishAsync; the implementer chooses which hook to flush
  in based on the desired delivery guarantee.

- `Polecat.Events.Aggregation.IMessageOutbox` — factory that vends a fresh
  `IMessageBatch` per projection daemon batch. The signature takes the public
  `IDocumentSession` (rather than internal `DocumentSessionBase`) so downstream
  integrations don't need InternalsVisibleTo; `IDocumentSession` already exposes
  `ITransactionParticipantRegistrar` for enrollment.

- `Polecat.Events.Aggregation.NulloMessageOutbox` — singleton no-op default
  (drops messages, fires no hooks). Apps that don't integrate a message bus
  pay zero overhead.

- `EventStoreOptions.MessageOutbox` property defaulting to NulloMessageOutbox.
  This is the registration point for downstream integrations.

## Wired through PolecatProjectionBatch

- New `_messageBatch` field + `_messageBatchGate` semaphore for thread-safe
  lazy initialization. Stays null when no projection in this batch publishes.

- `PublishMessageAsync(object, string)` — was a Task.CompletedTask stub; now
  lazily creates the batch via `IMessageOutbox.CreateBatch` and forwards the
  message via cached generic `IMessageSink.PublishAsync<T>` MethodInfo. The
  reflection lookup is done once at static init; per-publish cost is one
  MakeGenericMethod + one Invoke (off the per-event hot path — only fires
  when a projection explicitly emits a side-effect).

- New `PublishMessageAsync(object, MessageMetadata)` overload mirroring the
  same pattern; satisfies the JFx.Events.IProjectionBatch contract added in
  the canonical interface (default impl forwards tenant-only; we wire the
  metadata-aware path so downstream integrations can stamp correlation /
  causation / headers).

- ExecuteAsync passes the snapshot batch into the resilience lambda's state
  tuple. `BeforeCommitAsync` fires inside the SQL transaction (right before
  `tx.CommitAsync`) so an at-least-once outbox can persist its envelopes in
  the same transaction. `AfterCommitAsync` fires outside the resilience
  pipeline (after success) so it does not re-fire on a transient SQL retry.

## Tests

`src/Polecat.Tests/Daemon/projection_message_outbox_tests.cs` — 4 cases:

- No publish in the batch → `IMessageOutbox.CreateBatch` is never called and
  no hooks fire (the common no-op path)
- Publish creates the batch exactly once and forwards every published message
  with the right type and tenant id
- BeforeCommit + AfterCommit hooks both fire after `ExecuteAsync` completes,
  in the right order (publish → before → after)
- The metadata overload correctly extracts tenant id (default IMessageSink
  impl) and forwards through the same path

All 4 pass against SQL Server 2025 docker on net9.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jeremydmiller jeremydmiller merged commit 498ee7d into main May 14, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add IMessageBatch equivalent for projection-emitted side-effect messages (Wolverine outbox symmetry)

1 participant