diff --git a/docs/examples/enterprise-messaging-workflows.md b/docs/examples/enterprise-messaging-workflows.md index 55d14a2..c9dea26 100644 --- a/docs/examples/enterprise-messaging-workflows.md +++ b/docs/examples/enterprise-messaging-workflows.md @@ -25,7 +25,7 @@ Example source: | Source-generated content router | `ContentRouterGeneratorExample.cs` | Attribute-driven content routing without runtime scanning. | | Resilient checkout orchestration | `ResilientCheckoutDemo.cs` | Route selection, routing-slip execution, command compensation, and fallback routes. | | Collaborating service mailboxes | `ServiceCollaborationMailboxDemo.cs` | Inventory, payment, shipping, and notification mailboxes collaborating over correlated messages. | -| Backplane facade | `BackplaneFacadeDemo.cs` | MassTransit/MediatR-shaped request/reply and pub/sub over an application-owned transport boundary. | +| Backplane facade | `BackplaneFacadeDemo.cs` | MassTransit/MediatR-shaped host builder, typed client, request/reply, and pub/sub over an application-owned transport boundary. | ## Workflow Shape @@ -77,7 +77,7 @@ The example tests use behavior-oriented assertions: - Generator tests assert that generated factories compile and behave like the equivalent runtime builders. - Resilient checkout tests assert rollback, fallback route selection, manual review, and side-effect boundaries. - Mailbox collaboration tests assert service handoff, compensation, correlation propagation, and final notification outcomes. -- Backplane facade tests assert routed request/reply, publish/subscribe fanout, outbox dispatch, idempotent replay, and correlation propagation. +- Backplane facade tests assert startup-style host configuration, routed request/reply, publish/subscribe fanout, outbox dispatch, idempotent replay, and correlation propagation. ## Related Documentation diff --git a/docs/examples/index.md b/docs/examples/index.md index f7f315a..1dbf69a 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -15,7 +15,7 @@ Welcome! This section collects small, focused demos that show **how to compose b * **Snapshot history & undo/redo (Memento)** for document/editor style workflows. * **Source-generated application wiring** for builders, factories, facades, proxies, observers, visitors, state machines, strategies, mementos, template methods, and messaging factories. * **Enterprise messaging workflows** for envelopes, routers, recipient lists, splitters, aggregators, routing slips, sagas, mailboxes, idempotent receivers, inboxes, and outboxes. -* **Messaging backplane facade** for request/reply and publish/subscribe over an application-owned transport boundary. +* **Messaging backplane facade** for host-style setup, typed request/reply, and publish/subscribe over an application-owned transport boundary. ## Demos in this section @@ -65,7 +65,7 @@ Welcome! This section collects small, focused demos that show **how to compose b Application-shaped messaging demos: checkout route selection, routing-slip execution, command compensation, fallback routes, and service mailboxes collaborating over correlated messages. See [Resilient Checkout and Collaborating Mailboxes](resilient-checkout-and-mailboxes.md). * **Messaging Backplane Facade** - Demonstrates how PatternKit can sit behind a MassTransit- or MediatR-style application API while RabbitMQ, Azure Service Bus, Postgres, MQTT, or another adapter remains infrastructure-owned. See [Messaging Backplane Facade](messaging-backplane-facade.md). + Demonstrates how PatternKit can sit behind a MassTransit- or MediatR-style host builder and typed client while RabbitMQ, Azure Service Bus, Postgres, MQTT, or another adapter remains infrastructure-owned. See [Messaging Backplane Facade](messaging-backplane-facade.md). ## How to run @@ -111,7 +111,7 @@ dotnet test PatternKit.slnx -c Release * **Enterprise Messaging:** `src/PatternKit.Examples/Messaging` (+ `PatternKit.Examples.Tests/Messaging`) — runtime and generated messaging workflows. * **Resilient Checkout:** `ResilientCheckoutDemo` (+ `ResilientCheckoutDemoTests`) — fallback checkout routes with compensation. * **Collaborating Mailboxes:** `ServiceCollaborationMailboxDemo` (+ `ServiceCollaborationMailboxDemoTests`) — inventory, payment, shipping, and notification services collaborating through serialized mailboxes. -* **Messaging Backplane Facade:** `BackplaneFacadeDemo` (+ `BackplaneFacadeDemoTests`) — request/reply, pub/sub, outbox, idempotency, and mailbox-backed transport subscribers. +* **Messaging Backplane Facade:** `BackplaneFacadeDemo` (+ `BackplaneFacadeDemoTests`) — host setup, request/reply, pub/sub, outbox, idempotency, and mailbox-backed transport subscribers. * **Tests:** `PatternKit.Examples.Tests/*` use TinyBDD scenarios that read like specs. ## Why these demos exist diff --git a/docs/examples/messaging-backplane-facade.md b/docs/examples/messaging-backplane-facade.md index a3128b8..22d1b0e 100644 --- a/docs/examples/messaging-backplane-facade.md +++ b/docs/examples/messaging-backplane-facade.md @@ -38,30 +38,54 @@ public interface IBackplaneTransport : IAsyncDisposable The example uses `InMemoryBackplaneTransport` for deterministic tests. A production adapter could implement the same boundary over RabbitMQ exchanges, Azure Service Bus topics/queues, Postgres-backed tables and notifications, MQTT topics, or another transport. +## Application Startup + +The demo now starts from a host builder, which is the shape a production application would usually expose from its composition root: + +```csharp +await using var host = await BackplaneHost.Create() + .UseTransport(() => transport) + .UseOutbox(outbox) + .UseIdempotencyStore(idempotency) + .MapCommand( + static (message, _) => message.Payload.CustomerTier == CustomerTier.Vip, + "orders.priority") + .MapDefaultCommand("orders.standard") + .ReceiveEndpoint("orders.standard", endpoint => + endpoint.HandleCommand(services.AcceptStandardOrderAsync)) + .ReceiveEndpoint("billing-service", endpoint => + endpoint.Subscribe("orders.submitted", services.CapturePaymentAsync)) + .ReceiveEndpoint("notification-service", endpoint => + { + endpoint.Subscribe("payments.declined", services.NotifyPaymentDeclinedAsync); + endpoint.Subscribe("shipments.scheduled", services.NotifyShipmentScheduledAsync); + }) + .BuildAsync(cancellationToken); +``` + +`BackplaneHost` owns the bus, typed client, transport, endpoint subscriptions, outbox, idempotency store, and topology metadata. Application code uses `host.Client`, while advanced integrations can still reach the lower-level `host.Bus`. + ## Request/Reply -The bus facade exposes a typed request/reply API: +The client exposes a typed request/reply API: ```csharp -bus.Route( - static (message, _) => message.Payload.CustomerTier == CustomerTier.Vip, - "orders.priority"); -bus.RouteDefault("orders.standard"); - -await bus.HandleAsync( - "orders.standard", - AcceptOrderAsync, - idempotencyStore); +var accepted = await host.Client.RequestAsync( + Message + .Create(new SubmitOrder("order-42", 90m, CustomerTier.Standard)) + .WithCorrelationId("corr-order-42") + .WithIdempotencyKey("idem-order-42"), + cancellationToken); ``` -`BackplaneBus.RequestAsync` creates a temporary reply address, enriches the message with a reply header, routes the command with the content router, sends it through the transport, and waits for the typed response. Duplicate requests with the same idempotency key replay the stored `BackplaneOrderAccepted` response without republishing `BackplaneOrderSubmitted`. +`BackplaneClient.RequestAsync` creates a temporary reply address, enriches the message with a reply header, routes the command with the content router, sends it through the transport, and waits for the typed response. Duplicate requests with the same idempotency key replay the stored `BackplaneOrderAccepted` response without republishing `BackplaneOrderSubmitted`. ## Publish/Subscribe The order service publishes an event through the outbox: ```csharp -await bus.PublishAsync( +await client.PublishAsync( "orders.submitted", new BackplaneOrderSubmitted(message.Payload.OrderId, message.Payload.Total, message.Payload.CustomerTier), context.Headers, @@ -77,11 +101,23 @@ The transport uses a recipient list so every matching subscriber receives the en Each subscriber runs behind a bounded mailbox, so stateful handlers process one message at a time with explicit backpressure. +## Transport Adapter Boundary + +The application chooses the transport at startup: + +```csharp +BackplaneHost.Create() + .UseTransport(() => new RabbitMqBackplaneTransport(/* connection settings */)); +``` + +`RabbitMqBackplaneTransport` is not part of PatternKit; it would be application infrastructure that implements `IBackplaneTransport`. The same host, routes, endpoints, command handlers, and event subscribers can run over any adapter that honors the transport contract. + ## Tested Behavior The tests assert that: - Standard orders route to `orders.standard` and VIP orders route to `orders.priority`. +- The host builder configures transport, outbox, idempotency, endpoint topology, and the typed client surface. - Duplicate commands replay the original response and do not duplicate outbox side effects. - Published events fan out to independent services. - Every event is recorded in the outbox before transport dispatch. diff --git a/src/PatternKit.Examples/Messaging/BackplaneFacadeDemo.cs b/src/PatternKit.Examples/Messaging/BackplaneFacadeDemo.cs index 883fbc1..dc2b5e5 100644 --- a/src/PatternKit.Examples/Messaging/BackplaneFacadeDemo.cs +++ b/src/PatternKit.Examples/Messaging/BackplaneFacadeDemo.cs @@ -16,107 +16,41 @@ public static class BackplaneFacadeDemo /// public static async ValueTask RunAsync(CancellationToken cancellationToken = default) { - await using var transport = new InMemoryBackplaneTransport(); + var transport = new InMemoryBackplaneTransport(); var outbox = new BackplaneOutbox(); - var bus = new BackplaneBus(transport, outbox); var idempotency = new InMemoryIdempotencyStore(); var audit = new ConcurrentQueue(); var endpoints = new ConcurrentDictionary(); var notifications = new ConcurrentQueue(); var completed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - bus.Route( - static (message, _) => message.Payload.CustomerTier == CustomerTier.Vip, - "orders.priority"); - bus.RouteDefault("orders.standard"); - - await bus.HandleAsync( - "orders.standard", - async (message, context, token) => - await AcceptOrderAsync("orders.standard", message, context, token).ConfigureAwait(false), - idempotency, - cancellationToken); - - await bus.HandleAsync( - "orders.priority", - async (message, context, token) => - await AcceptOrderAsync("orders.priority", message, context, token).ConfigureAwait(false), - idempotency, - cancellationToken); - - await bus.SubscribeAsync( - "orders.submitted", - "billing-service", - async (message, context, token) => + var services = new BackplaneDemoServices(audit, endpoints, notifications, completed); + + await using var host = await BackplaneHost.Create() + .UseTransport(() => transport) + .UseOutbox(outbox) + .UseIdempotencyStore(idempotency) + .MapCommand( + static (message, _) => message.Payload.CustomerTier == CustomerTier.Vip, + "orders.priority") + .MapDefaultCommand("orders.standard") + .ReceiveEndpoint("orders.standard", endpoint => + endpoint.HandleCommand(services.AcceptStandardOrderAsync)) + .ReceiveEndpoint("orders.priority", endpoint => + endpoint.HandleCommand(services.AcceptPriorityOrderAsync)) + .ReceiveEndpoint("billing-service", endpoint => + endpoint.Subscribe("orders.submitted", services.CapturePaymentAsync)) + .ReceiveEndpoint("audit-service", endpoint => + endpoint.Subscribe("orders.submitted", services.AuditSubmittedOrderAsync)) + .ReceiveEndpoint("fulfillment-service", endpoint => + endpoint.Subscribe("payments.captured", services.ScheduleShipmentAsync)) + .ReceiveEndpoint("notification-service", endpoint => { - audit.Enqueue($"billing:received:{message.Payload.OrderId}"); - if (message.Payload.Total > 300m) - { - await bus.PublishAsync( - "payments.declined", - new PaymentDeclined(message.Payload.OrderId, "authorization-declined"), - context.Headers, - token).ConfigureAwait(false); - return; - } - - await bus.PublishAsync( - "payments.captured", - new PaymentCaptured(message.Payload.OrderId, message.Payload.Total), - context.Headers, - token).ConfigureAwait(false); - }, - cancellationToken); + endpoint.Subscribe("payments.declined", services.NotifyPaymentDeclinedAsync); + endpoint.Subscribe("shipments.scheduled", services.NotifyShipmentScheduledAsync); + }) + .BuildAsync(cancellationToken).ConfigureAwait(false); - await bus.SubscribeAsync( - "orders.submitted", - "audit-service", - (message, context, _) => - { - audit.Enqueue($"audit:order-submitted:{message.Payload.OrderId}:{context.Headers.CorrelationId}"); - return default; - }, - cancellationToken); - - await bus.SubscribeAsync( - "payments.captured", - "fulfillment-service", - async (message, context, token) => - { - audit.Enqueue($"fulfillment:scheduled:{message.Payload.OrderId}"); - await bus.PublishAsync( - "shipments.scheduled", - new ShipmentScheduled(message.Payload.OrderId, $"trk-{message.Payload.OrderId}"), - context.Headers, - token).ConfigureAwait(false); - }, - cancellationToken); - - await bus.SubscribeAsync( - "payments.declined", - "notification-service", - (message, context, _) => - { - RecordNotification(new CustomerNotification( - message.Payload.OrderId, - "payment-declined", - context.Headers.CorrelationId ?? string.Empty)); - return default; - }, - cancellationToken); - - await bus.SubscribeAsync( - "shipments.scheduled", - "notification-service", - (message, context, _) => - { - RecordNotification(new CustomerNotification( - message.Payload.OrderId, - "shipment-scheduled", - context.Headers.CorrelationId ?? string.Empty)); - return default; - }, - cancellationToken); + services.AttachClient(host.Client); var accepted = new List { @@ -151,35 +85,339 @@ async ValueTask SubmitAsync( .WithCorrelationId($"corr-{orderId}") .WithIdempotencyKey(idempotencyKey); - return await bus.RequestAsync(command, cancellationToken).ConfigureAwait(false); + return await host.Client.RequestAsync(command, cancellationToken).ConfigureAwait(false); } + } +} - async ValueTask AcceptOrderAsync( - string endpoint, - Message message, - MessageContext context, - CancellationToken token) - { - endpoints[message.Payload.OrderId] = endpoint; - audit.Enqueue($"orders:accepted:{message.Payload.OrderId}:{endpoint}"); +/// Application host that owns the bus facade, typed client, transport, and active subscriptions. +public sealed class BackplaneHost : IAsyncDisposable +{ + private readonly IBackplaneTransport _transport; + private readonly IReadOnlyList _subscriptions; + private bool _disposed; - await bus.PublishAsync( - "orders.submitted", - new BackplaneOrderSubmitted(message.Payload.OrderId, message.Payload.Total, message.Payload.CustomerTier), - context.Headers, - token).ConfigureAwait(false); + private BackplaneHost( + BackplaneBus bus, + BackplaneClient client, + IBackplaneTransport transport, + IReadOnlyList subscriptions, + BackplaneOutbox outbox, + InMemoryIdempotencyStore? idempotencyStore, + IReadOnlyList endpoints) + { + Bus = bus; + Client = client; + _transport = transport; + _subscriptions = subscriptions; + Outbox = outbox; + IdempotencyStore = idempotencyStore; + Endpoints = endpoints; + } - return new BackplaneOrderAccepted(message.Payload.OrderId, endpoint, context.Headers.CorrelationId ?? string.Empty); - } + /// The lower-level facade for advanced application-owned integration points. + public BackplaneBus Bus { get; } + + /// Typed application client used by request handlers, controllers, background services, and tests. + public BackplaneClient Client { get; } + + /// The outbox configured for this host. + public BackplaneOutbox Outbox { get; } + + /// The optional idempotency store shared by command endpoints. + public InMemoryIdempotencyStore? IdempotencyStore { get; } - void RecordNotification(CustomerNotification notification) + /// The configured endpoint topology. + public IReadOnlyList Endpoints { get; } + + /// Creates a fluent host builder. + public static BackplaneHostBuilder Create() => new(); + + internal static async ValueTask StartAsync( + IBackplaneTransport transport, + BackplaneOutbox outbox, + InMemoryIdempotencyStore? idempotencyStore, + IReadOnlyList endpoints, + IReadOnlyList routes, + CancellationToken cancellationToken) + { + var bus = new BackplaneBus(transport, outbox); + foreach (var route in routes) + route.Configure(bus); + + var subscriptions = new List(endpoints.Sum(static endpoint => endpoint.Handlers.Count)); + try { - notifications.Enqueue(notification); - audit.Enqueue($"notification:{notification.OrderId}:{notification.Kind}"); + foreach (var endpoint in endpoints) + { + foreach (var handler in endpoint.Handlers) + { + subscriptions.Add(await handler.StartAsync(bus, endpoint.Name, idempotencyStore, cancellationToken) + .ConfigureAwait(false)); + } + } + } + catch + { + foreach (var subscription in subscriptions.Reverse()) + await subscription.DisposeAsync().ConfigureAwait(false); - if (notifications.Count == 3) - completed.TrySetResult(); + await transport.DisposeAsync().ConfigureAwait(false); + throw; } + + return new BackplaneHost( + bus, + new BackplaneClient(bus), + transport, + subscriptions, + outbox, + idempotencyStore, + endpoints); + } + + /// + public async ValueTask DisposeAsync() + { + if (_disposed) + return; + + _disposed = true; + foreach (var subscription in _subscriptions.Reverse()) + await subscription.DisposeAsync().ConfigureAwait(false); + + await _transport.DisposeAsync().ConfigureAwait(false); + } +} + +/// Fluent application startup builder for the demo backplane host. +public sealed class BackplaneHostBuilder +{ + private readonly List _endpoints = new(); + private readonly List _routes = new(); + private Func _transportFactory = static () => new InMemoryBackplaneTransport(); + private BackplaneOutbox? _outbox; + private InMemoryIdempotencyStore? _idempotencyStore; + + /// Uses the deterministic in-memory transport included with the demo. + public BackplaneHostBuilder UseInMemoryTransport() + { + _transportFactory = static () => new InMemoryBackplaneTransport(); + return this; + } + + /// Uses an application-owned transport adapter such as RabbitMQ, Azure Service Bus, Postgres, or MQTT. + public BackplaneHostBuilder UseTransport(Func transportFactory) + { + _transportFactory = transportFactory ?? throw new ArgumentNullException(nameof(transportFactory)); + return this; + } + + /// Uses an application-owned outbox for publish-side dispatch records. + public BackplaneHostBuilder UseOutbox(BackplaneOutbox outbox) + { + _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox)); + return this; + } + + /// Uses a shared idempotency store for command endpoints. + public BackplaneHostBuilder UseIdempotencyStore(InMemoryIdempotencyStore idempotencyStore) + { + _idempotencyStore = idempotencyStore ?? throw new ArgumentNullException(nameof(idempotencyStore)); + return this; + } + + /// Maps a command type to an endpoint using a content-based route. + public BackplaneHostBuilder MapCommand( + ContentRouter.RoutePredicate predicate, + string endpointName) + { + _routes.Add(BackplaneRouteRegistration.Route(predicate, endpointName)); + return this; + } + + /// Maps a command type to a default endpoint. + public BackplaneHostBuilder MapDefaultCommand(string endpointName) + { + _routes.Add(BackplaneRouteRegistration.Default(endpointName)); + return this; + } + + /// Registers a receive endpoint and its command/event consumers. + public BackplaneHostBuilder ReceiveEndpoint(string endpointName, Action configure) + { + ValidateName(endpointName, nameof(endpointName)); + ArgumentNullException.ThrowIfNull(configure); + + var builder = new BackplaneEndpointBuilder(endpointName); + configure(builder); + _endpoints.Add(builder.Build()); + return this; + } + + /// Builds and starts the host. + public ValueTask BuildAsync(CancellationToken cancellationToken = default) + => BackplaneHost.StartAsync( + _transportFactory() ?? throw new InvalidOperationException("The configured transport factory returned null."), + _outbox ?? new BackplaneOutbox(), + _idempotencyStore, + _endpoints.ToArray(), + _routes.ToArray(), + cancellationToken); + + private static void ValidateName(string value, string paramName) + { + if (string.IsNullOrWhiteSpace(value)) + throw new ArgumentException("Endpoint names cannot be null, empty, or whitespace.", paramName); + } +} + +/// Endpoint-level consumer registration builder. +public sealed class BackplaneEndpointBuilder +{ + private readonly string _endpointName; + private readonly List _handlers = new(); + + internal BackplaneEndpointBuilder(string endpointName) + { + _endpointName = endpointName; + } + + /// Registers a request/reply command consumer on this endpoint. + public BackplaneEndpointBuilder HandleCommand( + BackplaneRequestHandler handler) + { + _handlers.Add(new BackplaneCommandRegistration(_endpointName, handler)); + return this; + } + + /// Registers an event consumer on this endpoint. + public BackplaneEndpointBuilder Subscribe( + string topic, + BackplaneEventHandler handler) + { + _handlers.Add(new BackplaneSubscriptionRegistration(topic, handler)); + return this; + } + + internal BackplaneEndpointRegistration Build() => new(_endpointName, _handlers.ToArray()); +} + +/// Typed client exposed to application code by the demo host. +public sealed class BackplaneClient +{ + private readonly BackplaneBus _bus; + + internal BackplaneClient(BackplaneBus bus) + { + _bus = bus; + } + + /// Sends a typed request and waits for the typed reply. + public ValueTask RequestAsync( + Message message, + CancellationToken cancellationToken = default) + => _bus.RequestAsync(message, cancellationToken); + + /// Publishes a typed event to a topic. + public ValueTask PublishAsync( + string topic, + TEvent payload, + MessageHeaders headers, + CancellationToken cancellationToken = default) + => _bus.PublishAsync(topic, payload, headers, cancellationToken); +} + +/// Endpoint topology entry produced by the host builder. +public sealed record BackplaneEndpointRegistration( + string Name, + IReadOnlyList Handlers); + +/// Handler registration that can attach itself to a started bus. +public interface IBackplaneHandlerRegistration +{ + /// Starts the handler subscription. + ValueTask StartAsync( + BackplaneBus bus, + string endpointName, + InMemoryIdempotencyStore? idempotencyStore, + CancellationToken cancellationToken); +} + +internal sealed class BackplaneCommandRegistration : IBackplaneHandlerRegistration +{ + private readonly string _address; + private readonly BackplaneRequestHandler _handler; + + internal BackplaneCommandRegistration( + string address, + BackplaneRequestHandler handler) + { + _address = address; + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public ValueTask StartAsync( + BackplaneBus bus, + string endpointName, + InMemoryIdempotencyStore? idempotencyStore, + CancellationToken cancellationToken) + { + _ = endpointName; + return bus.HandleAsync(_address, _handler, idempotencyStore, cancellationToken); + } +} + +internal sealed class BackplaneSubscriptionRegistration : IBackplaneHandlerRegistration +{ + private readonly string _topic; + private readonly BackplaneEventHandler _handler; + + internal BackplaneSubscriptionRegistration( + string topic, + BackplaneEventHandler handler) + { + if (string.IsNullOrWhiteSpace(topic)) + throw new ArgumentException("Topics cannot be null, empty, or whitespace.", nameof(topic)); + + _topic = topic; + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public ValueTask StartAsync( + BackplaneBus bus, + string endpointName, + InMemoryIdempotencyStore? idempotencyStore, + CancellationToken cancellationToken) + { + _ = idempotencyStore; + return bus.SubscribeAsync(_topic, endpointName, _handler, cancellationToken); + } +} + +internal sealed class BackplaneRouteRegistration +{ + private readonly Action _configure; + + private BackplaneRouteRegistration(Action configure) + { + _configure = configure; + } + + internal void Configure(BackplaneBus bus) => _configure(bus); + + internal static BackplaneRouteRegistration Route( + ContentRouter.RoutePredicate predicate, + string endpointName) + { + _ = typeof(TResponse); + return new BackplaneRouteRegistration(bus => bus.Route(predicate, endpointName)); + } + + internal static BackplaneRouteRegistration Default(string endpointName) + { + _ = typeof(TResponse); + return new BackplaneRouteRegistration(bus => bus.RouteDefault(endpointName)); } } @@ -653,6 +891,145 @@ public void MarkDispatched(string id, int delivered) } } +internal sealed class BackplaneDemoServices +{ + private readonly ConcurrentQueue _audit; + private readonly ConcurrentDictionary _endpoints; + private readonly ConcurrentQueue _notifications; + private readonly TaskCompletionSource _completed; + private BackplaneClient? _client; + + internal BackplaneDemoServices( + ConcurrentQueue audit, + ConcurrentDictionary endpoints, + ConcurrentQueue notifications, + TaskCompletionSource completed) + { + _audit = audit; + _endpoints = endpoints; + _notifications = notifications; + _completed = completed; + } + + internal void AttachClient(BackplaneClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + internal ValueTask AcceptStandardOrderAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + => AcceptOrderAsync("orders.standard", message, context, cancellationToken); + + internal ValueTask AcceptPriorityOrderAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + => AcceptOrderAsync("orders.priority", message, context, cancellationToken); + + internal async ValueTask CapturePaymentAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _audit.Enqueue($"billing:received:{message.Payload.OrderId}"); + if (message.Payload.Total > 300m) + { + await Client.PublishAsync( + "payments.declined", + new PaymentDeclined(message.Payload.OrderId, "authorization-declined"), + context.Headers, + cancellationToken).ConfigureAwait(false); + return; + } + + await Client.PublishAsync( + "payments.captured", + new PaymentCaptured(message.Payload.OrderId, message.Payload.Total), + context.Headers, + cancellationToken).ConfigureAwait(false); + } + + internal ValueTask AuditSubmittedOrderAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _ = cancellationToken; + _audit.Enqueue($"audit:order-submitted:{message.Payload.OrderId}:{context.Headers.CorrelationId}"); + return default; + } + + internal async ValueTask ScheduleShipmentAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _audit.Enqueue($"fulfillment:scheduled:{message.Payload.OrderId}"); + await Client.PublishAsync( + "shipments.scheduled", + new ShipmentScheduled(message.Payload.OrderId, $"trk-{message.Payload.OrderId}"), + context.Headers, + cancellationToken).ConfigureAwait(false); + } + + internal ValueTask NotifyPaymentDeclinedAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _ = cancellationToken; + RecordNotification(new CustomerNotification( + message.Payload.OrderId, + "payment-declined", + context.Headers.CorrelationId ?? string.Empty)); + return default; + } + + internal ValueTask NotifyShipmentScheduledAsync( + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _ = cancellationToken; + RecordNotification(new CustomerNotification( + message.Payload.OrderId, + "shipment-scheduled", + context.Headers.CorrelationId ?? string.Empty)); + return default; + } + + private BackplaneClient Client => _client ?? throw new InvalidOperationException("Backplane client has not been attached."); + + private async ValueTask AcceptOrderAsync( + string endpoint, + Message message, + MessageContext context, + CancellationToken cancellationToken) + { + _endpoints[message.Payload.OrderId] = endpoint; + _audit.Enqueue($"orders:accepted:{message.Payload.OrderId}:{endpoint}"); + + await Client.PublishAsync( + "orders.submitted", + new BackplaneOrderSubmitted(message.Payload.OrderId, message.Payload.Total, message.Payload.CustomerTier), + context.Headers, + cancellationToken).ConfigureAwait(false); + + return new BackplaneOrderAccepted(message.Payload.OrderId, endpoint, context.Headers.CorrelationId ?? string.Empty); + } + + private void RecordNotification(CustomerNotification notification) + { + _notifications.Enqueue(notification); + _audit.Enqueue($"notification:{notification.OrderId}:{notification.Kind}"); + + if (_notifications.Count == 3) + _completed.TrySetResult(); + } +} + internal sealed record BackplaneRoute( Type PayloadType, Func Predicate, diff --git a/test/PatternKit.Examples.Tests/Messaging/BackplaneFacadeDemoTests.cs b/test/PatternKit.Examples.Tests/Messaging/BackplaneFacadeDemoTests.cs index e5b55df..654743c 100644 --- a/test/PatternKit.Examples.Tests/Messaging/BackplaneFacadeDemoTests.cs +++ b/test/PatternKit.Examples.Tests/Messaging/BackplaneFacadeDemoTests.cs @@ -1,9 +1,46 @@ using PatternKit.Examples.Messaging; +using PatternKit.Messaging; +using PatternKit.Messaging.Reliability; namespace PatternKit.Examples.Tests.Messaging; public sealed class BackplaneFacadeDemoTests { + [Fact] + public async Task BackplaneHostBuilder_ConfiguresNativeMessagingPlatformSurface() + { + var transport = new InMemoryBackplaneTransport(); + var outbox = new BackplaneOutbox(); + var idempotency = new InMemoryIdempotencyStore(); + + await using var host = await BackplaneHost.Create() + .UseTransport(() => transport) + .UseOutbox(outbox) + .UseIdempotencyStore(idempotency) + .MapDefaultCommand("orders.standard") + .ReceiveEndpoint("orders.standard", endpoint => + endpoint.HandleCommand((message, context, _) => + new ValueTask(new BackplaneOrderAccepted( + message.Payload.OrderId, + "orders.standard", + context.Headers.CorrelationId ?? string.Empty)))) + .BuildAsync(); + + var response = await host.Client.RequestAsync( + Message + .Create(new SubmitOrder("order-builder", 10m, CustomerTier.Standard)) + .WithCorrelationId("corr-builder") + .WithIdempotencyKey("idem-builder")); + + Assert.Same(outbox, host.Outbox); + Assert.Same(idempotency, host.IdempotencyStore); + Assert.Single(host.Endpoints); + Assert.Equal("orders.standard", host.Endpoints[0].Name); + Assert.Single(host.Endpoints[0].Handlers); + Assert.Equal(new BackplaneOrderAccepted("order-builder", "orders.standard", "corr-builder"), response); + Assert.Contains(transport.DeliveryLog, static entry => entry.Contains("orders.standard->orders.standard", StringComparison.Ordinal)); + } + [Fact] public async Task RunAsync_RoutesRequestsAndPublishesEventsThroughBackplane() {