Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/examples/enterprise-messaging-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Example source:
| Saga/process manager | `SagaExample.cs` | Typed message transitions over explicit saga state and completion rules. |
| Mailbox | `MailboxExample.cs` | Serialized async inbox processing with explicit lifecycle and error behavior. |
| Source-generated mailbox | `MailboxExample.cs` | Attribute-driven serialized inbox factories with bounded backpressure and error policy. |
| Idempotent receiver | `ReliabilityExample.cs` | Duplicate detection around at-least-once message delivery. |
| Inbox/outbox | `ReliabilityExample.cs` | Explicit handoff records for durable integration boundaries owned by the application. |
| Idempotent receiver | `ReliabilityExample.cs` | Duplicate detection around at-least-once message delivery with fluent and generated factories. |
| Inbox/outbox | `ReliabilityExample.cs` | Explicit handoff records for durable integration boundaries owned by the application, including a generated reliability pipeline. |
| Source-generated dispatcher | `DispatcherExample.cs` | Compile-time mediator commands, notifications, streams, and paging. |
| Source-generated content router | `ContentRouterGeneratorExample.cs` | Attribute-driven content routing without runtime scanning. |
| Source-generated recipient list | `RecipientListGeneratorExample.cs` | Attribute-driven fan-out without runtime scanning. |
Expand Down Expand Up @@ -81,7 +81,7 @@ The example tests use behavior-oriented assertions:
- Routing-slip tests assert step order and header progress.
- Saga tests assert transition behavior and completion state.
- Mailbox tests assert serialized processing and lifecycle semantics.
- Reliability tests assert duplicate suppression and outbox record creation.
- Reliability tests assert duplicate suppression, outbox record creation, generated pipeline parity, and DI importability.
- Generator tests assert that generated factories compile and behave like the equivalent runtime builders.
- Resilient checkout tests assert rollback, fallback route selection, manual review, and side-effect boundaries.
- Mailbox collaboration tests assert service handoff, compensation, correlation propagation, and final notification outcomes.
Expand Down
42 changes: 42 additions & 0 deletions docs/examples/generated-reliability-pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated Reliability Pipeline

The generated reliability pipeline example shows the fluent reliability primitives and the source-generated path side by side. Both paths process the same duplicate `AcceptOrder` command and dispatch exactly one `ReliabilityOrderAccepted` outbox message.

## Integration Shape

Register the example with the standard .NET container:

```csharp
using Microsoft.Extensions.DependencyInjection;
using PatternKit.Examples.DependencyInjection;

var services = new ServiceCollection()
.AddGeneratedReliabilityPipelineExample();

using var provider = services.BuildServiceProvider(validateScopes: true);
var example = provider.GetRequiredService<GeneratedReliabilityPipelineExample>();
var dispatched = await example.Runner.RunGeneratedAsync();
```

The registered `PatternKitExampleServiceDescriptor` advertises messaging, source generation, and dependency-injection support so host applications can audit what they import.

## Generated Contract

`GeneratedReliabilityOrderPipeline` uses `[GenerateReliabilityPipeline]` to emit:

- `CreateOrderReceiver(IIdempotencyStore)` for the idempotent receiver.
- `CreateInbox(IIdempotencyStore)` for the inbox boundary.
- `CreateOutbox()` for the outbox record store.

The generated receiver is configured with `DuplicatePolicy = "ReplayCompleted"`, so a duplicate message with the same idempotency key replays the completed result instead of invoking the handler again.

## Production Notes

`InMemoryIdempotencyStore` and `InMemoryOutbox<T>` are deterministic for tests, demos, and single-process tools. Production applications should implement `IIdempotencyStore` and outbox persistence over durable storage, usually in the same transaction as the business state change.

## Source

