Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/examples/enterprise-messaging-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Example source:
| Source-generated dispatcher | `DispatcherExample.cs` | Compile-time mediator commands, notifications, streams, and paging. |
| Source-generated content router | `ContentRouterGeneratorExample.cs` | Attribute-driven content routing without runtime scanning. |
| Source-generated recipient list | `RecipientListGeneratorExample.cs` | Attribute-driven fan-out without runtime scanning. |
| Source-generated splitter and aggregator | `MessageRoutingExample.cs` | Attribute-driven split projection, correlation, completion, and result projection without runtime scanning. |
| Resilient checkout orchestration | `ResilientCheckoutDemo.cs` | Route selection, routing-slip execution, command compensation, and fallback routes. |
| Collaborating service mailboxes | `ServiceCollaborationMailboxDemo.cs` | Inventory, payment, shipping, and notification mailboxes collaborating over correlated messages. |
| Backplane facade | `BackplaneFacadeDemo.cs` | MassTransit/MediatR-shaped host builder, typed client, request/reply, and pub/sub over an application-owned transport boundary. |
Expand Down Expand Up @@ -66,6 +67,8 @@ public static partial class FulfillmentSlip

The generated factories are AOT-friendly and do not scan assemblies. The runtime builders are better for user- or tenant-defined routing.

Generated splitter and aggregator contracts follow the same rule: use `[GenerateSplitter]` and `[SplitterProjection]` for a stable split projection, then `[GenerateAggregator]` with `[AggregatorCorrelation]`, `[AggregatorCompletion]`, and `[AggregatorProjection]` for the matching rejoin contract.

## Testing Guidance

The example tests use behavior-oriented assertions:
Expand All @@ -91,5 +94,6 @@ The example tests use behavior-oriented assertions:
- [Mailbox](../patterns/messaging/mailbox.md)
- [Idempotent Receiver, Inbox, and Outbox](../patterns/messaging/reliability.md)
- [Messaging Generators](../generators/messaging.md)
- [Generated Splitter And Aggregator](generated-splitter-aggregator.md)
- [Resilient Checkout and Collaborating Mailboxes](resilient-checkout-and-mailboxes.md)
- [Messaging Backplane Facade](messaging-backplane-facade.md)
79 changes: 79 additions & 0 deletions docs/examples/generated-splitter-aggregator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Generated Splitter And Aggregator

This example shows the fluent and source-generated paths for two Enterprise Integration Patterns that are often used together:

- Splitter turns one aggregate message into item-level messages while preserving correlation metadata.
- Aggregator collects related item messages and projects a result when the completion policy is satisfied.

The source-generated path is useful when the split projection, correlation key, completion rule, and projection are stable application contracts. It emits ordinary `Splitter<TPayload, TItem>` and `Aggregator<TKey, TItem, TResult>` factories, so the generated artifacts can be registered through `IServiceCollection` like any other PatternKit primitive.

## Source

- `src/PatternKit.Examples/Messaging/MessageRoutingExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/MessageRoutingExampleTests.cs`

## Fluent Path

```csharp
var splitter = Splitter<RoutedOrder, RoutedLine>.Create()
.Use((message, context) => message.Payload.Lines)
.Build();

var lineMessages = splitter.Split(orderMessage);

var aggregator = Aggregator<string, RoutedLine, decimal>.Create()
.KeyBy((message, context) => message.Headers.CorrelationId ?? message.Payload.Sku)
.CompleteWhen((key, messages, context) => messages.Count == lineMessages.Count)
.Project((key, messages, context) => messages.Sum(message => message.Payload.Amount))
.Build();
```

## Source-Generated Path

```csharp
[GenerateSplitter(typeof(RoutedOrder), typeof(RoutedLine), FactoryName = "CreateLineSplitter")]
public static partial class GeneratedOrderLineSplitter
{
[SplitterProjection]
private static IEnumerable<RoutedLine> ProjectLines(Message<RoutedOrder> message, MessageContext context)
=> message.Payload.Lines;
}

[GenerateAggregator(typeof(string), typeof(RoutedLine), typeof(decimal), FactoryName = "CreateLineTotalAggregator")]
public static partial class GeneratedOrderLineAggregator
{
[AggregatorCorrelation]
private static string Correlate(Message<RoutedLine> message, MessageContext context)
=> message.Headers.CorrelationId ?? message.Payload.Sku;

[AggregatorCompletion]
private static bool Complete(string key, IReadOnlyList<Message<RoutedLine>> messages, MessageContext context)
=> messages.Count == 2;

[AggregatorProjection]
private static decimal Project(string key, IReadOnlyList<Message<RoutedLine>> messages, MessageContext context)
=> messages.Sum(message => message.Payload.Amount);
}
```

