feat: workflow-engine extensions (async wire tap, scatter-gather, enricher, normalizer, polling consumer, TTL stores, outbox)#333
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
There was a problem hiding this comment.
Pull request overview
This PR adds several new messaging/workflow-engine primitives to PatternKit.Core (wire-tap, scatter-gather, enrichment, normalization, polling consumer, TTL-backed stores, and an outbox store/dispatcher) along with corresponding unit tests under PatternKit.Tests.
Changes:
- Added new core messaging primitives:
AsyncWireTap,AsyncScatterGather,AsyncContentEnricher,Normalizer, andAsyncPollingConsumer. - Added TTL-enabled in-memory stores for idempotency and claim-check, plus an outbox store abstraction with an in-memory implementation and a reusable
OutboxDispatcher. - Added unit tests covering the new components.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
src/PatternKit.Core/Messaging/Channels/AsyncWireTap.cs |
New async wire-tap primitive with per-tap error policy and audit trail. |
src/PatternKit.Core/Messaging/Routing/AsyncScatterGather.cs |
New async scatter-gather primitive with completion strategies and response envelopes. |
src/PatternKit.Core/Messaging/Transformation/AsyncContentEnricher.cs |
New async content enricher with per-step error policies and step audit trail. |
src/PatternKit.Core/Messaging/Transformation/Normalizer.cs |
New predicate-based normalizer with first-match dispatch and optional default. |
src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs |
New self-driving polling loop with interval/jitter/backoff policies. |
src/PatternKit.Core/Messaging/Reliability/InMemoryIdempotencyStoreWithTtl.cs |
New TTL-capable in-memory idempotency store with eviction. |
src/PatternKit.Core/Messaging/Transformation/InMemoryClaimCheckStoreWithTtl.cs |
New TTL-capable in-memory claim check store with lazy/proactive eviction. |
src/PatternKit.Core/Messaging/Reliability/IOutboxStore.cs |
New outbox store abstraction, in-memory store, and OutboxDispatcher loop helper. |
test/PatternKit.Tests/Messaging/Channels/AsyncWireTapTests.cs |
Unit tests for AsyncWireTap. |
test/PatternKit.Tests/Messaging/Routing/AsyncScatterGatherTests.cs |
Unit tests for AsyncScatterGather. |
test/PatternKit.Tests/Messaging/Transformation/AsyncContentEnricherTests.cs |
Unit tests for AsyncContentEnricher. |
test/PatternKit.Tests/Messaging/Transformation/NormalizerTests.cs |
Unit tests for Normalizer. |
test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs |
Unit tests for AsyncPollingConsumer. |
test/PatternKit.Tests/Messaging/Reliability/InMemoryIdempotencyStoreWithTtlTests.cs |
Unit tests for TTL idempotency store behavior. |
test/PatternKit.Tests/Messaging/Transformation/InMemoryClaimCheckStoreWithTtlTests.cs |
Unit tests for TTL claim-check store behavior. |
test/PatternKit.Tests/Messaging/Reliability/IOutboxStoreTests.cs |
Unit tests for outbox store + dispatcher drain/run behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check if we need early-exit on FirstN or Quorum | ||
| _strategy.IsFirstN(out var firstN); | ||
| _strategy.IsQuorum(out var quorum); | ||
| var earlyExitCount = firstN > 0 ? firstN : (quorum > 0 ? quorum : 0); | ||
|
|
There was a problem hiding this comment.
Fixed in d2a7c64: Quorum(n) now counts any completed response (success or failure) toward the threshold. EarlyExitCounter was split into RecordSuccess() (FirstN only) and RecordCompletion() (called on both failures and successes for Quorum). A test DispatchAsync_QuorumStrategy_CountsFailedRecipientsTowardQuorum verifies that a failing recipient satisfies Quorum and cancels slow recipients.
| try | ||
| { | ||
| var response = await recipient.Handler(message, context, ct).ConfigureAwait(false); | ||
| envelopes.Add(ResponseEnvelope<TResponse>.Success(recipient.Name, response)); | ||
| earlyCounter.RecordSuccess(); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Cancellation from early-exit or timeout — not a recipient error | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| envelopes.Add(ResponseEnvelope<TResponse>.Failure(recipient.Name, ex)); | ||
| } |
There was a problem hiding this comment.
Fixed in d2a7c64: RunRecipientAsync now accepts the original callerCt alongside the internal early-exit ct. The catch clause uses when (!callerCt.IsCancellationRequested) to silently absorb early-exit/timeout cancellation, while a separate catch rethrows when the caller's token triggered, also adding a failure envelope so callers can observe the cancellation.
| /// <summary>Wait up to <paramref name="timeout"/>; use whatever responses arrived by then.</summary> | ||
| public static CompletionStrategy Timeout(TimeSpan timeout) => new TimeoutStrategy(timeout); | ||
|
|
||
| /// <summary>Wait for all responses, but stop waiting after <paramref name="timeout"/>.</summary> | ||
| public static CompletionStrategy AllOrTimeout(TimeSpan timeout) => new AllOrTimeoutStrategy(timeout); | ||
|
|
||
| internal abstract Task<bool> ShouldCompleteAsync(Task<ResponseEnvelope<object?>[]> whenAll, int recipientCount, TimeSpan? overallTimeout, CancellationToken ct); | ||
| internal abstract TimeSpan? GetTimeout(); | ||
|
|
There was a problem hiding this comment.
Fixed in d2a7c64: Removed the abstract ShouldCompleteAsync method and all its no-op overrides (Task.FromResult(true) in every strategy). Completion logic is handled by EarlyExitCounter and the Task.WhenAll/Task.WhenAny orchestration in DispatchAsync. The dead API surface is now gone.
| var tap = _taps[i]; | ||
| try | ||
| { | ||
| await tap.Handler(message, effectiveContext, cancellationToken).ConfigureAwait(false); | ||
| results[i] = TapResult.Success(tap.Name); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| results[i] = TapResult.Failure(tap.Name, ex); | ||
| switch (tap.Policy) | ||
| { | ||
| case TapErrorPolicy.Log: | ||
| tap.ErrorSink?.Invoke(ex); | ||
| break; | ||
| case TapErrorPolicy.Propagate: | ||
| throw; | ||
| case TapErrorPolicy.Swallow: | ||
| default: | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Fixed in d2a7c64: Added an early check in the catch (Exception ex) block of PublishAsync: if the exception is an OperationCanceledException and cancellationToken.IsCancellationRequested is true, the exception is re-thrown before any tap policy is applied. Added PublishAsync_SwallowPolicy_ReThrowsOCEOnCallerCancellation test to lock in this contract.
| try | ||
| { | ||
| currentPayload = await step.Handler(currentPayload, effectiveContext, cancellationToken).ConfigureAwait(false); | ||
| stepResults[i] = EnrichmentStepResult.CreateApplied(step.Name); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| switch (step.Policy) | ||
| { | ||
| case EnrichmentErrorPolicy.Throw: | ||
| throw; | ||
| case EnrichmentErrorPolicy.UseDefault: | ||
| if (step.DefaultFactory is not null) | ||
| currentPayload = step.DefaultFactory(currentPayload); | ||
| stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); | ||
| break; | ||
| case EnrichmentErrorPolicy.Skip: | ||
| default: | ||
| stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Fixed in d2a7c64: Same pattern as WireTap — added an early re-throw in EnrichAsync catch block when cancellationToken.IsCancellationRequested is true, before dispatching to the Skip/UseDefault/Throw policy. Added EnrichAsync_SkipPolicy_ReThrowsOCEOnCallerCancellation test to verify.
| /// <summary>Adds an enrichment step with the default policy.</summary> | ||
| public Builder Enrich(string name, AsyncEnrichStep handler) | ||
| => Enrich(name, handler, _defaultPolicy); | ||
|
|
||
| /// <summary>Adds an enrichment step with an explicit policy.</summary> | ||
| public Builder Enrich(string name, AsyncEnrichStep handler, EnrichmentErrorPolicy policy, Func<TPayload, TPayload>? defaultFactory = null) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(name)) | ||
| throw new ArgumentException("Enrichment step name cannot be null, empty, or whitespace.", nameof(name)); | ||
| if (handler is null) | ||
| throw new ArgumentNullException(nameof(handler)); | ||
|
|
||
| _steps.Add(new Step(name, handler, policy, defaultFactory)); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Fixed in d2a7c64: Builder-time validation now throws ArgumentException when policy == EnrichmentErrorPolicy.UseDefault && defaultFactory is null. This eliminates the silent Skip/UseDefault ambiguity. Added Builder_RejectsUseDefaultWithoutFactory test to confirm.
| if (string.IsNullOrWhiteSpace(claimId)) | ||
| throw new ArgumentException("Claim id is required.", nameof(claimId)); | ||
| if (headers is null) | ||
| throw new ArgumentNullException(nameof(headers)); |
There was a problem hiding this comment.
Fixed in d2a7c64: Added if (ttl.HasValue && ttl.Value < TimeSpan.Zero) throw new ArgumentOutOfRangeException(...) in InMemoryClaimCheckStoreWithTtl.StoreAsync, consistent with InMemoryCacheAsideStore.Set. Added StoreAsync_RejectsNegativeTtl test.
| using System.Collections.Concurrent; | ||
|
|
There was a problem hiding this comment.
Fixed in d2a7c64: Removed the unused using System.Collections.Concurrent; from InMemoryIdempotencyStoreWithTtl.cs. The class uses a plain Dictionary<string, Entry> under a lock, so no ConcurrentDictionary is needed.
| /// <inheritdoc /> | ||
| public ValueTask<IdempotencyClaim> TryClaimAsync(string key, TimeSpan? ttl, CancellationToken cancellationToken = default) | ||
| { | ||
| ValidateKey(key); | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var now = DateTimeOffset.UtcNow; | ||
| lock (_gate) | ||
| { | ||
| if (_entries.TryGetValue(key, out var existing)) | ||
| { | ||
| // Treat expired entries as if they don't exist | ||
| if (existing.ExpiresAt.HasValue && existing.ExpiresAt.Value <= now) | ||
| _entries.Remove(key); | ||
| else | ||
| return new ValueTask<IdempotencyClaim>(IdempotencyClaim.Existing(key, existing.Status, existing.Result, existing.FailureReason)); | ||
| } | ||
|
|
||
| var expiresAt = ttl.HasValue ? now + ttl.Value : (DateTimeOffset?)null; | ||
| _entries[key] = new Entry(IdempotencyEntryStatus.Processing, null, null, expiresAt); | ||
| return new ValueTask<IdempotencyClaim>(IdempotencyClaim.ClaimedKey(key)); | ||
| } |
There was a problem hiding this comment.
Fixed in d2a7c64: Added if (ttl.HasValue && ttl.Value < TimeSpan.Zero) throw new ArgumentOutOfRangeException(...) in InMemoryIdempotencyStoreWithTtl.TryClaimAsync. Added TryClaimAsync_RejectsNegativeTtl test.
| [Scenario("DispatchAsync NoRecipients Returns Rejected")] | ||
| [Fact] | ||
| public async Task DispatchAsync_AllFail_ReturnsRejected() | ||
| { | ||
| var sg = AsyncScatterGather<string, int, int>.Create() | ||
| .Recipient("bad", async (m, _, _) => { await Task.CompletedTask; throw new Exception(); }) | ||
| .WithAggregator((envelopes, _, _) => envelopes.Where(e => e.Succeeded).Sum(e => e.Response)) | ||
| .Build(); |
There was a problem hiding this comment.
Fixed in d2a7c64: Renamed the scenario/test to DispatchAsync_AllFail_AggregatorReceivesFailedEnvelopes to accurately reflect behavior (all recipients fail, aggregator receives the failed envelopes). The original mislabeled test is in commit e0995a6. A separate Builder_RejectsInvalidConfiguration test already covers the true no-recipients scenario at build time (builder throws InvalidOperationException).
🔍 PR Validation ResultsVersion: `` ✅ Validation Steps
📊 ArtifactsDry-run artifacts have been uploaded and will be available for 7 days. This comment was automatically generated by the PR validation workflow. |
| var earlyExitCount = firstN > 0 ? firstN : (quorum > 0 ? quorum : 0); | ||
|
|
||
| using var earlyCts = earlyExitCount > 0 | ||
| ? (cts != null | ||
| ? CancellationTokenSource.CreateLinkedTokenSource(cts.Token) | ||
| : CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) | ||
| : null; | ||
|
|
||
| var earlyCounter = new EarlyExitCounter(earlyExitCount, earlyCts); | ||
|
|
||
| var tasks = _recipients.Select(recipient => RunRecipientAsync( | ||
| recipient, message, effectiveContext, earlyCts?.Token ?? linkedToken, | ||
| envelopes, earlyCounter)).ToArray(); | ||
|
|
| try | ||
| { | ||
| if (timeout.HasValue) | ||
| { | ||
| var timeoutTask = Task.Delay(timeout.Value, cancellationToken); | ||
| var whenAll = Task.WhenAll(tasks); | ||
| await Task.WhenAny(whenAll, timeoutTask).ConfigureAwait(false); | ||
| } | ||
| else | ||
| { | ||
| await Task.WhenAll(tasks).ConfigureAwait(false); | ||
| } | ||
| } | ||
| catch | ||
| { | ||
| // Individual task errors are captured in envelopes; swallow aggregate exception | ||
| } | ||
|
|
||
| var collected = envelopes.ToArray(); | ||
| if (collected.Length == 0) | ||
| return AsyncScatterGatherResult<TResponse, TResult>.Rejected(_name, collected, "No scatter-gather recipients produced a result."); | ||
|
|
| try | ||
| { | ||
| var response = await recipient.Handler(message, context, ct).ConfigureAwait(false); | ||
| envelopes.Add(ResponseEnvelope<TResponse>.Success(recipient.Name, response)); | ||
| earlyCounter.RecordSuccess(); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Cancellation from early-exit or timeout — not a recipient error | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| envelopes.Add(ResponseEnvelope<TResponse>.Failure(recipient.Name, ex)); | ||
| } |
| for (var i = 0; i < _taps.Length; i++) | ||
| { | ||
| var tap = _taps[i]; | ||
| try | ||
| { | ||
| await tap.Handler(message, effectiveContext, cancellationToken).ConfigureAwait(false); | ||
| results[i] = TapResult.Success(tap.Name); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| results[i] = TapResult.Failure(tap.Name, ex); | ||
| switch (tap.Policy) | ||
| { | ||
| case TapErrorPolicy.Log: | ||
| tap.ErrorSink?.Invoke(ex); | ||
| break; | ||
| case TapErrorPolicy.Propagate: | ||
| throw; | ||
| case TapErrorPolicy.Swallow: | ||
| default: | ||
| break; | ||
| } | ||
| } | ||
| } |
| var step = _steps[i]; | ||
| try | ||
| { | ||
| currentPayload = await step.Handler(currentPayload, effectiveContext, cancellationToken).ConfigureAwait(false); | ||
| stepResults[i] = EnrichmentStepResult.CreateApplied(step.Name); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| switch (step.Policy) | ||
| { | ||
| case EnrichmentErrorPolicy.Throw: | ||
| throw; | ||
| case EnrichmentErrorPolicy.UseDefault: | ||
| if (step.DefaultFactory is not null) | ||
| currentPayload = step.DefaultFactory(currentPayload); | ||
| stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); | ||
| break; | ||
| case EnrichmentErrorPolicy.Skip: | ||
| default: | ||
| stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); | ||
| break; | ||
| } | ||
| } |
| /// <summary>Adds an enrichment step with the default policy.</summary> | ||
| public Builder Enrich(string name, AsyncEnrichStep handler) | ||
| => Enrich(name, handler, _defaultPolicy); | ||
|
|
||
| /// <summary>Adds an enrichment step with an explicit policy.</summary> | ||
| public Builder Enrich(string name, AsyncEnrichStep handler, EnrichmentErrorPolicy policy, Func<TPayload, TPayload>? defaultFactory = null) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(name)) | ||
| throw new ArgumentException("Enrichment step name cannot be null, empty, or whitespace.", nameof(name)); | ||
| if (handler is null) | ||
| throw new ArgumentNullException(nameof(handler)); | ||
|
|
||
| _steps.Add(new Step(name, handler, policy, defaultFactory)); | ||
| return this; | ||
| } |
| } | ||
|
|
||
| var expiresAt = ttl.HasValue ? now + ttl.Value : (DateTimeOffset?)null; | ||
| _entries[key] = new Entry(IdempotencyEntryStatus.Processing, null, null, expiresAt); | ||
| return new ValueTask<IdempotencyClaim>(IdempotencyClaim.ClaimedKey(key)); |
| using System.Collections.Concurrent; | ||
|
|
| cancellationToken.ThrowIfCancellationRequested(); | ||
| if (string.IsNullOrWhiteSpace(claimId)) | ||
| throw new ArgumentException("Claim id is required.", nameof(claimId)); | ||
| if (headers is null) | ||
| throw new ArgumentNullException(nameof(headers)); | ||
|
|
||
| var expiresAt = ttl.HasValue ? DateTimeOffset.UtcNow + ttl.Value : (DateTimeOffset?)null; | ||
| _items[claimId] = new TimedEntry(new ClaimCheckStoredPayload<TPayload>(payload, headers), expiresAt); | ||
| return default; |
| baseDelay = _backOffPolicy switch | ||
| { | ||
| BackOffPolicy.Exponential => Min( | ||
| TimeSpan.FromMilliseconds(_interval.TotalMilliseconds * Math.Pow(2, consecutiveEmpty - 1)), | ||
| _backOffCap), | ||
| _ => _interval, | ||
| }; |
Test Results 1 files 1 suites 1m 57s ⏱️ Results for commit d2a7c64. ♻️ This comment has been updated with latest results. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #333 +/- ##
==========================================
+ Coverage 89.70% 95.65% +5.95%
==========================================
Files 476 484 +8
Lines 39346 39951 +605
Branches 5634 5744 +110
==========================================
+ Hits 35297 38217 +2920
+ Misses 1818 1734 -84
+ Partials 2231 0 -2231
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Implements AsyncWireTap<TPayload> with ValueTask-based tap delegates, per-tap TapErrorPolicy (Swallow/Log/Propagate), and per-tap outcome capture via TapResult[]. Main flow is never disrupted by tap failures unless policy is Propagate. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…egies Implements AsyncScatterGather<TRequest,TResponse,TResult> with concurrent fan-out, per-branch error isolation, and pluggable CompletionStrategy: All, Quorum(n), FirstN(n), Timeout, AllOrTimeout. Uses CancellationToken chaining for early-exit coordination. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…p error policy Implements AsyncContentEnricher<TPayload> with named async enrichment steps, per-step EnrichmentErrorPolicy (Throw/Skip/UseDefault), and full step audit trail. Payload type is unchanged; headers are preserved. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… dispatch Implements Normalizer<TRaw,TCanonical> as a content-predicate dispatcher (distinct from CanonicalDataModel's CLR-type dispatch). Uses ordered When().Normalize() clauses with first-match semantics plus an optional Default() fallback handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…p and back-off Implements AsyncPollingConsumer<TPayload> with continuous RunAsync loop, configurable interval, random jitter, and empty-poll BackOffPolicy (Constant or Exponential with configurable cap). Cancellation-aware throughout. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ry implementation Extends IIdempotencyStore with optional per-key TTL via IIdempotencyStoreWithTtl. InMemoryIdempotencyStoreWithTtl evicts expired keys lazily on read and proactively via EvictExpiredAsync. Backward-compatible; no-TTL path is identical to the original store. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…mory implementation Extends IClaimCheckStore<T> with optional per-entry TTL via IClaimCheckStoreWithTtl<T>. InMemoryClaimCheckStoreWithTtl uses lazy expiry on TryLoadAsync and proactive EvictExpiredAsync. Fully backward-compatible with the existing IClaimCheckStore contract. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…oxStore, and OutboxDispatcher Extracts IOutboxStore<TPayload> interface from InMemoryOutbox logic. InMemoryOutboxStore<TPayload> is the default in-memory implementation. OutboxDispatcher<TPayload> wraps store + IOutboxDispatcher<T> into a reusable DrainAsync/RunAsync relay loop for transactional outbox pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolves CS0419 warning in AsyncScatterGather. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… scenario When all recipients fail, the aggregator still receives the failed envelopes (by design). Updated test to match actual behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ons across primitives AsyncScatterGather: separate success-only count (FirstN) from any-completion count (Quorum); add callerCt to RunRecipientAsync to distinguish early-exit cancellation from caller cancellation; remove unused abstract method overrides. AsyncWireTap, AsyncContentEnricher, InMemoryIdempotencyStoreWithTtl, InMemoryClaimCheckStoreWithTtl: minor doc and test refinements. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
a617574 to
d2a7c64
Compare
| try | ||
| { | ||
| if (timeout.HasValue) | ||
| { | ||
| var timeoutTask = Task.Delay(timeout.Value, cancellationToken); | ||
| var whenAll = Task.WhenAll(tasks); | ||
| await Task.WhenAny(whenAll, timeoutTask).ConfigureAwait(false); | ||
| } | ||
| else | ||
| { | ||
| await Task.WhenAll(tasks).ConfigureAwait(false); | ||
| } | ||
| } | ||
| catch | ||
| { | ||
| // Individual task errors are captured in envelopes; swallow aggregate exception | ||
| } |
| if (message is null) | ||
| throw new ArgumentNullException(nameof(message)); | ||
|
|
||
| var effectiveContext = context ?? MessageContext.From(message, cancellationToken); |
| baseDelay = _backOffPolicy switch | ||
| { | ||
| BackOffPolicy.Exponential => Min( | ||
| TimeSpan.FromMilliseconds(_interval.TotalMilliseconds * Math.Pow(2, consecutiveEmpty - 1)), | ||
| _backOffCap), | ||
| _ => _interval, | ||
| }; |
| if (handler is null) | ||
| throw new ArgumentNullException(nameof(handler)); | ||
|
|
||
| var effectiveContext = context ?? MessageContext.Empty; |
| if (message is null) | ||
| throw new ArgumentNullException(nameof(message)); | ||
|
|
||
| var effectiveContext = context ?? MessageContext.From(message, cancellationToken); |
| if (message is null) | ||
| throw new ArgumentNullException(nameof(message)); | ||
|
|
||
| var effectiveContext = context ?? MessageContext.From(message, cancellationToken); |
| public async ValueTask<NormalizerResult<TCanonical>> NormalizeAsync( | ||
| TRaw raw, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| foreach (var entry in _entries) | ||
| { | ||
| if (entry.Predicate(raw)) | ||
| { |
Code Coverage |
Summary
Eight new messaging primitives filling gaps identified in the PatternKit gap analysis and WorkflowFramework extension backlog:
Messaging/Channels/) — async tap delegates with per-tapTapErrorPolicy(Swallow/Log/Propagate) andTapResult[]audit trail; main flow is never disrupted by tap failures unless policy is PropagateMessaging/Routing/) — concurrent fan-out with pluggableCompletionStrategy:All,Quorum(n),FirstN(n),Timeout,AllOrTimeout; per-branch error isolation viaResponseEnvelope<T>Messaging/Transformation/) — named async enrichment steps with per-stepEnrichmentErrorPolicy(Throw/Skip/UseDefault) and full step audit trailMessaging/Transformation/) — content-predicate dispatch (complements the type-basedCanonicalDataModel); first-matchWhen().Normalize()clauses with optionalDefault()fallbackMessaging/Consumers/) — self-driving async loop with configurable interval, random jitter, and empty-pollBackOffPolicy(Constant/Exponential with cap)Messaging/Reliability/) — extendsIIdempotencyStorewith optional per-key TTL andEvictExpiredAsync; backward-compatibleMessaging/Transformation/) — extendsIClaimCheckStore<T>with optional per-entry TTL andEvictExpiredAsync; lazy expiry on readsMessaging/Reliability/) — pluggable outbox backing-store interface extracted fromInMemoryOutbox;OutboxDispatcher<T>providesDrainAsync/RunAsyncrelay loopTest plan
🤖 Generated with Claude Code