- `src/PatternKit.Examples/Messaging/ReliabilityExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/ReliabilityExampleTests.cs`
- `src/PatternKit.Generators/Messaging/ReliabilityPipelineGenerator.cs`
- `test/PatternKit.Generators.Tests/ReliabilityPipelineGeneratorTests.cs`
3 changes: 3 additions & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ Welcome! This section collects small, focused demos that show **how to compose b
* **Generated Mailbox**
Shows fluent and source-generated serialized inboxes side by side, with an importable `IServiceCollection` extension. See [Generated Mailbox](generated-mailbox.md).

* **Generated Reliability Pipeline**
Shows fluent and source-generated idempotent receiver, inbox, and outbox composition side by side, with an importable `IServiceCollection` extension. See [Generated Reliability Pipeline](generated-reliability-pipeline.md).

* **Resilient Checkout and Collaborating Mailboxes**
Application-shaped messaging demos: checkout route selection, routing-slip execution, command compensation, fallback routes, and service mailboxes collaborating over correlated messages. See [Resilient Checkout and Collaborating Mailboxes](resilient-checkout-and-mailboxes.md).

Expand Down
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
- name: Generated Mailbox
href: generated-mailbox.md

- name: Generated Reliability Pipeline
href: generated-reliability-pipeline.md

- name: Resilient Checkout and Collaborating Mailboxes
href: resilient-checkout-and-mailboxes.md

