Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/examples/order-message-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Order Message Store

The order message-store example shows an importable audit/replay store for order events.

```csharp
services.AddOrderMessageStoreDemo();

var service = provider.GetRequiredService<OrderMessageStoreService>();
var summary = service.Record(
new OrderMessageStoreEvent("ORDER-100", "Submitted", 125m, containsSensitiveData: false),
"MSG-100",
"CHECKOUT-100");
Comment on lines +5 to +12
```

The example includes:

- a fluent `OrderMessageStores.CreateAuditStore()` path,
- a generated `GeneratedOrderMessageStore.Create()` path,
- a retention policy that refuses sensitive payloads,
- `IServiceCollection` registration for standard .NET hosts.
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
- name: Order Message Filter
href: order-message-filter.md

- name: Order Message Store
href: order-message-store.md

- name: Order Wire Tap
href: order-wire-tap.md

Expand Down
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Dead Letter Channel**](dead-letter-channel.md) | Failed-message capture and replay handoff | `[GenerateDeadLetterChannel]` |
| [**Content Router**](messaging.md#generated-content-router) | Content-based message routing factories | `[GenerateContentRouter]` |
| [**Message Filter**](message-filter.md) | Named allow-rule filters for message consumers | `[GenerateMessageFilter]` |
| [**Message Store**](message-store.md) | Message audit, lookup, and replay store factories | `[GenerateMessageStore]` |
| [**Wire Tap**](wire-tap.md) | Side-channel message observability factories | `[GenerateWireTap]` |
| [**Recipient List**](messaging.md#generated-recipient-list) | Recipient fan-out factories | `[GenerateRecipientList]` |
| [**Splitter / Aggregator**](messaging.md#generated-splitter-and-aggregator) | Split/rejoin message routing factories | `[GenerateSplitter]` / `[GenerateAggregator]` |
Expand Down
31 changes: 31 additions & 0 deletions docs/generators/message-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Message Store Generator

`[GenerateMessageStore]` creates a typed `MessageStore<TPayload>` factory for a partial class or struct.

```csharp
[GenerateMessageStore(typeof(OrderSubmitted), FactoryName = "Create", StoreName = "order-audit")]
public static partial class OrderAuditStore
{
[MessageStoreIdentity]
private static string Identity(Message<OrderSubmitted> message, MessageContext context)
=> message.Headers.MessageId!;

[MessageStoreRetention]
private static bool Retain(StoredMessage<OrderSubmitted> stored)
=> !stored.Message.Payload.ContainsSensitiveData;
}
```

The generated factory composes the fluent runtime API:

- `MessageStore<TPayload>.Create(StoreName)`
- optional `.IdentifyBy(...)`
- optional `.RetainWhen(...)`
- `.Build()`

Diagnostics:

- `PKMS001`: host type must be partial.
- `PKMS002`: identity method must be `static string Method(Message<TPayload>, MessageContext)`.
- `PKMS003`: retention method must be `static bool Method(StoredMessage<TPayload>)`.
- `PKMS004`: only one identity hook and one retention hook are allowed.
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
- name: Message Filter
href: message-filter.md

- name: Message Store
href: message-store.md

- name: Wire Tap
href: wire-tap.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Enterprise Integration | Dead Letter Channel | `DeadLetterChannel<TPayload>` | Dead Letter Channel generator |
| Enterprise Integration | Content-Based Router | `ContentRouter<TPayload, TResult>` | Messaging generator |
| Enterprise Integration | Message Filter | `MessageFilter<TPayload>` | Message Filter generator |
| Enterprise Integration | Message Store | `MessageStore<TPayload>` | Message Store generator |
| Enterprise Integration | Wire Tap | `WireTap<TPayload>` | Wire Tap generator |
| Enterprise Integration | Recipient List | `RecipientList<TPayload>` | Messaging generator |
| Enterprise Integration | Competing Consumers | `CompetingConsumerGroup<TMessage, TResult>` | Competing Consumers generator |
Expand Down
19 changes: 19 additions & 0 deletions docs/patterns/messaging/message-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Message Store

Message Store persists message envelopes so support, replay, and operational audit workflows can inspect what moved through a pipeline.

`MessageStore<TPayload>` provides a fluent runtime path:

```csharp
var store = MessageStore<OrderSubmitted>.Create("order-audit")
.IdentifyBy((message, context) => message.Headers.MessageId!)
.RetainWhen(stored => !stored.Message.Payload.ContainsSensitiveData)
.Build();

var result = store.Append(message.WithMessageId("msg-100").WithCorrelationId("checkout-100"));
var replay = store.Replay(MessageStoreQuery.ForCorrelation("checkout-100"));
```

Use it when an app needs durable lookup semantics around messages without mixing audit/replay concerns into message handlers. In production, the same shape can sit behind a hosted service, ASP.NET Core endpoint, queue worker, or support tool through `IServiceCollection`.

The source-generated path uses `[GenerateMessageStore]`, `[MessageStoreIdentity]`, and `[MessageStoreRetention]` to create the same fluent store factory from annotated static hooks.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@
href: messaging/dead-letter-channel.md
- name: Message Filter
href: messaging/message-filter.md
- name: Message Store
href: messaging/message-store.md
- name: Wire Tap
href: messaging/wire-tap.md
- name: Enterprise Message Routing
Expand Down
256 changes: 256 additions & 0 deletions src/PatternKit.Core/Messaging/Storage/MessageStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
namespace PatternKit.Messaging.Storage;

/// <summary>
/// In-memory message store for audit, replay, and operational lookup workflows.
/// </summary>
public sealed class MessageStore<TPayload>
{
/// <summary>Selects the persisted message identifier.</summary>
public delegate string MessageIdentitySelector(Message<TPayload> message, MessageContext context);

/// <summary>Determines whether a stored message should be retained.</summary>
public delegate bool RetentionPredicate(StoredMessage<TPayload> stored);

private readonly object _gate = new();
private readonly string _name;
private readonly MessageIdentitySelector _identitySelector;
private readonly RetentionPredicate _retentionPredicate;
private readonly Func<DateTimeOffset> _clock;
private readonly Dictionary<string, StoredMessage<TPayload>> _byId = new(StringComparer.Ordinal);
private readonly List<StoredMessage<TPayload>> _ordered = new();
private long _sequence;

private MessageStore(
string name,
MessageIdentitySelector identitySelector,
RetentionPredicate retentionPredicate,
Func<DateTimeOffset> clock)
=> (_name, _identitySelector, _retentionPredicate, _clock) = (name, identitySelector, retentionPredicate, clock);

/// <summary>Appends a message when it is not already present.</summary>
public MessageStoreAppendResult<TPayload> Append(Message<TPayload> message, MessageContext? context = null)
{
if (message is null)
throw new ArgumentNullException(nameof(message));

var effectiveContext = context ?? MessageContext.From(message);
var messageId = _identitySelector(message, effectiveContext);
if (string.IsNullOrWhiteSpace(messageId))
throw new InvalidOperationException("Message store identity selector returned a blank identifier.");

lock (_gate)
{
if (_byId.TryGetValue(messageId, out var existing))
return MessageStoreAppendResult<TPayload>.ForDuplicate(_name, existing);

var stored = new StoredMessage<TPayload>(_name, messageId, ++_sequence, _clock(), message, effectiveContext.Headers);
if (!_retentionPredicate(stored))
return MessageStoreAppendResult<TPayload>.ForRejected(_name, stored, "Message did not satisfy retention policy.");

Comment on lines +46 to +49
_byId.Add(messageId, stored);
_ordered.Add(stored);
return MessageStoreAppendResult<TPayload>.ForStored(_name, stored);
}
}

/// <summary>Returns a stored message by identifier when present.</summary>
public StoredMessage<TPayload>? Get(string messageId)
{
if (string.IsNullOrWhiteSpace(messageId))
throw new ArgumentException("Message identifier cannot be null, empty, or whitespace.", nameof(messageId));

lock (_gate)
return _byId.TryGetValue(messageId, out var stored) ? stored : null;
}

/// <summary>Queries stored messages in append order.</summary>
public IReadOnlyList<StoredMessage<TPayload>> Query(MessageStoreQuery? query = null)
{
var effectiveQuery = query ?? MessageStoreQuery.All;
lock (_gate)
{
IEnumerable<StoredMessage<TPayload>> matches = _ordered;
if (!string.IsNullOrWhiteSpace(effectiveQuery.CorrelationId))
matches = matches.Where(stored => stored.Headers.CorrelationId == effectiveQuery.CorrelationId);
if (effectiveQuery.FromUtc is not null)
matches = matches.Where(stored => stored.StoredAtUtc >= effectiveQuery.FromUtc.Value);
if (effectiveQuery.ToUtc is not null)
matches = matches.Where(stored => stored.StoredAtUtc <= effectiveQuery.ToUtc.Value);
if (effectiveQuery.MaxCount is not null)
matches = matches.Take(effectiveQuery.MaxCount.Value);

return matches.ToArray();
}
}

/// <summary>Returns message envelopes that match a query in replay order.</summary>
public IReadOnlyList<Message<TPayload>> Replay(MessageStoreQuery? query = null)
=> Query(query).Select(static stored => stored.Message).ToArray();

/// <summary>Creates a message-store builder.</summary>
public static Builder Create(string name = "message-store") => new(name);

/// <summary>Fluent builder for <see cref="MessageStore{TPayload}"/>.</summary>
public sealed class Builder
{
private readonly string _name;
private MessageIdentitySelector _identitySelector = static (message, _) =>
message.Headers.MessageId ?? Guid.NewGuid().ToString("N");
private RetentionPredicate _retentionPredicate = static _ => true;
private Func<DateTimeOffset> _clock = static () => DateTimeOffset.UtcNow;

internal Builder(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Message store name cannot be null, empty, or whitespace.", nameof(name));

_name = name;
}

/// <summary>Configures the identity selector used for storage and lookup.</summary>
public Builder IdentifyBy(MessageIdentitySelector selector)
{
_identitySelector = selector ?? throw new ArgumentNullException(nameof(selector));
return this;
}

/// <summary>Configures a retention predicate. Messages returning false are not persisted.</summary>
public Builder RetainWhen(RetentionPredicate predicate)
{
_retentionPredicate = predicate ?? throw new ArgumentNullException(nameof(predicate));
return this;
}

/// <summary>Configures the clock used when storing messages.</summary>
public Builder UseClock(Func<DateTimeOffset> clock)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
return this;
}

/// <summary>Builds the message store.</summary>
public MessageStore<TPayload> Build() => new(_name, _identitySelector, _retentionPredicate, _clock);
}
}

/// <summary>Stored message metadata and envelope.</summary>
public sealed class StoredMessage<TPayload>
{
/// <summary>Creates stored message metadata.</summary>
public StoredMessage(
string storeName,
string messageId,
long sequence,
DateTimeOffset storedAtUtc,
Message<TPayload> message,
MessageHeaders headers)
{
StoreName = storeName;
MessageId = messageId;
Sequence = sequence;
StoredAtUtc = storedAtUtc;
Message = message;
Headers = headers;
}

/// <summary>Name of the message store.</summary>
public string StoreName { get; }

/// <summary>Stable message identifier.</summary>
public string MessageId { get; }

/// <summary>Append sequence assigned by the store.</summary>
public long Sequence { get; }

/// <summary>UTC time the message was stored.</summary>
public DateTimeOffset StoredAtUtc { get; }

/// <summary>Stored message envelope.</summary>
public Message<TPayload> Message { get; }

/// <summary>Headers captured for query and replay metadata.</summary>
public MessageHeaders Headers { get; }
}

/// <summary>Query options for store lookup and replay.</summary>
public sealed class MessageStoreQuery
{
/// <summary>Query that returns every stored message.</summary>
public static MessageStoreQuery All { get; } = new();

/// <summary>Creates a message-store query.</summary>
public MessageStoreQuery(string? correlationId = null, DateTimeOffset? fromUtc = null, DateTimeOffset? toUtc = null, int? maxCount = null)
{
if (maxCount is not null && maxCount <= 0)
throw new ArgumentOutOfRangeException(nameof(maxCount), "Maximum count must be greater than zero.");

CorrelationId = correlationId;
FromUtc = fromUtc;
ToUtc = toUtc;
MaxCount = maxCount;
}

/// <summary>Correlation identifier filter.</summary>
public string? CorrelationId { get; }

/// <summary>Inclusive lower stored-at UTC filter.</summary>
public DateTimeOffset? FromUtc { get; }

/// <summary>Inclusive upper stored-at UTC filter.</summary>
public DateTimeOffset? ToUtc { get; }

/// <summary>Maximum number of messages to return.</summary>
public int? MaxCount { get; }

/// <summary>Creates a query scoped to a correlation identifier.</summary>
public static MessageStoreQuery ForCorrelation(string correlationId)
{
if (string.IsNullOrWhiteSpace(correlationId))
throw new ArgumentException("Correlation identifier cannot be null, empty, or whitespace.", nameof(correlationId));

return new MessageStoreQuery(correlationId);
}

/// <summary>Creates a copy with a maximum result count.</summary>
public MessageStoreQuery Take(int maxCount)
{
if (maxCount <= 0)
throw new ArgumentOutOfRangeException(nameof(maxCount), "Maximum count must be greater than zero.");

return new MessageStoreQuery(CorrelationId, FromUtc, ToUtc, maxCount);
}
}

/// <summary>Result returned when appending to a message store.</summary>
public sealed class MessageStoreAppendResult<TPayload>
{
private MessageStoreAppendResult(string storeName, StoredMessage<TPayload> storedMessage, bool stored, bool duplicate, string? rejectionReason)
=> (StoreName, StoredMessage, Stored, Duplicate, RejectionReason) = (storeName, storedMessage, stored, duplicate, rejectionReason);

/// <summary>Name of the message store.</summary>
public string StoreName { get; }

/// <summary>Message metadata involved in the append attempt.</summary>
public StoredMessage<TPayload> StoredMessage { get; }

/// <summary>True when the message was persisted by this call.</summary>
public bool Stored { get; }

/// <summary>True when the message already existed.</summary>
public bool Duplicate { get; }

/// <summary>Reason the message was rejected, when applicable.</summary>
public string? RejectionReason { get; }

/// <summary>Creates a successful stored result.</summary>
public static MessageStoreAppendResult<TPayload> ForStored(string storeName, StoredMessage<TPayload> storedMessage)
=> new(storeName, storedMessage, true, false, null);

/// <summary>Creates a duplicate result.</summary>
public static MessageStoreAppendResult<TPayload> ForDuplicate(string storeName, StoredMessage<TPayload> storedMessage)
=> new(storeName, storedMessage, false, true, null);

/// <summary>Creates a retention rejection result.</summary>
public static MessageStoreAppendResult<TPayload> ForRejected(string storeName, StoredMessage<TPayload> storedMessage, string rejectionReason)
=> new(storeName, storedMessage, false, false, rejectionReason);
}
Loading
Loading