diff --git a/docs/examples/order-message-store.md b/docs/examples/order-message-store.md new file mode 100644 index 0000000..aa7dd20 --- /dev/null +++ b/docs/examples/order-message-store.md @@ -0,0 +1,20 @@ +# Order Message Store + +The order message-store example shows an importable audit/replay store for order events. + +```csharp +services.AddOrderMessageStoreDemo(); + +var service = provider.GetRequiredService(); +var summary = service.Record( + new OrderMessageStoreEvent("ORDER-100", "Submitted", 125m, containsSensitiveData: false), + "MSG-100", + "CHECKOUT-100"); +``` + +The example includes: + +- a fluent `OrderMessageStores.CreateAuditStore()` path, +- a generated `GeneratedOrderMessageStore.Create()` path, +- a retention policy that refuses sensitive payloads, +- `IServiceCollection` registration for standard .NET hosts. diff --git a/docs/examples/toc.yml b/docs/examples/toc.yml index 083e859..e2aaa9c 100644 --- a/docs/examples/toc.yml +++ b/docs/examples/toc.yml @@ -76,6 +76,9 @@ - name: Order Message Filter href: order-message-filter.md +- name: Order Message Store + href: order-message-store.md + - name: Order Wire Tap href: order-wire-tap.md diff --git a/docs/generators/index.md b/docs/generators/index.md index 6aeefa3..ffc81f6 100644 --- a/docs/generators/index.md +++ b/docs/generators/index.md @@ -86,6 +86,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato | [**Dead Letter Channel**](dead-letter-channel.md) | Failed-message capture and replay handoff | `[GenerateDeadLetterChannel]` | | [**Content Router**](messaging.md#generated-content-router) | Content-based message routing factories | `[GenerateContentRouter]` | | [**Message Filter**](message-filter.md) | Named allow-rule filters for message consumers | `[GenerateMessageFilter]` | +| [**Message Store**](message-store.md) | Message audit, lookup, and replay store factories | `[GenerateMessageStore]` | | [**Wire Tap**](wire-tap.md) | Side-channel message observability factories | `[GenerateWireTap]` | | [**Recipient List**](messaging.md#generated-recipient-list) | Recipient fan-out factories | `[GenerateRecipientList]` | | [**Splitter / Aggregator**](messaging.md#generated-splitter-and-aggregator) | Split/rejoin message routing factories | `[GenerateSplitter]` / `[GenerateAggregator]` | diff --git a/docs/generators/message-store.md b/docs/generators/message-store.md new file mode 100644 index 0000000..f259e46 --- /dev/null +++ b/docs/generators/message-store.md @@ -0,0 +1,31 @@ +# Message Store Generator + +`[GenerateMessageStore]` creates a typed `MessageStore` factory for a partial class or struct. + +```csharp +[GenerateMessageStore(typeof(OrderSubmitted), FactoryName = "Create", StoreName = "order-audit")] +public static partial class OrderAuditStore +{ + [MessageStoreIdentity] + private static string Identity(Message message, MessageContext context) + => message.Headers.MessageId!; + + [MessageStoreRetention] + private static bool Retain(StoredMessage stored) + => !stored.Message.Payload.ContainsSensitiveData; +} +``` + +The generated factory composes the fluent runtime API: + +- `MessageStore.Create(StoreName)` +- optional `.IdentifyBy(...)` +- optional `.RetainWhen(...)` +- `.Build()` + +Diagnostics: + +- `PKMS001`: host type must be partial. +- `PKMS002`: identity method must be `static string Method(Message, MessageContext)`. +- `PKMS003`: retention method must be `static bool Method(StoredMessage)`. +- `PKMS004`: only one identity hook and one retention hook are allowed. diff --git a/docs/generators/toc.yml b/docs/generators/toc.yml index c0577e0..71be64e 100644 --- a/docs/generators/toc.yml +++ b/docs/generators/toc.yml @@ -94,6 +94,9 @@ - name: Message Filter href: message-filter.md +- name: Message Store + href: message-store.md + - name: Wire Tap href: wire-tap.md diff --git a/docs/guides/pattern-coverage.md b/docs/guides/pattern-coverage.md index 8fb770b..d5d8a6c 100644 --- a/docs/guides/pattern-coverage.md +++ b/docs/guides/pattern-coverage.md @@ -50,6 +50,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr | Enterprise Integration | Dead Letter Channel | `DeadLetterChannel` | Dead Letter Channel generator | | Enterprise Integration | Content-Based Router | `ContentRouter` | Messaging generator | | Enterprise Integration | Message Filter | `MessageFilter` | Message Filter generator | +| Enterprise Integration | Message Store | `MessageStore` | Message Store generator | | Enterprise Integration | Wire Tap | `WireTap` | Wire Tap generator | | Enterprise Integration | Recipient List | `RecipientList` | Messaging generator | | Enterprise Integration | Competing Consumers | `CompetingConsumerGroup` | Competing Consumers generator | diff --git a/docs/patterns/messaging/message-store.md b/docs/patterns/messaging/message-store.md new file mode 100644 index 0000000..882e832 --- /dev/null +++ b/docs/patterns/messaging/message-store.md @@ -0,0 +1,19 @@ +# Message Store + +Message Store persists message envelopes so support, replay, and operational audit workflows can inspect what moved through a pipeline. + +`MessageStore` provides a fluent runtime path: + +```csharp +var store = MessageStore.Create("order-audit") + .IdentifyBy((message, context) => message.Headers.MessageId!) + .RetainWhen(stored => !stored.Message.Payload.ContainsSensitiveData) + .Build(); + +var result = store.Append(message.WithMessageId("msg-100").WithCorrelationId("checkout-100")); +var replay = store.Replay(MessageStoreQuery.ForCorrelation("checkout-100")); +``` + +Use it when an app needs durable lookup semantics around messages without mixing audit/replay concerns into message handlers. In production, the same shape can sit behind a hosted service, ASP.NET Core endpoint, queue worker, or support tool through `IServiceCollection`. + +The source-generated path uses `[GenerateMessageStore]`, `[MessageStoreIdentity]`, and `[MessageStoreRetention]` to create the same fluent store factory from annotated static hooks. diff --git a/docs/patterns/toc.yml b/docs/patterns/toc.yml index 66ffb89..d72ae33 100644 --- a/docs/patterns/toc.yml +++ b/docs/patterns/toc.yml @@ -305,6 +305,8 @@ href: messaging/dead-letter-channel.md - name: Message Filter href: messaging/message-filter.md + - name: Message Store + href: messaging/message-store.md - name: Wire Tap href: messaging/wire-tap.md - name: Enterprise Message Routing diff --git a/src/PatternKit.Core/Messaging/Storage/MessageStore.cs b/src/PatternKit.Core/Messaging/Storage/MessageStore.cs new file mode 100644 index 0000000..6e56dbb --- /dev/null +++ b/src/PatternKit.Core/Messaging/Storage/MessageStore.cs @@ -0,0 +1,256 @@ +namespace PatternKit.Messaging.Storage; + +/// +/// In-memory message store for audit, replay, and operational lookup workflows. +/// +public sealed class MessageStore +{ + /// Selects the persisted message identifier. + public delegate string MessageIdentitySelector(Message message, MessageContext context); + + /// Determines whether a stored message should be retained. + public delegate bool RetentionPredicate(StoredMessage stored); + + private readonly object _gate = new(); + private readonly string _name; + private readonly MessageIdentitySelector _identitySelector; + private readonly RetentionPredicate _retentionPredicate; + private readonly Func _clock; + private readonly Dictionary> _byId = new(StringComparer.Ordinal); + private readonly List> _ordered = new(); + private long _sequence; + + private MessageStore( + string name, + MessageIdentitySelector identitySelector, + RetentionPredicate retentionPredicate, + Func clock) + => (_name, _identitySelector, _retentionPredicate, _clock) = (name, identitySelector, retentionPredicate, clock); + + /// Appends a message when it is not already present. + public MessageStoreAppendResult Append(Message message, MessageContext? context = null) + { + if (message is null) + throw new ArgumentNullException(nameof(message)); + + var effectiveContext = context ?? MessageContext.From(message); + var messageId = _identitySelector(message, effectiveContext); + if (string.IsNullOrWhiteSpace(messageId)) + throw new InvalidOperationException("Message store identity selector returned a blank identifier."); + + lock (_gate) + { + if (_byId.TryGetValue(messageId, out var existing)) + return MessageStoreAppendResult.ForDuplicate(_name, existing); + + var stored = new StoredMessage(_name, messageId, ++_sequence, _clock(), message, effectiveContext.Headers); + if (!_retentionPredicate(stored)) + return MessageStoreAppendResult.ForRejected(_name, stored, "Message did not satisfy retention policy."); + + _byId.Add(messageId, stored); + _ordered.Add(stored); + return MessageStoreAppendResult.ForStored(_name, stored); + } + } + + /// Returns a stored message by identifier when present. + public StoredMessage? Get(string messageId) + { + if (string.IsNullOrWhiteSpace(messageId)) + throw new ArgumentException("Message identifier cannot be null, empty, or whitespace.", nameof(messageId)); + + lock (_gate) + return _byId.TryGetValue(messageId, out var stored) ? stored : null; + } + + /// Queries stored messages in append order. + public IReadOnlyList> Query(MessageStoreQuery? query = null) + { + var effectiveQuery = query ?? MessageStoreQuery.All; + lock (_gate) + { + IEnumerable> matches = _ordered; + if (!string.IsNullOrWhiteSpace(effectiveQuery.CorrelationId)) + matches = matches.Where(stored => stored.Headers.CorrelationId == effectiveQuery.CorrelationId); + if (effectiveQuery.FromUtc is not null) + matches = matches.Where(stored => stored.StoredAtUtc >= effectiveQuery.FromUtc.Value); + if (effectiveQuery.ToUtc is not null) + matches = matches.Where(stored => stored.StoredAtUtc <= effectiveQuery.ToUtc.Value); + if (effectiveQuery.MaxCount is not null) + matches = matches.Take(effectiveQuery.MaxCount.Value); + + return matches.ToArray(); + } + } + + /// Returns message envelopes that match a query in replay order. + public IReadOnlyList> Replay(MessageStoreQuery? query = null) + => Query(query).Select(static stored => stored.Message).ToArray(); + + /// Creates a message-store builder. + public static Builder Create(string name = "message-store") => new(name); + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private MessageIdentitySelector _identitySelector = static (message, _) => + message.Headers.MessageId ?? Guid.NewGuid().ToString("N"); + private RetentionPredicate _retentionPredicate = static _ => true; + private Func _clock = static () => DateTimeOffset.UtcNow; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Message store name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Configures the identity selector used for storage and lookup. + public Builder IdentifyBy(MessageIdentitySelector selector) + { + _identitySelector = selector ?? throw new ArgumentNullException(nameof(selector)); + return this; + } + + /// Configures a retention predicate. Messages returning false are not persisted. + public Builder RetainWhen(RetentionPredicate predicate) + { + _retentionPredicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + return this; + } + + /// Configures the clock used when storing messages. + public Builder UseClock(Func clock) + { + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + return this; + } + + /// Builds the message store. + public MessageStore Build() => new(_name, _identitySelector, _retentionPredicate, _clock); + } +} + +/// Stored message metadata and envelope. +public sealed class StoredMessage +{ + /// Creates stored message metadata. + public StoredMessage( + string storeName, + string messageId, + long sequence, + DateTimeOffset storedAtUtc, + Message message, + MessageHeaders headers) + { + StoreName = storeName; + MessageId = messageId; + Sequence = sequence; + StoredAtUtc = storedAtUtc; + Message = message; + Headers = headers; + } + + /// Name of the message store. + public string StoreName { get; } + + /// Stable message identifier. + public string MessageId { get; } + + /// Append sequence assigned by the store. + public long Sequence { get; } + + /// UTC time the message was stored. + public DateTimeOffset StoredAtUtc { get; } + + /// Stored message envelope. + public Message Message { get; } + + /// Headers captured for query and replay metadata. + public MessageHeaders Headers { get; } +} + +/// Query options for store lookup and replay. +public sealed class MessageStoreQuery +{ + /// Query that returns every stored message. + public static MessageStoreQuery All { get; } = new(); + + /// Creates a message-store query. + public MessageStoreQuery(string? correlationId = null, DateTimeOffset? fromUtc = null, DateTimeOffset? toUtc = null, int? maxCount = null) + { + if (maxCount is not null && maxCount <= 0) + throw new ArgumentOutOfRangeException(nameof(maxCount), "Maximum count must be greater than zero."); + + CorrelationId = correlationId; + FromUtc = fromUtc; + ToUtc = toUtc; + MaxCount = maxCount; + } + + /// Correlation identifier filter. + public string? CorrelationId { get; } + + /// Inclusive lower stored-at UTC filter. + public DateTimeOffset? FromUtc { get; } + + /// Inclusive upper stored-at UTC filter. + public DateTimeOffset? ToUtc { get; } + + /// Maximum number of messages to return. + public int? MaxCount { get; } + + /// Creates a query scoped to a correlation identifier. + public static MessageStoreQuery ForCorrelation(string correlationId) + { + if (string.IsNullOrWhiteSpace(correlationId)) + throw new ArgumentException("Correlation identifier cannot be null, empty, or whitespace.", nameof(correlationId)); + + return new MessageStoreQuery(correlationId); + } + + /// Creates a copy with a maximum result count. + public MessageStoreQuery Take(int maxCount) + { + if (maxCount <= 0) + throw new ArgumentOutOfRangeException(nameof(maxCount), "Maximum count must be greater than zero."); + + return new MessageStoreQuery(CorrelationId, FromUtc, ToUtc, maxCount); + } +} + +/// Result returned when appending to a message store. +public sealed class MessageStoreAppendResult +{ + private MessageStoreAppendResult(string storeName, StoredMessage storedMessage, bool stored, bool duplicate, string? rejectionReason) + => (StoreName, StoredMessage, Stored, Duplicate, RejectionReason) = (storeName, storedMessage, stored, duplicate, rejectionReason); + + /// Name of the message store. + public string StoreName { get; } + + /// Message metadata involved in the append attempt. + public StoredMessage StoredMessage { get; } + + /// True when the message was persisted by this call. + public bool Stored { get; } + + /// True when the message already existed. + public bool Duplicate { get; } + + /// Reason the message was rejected, when applicable. + public string? RejectionReason { get; } + + /// Creates a successful stored result. + public static MessageStoreAppendResult ForStored(string storeName, StoredMessage storedMessage) + => new(storeName, storedMessage, true, false, null); + + /// Creates a duplicate result. + public static MessageStoreAppendResult ForDuplicate(string storeName, StoredMessage storedMessage) + => new(storeName, storedMessage, false, true, null); + + /// Creates a retention rejection result. + public static MessageStoreAppendResult ForRejected(string storeName, StoredMessage storedMessage, string rejectionReason) + => new(storeName, storedMessage, false, false, rejectionReason); +} diff --git a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs index 54108a6..b226976 100644 --- a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs +++ b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs @@ -60,6 +60,7 @@ using PatternKit.Examples.UnitOfWorkDemo; using PatternKit.Examples.VisitorDemo; using PatternKit.Messaging.Routing; +using PatternKit.Messaging.Storage; using PatternKit.Messaging.CompetingConsumers; using PatternKit.Messaging.PipesAndFilters; using PatternKit.Messaging.Transformation; @@ -128,6 +129,7 @@ public sealed record GeneratedDeadLetterChannelExample(FulfillmentDeadLetterChan public sealed record GeneratedRecipientListExample(RecipientListGeneratorExampleRunner Runner); public sealed record GeneratedSplitterAggregatorExample(MessageRoutingExampleRunner Runner); public sealed record OrderMessageFilterExampleService(MessageFilter Filter, OrderMessageFilterService Service); +public sealed record OrderMessageStoreExampleService(MessageStore Store, OrderMessageStoreService Service); public sealed record OrderWireTapExampleService(WireTap Tap, OrderWireTapService Service); public sealed record FulfillmentCompetingConsumersExampleService(CompetingConsumerGroup Group, FulfillmentCompetingConsumerService Service); public sealed record FulfillmentPipesAndFiltersExampleService(PipesAndFiltersPipeline Pipeline, FulfillmentPipesAndFiltersService Service); @@ -202,6 +204,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s .AddGeneratedRecipientListExample() .AddGeneratedSplitterAggregatorExample() .AddOrderMessageFilterExample() + .AddOrderMessageStoreExample() .AddOrderWireTapExample() .AddFulfillmentCompetingConsumersExample() .AddFulfillmentPipesAndFiltersExample() @@ -498,6 +501,15 @@ public static IServiceCollection AddOrderMessageFilterExample(this IServiceColle return services.RegisterExample("Order Message Filter", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection); } + public static IServiceCollection AddOrderMessageStoreExample(this IServiceCollection services) + { + services.AddOrderMessageStoreDemo(); + services.AddSingleton(sp => new( + sp.GetRequiredService>(), + sp.GetRequiredService())); + return services.RegisterExample("Order Message Store", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection); + } + public static IServiceCollection AddOrderWireTapExample(this IServiceCollection services) { services.AddOrderWireTapDemo(); diff --git a/src/PatternKit.Examples/Messaging/OrderMessageStoreExample.cs b/src/PatternKit.Examples/Messaging/OrderMessageStoreExample.cs new file mode 100644 index 0000000..6096232 --- /dev/null +++ b/src/PatternKit.Examples/Messaging/OrderMessageStoreExample.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Generators.Messaging; +using PatternKit.Messaging; +using PatternKit.Messaging.Storage; + +namespace PatternKit.Examples.Messaging; + +/// Order audit event stored for replay and support lookup. +public sealed record OrderMessageStoreEvent(string OrderId, string EventName, decimal Total, bool ContainsSensitiveData); + +/// Summary returned by the message-store example. +public sealed record OrderMessageStoreSummary(bool Stored, bool Duplicate, int ReplayCount, string? RejectionReason); + +/// Service that records order events in a DI-importable message store. +public sealed class OrderMessageStoreService(MessageStore store) +{ + public OrderMessageStoreSummary Record(OrderMessageStoreEvent orderEvent, string messageId, string correlationId) + { + var message = Message.Create(orderEvent) + .WithMessageId(messageId) + .WithCorrelationId(correlationId); + var result = store.Append(message); + var replay = store.Replay(MessageStoreQuery.ForCorrelation(correlationId)); + return new OrderMessageStoreSummary(result.Stored, result.Duplicate, replay.Count, result.RejectionReason); + } +} + +/// Fluent message-store builder used by applications that do not enable generators. +public static class OrderMessageStores +{ + public static MessageStore CreateAuditStore() + => MessageStore.Create("order-message-store") + .IdentifyBy(static (message, _) => message.Headers.MessageId!) + .RetainWhen(static stored => !stored.Message.Payload.ContainsSensitiveData) + .Build(); +} + +/// Source-generated message store for order audit and replay events. +[GenerateMessageStore(typeof(OrderMessageStoreEvent), FactoryName = "Create", StoreName = "order-message-store")] +public static partial class GeneratedOrderMessageStore +{ + [MessageStoreIdentity] + private static string SelectIdentity(Message message, MessageContext context) + => message.Headers.MessageId!; + + [MessageStoreRetention] + private static bool RetainReplaySafeEvents(StoredMessage stored) + => !stored.Message.Payload.ContainsSensitiveData; +} + +/// Runner that demonstrates both fluent and generated message-store paths. +public sealed class OrderMessageStoreExampleRunner(OrderMessageStoreService service) +{ + public OrderMessageStoreSummary RunGenerated(OrderMessageStoreEvent orderEvent, string messageId, string correlationId) + => service.Record(orderEvent, messageId, correlationId); + + public static OrderMessageStoreSummary RunFluent(OrderMessageStoreEvent orderEvent, string messageId, string correlationId) + { + var store = OrderMessageStores.CreateAuditStore(); + var message = Message.Create(orderEvent) + .WithMessageId(messageId) + .WithCorrelationId(correlationId); + var result = store.Append(message); + var replay = store.Replay(MessageStoreQuery.ForCorrelation(correlationId)); + return new OrderMessageStoreSummary(result.Stored, result.Duplicate, replay.Count, result.RejectionReason); + } +} + +/// DI helpers for importing the order message-store example into standard .NET containers. +public static class OrderMessageStoreExampleServiceCollectionExtensions +{ + public static IServiceCollection AddOrderMessageStoreDemo(this IServiceCollection services) + { + services.AddSingleton(_ => GeneratedOrderMessageStore.Create()); + services.AddSingleton(); + services.AddSingleton(); + return services; + } +} diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs index 32094f6..77e8d49 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs @@ -288,6 +288,14 @@ public sealed class PatternKitExampleCatalog : IPatternKitExampleCatalog ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection, ["MessageFilter"], ["fraud-screening allow rules", "source-generated filter", "DI composition"]), + Descriptor( + "Order Message Store", + "src/PatternKit.Examples/Messaging/OrderMessageStoreExample.cs", + "test/PatternKit.Examples.Tests/Messaging/OrderMessageStoreExampleTests.cs", + "docs/examples/order-message-store.md", + ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection, + ["MessageStore"], + ["audit persistence", "source-generated identity and retention", "DI composition"]), Descriptor( "Order Wire Tap", "src/PatternKit.Examples/Messaging/OrderWireTapExample.cs", diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs index 55d0f2c..0ccda09 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs @@ -441,6 +441,19 @@ public sealed class PatternKitPatternCatalog : IPatternKitPatternCatalog "test/PatternKit.Examples.Tests/Messaging/OrderMessageFilterExampleTests.cs", ["fluent message filter", "generated allow-rule filter", "DI-importable order fraud-screening example"]), + Pattern("Message Store", PatternFamily.EnterpriseIntegration, + "docs/patterns/messaging/message-store.md", + "src/PatternKit.Core/Messaging/Storage/MessageStore.cs", + "test/PatternKit.Tests/Messaging/Storage/MessageStoreTests.cs", + "docs/generators/message-store.md", + "src/PatternKit.Generators/Messaging/MessageStoreGenerator.cs", + "test/PatternKit.Generators.Tests/MessageStoreGeneratorTests.cs", + null, + "docs/examples/order-message-store.md", + "src/PatternKit.Examples/Messaging/OrderMessageStoreExample.cs", + "test/PatternKit.Examples.Tests/Messaging/OrderMessageStoreExampleTests.cs", + ["fluent message store", "generated identity and retention hooks", "DI-importable order audit/replay example"]), + Pattern("Wire Tap", PatternFamily.EnterpriseIntegration, "docs/patterns/messaging/wire-tap.md", "src/PatternKit.Core/Messaging/Routing/WireTap.cs", diff --git a/src/PatternKit.Generators.Abstractions/Messaging/MessageStoreAttributes.cs b/src/PatternKit.Generators.Abstractions/Messaging/MessageStoreAttributes.cs new file mode 100644 index 0000000..4f1a07a --- /dev/null +++ b/src/PatternKit.Generators.Abstractions/Messaging/MessageStoreAttributes.cs @@ -0,0 +1,35 @@ +using System; + +namespace PatternKit.Generators.Messaging; + +/// +/// Generates a typed message-store factory for a partial class or struct. +/// +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)] +public sealed class GenerateMessageStoreAttribute : Attribute +{ + /// Creates a message-store generator attribute. + public GenerateMessageStoreAttribute(Type payloadType) + => PayloadType = payloadType ?? throw new ArgumentNullException(nameof(payloadType)); + + /// Payload type persisted by the generated message store. + public Type PayloadType { get; } + + /// Name of the generated factory method. + public string FactoryName { get; set; } = "Create"; + + /// Name assigned to the generated message store. + public string StoreName { get; set; } = "message-store"; +} + +/// +/// Marks a static method as the generated message identity selector. +/// +[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)] +public sealed class MessageStoreIdentityAttribute : Attribute; + +/// +/// Marks a static predicate as the generated message-store retention policy. +/// +[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)] +public sealed class MessageStoreRetentionAttribute : Attribute; diff --git a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md index 16a4ec5..182f635 100644 --- a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md +++ b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md @@ -296,3 +296,7 @@ PKECS001 | PatternKit.Generators.Cloud | Error | External Configuration Store ho PKECS002 | PatternKit.Generators.Cloud | Error | External Configuration Store loader is invalid. PKECS003 | PatternKit.Generators.Cloud | Error | External Configuration Store validator signature is invalid. PKECS004 | PatternKit.Generators.Cloud | Error | External Configuration Store validator order is duplicated. +PKMS001 | PatternKit.Generators.Messaging | Error | Message store type must be partial. +PKMS002 | PatternKit.Generators.Messaging | Error | Message store identity signature is invalid. +PKMS003 | PatternKit.Generators.Messaging | Error | Message store retention signature is invalid. +PKMS004 | PatternKit.Generators.Messaging | Error | Message store hook is duplicated. diff --git a/src/PatternKit.Generators/Messaging/MessageStoreGenerator.cs b/src/PatternKit.Generators/Messaging/MessageStoreGenerator.cs new file mode 100644 index 0000000..a0c2ab4 --- /dev/null +++ b/src/PatternKit.Generators/Messaging/MessageStoreGenerator.cs @@ -0,0 +1,190 @@ +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp.Syntax; +using Microsoft.CodeAnalysis.Text; +using System.Linq; +using System.Text; + +namespace PatternKit.Generators.Messaging; + +[Generator] +public sealed class MessageStoreGenerator : IIncrementalGenerator +{ + private static readonly DiagnosticDescriptor MustBePartial = new( + "PKMS001", + "Message store type must be partial", + "Type '{0}' is marked with [GenerateMessageStore] but is not declared as partial", + "PatternKit.Generators.Messaging", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidIdentity = new( + "PKMS002", + "Message store identity signature is invalid", + "Message store identity method '{0}' must be static and return string with Message and MessageContext parameters", + "PatternKit.Generators.Messaging", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidRetention = new( + "PKMS003", + "Message store retention signature is invalid", + "Message store retention method '{0}' must be static and return bool with StoredMessage parameter", + "PatternKit.Generators.Messaging", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor DuplicateHook = new( + "PKMS004", + "Message store hook is duplicated", + "Type '{0}' declares more than one [{1}] method", + "PatternKit.Generators.Messaging", + DiagnosticSeverity.Error, + true); + + public void Initialize(IncrementalGeneratorInitializationContext context) + { + var candidates = context.SyntaxProvider.ForAttributeWithMetadataName( + "PatternKit.Generators.Messaging.GenerateMessageStoreAttribute", + static (node, _) => node is TypeDeclarationSyntax, + static (ctx, _) => (Type: (INamedTypeSymbol)ctx.TargetSymbol, Node: (TypeDeclarationSyntax)ctx.TargetNode, Attributes: ctx.Attributes)); + + context.RegisterSourceOutput(candidates, static (spc, candidate) => + { + var attr = candidate.Attributes.FirstOrDefault(a => + a.AttributeClass?.ToDisplayString() == "PatternKit.Generators.Messaging.GenerateMessageStoreAttribute"); + if (attr is null) + return; + + Generate(spc, candidate.Type, candidate.Node, attr); + }); + } + + private static void Generate(SourceProductionContext context, INamedTypeSymbol type, TypeDeclarationSyntax node, AttributeData attribute) + { + if (!node.Modifiers.Any(static modifier => modifier.Text == "partial")) + { + context.ReportDiagnostic(Diagnostic.Create(MustBePartial, node.Identifier.GetLocation(), type.Name)); + return; + } + + var payloadType = attribute.ConstructorArguments.Length >= 1 + ? attribute.ConstructorArguments[0].Value as INamedTypeSymbol + : null; + if (payloadType is null) + return; + + var identities = type.GetMembers().OfType().Where(static method => + method.GetAttributes().Any(static attr => + attr.AttributeClass?.ToDisplayString() == "PatternKit.Generators.Messaging.MessageStoreIdentityAttribute")).ToArray(); + var retentions = type.GetMembers().OfType().Where(static method => + method.GetAttributes().Any(static attr => + attr.AttributeClass?.ToDisplayString() == "PatternKit.Generators.Messaging.MessageStoreRetentionAttribute")).ToArray(); + + if (identities.Length > 1) + { + context.ReportDiagnostic(Diagnostic.Create(DuplicateHook, identities[1].Locations.FirstOrDefault(), type.Name, "MessageStoreIdentity")); + return; + } + + if (retentions.Length > 1) + { + context.ReportDiagnostic(Diagnostic.Create(DuplicateHook, retentions[1].Locations.FirstOrDefault(), type.Name, "MessageStoreRetention")); + return; + } + + var identity = identities.FirstOrDefault(); + if (identity is not null && !IsIdentity(identity, payloadType)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidIdentity, identity.Locations.FirstOrDefault(), identity.Name)); + return; + } + + var retention = retentions.FirstOrDefault(); + if (retention is not null && !IsRetention(retention, payloadType)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidRetention, retention.Locations.FirstOrDefault(), retention.Name)); + return; + } + + var factoryName = GetNamedString(attribute, "FactoryName") ?? "Create"; + var storeName = GetNamedString(attribute, "StoreName") ?? "message-store"; + context.AddSource($"{type.Name}.MessageStore.g.cs", SourceText.From( + GenerateSource(type, payloadType, factoryName, storeName, identity?.Name, retention?.Name), + Encoding.UTF8)); + } + + private static bool IsIdentity(IMethodSymbol method, INamedTypeSymbol payloadType) + => method.IsStatic && + method.ReturnType.SpecialType == SpecialType.System_String && + method.Parameters.Length == 2 && + IsMessageOfPayload(method.Parameters[0].Type, payloadType) && + method.Parameters[1].Type.ToDisplayString() == "PatternKit.Messaging.MessageContext"; + + private static bool IsRetention(IMethodSymbol method, INamedTypeSymbol payloadType) + => method.IsStatic && + method.ReturnType.SpecialType == SpecialType.System_Boolean && + method.Parameters.Length == 1 && + IsStoredMessageOfPayload(method.Parameters[0].Type, payloadType); + + private static bool IsMessageOfPayload(ITypeSymbol type, INamedTypeSymbol payloadType) + => type is INamedTypeSymbol named && + named.ConstructedFrom.ToDisplayString() == "PatternKit.Messaging.Message" && + SymbolEqualityComparer.Default.Equals(named.TypeArguments[0], payloadType); + + private static bool IsStoredMessageOfPayload(ITypeSymbol type, INamedTypeSymbol payloadType) + => type is INamedTypeSymbol named && + named.ConstructedFrom.ToDisplayString() == "PatternKit.Messaging.Storage.StoredMessage" && + SymbolEqualityComparer.Default.Equals(named.TypeArguments[0], payloadType); + + private static string GenerateSource( + INamedTypeSymbol type, + INamedTypeSymbol payloadType, + string factoryName, + string storeName, + string? identityMethod, + string? retentionMethod) + { + var sb = new StringBuilder(); + sb.AppendLine("// "); + sb.AppendLine("#nullable enable"); + sb.AppendLine(); + + var ns = type.ContainingNamespace.IsGlobalNamespace ? null : type.ContainingNamespace.ToDisplayString(); + if (ns is not null) + { + sb.Append("namespace ").Append(ns).AppendLine(";"); + sb.AppendLine(); + } + + sb.Append("partial ").Append(GetKind(type)).Append(' ').Append(type.Name).AppendLine(); + sb.AppendLine("{"); + sb.Append(" public static global::PatternKit.Messaging.Storage.MessageStore<") + .Append(payloadType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)) + .Append("> ") + .Append(factoryName) + .AppendLine("()"); + sb.Append(" => global::PatternKit.Messaging.Storage.MessageStore<") + .Append(payloadType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)) + .Append(">.Create(") + .Append(ToLiteral(storeName)) + .AppendLine(")"); + + if (identityMethod is not null) + sb.Append(" .IdentifyBy(").Append(identityMethod).AppendLine(")"); + if (retentionMethod is not null) + sb.Append(" .RetainWhen(").Append(retentionMethod).AppendLine(")"); + + sb.AppendLine(" .Build();"); + sb.AppendLine("}"); + return sb.ToString(); + } + + private static string GetKind(INamedTypeSymbol type) + => type.TypeKind == TypeKind.Struct ? "struct" : "class"; + + private static string? GetNamedString(AttributeData attribute, string name) + => attribute.NamedArguments.FirstOrDefault(kv => kv.Key == name).Value.Value as string; + + private static string ToLiteral(string value) + => "@\"" + value.Replace("\"", "\"\"") + "\""; +} diff --git a/test/PatternKit.Examples.Tests/DependencyInjection/PatternKitExampleDependencyInjectionTests.cs b/test/PatternKit.Examples.Tests/DependencyInjection/PatternKitExampleDependencyInjectionTests.cs index 920e52b..cc16b5f 100644 --- a/test/PatternKit.Examples.Tests/DependencyInjection/PatternKitExampleDependencyInjectionTests.cs +++ b/test/PatternKit.Examples.Tests/DependencyInjection/PatternKitExampleDependencyInjectionTests.cs @@ -104,6 +104,7 @@ public Task IoC_Registered_Examples_Can_Be_Used_By_Importing_Applications() var competingConsumers = provider.GetRequiredService(); var pipesAndFilters = provider.GetRequiredService(); var messageFilter = provider.GetRequiredService(); + var messageStore = provider.GetRequiredService(); var wireTap = provider.GetRequiredService(); var generatedTranslator = provider.GetRequiredService(); var generatedClaimCheck = provider.GetRequiredService(); @@ -193,6 +194,7 @@ public Task IoC_Registered_Examples_Can_Be_Used_By_Importing_Applications() ("generated competing consumers dispatch fulfillment work", competingConsumers.Service.DispatchAsync(new FulfillmentConsumerWork("ORDER-CC", "central")).GetAwaiter().GetResult().Accepted), ("generated pipes and filters publish fulfillment work", pipesAndFilters.Service.ProcessAsync("ORDER-PF").GetAwaiter().GetResult().Value.Published), ("generated message filter screens trusted orders", messageFilter.Service.Screen(new("ORDER-MF", "trusted", 250m, true)).Accepted), + ("generated message store records replayable order events", messageStore.Service.Record(new("ORDER-MS", "Submitted", 125m, false), "MSG-MS", "CHECKOUT-MS").Stored), ("generated wire tap records observability side channels", wireTap.Service.Publish(new("ORDER-WT", "tenant-a", 125m)).InvokedTaps.Count == 2), ("message envelope example tracks first attempt", envelope.Run().Attempt == 1), ("CQRS fluent path matches command writes to query reads", cqrsFluent.QueryMatchedCommand), diff --git a/test/PatternKit.Examples.Tests/Messaging/OrderMessageStoreExampleTests.cs b/test/PatternKit.Examples.Tests/Messaging/OrderMessageStoreExampleTests.cs new file mode 100644 index 0000000..50d46f7 --- /dev/null +++ b/test/PatternKit.Examples.Tests/Messaging/OrderMessageStoreExampleTests.cs @@ -0,0 +1,75 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Examples.DependencyInjection; +using PatternKit.Examples.Messaging; +using PatternKit.Messaging; +using PatternKit.Messaging.Storage; +using TinyBDD; + +namespace PatternKit.Examples.Tests.Messaging; + +public sealed class OrderMessageStoreExampleTests +{ + [Scenario("FluentMessageStore RecordsReplaySafeEvents")] + [Fact] + public void FluentMessageStore_RecordsReplaySafeEvents() + { + var summary = OrderMessageStoreExampleRunner.RunFluent( + new("order-1", "Submitted", 125m, false), + "msg-1", + "checkout-1"); + + ScenarioExpect.True(summary.Stored); + ScenarioExpect.False(summary.Duplicate); + ScenarioExpect.Equal(1, summary.ReplayCount); + ScenarioExpect.Null(summary.RejectionReason); + } + + [Scenario("GeneratedMessageStore MatchesFluentRetention")] + [Fact] + public void GeneratedMessageStore_MatchesFluentRetention() + { + var orderEvent = new OrderMessageStoreEvent("order-1", "PaymentCaptured", 125m, true); + var fluent = OrderMessageStoreExampleRunner.RunFluent(orderEvent, "msg-1", "checkout-1"); + var generated = GeneratedOrderMessageStore.Create().Append( + Message.Create(orderEvent).WithMessageId("msg-1").WithCorrelationId("checkout-1")); + + ScenarioExpect.False(fluent.Stored); + ScenarioExpect.False(generated.Stored); + ScenarioExpect.Equal(fluent.RejectionReason, generated.RejectionReason); + } + + [Scenario("ServiceCollection ImportsMessageStoreExample")] + [Fact] + public void ServiceCollection_ImportsMessageStoreExample() + { + var services = new ServiceCollection(); + services.AddOrderMessageStoreDemo(); + + using var provider = services.BuildServiceProvider(validateScopes: true); + var store = provider.GetRequiredService>(); + var runner = provider.GetRequiredService(); + + var direct = store.Append(Message.Create(new("order-1", "Submitted", 125m, false)).WithMessageId("msg-1")); + var summary = runner.RunGenerated(new("order-1", "Submitted", 125m, false), "msg-2", "checkout-1"); + + ScenarioExpect.True(direct.Stored); + ScenarioExpect.True(summary.Stored); + ScenarioExpect.Equal(1, summary.ReplayCount); + } + + [Scenario("AggregateServiceCollection ImportsMessageStoreExample")] + [Fact] + public void AggregateServiceCollection_ImportsMessageStoreExample() + { + var services = new ServiceCollection(); + services.AddPatternKitExamples(); + + using var provider = services.BuildServiceProvider(validateScopes: true); + var example = provider.GetRequiredService(); + + var summary = example.Service.Record(new("order-1", "Submitted", 125m, false), "msg-1", "checkout-1"); + + ScenarioExpect.True(summary.Stored); + ScenarioExpect.Equal(1, summary.ReplayCount); + } +} diff --git a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs index ea0a0d6..2051b0d 100644 --- a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs +++ b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs @@ -44,6 +44,7 @@ public sealed class PatternKitPatternCatalogTests(ITestOutputHelper output) : Ti "Dead Letter Channel", "Content-Based Router", "Message Filter", + "Message Store", "Wire Tap", "Recipient List", "Competing Consumers", @@ -120,7 +121,7 @@ public Task Catalog_Includes_Enterprise_Integration_And_Architecture_Patterns() ScenarioExpect.Equal(EnterprisePatternAdditions.OrderBy(static x => x), patterns.Select(static p => p.Name).OrderBy(static x => x))) .And("enterprise entries are grouped by integration reliability and architecture families", patterns => { - ScenarioExpect.Equal(17, patterns.Count(static p => p.Family == PatternFamily.EnterpriseIntegration)); + ScenarioExpect.Equal(18, patterns.Count(static p => p.Family == PatternFamily.EnterpriseIntegration)); ScenarioExpect.Equal(3, patterns.Count(static p => p.Family == PatternFamily.MessagingReliability)); ScenarioExpect.Equal(7, patterns.Count(static p => p.Family == PatternFamily.CloudArchitecture)); ScenarioExpect.Equal(15, patterns.Count(static p => p.Family == PatternFamily.ApplicationArchitecture)); diff --git a/test/PatternKit.Generators.Tests/AbstractionsAttributeCoverageTests.cs b/test/PatternKit.Generators.Tests/AbstractionsAttributeCoverageTests.cs index 61fc5a3..d7e7717 100644 --- a/test/PatternKit.Generators.Tests/AbstractionsAttributeCoverageTests.cs +++ b/test/PatternKit.Generators.Tests/AbstractionsAttributeCoverageTests.cs @@ -143,6 +143,9 @@ private enum TestTrigger { typeof(ContentRouteDefaultAttribute), AttributeTargets.Method, false, false }, { typeof(GenerateMessageFilterAttribute), AttributeTargets.Class | AttributeTargets.Struct, false, false }, { typeof(MessageFilterRuleAttribute), AttributeTargets.Method, false, false }, + { typeof(GenerateMessageStoreAttribute), AttributeTargets.Class | AttributeTargets.Struct, false, false }, + { typeof(MessageStoreIdentityAttribute), AttributeTargets.Method, false, false }, + { typeof(MessageStoreRetentionAttribute), AttributeTargets.Method, false, false }, { typeof(GenerateWireTapAttribute), AttributeTargets.Class | AttributeTargets.Struct, false, false }, { typeof(WireTapHandlerAttribute), AttributeTargets.Method, false, false }, { typeof(GenerateClaimCheckAttribute), AttributeTargets.Class | AttributeTargets.Struct, false, false }, @@ -751,6 +754,11 @@ public void Flyweight_Iterator_And_Messaging_Attributes_Expose_Defaults_And_Conf RejectionReason = "manual review" }; var messageFilterRule = new MessageFilterRuleAttribute("trusted", 9); + var messageStore = new GenerateMessageStoreAttribute(typeof(string)) + { + FactoryName = "BuildStore", + StoreName = "order-audit" + }; var wireTap = new GenerateWireTapAttribute(typeof(string)) { FactoryName = "BuildTap", @@ -882,6 +890,9 @@ public void Flyweight_Iterator_And_Messaging_Attributes_Expose_Defaults_And_Conf ScenarioExpect.Equal("manual review", messageFilter.RejectionReason); ScenarioExpect.Equal("trusted", messageFilterRule.Name); ScenarioExpect.Equal(9, messageFilterRule.Order); + ScenarioExpect.Equal(typeof(string), messageStore.PayloadType); + ScenarioExpect.Equal("BuildStore", messageStore.FactoryName); + ScenarioExpect.Equal("order-audit", messageStore.StoreName); ScenarioExpect.Equal(typeof(string), wireTap.PayloadType); ScenarioExpect.Equal("BuildTap", wireTap.FactoryName); ScenarioExpect.Equal("orders-observability", wireTap.TapName); @@ -961,6 +972,9 @@ public void Flyweight_Iterator_And_Messaging_Attributes_Expose_Defaults_And_Conf ScenarioExpect.Throws(() => new ContentRouteAttribute("name", 1, "")); ScenarioExpect.Throws(() => new GenerateMessageFilterAttribute(null!)); ScenarioExpect.Throws(() => new MessageFilterRuleAttribute("", 1)); + ScenarioExpect.Throws(() => new GenerateMessageStoreAttribute(null!)); + ScenarioExpect.IsType(new MessageStoreIdentityAttribute()); + ScenarioExpect.IsType(new MessageStoreRetentionAttribute()); ScenarioExpect.Throws(() => new GenerateWireTapAttribute(null!)); ScenarioExpect.Throws(() => new WireTapHandlerAttribute("", 1)); ScenarioExpect.Throws(() => new GenerateClaimCheckAttribute(null!)); diff --git a/test/PatternKit.Generators.Tests/MessageStoreGeneratorTests.cs b/test/PatternKit.Generators.Tests/MessageStoreGeneratorTests.cs new file mode 100644 index 0000000..f491673 --- /dev/null +++ b/test/PatternKit.Generators.Tests/MessageStoreGeneratorTests.cs @@ -0,0 +1,164 @@ +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using PatternKit.Generators.Messaging; +using TinyBDD; + +namespace PatternKit.Generators.Tests; + +public sealed class MessageStoreGeneratorTests +{ + [Scenario("GeneratesMessageStoreFactory")] + [Fact] + public void GeneratesMessageStoreFactory() + { + var source = """ + using PatternKit.Generators.Messaging; + using PatternKit.Messaging; + using PatternKit.Messaging.Storage; + + namespace MyApp; + + public sealed record Order(string Id, decimal Total); + + [GenerateMessageStore(typeof(Order), FactoryName = "Build", StoreName = "order-audit")] + public static partial class OrderMessageStore + { + [MessageStoreIdentity] + private static string Identity(Message message, MessageContext context) + => message.Headers.MessageId!; + + [MessageStoreRetention] + private static bool Retain(StoredMessage stored) + => stored.Message.Payload.Total <= 500m; + } + + public static class Demo + { + public static bool Run() + => OrderMessageStore.Build().Append(Message.Create(new Order("o-1", 100m)).WithMessageId("m-1")).Stored; + } + """; + + var comp = CreateCompilation(source, nameof(GeneratesMessageStoreFactory)); + var gen = new MessageStoreGenerator(); + _ = RoslynTestHelpers.Run(comp, gen, out var run, out var updated); + + ScenarioExpect.All(run.Results, result => ScenarioExpect.Empty(result.Diagnostics)); + var generated = ScenarioExpect.Single(run.Results.SelectMany(result => result.GeneratedSources)); + ScenarioExpect.Equal("OrderMessageStore.MessageStore.g.cs", generated.HintName); + var text = generated.SourceText.ToString(); + ScenarioExpect.Contains("MessageStore", text); + ScenarioExpect.Contains(".IdentifyBy(Identity)", text); + ScenarioExpect.Contains(".RetainWhen(Retain)", text); + + var emit = updated.Emit(Stream.Null); + ScenarioExpect.True(emit.Success, string.Join("\n", emit.Diagnostics)); + } + + [Scenario("ReportsDiagnosticForNonPartialStore")] + [Fact] + public void ReportsDiagnosticForNonPartialStore() + { + var source = """ + using PatternKit.Generators.Messaging; + + namespace MyApp; + public sealed record Order(string Id); + [GenerateMessageStore(typeof(Order))] + public static class OrderStore; + """; + + var comp = CreateCompilation(source, nameof(ReportsDiagnosticForNonPartialStore)); + var gen = new MessageStoreGenerator(); + _ = RoslynTestHelpers.Run(comp, gen, out var run, out _); + + var diagnostic = ScenarioExpect.Single(run.Results.SelectMany(result => result.Diagnostics)); + ScenarioExpect.Equal("PKMS001", diagnostic.Id); + } + + [Scenario("ReportsDiagnosticForInvalidIdentity")] + [Fact] + public void ReportsDiagnosticForInvalidIdentity() + { + var source = """ + using PatternKit.Generators.Messaging; + using PatternKit.Messaging; + + namespace MyApp; + public sealed record Order(string Id); + [GenerateMessageStore(typeof(Order))] + public static partial class OrderStore + { + [MessageStoreIdentity] + private static int Identity(Message message, MessageContext context) => 1; + } + """; + + var comp = CreateCompilation(source, nameof(ReportsDiagnosticForInvalidIdentity)); + var gen = new MessageStoreGenerator(); + _ = RoslynTestHelpers.Run(comp, gen, out var run, out _); + + var diagnostic = ScenarioExpect.Single(run.Results.SelectMany(result => result.Diagnostics)); + ScenarioExpect.Equal("PKMS002", diagnostic.Id); + } + + [Scenario("ReportsDiagnosticForInvalidRetention")] + [Fact] + public void ReportsDiagnosticForInvalidRetention() + { + var source = """ + using PatternKit.Generators.Messaging; + using PatternKit.Messaging.Storage; + + namespace MyApp; + public sealed record Order(string Id); + [GenerateMessageStore(typeof(Order))] + public static partial class OrderStore + { + [MessageStoreRetention] + private static string Retain(StoredMessage stored) => "yes"; + } + """; + + var comp = CreateCompilation(source, nameof(ReportsDiagnosticForInvalidRetention)); + var gen = new MessageStoreGenerator(); + _ = RoslynTestHelpers.Run(comp, gen, out var run, out _); + + var diagnostic = ScenarioExpect.Single(run.Results.SelectMany(result => result.Diagnostics)); + ScenarioExpect.Equal("PKMS003", diagnostic.Id); + } + + [Scenario("ReportsDiagnosticForDuplicateHooks")] + [Fact] + public void ReportsDiagnosticForDuplicateHooks() + { + var source = """ + using PatternKit.Generators.Messaging; + using PatternKit.Messaging; + + namespace MyApp; + public sealed record Order(string Id); + [GenerateMessageStore(typeof(Order))] + public static partial class OrderStore + { + [MessageStoreIdentity] + private static string One(Message message, MessageContext context) => "1"; + [MessageStoreIdentity] + private static string Two(Message message, MessageContext context) => "2"; + } + """; + + var comp = CreateCompilation(source, nameof(ReportsDiagnosticForDuplicateHooks)); + var gen = new MessageStoreGenerator(); + _ = RoslynTestHelpers.Run(comp, gen, out var run, out _); + + var diagnostic = ScenarioExpect.Single(run.Results.SelectMany(result => result.Diagnostics)); + ScenarioExpect.Equal("PKMS004", diagnostic.Id); + } + + private static CSharpCompilation CreateCompilation(string source, string assemblyName) + => RoslynTestHelpers.CreateCompilation( + source, + assemblyName, + extra: MetadataReference.CreateFromFile(typeof(PatternKit.Messaging.Storage.MessageStore<>).Assembly.Location)); +} diff --git a/test/PatternKit.Tests/Messaging/Storage/MessageStoreTests.cs b/test/PatternKit.Tests/Messaging/Storage/MessageStoreTests.cs new file mode 100644 index 0000000..6c312ff --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Storage/MessageStoreTests.cs @@ -0,0 +1,124 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Storage; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Storage; + +public sealed class MessageStoreTests +{ + [Scenario("Append StoresMessageForLookupAndReplay")] + [Fact] + public void Append_StoresMessageForLookupAndReplay() + { + var now = new DateTimeOffset(2026, 5, 21, 12, 0, 0, TimeSpan.Zero); + var store = MessageStore.Create("order-audit") + .IdentifyBy(static (message, _) => message.Headers.MessageId!) + .UseClock(() => now) + .Build(); + var message = Message.Create(new("order-1", 100m)).WithMessageId("msg-1").WithCorrelationId("checkout-1"); + + var result = store.Append(message); + var stored = store.Get("msg-1"); + var replay = store.Replay(MessageStoreQuery.ForCorrelation("checkout-1")); + + ScenarioExpect.True(result.Stored); + ScenarioExpect.False(result.Duplicate); + ScenarioExpect.Equal("order-audit", result.StoreName); + ScenarioExpect.Equal("msg-1", result.StoredMessage.MessageId); + ScenarioExpect.Equal(now, result.StoredMessage.StoredAtUtc); + ScenarioExpect.NotNull(stored); + ScenarioExpect.Equal(1L, stored!.Sequence); + ScenarioExpect.Single(replay); + ScenarioExpect.Equal("order-1", replay[0].Payload.Id); + } + + [Scenario("Append DetectsDuplicateMessageIds")] + [Fact] + public void Append_DetectsDuplicateMessageIds() + { + var store = MessageStore.Create() + .IdentifyBy(static (message, _) => message.Headers.MessageId!) + .Build(); + var first = Message.Create(new("order-1", 100m)).WithMessageId("msg-1"); + var second = Message.Create(new("order-2", 200m)).WithMessageId("msg-1"); + + _ = store.Append(first); + var duplicate = store.Append(second); + + ScenarioExpect.False(duplicate.Stored); + ScenarioExpect.True(duplicate.Duplicate); + ScenarioExpect.Equal("order-1", duplicate.StoredMessage.Message.Payload.Id); + ScenarioExpect.Single(store.Query()); + } + + [Scenario("RetentionPredicateRejectsMessagesWithoutPersisting")] + [Fact] + public void RetentionPredicate_RejectsMessagesWithoutPersisting() + { + var store = MessageStore.Create() + .IdentifyBy(static (message, _) => message.Headers.MessageId!) + .RetainWhen(static stored => stored.Message.Payload.Total <= 100m) + .Build(); + + var result = store.Append(Message.Create(new("order-1", 250m)).WithMessageId("msg-1")); + + ScenarioExpect.False(result.Stored); + ScenarioExpect.False(result.Duplicate); + ScenarioExpect.Equal("Message did not satisfy retention policy.", result.RejectionReason); + ScenarioExpect.Empty(store.Query()); + } + + [Scenario("QuerySupportsCorrelationTimeWindowAndMaxCount")] + [Fact] + public void Query_SupportsCorrelationTimeWindowAndMaxCount() + { + var ticks = new Queue([ + new(2026, 5, 21, 10, 0, 0, TimeSpan.Zero), + new(2026, 5, 21, 10, 1, 0, TimeSpan.Zero), + new(2026, 5, 21, 10, 2, 0, TimeSpan.Zero) + ]); + var store = MessageStore.Create() + .IdentifyBy(static (message, _) => message.Headers.MessageId!) + .UseClock(() => ticks.Dequeue()) + .Build(); + _ = store.Append(Message.Create(new("order-1", 10m)).WithMessageId("m1").WithCorrelationId("c1")); + _ = store.Append(Message.Create(new("order-2", 20m)).WithMessageId("m2").WithCorrelationId("c1")); + _ = store.Append(Message.Create(new("order-3", 30m)).WithMessageId("m3").WithCorrelationId("c2")); + + var matches = store.Query(new MessageStoreQuery( + "c1", + new DateTimeOffset(2026, 5, 21, 10, 0, 30, TimeSpan.Zero), + null, + 1)); + + ScenarioExpect.Single(matches); + ScenarioExpect.Equal("m2", matches[0].MessageId); + } + + [Scenario("BuilderRejectsInvalidConfiguration")] + [Fact] + public void Builder_RejectsInvalidConfiguration() + { + ScenarioExpect.Throws(() => MessageStore.Create("")); + ScenarioExpect.Throws(() => MessageStore.Create().IdentifyBy(null!)); + ScenarioExpect.Throws(() => MessageStore.Create().RetainWhen(null!)); + ScenarioExpect.Throws(() => MessageStore.Create().UseClock(null!)); + ScenarioExpect.Throws(() => MessageStoreQuery.ForCorrelation("")); + ScenarioExpect.Throws(() => MessageStoreQuery.All.Take(0)); + } + + [Scenario("AppendRejectsNullMessageAndBlankIdentity")] + [Fact] + public void Append_RejectsNullMessageAndBlankIdentity() + { + var store = MessageStore.Create() + .IdentifyBy(static (_, _) => "") + .Build(); + + ScenarioExpect.Throws(() => store.Append(null!)); + ScenarioExpect.Throws(() => store.Append(Message.Create(new("order-1", 10m)))); + ScenarioExpect.Throws(() => store.Get("")); + } + + private sealed record Order(string Id, decimal Total); +}