Expand Down
5 changes: 5 additions & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Routing Slip**](messaging.md#generated-routing-slip) | Ordered message itinerary factories | `[GenerateRoutingSlip]` |
| [**Saga**](messaging.md#generated-saga) | Typed process-manager transition factories | `[GenerateSaga]` |
| [**Mailbox**](messaging.md#generated-mailbox) | Serialized in-process inbox factories | `[GenerateMailbox]` |
| [**Reliability Pipeline**](messaging.md#generated-reliability-pipeline) | Idempotent receiver, inbox, and outbox factories | `[GenerateReliabilityPipeline]` |

## Quick Reference

Expand Down Expand Up @@ -160,6 +161,10 @@ public static partial class OrderLineAggregator { }
[GenerateMailbox(typeof(OrderWork), Capacity = 32, BackpressurePolicy = "Wait")]
public static partial class OrderMailbox { }

// Reliability pipeline - generated idempotent receiver, inbox, and outbox factories
[GenerateReliabilityPipeline(typeof(AcceptOrder), typeof(string), typeof(OrderAccepted))]
public static partial class OrderReliability { }

// Routing slip - generated ordered itinerary factory
[GenerateRoutingSlip(typeof(Order))]
public static partial class OrderSlip { }
Expand Down
35 changes: 34 additions & 1 deletion docs/generators/messaging.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Messaging Generators

PatternKit includes eight messaging-oriented source generators:
PatternKit includes nine messaging-oriented source generators:

- <xref:PatternKit.Generators.Messaging.GenerateDispatcherAttribute> for source-generated mediator dispatchers.
- <xref:PatternKit.Generators.Messaging.GenerateMessageEnvelopeAttribute> for required message-envelope contracts.
Expand All @@ -10,6 +10,7 @@ PatternKit includes eight messaging-oriented source generators:
- <xref:PatternKit.Generators.Messaging.GenerateRoutingSlipAttribute> for ordered routing-slip factories.
- <xref:PatternKit.Generators.Messaging.GenerateSagaAttribute> for typed saga/process-manager factories.
- <xref:PatternKit.Generators.Messaging.GenerateMailboxAttribute> for serialized in-process inbox factories.
- <xref:PatternKit.Generators.Messaging.GenerateReliabilityPipelineAttribute> for idempotent receiver, inbox, and outbox factories.

Use these generators when the message topology is known at compile time and should remain explicit, AOT-friendly, and validated by the compiler. They generate factories and fluent builders; they do not discover handlers from assemblies at runtime and they do not replace brokers, durable queues, or workflow engines.

Expand Down Expand Up @@ -203,6 +204,37 @@ Example files:
- `src/PatternKit.Examples/Messaging/MailboxExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/MailboxExampleTests.cs`

## Generated Reliability Pipeline

`[GenerateReliabilityPipeline]` creates factories for the reliability boundary around a static handler:

```csharp
using PatternKit.Generators.Messaging;
using PatternKit.Messaging;

[GenerateReliabilityPipeline(
typeof(AcceptOrder),
typeof(string),
typeof(OrderAccepted),
DuplicatePolicy = "ReplayCompleted")]
public static partial class OrderReliability
{
[ReliabilityHandler]
private static ValueTask<string> Handle(
Message<AcceptOrder> message,
MessageContext context,
CancellationToken cancellationToken)
=> new(message.Payload.OrderId);
}
```

The generated type emits idempotent receiver, inbox processor, and in-memory outbox factories. Use this path when the reliability boundary is static and should be reviewed in code. Keep using fluent builders when policies, key selectors, or storage wiring are tenant-defined at runtime.

Example files:

- `src/PatternKit.Examples/Messaging/ReliabilityExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/ReliabilityExampleTests.cs`

## Generated Saga

`[GenerateSaga]` emits a process-manager factory from typed transition methods:
Expand Down Expand Up @@ -243,6 +275,7 @@ Example source:
| `PKRS001`-`PKRS003` | Routing Slip | Non-partial host, missing steps, or invalid step signatures. |
| `PKSG001`-`PKSG004` | Saga | Non-partial host, missing transitions, invalid transition signatures, or invalid completion checks. |
| `PKMB001`-`PKMB005` | Mailbox | Non-partial host, missing handler, invalid handler signatures, or invalid configuration. |
| `PKRP001`-`PKRP005` | Reliability Pipeline | Non-partial host, missing handler, invalid handler/key selector signatures, or invalid configuration. |

## Related Runtime Patterns

Expand Down
6 changes: 3 additions & 3 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Enterprise Integration | Routing Slip | `RoutingSlip<TPayload>` | Messaging generator |
| Enterprise Integration | Saga / Process Manager | `Saga<TState>` | Messaging generator |
| Enterprise Integration | Mailbox | `Mailbox<TPayload>` | Messaging generator |
| Messaging Reliability | Idempotent Receiver | `IdempotentReceiver<TPayload, TResult>` | Tracked in [#213](https://github.com/JerrettDavis/PatternKit/issues/213) |
| Messaging Reliability | Inbox | `InboxProcessor<TPayload, TResult>` | Tracked in [#213](https://github.com/JerrettDavis/PatternKit/issues/213) |
| Messaging Reliability | Outbox | `InMemoryOutbox<TPayload>` and dispatcher contracts | Tracked in [#213](https://github.com/JerrettDavis/PatternKit/issues/213) |
| Messaging Reliability | Idempotent Receiver | `IdempotentReceiver<TPayload, TResult>` | Reliability pipeline generator |
| Messaging Reliability | Inbox | `InboxProcessor<TPayload, TResult>` | Reliability pipeline generator |
| Messaging Reliability | Outbox | `InMemoryOutbox<TPayload>` and dispatcher contracts | Reliability pipeline generator |
| Enterprise Integration | Request-Reply | Messaging backplane facade example | Tracked in [#214](https://github.com/JerrettDavis/PatternKit/issues/214) |
| Enterprise Integration | Publish-Subscribe | Messaging backplane facade example | Tracked in [#214](https://github.com/JerrettDavis/PatternKit/issues/214) |
| Application Architecture | CQRS | Mediator/dispatcher command-query split | Dispatcher generator |
Expand Down
2 changes: 1 addition & 1 deletion docs/patterns/messaging/enterprise-generators.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Saga/process-manager generation is documented in [Saga / Process Manager](saga.m

Mailbox generation is documented in [Mailbox](mailbox.md). It discovers one `[MailboxHandler]` method plus optional error and event hooks, then emits a configured serialized inbox factory.

Reliability helpers stay runtime-only for now. Their registration is still lifecycle-sensitive and is tracked separately.
Reliability helpers also have a generated path through `[GenerateReliabilityPipeline]`, which emits idempotent receiver, inbox, and outbox factories while keeping durable storage implementation owned by the application.

## Diagnostics

Expand Down
27 changes: 27 additions & 0 deletions docs/patterns/messaging/reliability.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ await outbox.DispatchPendingAsync(dispatcher, cancellationToken);

The in-memory outbox records attempts and dispatch timestamps, but it is not durable. A production outbox should persist `OutboxMessage<TPayload>` or an equivalent schema in the same transaction as the business state change, then dispatch records after commit.

## Source-Generated Reliability Pipeline

`[GenerateReliabilityPipeline]` generates the static factories for a stable idempotent receiver, inbox, and outbox contract:

```csharp
using PatternKit.Generators.Messaging;
using PatternKit.Messaging;

[GenerateReliabilityPipeline(
typeof(AcceptOrder),
typeof(string),
typeof(OrderAccepted),
DuplicatePolicy = "ReplayCompleted")]
public static partial class OrderReliability
{
[ReliabilityHandler]
private static ValueTask<string> Handle(
Message<AcceptOrder> message,
MessageContext context,
CancellationToken cancellationToken)
=> new(message.Payload.OrderId);
}
```

The generated host exposes receiver, inbox, and outbox factory methods while keeping the handler and optional key selector in source. This makes reliability topology visible during code review and importable through normal `IServiceCollection` registration.

## Boundaries

- These APIs help with at-least-once processing; they do not provide exactly-once delivery.
Expand All @@ -106,6 +132,7 @@ The in-memory outbox records attempts and dispatch timestamps, but it is not dur
- <xref:PatternKit.Messaging.Reliability.OutboxMessage`1>
- <xref:PatternKit.Messaging.Reliability.IOutboxDispatcher`1>
- <xref:PatternKit.Messaging.Reliability.InMemoryOutbox`1>
- <xref:PatternKit.Generators.Messaging.GenerateReliabilityPipelineAttribute>

## Example Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public sealed record SourceGeneratorApplicationSuiteExample(Func<ValueTask<Corpo
public sealed record EnterpriseMessagingWorkflowSuiteExample(Func<Summary> Run);
public sealed record CqrsDispatcherExample(Func<CancellationToken, ValueTask<CqrsSummary>> RunFluentAsync, Func<IServiceProvider, CancellationToken, ValueTask<CqrsSummary>> RunSourceGeneratedAsync);
public sealed record GeneratedMailboxExample(MailboxExampleRunner Runner);
public sealed record GeneratedReliabilityPipelineExample(ReliabilityExampleRunner Runner);
public sealed record ResilientCheckoutMailboxesExample(Func<CheckoutRequest, CheckoutServices, CheckoutResult> Run);
public sealed record MessagingBackplaneFacadeExample(Func<CancellationToken, ValueTask<BackplaneDemoSummary>> RunAsync);
public sealed record PrototypeGameCharacterFactoryExample(Prototype<string, PrototypeDemo.PrototypeDemo.GameCharacter> Factory);
Expand Down Expand Up @@ -135,6 +136,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddEnterpriseMessagingWorkflowSuiteExample()
.AddCqrsDispatcherExample()
.AddGeneratedMailboxExample()
.AddGeneratedReliabilityPipelineExample()
.AddResilientCheckoutMailboxesExample()
.AddMessagingBackplaneFacadeExample()
.AddPrototypeGameCharacterFactoryExample()
Expand Down Expand Up @@ -390,6 +392,13 @@ public static IServiceCollection AddGeneratedMailboxExample(this IServiceCollect
return services.RegisterExample<GeneratedMailboxExample>("Generated Mailbox", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddGeneratedReliabilityPipelineExample(this IServiceCollection services)
{
services.AddSingleton(new ReliabilityExampleRunner(ReliabilityExample.RunFluentAsync, ReliabilityExample.RunGeneratedAsync));
services.AddSingleton<GeneratedReliabilityPipelineExample>(sp => new(sp.GetRequiredService<ReliabilityExampleRunner>()));
return services.RegisterExample<GeneratedReliabilityPipelineExample>("Generated Reliability Pipeline", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddResilientCheckoutMailboxesExample(this IServiceCollection services)
{
services.AddSingleton<CheckoutServices>();
Expand Down
62 changes: 61 additions & 1 deletion src/PatternKit.Examples/Messaging/ReliabilityExample.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using PatternKit.Messaging;
using PatternKit.Messaging.Mailboxes;
using PatternKit.Messaging.Reliability;
using PatternKit.Generators.Messaging;

namespace PatternKit.Examples.Messaging;

Expand All @@ -10,7 +11,10 @@ namespace PatternKit.Examples.Messaging;
public static class ReliabilityExample
{
/// <summary>Runs an idempotent order inbox and returns dispatched outbox payloads.</summary>
public static async ValueTask<IReadOnlyList<string>> RunAsync()
public static ValueTask<IReadOnlyList<string>> RunAsync() => RunFluentAsync();

/// <summary>Runs the fluent idempotent receiver and outbox path.</summary>
public static async ValueTask<IReadOnlyList<string>> RunFluentAsync()
{
var store = new InMemoryIdempotencyStore();
var outbox = new InMemoryOutbox<ReliabilityOrderAccepted>();
Expand Down Expand Up @@ -54,6 +58,38 @@ await outbox.DispatchPendingAsync(new DelegateOutboxDispatcher<ReliabilityOrderA

return dispatched;
}

/// <summary>Runs the generated idempotent receiver, inbox processor, and outbox path.</summary>
public static async ValueTask<IReadOnlyList<string>> RunGeneratedAsync()
{
var store = new InMemoryIdempotencyStore();
var inbox = GeneratedReliabilityOrderPipeline.CreateInbox(store);
var outbox = GeneratedReliabilityOrderPipeline.CreateOutbox();
var dispatched = new List<string>();

var command = Message<AcceptOrder>
.Create(new AcceptOrder("order-42"))
.WithIdempotencyKey("accept-order-42");

var first = await inbox.ProcessAsync(command);
_ = await inbox.ProcessAsync(command);

if (first.Processed)
{
await outbox.EnqueueAsync(
Message<ReliabilityOrderAccepted>.Create(new ReliabilityOrderAccepted(first.Result!)),
id: $"accepted-{first.Result}");
}

await outbox.DispatchPendingAsync(new DelegateOutboxDispatcher<ReliabilityOrderAccepted>(
(record, _) =>
{
dispatched.Add(record.Message.Payload.OrderId);
return default;
}));

return dispatched;
}
}

/// <summary>Reliability example command payload.</summary>
Expand All @@ -62,6 +98,30 @@ public sealed record AcceptOrder(string OrderId);
/// <summary>Reliability example event payload.</summary>
public sealed record ReliabilityOrderAccepted(string OrderId);

/// <summary>DI-friendly runner exposing fluent and generated reliability paths.</summary>
public sealed record ReliabilityExampleRunner(
Func<ValueTask<IReadOnlyList<string>>> RunFluentAsync,
Func<ValueTask<IReadOnlyList<string>>> RunGeneratedAsync);

/// <summary>Source-generated reliability pipeline used by the production-shaped example.</summary>
[GenerateReliabilityPipeline(
typeof(AcceptOrder),
typeof(string),
typeof(ReliabilityOrderAccepted),
DuplicatePolicy = "ReplayCompleted",
ReceiverFactoryName = "CreateOrderReceiver",
InboxFactoryName = "CreateInbox",
OutboxFactoryName = "CreateOutbox")]
public static partial class GeneratedReliabilityOrderPipeline
{
[ReliabilityHandler]
private static ValueTask<string> Handle(
Message<AcceptOrder> message,
MessageContext context,
CancellationToken cancellationToken)
=> new(message.Payload.OrderId);
}

internal sealed class DelegateOutboxDispatcher<TPayload> : IOutboxDispatcher<TPayload>
{
private readonly Func<OutboxMessage<TPayload>, CancellationToken, ValueTask> _dispatch;
Expand Down
Loading
Loading