## Dependency Injection

```csharp
var services = new ServiceCollection();
services.AddGeneratedSplitterAggregatorExample();

using var provider = services.BuildServiceProvider(validateScopes: true);
var example = provider.GetRequiredService<GeneratedSplitterAggregatorExample>();
var summary = example.Runner.RunGenerated();
```

The extension registers a `MessageRoutingExampleRunner` with fluent and generated entry points. Applications can copy that shape directly: build the generated factories once at startup, register them as singletons, and inject the resulting splitter or aggregator into handlers.

## Validation

The TinyBDD tests assert that:

- fluent and generated paths route, split, and aggregate the same order message
- child messages preserve causation and correlation metadata
- the generated example is importable through `Microsoft.Extensions.DependencyInjection`
- the example catalog advertises messaging, source-generation, and DI integration surfaces
3 changes: 3 additions & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ Welcome! This section collects small, focused demos that show **how to compose b
* **Generated Recipient List**
Shows fluent and source-generated recipient-list fan-out side by side, with an importable `IServiceCollection` extension. See [Generated Recipient List](generated-recipient-list.md).

* **Generated Splitter and Aggregator**
Shows fluent and source-generated split/rejoin message routing side by side, with an importable `IServiceCollection` extension. See [Generated Splitter And Aggregator](generated-splitter-aggregator.md).

* **Resilient Checkout and Collaborating Mailboxes**
Application-shaped messaging demos: checkout route selection, routing-slip execution, command compensation, fallback routes, and service mailboxes collaborating over correlated messages. See [Resilient Checkout and Collaborating Mailboxes](resilient-checkout-and-mailboxes.md).

Expand Down
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
- name: Generated Recipient List
href: generated-recipient-list.md

- name: Generated Splitter and Aggregator
href: generated-splitter-aggregator.md

- name: CQRS Dispatcher
href: cqrs-dispatcher.md

Expand Down
8 changes: 8 additions & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Message Envelope**](messaging.md#generated-message-envelope) | Required message metadata contracts | `[GenerateMessageEnvelope]` |
| [**Content Router**](messaging.md#generated-content-router) | Content-based message routing factories | `[GenerateContentRouter]` |
| [**Recipient List**](messaging.md#generated-recipient-list) | Recipient fan-out factories | `[GenerateRecipientList]` |
| [**Splitter / Aggregator**](messaging.md#generated-splitter-and-aggregator) | Split/rejoin message routing factories | `[GenerateSplitter]` / `[GenerateAggregator]` |
| [**Routing Slip**](messaging.md#generated-routing-slip) | Ordered message itinerary factories | `[GenerateRoutingSlip]` |
| [**Saga**](messaging.md#generated-saga) | Typed process-manager transition factories | `[GenerateSaga]` |

Expand Down Expand Up @@ -147,6 +148,13 @@ public static partial class OrderAcceptedEnvelope { }
[GenerateContentRouter(typeof(Order), typeof(string))]
public static partial class OrderRouter { }

// Splitter and aggregator - generated split/rejoin factories
[GenerateSplitter(typeof(Order), typeof(OrderLine))]
public static partial class OrderSplitter { }

[GenerateAggregator(typeof(string), typeof(OrderLine), typeof(decimal))]
public static partial class OrderLineAggregator { }

// Routing slip - generated ordered itinerary factory
[GenerateRoutingSlip(typeof(Order))]
public static partial class OrderSlip { }
Expand Down
44 changes: 43 additions & 1 deletion docs/generators/messaging.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Messaging Generators

PatternKit includes six messaging-oriented source generators:
PatternKit includes seven messaging-oriented source generators:

- <xref:PatternKit.Generators.Messaging.GenerateDispatcherAttribute> for source-generated mediator dispatchers.
- <xref:PatternKit.Generators.Messaging.GenerateMessageEnvelopeAttribute> for required message-envelope contracts.
- <xref:PatternKit.Generators.Messaging.GenerateContentRouterAttribute> for content-based message routers.
- <xref:PatternKit.Generators.Messaging.GenerateRecipientListAttribute> for recipient-list fan-out.
- <xref:PatternKit.Generators.Messaging.GenerateSplitterAttribute> and <xref:PatternKit.Generators.Messaging.GenerateAggregatorAttribute> for split/rejoin routing.
- <xref:PatternKit.Generators.Messaging.GenerateRoutingSlipAttribute> for ordered routing-slip factories.
- <xref:PatternKit.Generators.Messaging.GenerateSagaAttribute> for typed saga/process-manager factories.

Expand Down Expand Up @@ -137,6 +138,46 @@ Example files:
- `src/PatternKit.Examples/Messaging/RecipientListGeneratorExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/RecipientListGeneratorExampleTests.cs`

## Generated Splitter And Aggregator

`[GenerateSplitter]` creates a `Splitter<TPayload, TItem>` factory from one static projection method. `[GenerateAggregator]` creates an `Aggregator<TKey, TItem, TResult>` factory from static correlation, completion, and projection methods:

```csharp
using PatternKit.Generators.Messaging;
using PatternKit.Messaging;

[GenerateSplitter(typeof(Order), typeof(OrderLine), FactoryName = "CreateLineSplitter")]
public static partial class OrderSplitter
{
[SplitterProjection]
private static IEnumerable<OrderLine> ProjectLines(Message<Order> message, MessageContext context)
=> message.Payload.Lines;
}

[GenerateAggregator(typeof(string), typeof(OrderLine), typeof(decimal), FactoryName = "CreateLineTotal")]
public static partial class OrderLineAggregator
{
[AggregatorCorrelation]
private static string Correlate(Message<OrderLine> message, MessageContext context)
=> message.Headers.CorrelationId ?? message.Payload.OrderId;

[AggregatorCompletion]
private static bool Complete(string key, IReadOnlyList<Message<OrderLine>> messages, MessageContext context)
=> messages.Count == 2;

[AggregatorProjection]
private static decimal Project(string key, IReadOnlyList<Message<OrderLine>> messages, MessageContext context)
=> messages.Sum(message => message.Payload.Amount);
}
```

Use generated splitter/aggregator factories when the split projection and rejoin contract are stable. Use runtime builders when completion rules depend on tenant configuration or runtime-discovered topology.

Example files:

- `src/PatternKit.Examples/Messaging/MessageRoutingExample.cs`
- `test/PatternKit.Examples.Tests/Messaging/MessageRoutingExampleTests.cs`

## Generated Saga

`[GenerateSaga]` emits a process-manager factory from typed transition methods:
Expand Down Expand Up @@ -173,6 +214,7 @@ Example source:
| `PKME001`-`PKME004` | Message Envelope | Non-partial host, missing headers, invalid header configuration, or duplicate names. |
| `PKCR001`-`PKCR005` | Content Router | Non-partial host, missing routes, invalid signatures, duplicate defaults, or duplicate route identity. |
| `PKRL001`-`PKRL004` | Recipient List | Non-partial host, missing recipients, invalid signatures, or duplicate recipient identity. |
| `PKSA001`-`PKSA006` | Splitter / Aggregator | Non-partial host, missing contract methods, invalid signatures, or invalid duplicate policy. |
| `PKRS001`-`PKRS003` | Routing Slip | Non-partial host, missing steps, or invalid step signatures. |
| `PKSG001`-`PKSG004` | Saga | Non-partial host, missing transitions, invalid transition signatures, or invalid completion checks. |

Expand Down
4 changes: 2 additions & 2 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Enterprise Integration | Message Envelope | `Message<TPayload>`, headers, context | Messaging generator |
| Enterprise Integration | Content-Based Router | `ContentRouter<TPayload, TResult>` | Messaging generator |
| Enterprise Integration | Recipient List | `RecipientList<TPayload>` | Messaging generator |
| Enterprise Integration | Splitter | `Splitter<TIn, TOut>` | Tracked in [#211](https://github.com/JerrettDavis/PatternKit/issues/211) |
| Enterprise Integration | Aggregator | `Aggregator<TKey, TIn, TOut>` | Tracked in [#211](https://github.com/JerrettDavis/PatternKit/issues/211) |
| Enterprise Integration | Splitter | `Splitter<TIn, TOut>` | Messaging generator |
| Enterprise Integration | Aggregator | `Aggregator<TKey, TIn, TOut>` | Messaging generator |
| Enterprise Integration | Routing Slip | `RoutingSlip<TPayload>` | Messaging generator |
| Enterprise Integration | Saga / Process Manager | `Saga<TState>` | Messaging generator |
| Enterprise Integration | Mailbox | `Mailbox<TPayload>` | Tracked in [#209](https://github.com/JerrettDavis/PatternKit/issues/209) |
Expand Down
38 changes: 38 additions & 0 deletions docs/patterns/messaging/message-routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ var splitter = Splitter<Order, LineItem>.Create()
var lineMessages = splitter.Split(orderMessage);
```

Use `[GenerateSplitter]` when the split projection is stable and should be compiled into a named factory:

```csharp
[GenerateSplitter(typeof(Order), typeof(LineItem))]
public static partial class OrderLineSplitter
{
[SplitterProjection]
private static IEnumerable<LineItem> ProjectLines(Message<Order> message, MessageContext context)
=> message.Payload.Lines;
}
```

## Aggregator

`Aggregator<TKey, TItem, TResult>` groups messages in memory until a completion policy is satisfied, then projects the completed group into a result and removes it from the open groups.
Expand All @@ -91,6 +103,26 @@ if (result.Completed)

Duplicate message ids are ignored by default. Use `DuplicateMessagePolicy.Replace` or `DuplicateMessagePolicy.Include` when a workflow needs different behavior.

Use `[GenerateAggregator]` when the correlation, completion, and projection contract is part of the application topology:

```csharp
[GenerateAggregator(typeof(string), typeof(LineItem), typeof(decimal))]
public static partial class OrderLineAggregator
{
[AggregatorCorrelation]
private static string Correlate(Message<LineItem> message, MessageContext context)
=> message.Headers.CorrelationId ?? message.Payload.Sku;

[AggregatorCompletion]
private static bool Complete(string key, IReadOnlyList<Message<LineItem>> messages, MessageContext context)
=> messages.Count == 2;

[AggregatorProjection]
private static decimal Project(string key, IReadOnlyList<Message<LineItem>> messages, MessageContext context)
=> messages.Sum(message => message.Payload.Amount);
}
```

## Choosing Boundaries

Use these primitives for:
Expand All @@ -117,9 +149,15 @@ Use external infrastructure for:
- <xref:PatternKit.Generators.Messaging.GenerateRecipientListAttribute>
- <xref:PatternKit.Generators.Messaging.RecipientListRecipientAttribute>
- <xref:PatternKit.Messaging.Routing.Splitter`2>
- <xref:PatternKit.Generators.Messaging.GenerateSplitterAttribute>
- <xref:PatternKit.Generators.Messaging.SplitterProjectionAttribute>
- <xref:PatternKit.Messaging.Routing.Aggregator`3>
- <xref:PatternKit.Messaging.Routing.AggregationResult`2>
- <xref:PatternKit.Messaging.Routing.DuplicateMessagePolicy>
- <xref:PatternKit.Generators.Messaging.GenerateAggregatorAttribute>
- <xref:PatternKit.Generators.Messaging.AggregatorCorrelationAttribute>
- <xref:PatternKit.Generators.Messaging.AggregatorCompletionAttribute>
- <xref:PatternKit.Generators.Messaging.AggregatorProjectionAttribute>

## Example Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public sealed record EventProcessingVisitorExample(Func<Task> RunAsync);
public sealed record MessageRouterVisitorExample(Func<RoutingSummary> Run);
public sealed record GeneratedMessageEnvelopeExample(MessageEnvelopeExampleRunner Runner);
public sealed record GeneratedRecipientListExample(RecipientListGeneratorExampleRunner Runner);
public sealed record GeneratedSplitterAggregatorExample(MessageRoutingExampleRunner Runner);
public sealed record PatternsShowcaseExample(ShowcaseFacade Facade);
public sealed record SourceGeneratorApplicationSuiteExample(Func<ValueTask<CorporateApp>> BuildProductionAsync);
public sealed record EnterpriseMessagingWorkflowSuiteExample(Func<Summary> Run);
Expand Down Expand Up @@ -127,6 +128,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddMessageRouterVisitorExample()
.AddGeneratedMessageEnvelopeExample()
.AddGeneratedRecipientListExample()
.AddGeneratedSplitterAggregatorExample()
.AddPatternsShowcaseExample()
.AddSourceGeneratorApplicationSuiteExample()
.AddEnterpriseMessagingWorkflowSuiteExample()
Expand Down Expand Up @@ -326,6 +328,12 @@ public static IServiceCollection AddMessageRouterVisitorExample(this IServiceCol
return services.RegisterExample<MessageRouterVisitorExample>("Message Router Visitor", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddMessageRoutingExample(this IServiceCollection services)
{
services.AddSingleton(new MessageRoutingExampleRunner(MessageRoutingExample.RunFluent, MessageRoutingExample.RunGenerated));
return services;
}

public static IServiceCollection AddGeneratedMessageEnvelopeExample(this IServiceCollection services)
{
services.AddMessageEnvelopeExample();
Expand All @@ -340,6 +348,13 @@ public static IServiceCollection AddGeneratedRecipientListExample(this IServiceC
return services.RegisterExample<GeneratedRecipientListExample>("Generated Recipient List", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddGeneratedSplitterAggregatorExample(this IServiceCollection services)
{
services.AddMessageRoutingExample();
services.AddSingleton<GeneratedSplitterAggregatorExample>(sp => new(sp.GetRequiredService<MessageRoutingExampleRunner>()));
return services.RegisterExample<GeneratedSplitterAggregatorExample>("Generated Splitter and Aggregator", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddPatternsShowcaseExample(this IServiceCollection services)
{
services.AddSingleton(_ => PatternShowcase.PatternShowcase.Build());
Expand Down
Loading
Loading