From a00cca555a971c3cd6511369ccb0d3199040489e Mon Sep 17 00:00:00 2001 From: "Jerrett D. Davis" Date: Thu, 9 Oct 2025 23:25:39 -0500 Subject: [PATCH 1/2] docs(observer): add full XML documentation for AsyncObserver and demos - PatternKit.Core.Behavioral.Observer.AsyncObserver: added comprehensive XML comments for delegates, members, and Builder, with remarks and exception tags to match Observer standard - PatternKit.Examples.ObserverDemo.EventHub: documented constructor, factory, subscribe methods, and Publish; documented UserEvent record parameters - PatternKit.Examples.ObserverDemo.ReactivePrimitives: documented PropertyChangedHub, ObservableVar, and ObservableList public APIs - PatternKit.Examples.ObserverDemo.ReactiveTransaction: documented enums, LineItem record, ReactiveTransaction and ProfileViewModel public APIs Build and tests verified locally: solution builds successfully; all tests pass. This ensures DocFX picks up consistent API docs across Observer classes and demos. --- docs/examples/observer-demo.md | 0 docs/examples/reactive-transaction.md | 0 docs/examples/reactive-viewmodel.md | 0 .../behavioral/observer/asyncobserver.md | 0 docs/patterns/behavioral/observer/observer.md | 0 .../Behavioral/Observer/AsyncObserver.cs | 285 ++++++++++++++++++ .../Behavioral/Observer/Observer.cs | 0 .../ObserverDemo/ReactivePrimitives.cs | 69 +++++ .../ObserverDemo/ReactiveTransaction.cs | 152 ++++++++++ .../ObserverDemo/SimpleEventHub.cs | 53 ++++ .../ObserverDemo/ReactiveTransactionTests.cs | 0 .../ObserverDemo/ReactiveViewModelTests.cs | 0 .../Behavioral/Observer/AsyncObserverTests.cs | 0 .../Behavioral/Observer/ObserverTests.cs | 0 14 files changed, 559 insertions(+) create mode 100644 docs/examples/observer-demo.md create mode 100644 docs/examples/reactive-transaction.md create mode 100644 docs/examples/reactive-viewmodel.md create mode 100644 docs/patterns/behavioral/observer/asyncobserver.md create mode 100644 docs/patterns/behavioral/observer/observer.md create mode 100644 src/PatternKit.Core/Behavioral/Observer/AsyncObserver.cs create mode 100644 src/PatternKit.Core/Behavioral/Observer/Observer.cs create mode 100644 src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs create mode 100644 src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs create mode 100644 src/PatternKit.Examples/ObserverDemo/SimpleEventHub.cs create mode 100644 test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs create mode 100644 test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs create mode 100644 test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs create mode 100644 test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs diff --git a/docs/examples/observer-demo.md b/docs/examples/observer-demo.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/examples/reactive-transaction.md b/docs/examples/reactive-transaction.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/examples/reactive-viewmodel.md b/docs/examples/reactive-viewmodel.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/patterns/behavioral/observer/asyncobserver.md b/docs/patterns/behavioral/observer/asyncobserver.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/patterns/behavioral/observer/observer.md b/docs/patterns/behavioral/observer/observer.md new file mode 100644 index 0000000..e69de29 diff --git a/src/PatternKit.Core/Behavioral/Observer/AsyncObserver.cs b/src/PatternKit.Core/Behavioral/Observer/AsyncObserver.cs new file mode 100644 index 0000000..646532c --- /dev/null +++ b/src/PatternKit.Core/Behavioral/Observer/AsyncObserver.cs @@ -0,0 +1,285 @@ +using System.Runtime.CompilerServices; + +namespace PatternKit.Behavioral.Observer; + +/// +/// Async Observer (typed, fluent, thread-safe) +/// Asynchronously notifies multiple observers about events of type . +/// +/// +/// +/// This implementation mirrors but for asynchronous handlers. It uses a copy-on-write array +/// for subscriptions, providing lock-free add/remove and snapshot-based iteration during publish. Handlers can optionally be +/// gated by a predicate that is awaited. Error handling behavior is configured via the fluent . +/// +/// After and , the observer instance is immutable and thread-safe. +/// +/// The event payload type to broadcast. +public sealed class AsyncObserver +{ + /// + /// Asynchronous predicate to filter whether a handler should receive the event. + /// + /// The event. + /// + /// A whose result is to invoke the handler; otherwise . + /// + public delegate ValueTask Predicate(TEvent evt); + + /// + /// Asynchronous handler invoked when a published event passes an optional predicate. + /// + /// The event payload. + /// A that completes when handling finishes. + public delegate ValueTask Handler(TEvent evt); + + /// + /// Asynchronous error sink invoked when a handler throws; never throws back into the publisher. + /// + /// The exception from a handler. + /// The event being processed. + /// A representing sink completion. + public delegate ValueTask ErrorSink(Exception ex, TEvent evt); + + private enum ErrorPolicy + { + Swallow, + ThrowFirst, + ThrowAggregate + } + + private readonly ErrorPolicy _errorPolicy; + private readonly ErrorSink? _errorSink; + + private Entry[] _entries = []; // copy-on-write storage + private int _nextId; + + private struct Entry + { + public int Id; + public Predicate? Pred; + public Handler Callback; + } + + private AsyncObserver(ErrorPolicy policy, ErrorSink? sink) + => (_errorPolicy, _errorSink) = (policy, sink); + + /// + /// Current number of active subscriptions (approximate during concurrent updates). + /// + public int SubscriberCount => Volatile.Read(ref _entries).Length; + + /// + /// Publish an event to all matching subscribers asynchronously. Exceptions are handled according to the configured policy. + /// + /// The event value. + /// A token to cancel the publish operation. + /// When policy is ThrowAggregate and one or more handlers threw. + /// When policy is ThrowFirst and a handler threw; the first exception is rethrown. + /// If is canceled. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async ValueTask PublishAsync(TEvent evt, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + var snapshot = Volatile.Read(ref _entries); + List? errors = null; + + foreach (var t in snapshot) + { + cancellationToken.ThrowIfCancellationRequested(); + var e = t; // copy to avoid ref across await + var run = e.Pred is null || await e.Pred(evt).ConfigureAwait(false); + if (!run) continue; + + try + { + await e.Callback(evt).ConfigureAwait(false); + } + catch (Exception ex) + { + var sink = _errorSink; + if (sink is not null) + { + try + { + await sink(ex, evt).ConfigureAwait(false); + } + catch + { + /* swallow */ + } + } + + switch (_errorPolicy) + { + case ErrorPolicy.Swallow: + break; + case ErrorPolicy.ThrowFirst: + throw; + case ErrorPolicy.ThrowAggregate: + (errors ??= []).Add(ex); + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + } + + if (errors is { Count: > 0 }) throw new AggregateException(errors); + } + + // Subscriptions + /// + /// Subscribe an asynchronous handler to receive all events. + /// + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Handler handler) => Subscribe(null, handler); + + /// + /// Subscribe an asynchronous handler with an optional asynchronous predicate filter. + /// + /// The filter to decide whether to deliver the event to the handler (optional). + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Predicate? predicate, Handler handler) + { + var id = Interlocked.Increment(ref _nextId); + while (true) + { + var curr = Volatile.Read(ref _entries); + var next = new Entry[curr.Length + 1]; + Array.Copy(curr, next, curr.Length); + next[curr.Length] = new Entry { Id = id, Pred = predicate, Callback = handler }; + if (Interlocked.CompareExchange(ref _entries, next, curr) == curr) + break; + } + + return new Subscription(this, id); + } + + // Convenience adapters for sync delegates + /// + /// Convenience adapter: subscribe a synchronous to the async observer. + /// + /// The synchronous handler to adapt. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Observer.Handler handler) + => Subscribe(null, e => + { + handler(in e); + return default; + }); + + /// + /// Convenience adapter: subscribe a synchronous predicate and handler to the async observer. + /// + /// The synchronous predicate to filter events. + /// The synchronous handler to adapt. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Observer.Predicate predicate, Observer.Handler handler) + => Subscribe((Predicate)(e => new ValueTask(predicate(in e))), + e => + { + handler(in e); + return default; + }); + + private void Unsubscribe(int id) + { + while (true) + { + var curr = Volatile.Read(ref _entries); + var idx = -1; + for (var i = 0; i < curr.Length; i++) + if (curr[i].Id == id) + { + idx = i; + break; + } + + if (idx < 0) return; + + var next = new Entry[curr.Length - 1]; + if (idx > 0) Array.Copy(curr, 0, next, 0, idx); + if (idx < curr.Length - 1) Array.Copy(curr, idx + 1, next, idx, curr.Length - idx - 1); + if (Interlocked.CompareExchange(ref _entries, next, curr) == curr) + return; + } + } + + private sealed class Subscription : IDisposable + { + private AsyncObserver? _owner; + private readonly int _id; + internal Subscription(AsyncObserver owner, int id) => (_owner, _id) = (owner, id); + + public void Dispose() + { + var o = Interlocked.Exchange(ref _owner, null); + o?.Unsubscribe(_id); + } + } + + /// Create a new fluent builder for . + public static Builder Create() => new(); + + /// Fluent builder for configuring error policy and sinks. + public sealed class Builder + { + private ErrorPolicy _policy = ErrorPolicy.ThrowAggregate; + private ErrorSink? _sink; + + /// Send handler exceptions to the provided asynchronous sink (never throws back). + /// The asynchronous error sink. + /// The current builder. + public Builder OnError(ErrorSink sink) + { + _sink = sink; + return this; + } + + /// + /// Convenience overload for synchronous error sinks. + /// + /// The synchronous error sink. + /// The current builder. + public Builder OnError(Observer.ErrorSink sink) + { + _sink = (ex, e) => + { + sink(ex, in e); + return default; + }; + return this; + } + + /// Swallow handler exceptions (after sending to sink if configured). + /// The current builder. + public Builder SwallowErrors() + { + _policy = ErrorPolicy.Swallow; + return this; + } + + /// Throw the first handler exception immediately. + /// The current builder. + public Builder ThrowFirstError() + { + _policy = ErrorPolicy.ThrowFirst; + return this; + } + + /// Aggregate all handler exceptions and throw at the end of publish (default). + /// The current builder. + public Builder ThrowAggregate() + { + _policy = ErrorPolicy.ThrowAggregate; + return this; + } + + /// Build the immutable, thread-safe async observer. + /// A configured . + public AsyncObserver Build() => new(_policy, _sink); + } +} \ No newline at end of file diff --git a/src/PatternKit.Core/Behavioral/Observer/Observer.cs b/src/PatternKit.Core/Behavioral/Observer/Observer.cs new file mode 100644 index 0000000..e69de29 diff --git a/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs b/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs new file mode 100644 index 0000000..153632c --- /dev/null +++ b/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs @@ -0,0 +1,69 @@ +using PatternKit.Behavioral.Observer; +using System.Collections; + +namespace PatternKit.Examples.ObserverDemo; + +/// Lightweight property change hub (INotifyPropertyChanged-like) built on Observer<string>. +public sealed class PropertyChangedHub +{ + private readonly Observer _obs = Observer.Create().Build(); + public IDisposable Subscribe(Action onProperty) => _obs.Subscribe((in p) => onProperty(p)); + public void Raise(string propertyName) => _obs.Publish(in propertyName); +} + +/// A tiny observable variable that publishes change notifications when its value changes. +public sealed class ObservableVar(T initial = default!) +{ + private readonly Observer<(T Old, T New)> _obs = Observer<(T Old, T New)>.Create().Build(); + private T _value = initial; + + public T Value + { + get => _value; + set + { + var old = _value; + if (EqualityComparer.Default.Equals(old, value)) return; + _value = value; + var ev = (Old: old, New: value); + _obs.Publish(in ev); + } + } + + public IDisposable Subscribe(Action onChange) + => _obs.Subscribe((in e) => onChange(e.Old, e.New)); +} + +/// Observable list wrapper that publishes simple add/remove events. +public sealed class ObservableList : IEnumerable +{ + private readonly List _items = []; + private readonly Observer<(string Action, T Item)> _obs = Observer<(string Action, T Item)>.Create().Build(); + + public int Count => _items.Count; + public IReadOnlyList Snapshot() => _items.ToArray(); + + public void Add(T item) + { + _items.Add(item); + var ev = (Action: "add", Item: item); + _obs.Publish(in ev); + } + + public bool Remove(T item) + { + var ok = _items.Remove(item); + if (!ok) + return ok; + var ev = (Action: "remove", Item: item); + _obs.Publish(in ev); + return ok; + } + + public IDisposable Subscribe(Action onChange) + => _obs.Subscribe((in e) => onChange(e.Action, e.Item)); + + public IEnumerator GetEnumerator() => _items.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); +} + diff --git a/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs b/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs new file mode 100644 index 0000000..55e46f1 --- /dev/null +++ b/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs @@ -0,0 +1,152 @@ +using PatternKit.Behavioral.Observer; + +namespace PatternKit.Examples.ObserverDemo; + +public enum LoyaltyTier { None, Silver, Gold, Platinum } +public enum PaymentKind { None, CreditCard, StoreCard, Cash } + +public readonly record struct LineItem(string Sku, int Qty, decimal UnitPrice, decimal? DiscountPct = null, bool Taxable = true) +{ + public decimal Raw => Qty * UnitPrice; + public decimal LineDiscount => DiscountPct is { } p ? Raw * p : 0m; + public decimal Net => Raw - LineDiscount; +} + +/// +/// Reactive transaction shows dependent, computed properties updated via Observer-based subscriptions. +/// +public sealed class ReactiveTransaction +{ + // Inputs (reactive) + public ObservableList Items { get; } = new(); + public ObservableVar Tier { get; } = new(); + public ObservableVar Payment { get; } = new(); + public ObservableVar TaxRate { get; } = new(0.07m); + + // Outputs (reactive vars so consumers can subscribe to precise changes) + public ObservableVar Subtotal { get; } = new(); + public ObservableVar LineItemDiscounts { get; } = new(); + public ObservableVar LoyaltyDiscount { get; } = new(); + public ObservableVar PaymentDiscount { get; } = new(); + public ObservableVar Tax { get; } = new(); + public ObservableVar Total { get; } = new(); + + // UI-ish dependent properties + public ObservableVar CanCheckout { get; } = new(); + public ObservableVar DiscountBadge { get; } = new(); + + // Fine-grained change notifications for property names, if needed + public PropertyChangedHub PropertyChanged { get; } = new(); + + private readonly Observer.Handler _notify; + + public ReactiveTransaction() + { + _notify = (in p) => PropertyChanged.Raise(p); + + // Recompute on any input change + Items.Subscribe((_, _) => Recompute()); + Tier.Subscribe((_, _) => Recompute()); + Payment.Subscribe((_, _) => Recompute()); + TaxRate.Subscribe((_, _) => Recompute()); + + // Recompute once on construction + Recompute(); + } + + private void Recompute() + { + var list = Items.Snapshot(); + decimal raw = 0m, lineDisc = 0m, taxableNet = 0m; + + foreach (var it in list) + { + raw += it.Raw; + lineDisc += it.LineDiscount; + if (it.Taxable) taxableNet += it.Net; + } + + var tierDiscPct = Tier.Value switch + { + LoyaltyTier.Platinum => 0.10m, + LoyaltyTier.Gold => 0.07m, + LoyaltyTier.Silver => 0.04m, + _ => 0m + }; + + var paymentDiscPct = Payment.Value switch + { + PaymentKind.StoreCard => 0.05m, // in-house card promo + _ => 0m + }; + + var loyaltyDisc = Math.Round((raw - lineDisc) * tierDiscPct, 2, MidpointRounding.AwayFromZero); + var payDisc = Math.Round((raw - lineDisc - loyaltyDisc) * paymentDiscPct, 2, MidpointRounding.AwayFromZero); + var preTax = raw - lineDisc - loyaltyDisc - payDisc; + var tax = Math.Round(taxableNet * TaxRate.Value, 2, MidpointRounding.AwayFromZero); + var total = Math.Round(preTax + tax, 2, MidpointRounding.AwayFromZero); + + // Publish to reactive outputs (triggers subscribers if changed) + Subtotal.Value = raw; + LineItemDiscounts.Value = lineDisc; + LoyaltyDiscount.Value = loyaltyDisc; + PaymentDiscount.Value = payDisc; + Tax.Value = tax; + Total.Value = total; + + // UI-ish dependents + CanCheckout.Value = total > 0 && Payment.Value != PaymentKind.None; + DiscountBadge.Value = (loyaltyDisc + payDisc) > 0 ? $"You saved {(loyaltyDisc + payDisc):C}" : null; + + // Optional name-based notifications for UIs that listen by name + var p1 = nameof(Subtotal); _notify(in p1); + var p2 = nameof(LineItemDiscounts); _notify(in p2); + var p3 = nameof(LoyaltyDiscount); _notify(in p3); + var p4 = nameof(PaymentDiscount); _notify(in p4); + var p5 = nameof(Tax); _notify(in p5); + var p6 = nameof(Total); _notify(in p6); + var p7 = nameof(CanCheckout); _notify(in p7); + var p8 = nameof(DiscountBadge); _notify(in p8); + } + + // Convenience API + public void AddItem(LineItem item) => Items.Add(item); + public bool RemoveItem(LineItem item) => Items.Remove(item); + public void SetTier(LoyaltyTier tier) => Tier.Value = tier; + public void SetPayment(PaymentKind kind) => Payment.Value = kind; + public void SetTaxRate(decimal rate) => TaxRate.Value = rate; +} + +/// +/// Minimal reactive ViewModel sample to demonstrate dependent properties and control enablement. +/// +public sealed class ProfileViewModel +{ + public ObservableVar FirstName { get; } = new(); + public ObservableVar LastName { get; } = new(); + public ObservableVar FullName { get; } = new(string.Empty); + public ObservableVar CanSave { get; } = new(); + public PropertyChangedHub PropertyChanged { get; } = new(); + + private readonly Observer.Handler _notify; + + public ProfileViewModel() + { + _notify = (in p) => PropertyChanged.Raise(p); + + FirstName.Subscribe((_, _) => Recompute()); + LastName.Subscribe((_, _) => Recompute()); + Recompute(); + } + + private void Recompute() + { + var fn = FirstName.Value?.Trim(); + var ln = LastName.Value?.Trim(); + var full = string.Join(' ', new[] { fn, ln }.Where(s => !string.IsNullOrWhiteSpace(s))); + FullName.Value = full; + CanSave.Value = !string.IsNullOrWhiteSpace(fn) && !string.IsNullOrWhiteSpace(ln); + var pFull = nameof(FullName); _notify(in pFull); + var pSave = nameof(CanSave); _notify(in pSave); + } +} diff --git a/src/PatternKit.Examples/ObserverDemo/SimpleEventHub.cs b/src/PatternKit.Examples/ObserverDemo/SimpleEventHub.cs new file mode 100644 index 0000000..e6693b8 --- /dev/null +++ b/src/PatternKit.Examples/ObserverDemo/SimpleEventHub.cs @@ -0,0 +1,53 @@ +namespace PatternKit.Examples.ObserverDemo; + +using Behavioral.Observer; + +/// +/// A tiny demo event hub built on top of for examples and docs. +/// +/// The event payload type to broadcast. +public sealed class EventHub +{ + private readonly Observer _observer; + + /// + /// Create an event hub that wraps the provided . + /// + /// The underlying observer to delegate to. + public EventHub(Observer observer) => _observer = observer; + + /// + /// Create a default event hub using with default error handling (aggregate). + /// + /// A new . + public static EventHub CreateDefault() + => new(Observer.Create().Build()); + + /// + /// Subscribe a handler to receive all events. + /// + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable On(Observer.Handler handler) + => _observer.Subscribe(handler); + + /// + /// Subscribe a handler with an optional predicate filter. + /// + /// The filter to decide whether to deliver the event to the handler. + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable On(Observer.Predicate predicate, Observer.Handler handler) + => _observer.Subscribe(predicate, handler); + + /// + /// Publish an event to all matching subscribers. + /// + /// The event value. + public void Publish(in TEvent evt) => _observer.Publish(in evt); +} + +/// Sample event used in docs and tests for the observer demo. +/// The unique user identifier. +/// The action performed by the user. +public readonly record struct UserEvent(int Id, string Action); diff --git a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs new file mode 100644 index 0000000..e69de29 diff --git a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs new file mode 100644 index 0000000..e69de29 diff --git a/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs b/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs new file mode 100644 index 0000000..e69de29 diff --git a/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs b/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs new file mode 100644 index 0000000..e69de29 From 05e373cab9effd6eeba44a237f88aade42860a8b Mon Sep 17 00:00:00 2001 From: "Jerrett D. Davis" Date: Thu, 9 Oct 2025 23:26:30 -0500 Subject: [PATCH 2/2] feat(observer): implemented fluent observer pattern --- docs/examples/observer-demo.md | 69 ++++++ docs/examples/reactive-transaction.md | 49 ++++ docs/examples/reactive-viewmodel.md | 36 +++ docs/examples/toc.yml | 9 + docs/index.md | 2 +- .../behavioral/observer/asyncobserver.md | 105 +++++++++ docs/patterns/behavioral/observer/observer.md | 103 +++++++++ docs/patterns/toc.yml | 6 + .../Behavioral/Observer/Observer.cs | 209 +++++++++++++++++ .../ObserverDemo/ReactivePrimitives.cs | 58 ++++- .../ObserverDemo/ReactiveTransaction.cs | 64 +++++- src/PatternKit.Generators/packages.lock.json | 188 +++++++++++++++ .../ObserverDemo/ReactiveTransactionTests.cs | 51 +++++ .../ObserverDemo/ReactiveViewModelTests.cs | 38 ++++ .../Behavioral/Observer/AsyncObserverTests.cs | 215 ++++++++++++++++++ .../Behavioral/Observer/ObserverTests.cs | 187 +++++++++++++++ 16 files changed, 1383 insertions(+), 6 deletions(-) diff --git a/docs/examples/observer-demo.md b/docs/examples/observer-demo.md index e69de29..1fd2d1b 100644 --- a/docs/examples/observer-demo.md +++ b/docs/examples/observer-demo.md @@ -0,0 +1,69 @@ +# In-Process Event Hub with Observer + +This demo shows how to build a minimal, typed event hub using PatternKit’s Observer. + +What it demonstrates +- Typed publish/subscribe: `Observer` +- Predicate-based subscriptions (per-subscriber filters) +- Error policies (aggregate, throw-first, swallow) + error sink +- Unsubscribe via `IDisposable` + +Where to look +- Code: `src/PatternKit.Examples/ObserverDemo/` + - `SimpleEventHub.cs`: small wrapper and a sample `UserEvent` record +- Tests: `test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs` + +Quick start +```csharp +using PatternKit.Behavioral.Observer; + +// Build an observer hub for string events +var hub = Observer.Create() + .OnError(static (ex, in msg) => Console.Error.WriteLine($"handler failed: {ex.Message} on '{msg}'")) + .ThrowAggregate() // default policy + .Build(); + +// Subscribe all messages +var subAll = hub.Subscribe(static (in string msg) => Console.WriteLine($"ALL: {msg}")); + +// Subscribe only warnings +var subWarn = hub.Subscribe(static (in string msg) => msg.StartsWith("warn:", StringComparison.Ordinal), + static (in string msg) => Console.WriteLine($"WARN: {msg}")); + +hub.Publish("hello"); +hub.Publish("warn: low-disk"); + +subWarn.Dispose(); // unsubscribe +``` + +Error handling policies +- ThrowAggregate (default): run everyone; collect exceptions and throw a single `AggregateException` at the end. +- ThrowFirstError: stop at the first exception (later handlers don’t run). +- SwallowErrors: never throw; failures go only to the error sink. + +Example: swallow errors so publishing never throws +```csharp +var hub = Observer.Create() + .OnError(static (ex, in n) => Console.Error.WriteLine($"fail({n}): {ex.Message}")) + .SwallowErrors() + .Build(); + +hub.Subscribe(static (in int _) => throw new InvalidOperationException("boom")); +hub.Subscribe(static (in int n) => Console.WriteLine($"ok:{n}")); + +hub.Publish(42); // prints: ok:42, no exception to caller +``` + +Notes +- Subscriptions are copy-on-write; publishing iterates a snapshot without locks. +- Registration order is preserved. +- Subscribing/unsubscribing during a publish affects the next publish, not the current one. +- Delegates use `in T` for zero-copy struct pass-through. + +Run the tests +```bash +# From the repo root +dotnet build PatternKit.slnx -c Debug +dotnet test PatternKit.slnx -c Debug +``` + diff --git a/docs/examples/reactive-transaction.md b/docs/examples/reactive-transaction.md index e69de29..d099842 100644 --- a/docs/examples/reactive-transaction.md +++ b/docs/examples/reactive-transaction.md @@ -0,0 +1,49 @@ +# Reactive Transaction with Observer + +This example builds a non-trivial transaction model that computes totals dynamically from line items, loyalty tier, payment kind, and tax rate. +It uses tiny reactive primitives powered by PatternKit’s `Observer` so everything updates automatically. + +What it demonstrates +- Reactive inputs: line items, loyalty tier, payment kind, tax rate +- Computed outputs: subtotal, line-item discounts, loyalty and payment discounts, tax, total +- UI-like dependents: `CanCheckout`, `DiscountBadge` +- Change broadcasting via both typed subscriptions and simple “property name” notifications + +Code +```csharp +using PatternKit.Examples.ObserverDemo; + +var tx = new ReactiveTransaction(); + +// Listen for total changes +using var _ = tx.Total.Subscribe((old, @new) => Console.WriteLine($"Total: {old:C} -> {@new:C}")); + +// Build cart +tx.AddItem(new LineItem("WIDGET", 2, 19.99m)); +tx.AddItem(new LineItem("GADGET", 1, 49.99m, DiscountPct: 0.10m)); // 10% line discount + +// Promotions +tx.SetTier(LoyaltyTier.Gold); // ~7% +tx.SetPayment(PaymentKind.StoreCard); // extra 5% after loyalty + +// Tax +tx.SetTaxRate(0.07m); + +Console.WriteLine($"Subtotal: {tx.Subtotal.Value:C}"); +Console.WriteLine($"Discounts: line={tx.LineItemDiscounts.Value:C}, loyalty={tx.LoyaltyDiscount.Value:C}, pay={tx.PaymentDiscount.Value:C}"); +Console.WriteLine($"Tax: {tx.Tax.Value:C}"); +Console.WriteLine($"Total: {tx.Total.Value:C}"); +Console.WriteLine($"CanCheckout: {tx.CanCheckout.Value}"); +Console.WriteLine(tx.DiscountBadge.Value ?? "No discounts"); +``` + +How it works +- `ObservableList` and `ObservableVar` publish change events. +- `ReactiveTransaction` recomputes whenever any input changes and pushes new values to reactive outputs. +- Consumers subscribe specifically (e.g., `Total`) or to property-name updates via a small `PropertyChangedHub`. + +Where to look +- Source: `src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs` +- Reactive primitives: `src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs` +- Tests: `test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs` + diff --git a/docs/examples/reactive-viewmodel.md b/docs/examples/reactive-viewmodel.md index e69de29..73f3fcc 100644 --- a/docs/examples/reactive-viewmodel.md +++ b/docs/examples/reactive-viewmodel.md @@ -0,0 +1,36 @@ +# Reactive ViewModel with Observer + +This example shows how to build dependent properties and control enablement without `INotifyPropertyChanged`, using PatternKit’s Observer-based primitives. + +What it demonstrates +- Dependent properties: `FullName` derived from `FirstName` + `LastName` +- Control enablement: `CanSave` toggles based on input validity +- Lightweight property-changed hub for UI bindings + +Code +```csharp +using PatternKit.Examples.ObserverDemo; + +var vm = new ProfileViewModel(); + +// Subscribe to name changes +var unsub = vm.PropertyChanged.Subscribe(name => Console.WriteLine($"Changed: {name}")); + +vm.FirstName.Value = "Ada"; +vm.LastName.Value = "Lovelace"; + +Console.WriteLine(vm.FullName.Value); // "Ada Lovelace" +Console.WriteLine(vm.CanSave.Value); // true + +unsub.Dispose(); +``` + +How it works +- `ObservableVar` emits a `(old,new)` event on changes. +- The viewmodel subscribes to `FirstName` and `LastName`, recomputes `FullName` and `CanSave`, and raises name-based notifications via a small `PropertyChangedHub`. +- No reflection, no `INotifyPropertyChanged` interface required. + +Where to look +- Source: `src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs` (contains `ProfileViewModel`) +- Tests: `test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs` + diff --git a/docs/examples/toc.yml b/docs/examples/toc.yml index 9206c91..77dfe87 100644 --- a/docs/examples/toc.yml +++ b/docs/examples/toc.yml @@ -36,3 +36,12 @@ - name: Text Editor History (Memento) href: text-editor-memento.md + +- name: Observer — In-Process Event Hub + href: observer-demo.md + +- name: Reactive ViewModel (Dependent Properties + Enablement) + href: reactive-viewmodel.md + +- name: Reactive Transaction (Dynamic Discounts, Tax, Total) + href: reactive-transaction.md diff --git a/docs/index.md b/docs/index.md index 6fb45eb..fbd5d81 100644 --- a/docs/index.md +++ b/docs/index.md @@ -66,7 +66,7 @@ PatternKit will grow to cover **Creational**, **Structural**, and **Behavioral** | -------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | **Creational** | [Factory](patterns/creational/factory/factory.md) ✓ • [Composer](patterns/creational/builder/composer.md) ✓ • [ChainBuilder](patterns/creational/builder/chainbuilder.md) ✓ • [BranchBuilder](patterns/creational/builder/chainbuilder.md) ✓ • [MutableBuilder](patterns/creational/builder/mutablebuilder.md) ✓ • [Prototype](patterns/creational/prototype/prototype.md) ✓ • [Singleton](patterns/creational/singleton/singleton.md) ✓ | | **Structural** | [Adapter](patterns/structural/adapter/fluent-adapter.md) ✓ • [Bridge](patterns/structural/bridge/bridge.md) ✓ • [Composite](patterns/structural/composite/composite.md) ✓ • [Decorator](patterns/structural/decorator/index.md) ✓ • [Facade](patterns/structural/facade/facade.md) ✓ • [Flyweight](patterns/structural/flyweight/index.md) ✓ • [Proxy](patterns/structural/proxy/index.md) ✓ | -| **Behavioral** | [Strategy](patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](patterns/behavioral/chain/resultchain.md) ✓ • [Command](patterns/behavioral/command/command.md) ✓ • [ReplayableSequence](patterns/behavioral/iterator/replayablesequence.md) ✓ • [WindowSequence](patterns/behavioral/iterator/windowsequence.md) ✓ • [Mediator](patterns/behavioral/mediator/mediator.md) ✓ • [Memento](patterns/behavioral/memento/memento.md) ✓ • Observer (planned) • State (planned) • Template Method (planned) • Visitor (planned) | +| **Behavioral** | [Strategy](patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](patterns/behavioral/chain/resultchain.md) ✓ • [Command](patterns/behavioral/command/command.md) ✓ • [ReplayableSequence](patterns/behavioral/iterator/replayablesequence.md) ✓ • [WindowSequence](patterns/behavioral/iterator/windowsequence.md) ✓ • [Mediator](patterns/behavioral/mediator/mediator.md) ✓ • [Memento](patterns/behavioral/memento/memento.md) ✓ • [Observer](patterns/behavioral/observer/observer.md) ✓ • [AsyncObserver](patterns/behavioral/observer/asyncobserver.md) ✓ • State (planned) • Template Method (planned) • Visitor (planned) | Each pattern will ship with: diff --git a/docs/patterns/behavioral/observer/asyncobserver.md b/docs/patterns/behavioral/observer/asyncobserver.md index e69de29..3b3a4bd 100644 --- a/docs/patterns/behavioral/observer/asyncobserver.md +++ b/docs/patterns/behavioral/observer/asyncobserver.md @@ -0,0 +1,105 @@ +# AsyncObserver + +An asynchronous, thread-safe event hub for broadcasting events of type `TEvent` to multiple subscribers. Handlers and predicates are `ValueTask`-based, allowing you to await I/O without blocking. + +Use it when your observers perform async work (I/O, timers, pipelines) and you want a clean, fluent API matching the synchronous `Observer`. + +--- + +## What it is + +- Typed: `AsyncObserver` delivers strongly-typed events. +- Async-first: `Predicate` and `Handler` return `ValueTask`. +- Cancellation: `PublishAsync` accepts a `CancellationToken`. +- Lock-free, copy-on-write subscriptions: publish iterates a snapshot without locks. +- Predicate filters: decide per-subscriber whether to run the handler. +- Immutable & thread-safe after `Build()`. + +--- + +## TL;DR example + +```csharp +using PatternKit.Behavioral.Observer; + +var hub = AsyncObserver.Create() + .OnError(static (ex, in msg) => { Console.Error.WriteLine(ex.Message); return ValueTask.CompletedTask; }) + .ThrowAggregate() // default + .Build(); + +hub.Subscribe(async (in string msg) => { await Task.Delay(1); Console.WriteLine($"ALL:{msg}"); }); + +hub.Subscribe( + predicate: static (in string msg) => new ValueTask(msg.StartsWith("warn:", StringComparison.Ordinal)), + handler: async (in string msg) => { await Task.Yield(); Console.WriteLine($"WARN:{msg}"); }); + +await hub.PublishAsync("hello"); +await hub.PublishAsync("warn: disk"); +``` + +--- + +## API shape + +```csharp +var hub = AsyncObserver.Create() + .OnError(static (Exception ex, in TEvent evt) => /* log */ ValueTask.CompletedTask) // optional + .ThrowAggregate() // default + // .ThrowFirstError() + // .SwallowErrors() + .Build(); + +IDisposable s1 = hub.Subscribe(static (in TEvent evt) => /* ValueTask handler */); +IDisposable s2 = hub.Subscribe(static (in TEvent evt) => /* ValueTask filter */, static (in TEvent evt) => /* ValueTask handler */); + +await hub.PublishAsync(evt, cancellationToken); +``` + +- `PublishAsync(TEvent evt, CancellationToken ct = default)` drives the async flow; it does not take `in` because async methods cannot have `in/ref/out` parameters. +- Delegates still use `in TEvent` for zero-copy of large structs. + +--- + +## Error handling policies + +Same as `Observer`: + +- ThrowAggregate (default): run all matching handlers; collect exceptions and throw a single `AggregateException` at the end. Error sink is awaited for each failure. +- ThrowFirstError: throw immediately on the first failing handler; remaining handlers do not run. +- SwallowErrors: never throw from `PublishAsync`; failures go only to the error sink if configured. + +Error sink forms: + +- Async: `.OnError((ex, in evt) => ValueTask)` +- Sync adapter: `.OnError((ex, in evt) => void)` which is adapted to `ValueTask`. + +--- + +## Interop with synchronous Observer + +You can reuse synchronous delegates via adapter overloads: + +```csharp +var asyncHub = AsyncObserver.Create().Build(); +asyncHub.Subscribe(static (in int x) => x > 0, static (in int x) => Console.WriteLine($"+:{x}")); +asyncHub.Subscribe(static (in int x) => Console.WriteLine(x)); +``` + +These overloads wrap sync delegates in `ValueTask` delegates with zero allocations on the fast path. + +--- + +## Notes + +- Ordering: handlers run in registration order. +- Reentrancy: subscribing/unsubscribing during `PublishAsync` affects subsequent publishes. +- Cancellation: `PublishAsync` checks the token between subscribers; a cancellation stops the loop with `OperationCanceledException`. +- Performance: copy-on-write subscriptions keep publish contention-free; avoid heavy per-event allocations in handlers. + +--- + +## See also + +- [Observer](./observer.md) for the synchronous variant +- [ActionChain](../chain/actionchain.md) and [Mediator](../mediator/mediator.md) for other orchestration styles + diff --git a/docs/patterns/behavioral/observer/observer.md b/docs/patterns/behavioral/observer/observer.md index e69de29..9bd5906 100644 --- a/docs/patterns/behavioral/observer/observer.md +++ b/docs/patterns/behavioral/observer/observer.md @@ -0,0 +1,103 @@ +# Observer + +A thread-safe, fluent event hub for broadcasting events of type `TEvent` to multiple subscribers. Observers can opt-in via an optional predicate filter. + +Use it when you need a decoupled, in-process publish/subscribe mechanism: UI events, domain notifications, pipeline hooks, or instrumentation. + +--- + +## What it is + +- Typed: `Observer` delivers strongly-typed events. +- Fluent builder: configure error handling, then `Build()`. +- Lock-free copy-on-write subscriptions: publish reads a snapshot array; subscribe/unsubscribe are atomic. +- Predicate filters: deliver to a handler only when a condition matches. +- Immutable & thread-safe after `Build()`. + +--- + +## TL;DR example + +```csharp +using PatternKit.Behavioral.Observer; + +var hub = Observer.Create() + .OnError(static (ex, in msg) => Console.Error.WriteLine($"handler failed: {ex.Message} for '{msg}'")) + .ThrowAggregate() // default; collect all exceptions and throw one AggregateException + .Build(); + +// subscribe everyone +var subAll = hub.Subscribe(static (in string msg) => Console.WriteLine($"ALL: {msg}")); + +// subscribe with a filter +var subWarn = hub.Subscribe(static (in string msg) => msg.StartsWith("warn:", StringComparison.Ordinal), + static (in string msg) => Console.WriteLine($"WARN: {msg}")); + +hub.Publish("hello"); +hub.Publish("warn: low-disk"); + +subWarn.Dispose(); // unsubscribe +``` + +--- + +## API shape + +```csharp +var hub = Observer.Create() + .OnError(static (Exception ex, in TEvent evt) => /* log */) // optional + .ThrowAggregate() // default + // .ThrowFirstError() // stop at first exception + // .SwallowErrors() // never throw to caller + .Build(); + +IDisposable s1 = hub.Subscribe(static (in TEvent evt) => /* handler */); +IDisposable s2 = hub.Subscribe(static (in TEvent evt) => /* bool filter */, static (in TEvent evt) => /* handler */); + +hub.Publish(in evt); + +s1.Dispose(); // unsubscribe +``` + +Delegates use `in TEvent` to avoid copying large structs. Subscriptions return `IDisposable`; disposing is idempotent. + +--- + +## Error handling policies + +Choose via the builder (defaults to `ThrowAggregate`): + +- ThrowAggregate: run all matching handlers; collect exceptions and throw a single `AggregateException` at the end. The error sink still receives each failure. +- ThrowFirstError: throw immediately on the first failing handler; remaining handlers do not run. +- SwallowErrors: never throw from `Publish`; all exceptions are sent to the error sink if provided. + +You can configure an error sink with `.OnError((ex, in evt) => ...)`. The sink should not throw; if it does, it is swallowed to protect the publisher. + +--- + +## Design notes + +- Copy-on-write subscriptions: publishing is contention-free; subscribe/unsubscribe perform an atomic array swap. +- Ordering: handlers are invoked in registration order. +- Filters: evaluated per-subscriber; if `false`, the handler is skipped. +- Reentrancy: handlers may subscribe/unsubscribe during `Publish`; such changes affect subsequent publishes (not the current one). +- Thread-safety: built instances are safe to share across threads; builder is not thread-safe. + +--- + +## When to use + +- Decouple producers and consumers inside a process. +- Many readers, few writers: telemetry, UI, domain events. +- Need simple filtering at the subscriber side. + +If you need cross-process or durable messaging, integrate with a message bus (RabbitMQ, Azure Service Bus) and consider using Observer as an in-process fan-out. + +--- + +## See also + +- [Mediator](../mediator/mediator.md): coordinate interactions via a central mediator. +- [ActionChain](../chain/actionchain.md): middleware pipeline for conditional processing. +- Examples: [Observer demo](../../../examples/observer-demo.md) + diff --git a/docs/patterns/toc.yml b/docs/patterns/toc.yml index 206550d..a866d8e 100644 --- a/docs/patterns/toc.yml +++ b/docs/patterns/toc.yml @@ -33,6 +33,12 @@ href: behavioral/mediator/mediator.md - name: Command href: behavioral/command/command.md + - name: Observer + items: + - name: Observer + href: behavioral/observer/observer.md + - name: AsyncObserver + href: behavioral/observer/asyncobserver.md - name: Creational items: - name: Builder diff --git a/src/PatternKit.Core/Behavioral/Observer/Observer.cs b/src/PatternKit.Core/Behavioral/Observer/Observer.cs index e69de29..bd954b1 100644 --- a/src/PatternKit.Core/Behavioral/Observer/Observer.cs +++ b/src/PatternKit.Core/Behavioral/Observer/Observer.cs @@ -0,0 +1,209 @@ +using System.Runtime.CompilerServices; + +namespace PatternKit.Behavioral.Observer; + +/// +/// Observer (typed, fluent, thread-safe) +/// Provides a subscription mechanism to notify multiple observers about events of type . +/// +/// +/// +/// This implementation is designed for high read/concurrent publish scenarios using a copy-on-write subscription array. +/// Subscriptions and unsubscriptions are atomic and lock-free; publishing snapshots the current array and iterates it +/// without locks. The builder configures error handling behavior. +/// +/// After and , the observer instance is immutable and thread-safe. +/// +/// The event payload type to broadcast. +public sealed class Observer +{ + /// Predicate to filter whether a handler should receive the event. + /// The event. + /// to invoke the handler; otherwise . + public delegate bool Predicate(in TEvent evt); + + /// Handler invoked when a published event passes an optional predicate. + /// The event payload. + public delegate void Handler(in TEvent evt); + + /// Error sink invoked when a handler throws; never throws back into the publisher. + /// The exception from a handler. + /// The event being processed. + public delegate void ErrorSink(Exception ex, in TEvent evt); + + private enum ErrorPolicy { Swallow, ThrowFirst, ThrowAggregate } + + private readonly ErrorPolicy _errorPolicy; + private readonly ErrorSink? _errorSink; + + private Entry[] _entries = []; // copy-on-write storage + private int _nextId; // subscription id counter + + private struct Entry + { + public int Id; + public Predicate? Pred; + public Handler Callback; + } + + private Observer(ErrorPolicy errorPolicy, ErrorSink? errorSink) + => (_errorPolicy, _errorSink) = (errorPolicy, errorSink); + + /// Current number of active subscriptions (approximate during concurrent updates). + public int SubscriberCount => Volatile.Read(ref _entries).Length; + + /// + /// Publish an event to all matching subscribers. Exceptions are handled according to the configured policy. + /// + /// The event value. + /// When policy is ThrowAggregate and one or more handlers threw. + /// When policy is ThrowFirst and a handler threw; the first exception is rethrown. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Publish(in TEvent evt) + { + var snapshot = Volatile.Read(ref _entries); + List? errors = null; + + foreach (var t in snapshot) + { + ref readonly var e = ref t; + var run = e.Pred is null || e.Pred(in evt); + if (!run) continue; + + try { e.Callback(in evt); } + catch (Exception ex) + { + // Best-effort sink; never throw from sink. + var sink = _errorSink; + if (sink is not null) + { + try { sink(ex, in evt); } catch { /* swallow */ } + } + + switch (_errorPolicy) + { + case ErrorPolicy.Swallow: + break; + case ErrorPolicy.ThrowFirst: + throw; + case ErrorPolicy.ThrowAggregate: + (errors ??= []).Add(ex); + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + } + + if (errors is { Count: > 0 }) throw new AggregateException(errors); + } + + /// + /// Subscribe a handler to receive all events. + /// + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Handler handler) => Subscribe(null, handler); + + /// + /// Subscribe a handler with an optional predicate filter. + /// + /// The filter to decide whether to deliver the event to the handler (optional). + /// The handler to invoke. + /// An that removes the subscription when disposed. + public IDisposable Subscribe(Predicate? predicate, Handler handler) + { + var id = Interlocked.Increment(ref _nextId); + while (true) + { + var curr = Volatile.Read(ref _entries); + var next = new Entry[curr.Length + 1]; + Array.Copy(curr, next, curr.Length); + next[curr.Length] = new Entry { Id = id, Pred = predicate, Callback = handler }; + if (Interlocked.CompareExchange(ref _entries, next, curr) == curr) + break; + } + + return new Subscription(this, id); + } + + private void Unsubscribe(int id) + { + while (true) + { + var curr = Volatile.Read(ref _entries); + var idx = -1; + for (var i = 0; i < curr.Length; i++) + { + if (curr[i].Id != id) continue; + + idx = i; break; + } + + if (idx < 0) return; // already removed + + var next = new Entry[curr.Length - 1]; + if (idx > 0) Array.Copy(curr, 0, next, 0, idx); + if (idx < curr.Length - 1) Array.Copy(curr, idx + 1, next, idx, curr.Length - idx - 1); + + if (Interlocked.CompareExchange(ref _entries, next, curr) == curr) + return; + } + } + + private sealed class Subscription : IDisposable + { + private Observer? _owner; + private readonly int _id; + + internal Subscription(Observer owner, int id) + => (_owner, _id) = (owner, id); + + public void Dispose() + { + var o = Interlocked.Exchange(ref _owner, null); + o?.Unsubscribe(_id); + } + } + + /// Create a new fluent builder for . + public static Builder Create() => new(); + + /// Fluent builder for configuring error policy and sinks. + public sealed class Builder + { + private ErrorPolicy _policy = ErrorPolicy.ThrowAggregate; + private ErrorSink? _sink; + + /// Send handler exceptions to the provided sink (never throws back). + public Builder OnError(ErrorSink sink) + { + _sink = sink; + return this; + } + + /// Swallow handler exceptions (after sending to sink if configured). + public Builder SwallowErrors() + { + _policy = ErrorPolicy.Swallow; + return this; + } + + /// Throw the first handler exception immediately. + public Builder ThrowFirstError() + { + _policy = ErrorPolicy.ThrowFirst; + return this; + } + + /// Aggregate all handler exceptions and throw at the end of publish (default). + public Builder ThrowAggregate() + { + _policy = ErrorPolicy.ThrowAggregate; + return this; + } + + /// Build the immutable, thread-safe observer. + public Observer Build() => new(_policy, _sink); + } +} diff --git a/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs b/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs index 153632c..58ee8ba 100644 --- a/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs +++ b/src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs @@ -3,20 +3,41 @@ namespace PatternKit.Examples.ObserverDemo; -/// Lightweight property change hub (INotifyPropertyChanged-like) built on Observer<string>. +/// +/// Lightweight property change hub (INotifyPropertyChanged-like) built on of . +/// Publishes property names to subscribers. +/// public sealed class PropertyChangedHub { private readonly Observer _obs = Observer.Create().Build(); + + /// + /// Subscribe to property name change notifications. + /// + /// Callback that receives the property name. + /// An that removes the subscription when disposed. public IDisposable Subscribe(Action onProperty) => _obs.Subscribe((in p) => onProperty(p)); + + /// + /// Raise a property change notification. + /// + /// The property name to publish. public void Raise(string propertyName) => _obs.Publish(in propertyName); } -/// A tiny observable variable that publishes change notifications when its value changes. +/// +/// A tiny observable variable that publishes change notifications when its value changes. +/// +/// The value type. +/// Optional initial value. public sealed class ObservableVar(T initial = default!) { private readonly Observer<(T Old, T New)> _obs = Observer<(T Old, T New)>.Create().Build(); private T _value = initial; + /// + /// The current value. Setting this will publish a change event if the value actually changes. + /// public T Value { get => _value; @@ -30,19 +51,36 @@ public T Value } } + /// + /// Subscribe to change notifications. The callback receives the old and new value. + /// + /// Callback invoked when changes. + /// An that removes the subscription when disposed. public IDisposable Subscribe(Action onChange) => _obs.Subscribe((in e) => onChange(e.Old, e.New)); } -/// Observable list wrapper that publishes simple add/remove events. +/// +/// Observable list wrapper that publishes simple add/remove events. +/// +/// The element type. public sealed class ObservableList : IEnumerable { private readonly List _items = []; private readonly Observer<(string Action, T Item)> _obs = Observer<(string Action, T Item)>.Create().Build(); + /// The current number of items. public int Count => _items.Count; + + /// + /// Return a shallow snapshot copy of the list's current items. + /// public IReadOnlyList Snapshot() => _items.ToArray(); + /// + /// Add an item and publish an "add" event. + /// + /// The item to add. public void Add(T item) { _items.Add(item); @@ -50,6 +88,11 @@ public void Add(T item) _obs.Publish(in ev); } + /// + /// Remove an item and publish a "remove" event if the item existed. + /// + /// The item to remove. + /// if the item was removed; otherwise . public bool Remove(T item) { var ok = _items.Remove(item); @@ -60,10 +103,17 @@ public bool Remove(T item) return ok; } + /// + /// Subscribe to add/remove change notifications. + /// + /// Callback receiving the action ("add" or "remove") and the item. + /// An that removes the subscription when disposed. public IDisposable Subscribe(Action onChange) => _obs.Subscribe((in e) => onChange(e.Action, e.Item)); + /// public IEnumerator GetEnumerator() => _items.GetEnumerator(); + + /// IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } - diff --git a/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs b/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs index 55e46f1..8fdeb9a 100644 --- a/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs +++ b/src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs @@ -2,44 +2,86 @@ namespace PatternKit.Examples.ObserverDemo; +/// Customer loyalty tiers used for discount calculation. public enum LoyaltyTier { None, Silver, Gold, Platinum } + +/// Payment methods that may influence discounts or eligibility. public enum PaymentKind { None, CreditCard, StoreCard, Cash } +/// +/// Represents a line item in a transaction. +/// +/// The product SKU. +/// The quantity ordered. +/// The unit price. +/// Optional per-line discount percentage (0..1). +/// Whether the line is taxable. public readonly record struct LineItem(string Sku, int Qty, decimal UnitPrice, decimal? DiscountPct = null, bool Taxable = true) { + /// Total raw amount before discounts. public decimal Raw => Qty * UnitPrice; + + /// Discount amount for this line based on . public decimal LineDiscount => DiscountPct is { } p ? Raw * p : 0m; + + /// Net amount after line discount. public decimal Net => Raw - LineDiscount; } /// /// Reactive transaction shows dependent, computed properties updated via Observer-based subscriptions. +/// It recomputes totals and UI-like flags whenever any input changes. /// public sealed class ReactiveTransaction { // Inputs (reactive) - public ObservableList Items { get; } = new(); + /// Collection of items in the transaction. Publishes add/remove events. + public ObservableList Items { get; } = []; + + /// Current customer loyalty tier. public ObservableVar Tier { get; } = new(); + + /// Selected payment method. public ObservableVar Payment { get; } = new(); + + /// Applicable tax rate for taxable items. public ObservableVar TaxRate { get; } = new(0.07m); // Outputs (reactive vars so consumers can subscribe to precise changes) + /// Subtotal before discounts and tax. public ObservableVar Subtotal { get; } = new(); + + /// Total of all per-line discounts. public ObservableVar LineItemDiscounts { get; } = new(); + + /// Discount based on loyalty tier. public ObservableVar LoyaltyDiscount { get; } = new(); + + /// Discount based on payment method. public ObservableVar PaymentDiscount { get; } = new(); + + /// Calculated tax on taxable net amount. public ObservableVar Tax { get; } = new(); + + /// Final total after all discounts and tax. public ObservableVar Total { get; } = new(); // UI-ish dependent properties + /// Whether the transaction meets basic checkout requirements. public ObservableVar CanCheckout { get; } = new(); + + /// Optional badge text indicating savings. public ObservableVar DiscountBadge { get; } = new(); // Fine-grained change notifications for property names, if needed + /// Name-based change hub for UI bindings listening by property name. public PropertyChangedHub PropertyChanged { get; } = new(); private readonly Observer.Handler _notify; + /// + /// Create the transaction and wire reactive inputs so that any change triggers recomputation of outputs. + /// public ReactiveTransaction() { _notify = (in p) => PropertyChanged.Raise(p); @@ -110,10 +152,20 @@ private void Recompute() } // Convenience API + /// Add a line item. public void AddItem(LineItem item) => Items.Add(item); + + /// Remove a line item. + /// if removed; otherwise . public bool RemoveItem(LineItem item) => Items.Remove(item); + + /// Set the loyalty tier. public void SetTier(LoyaltyTier tier) => Tier.Value = tier; + + /// Set the payment method. public void SetPayment(PaymentKind kind) => Payment.Value = kind; + + /// Set the tax rate. public void SetTaxRate(decimal rate) => TaxRate.Value = rate; } @@ -122,14 +174,24 @@ private void Recompute() /// public sealed class ProfileViewModel { + /// First name input. public ObservableVar FirstName { get; } = new(); + + /// Last name input. public ObservableVar LastName { get; } = new(); + + /// Computed full name. public ObservableVar FullName { get; } = new(string.Empty); + + /// Whether saving is currently allowed. public ObservableVar CanSave { get; } = new(); + + /// Name-based change hub for UI bindings listening by property name. public PropertyChangedHub PropertyChanged { get; } = new(); private readonly Observer.Handler _notify; + /// Create the view model and wire reactive recompute behavior. public ProfileViewModel() { _notify = (in p) => PropertyChanged.Raise(p); diff --git a/src/PatternKit.Generators/packages.lock.json b/src/PatternKit.Generators/packages.lock.json index 4eb9db9..40a2a74 100644 --- a/src/PatternKit.Generators/packages.lock.json +++ b/src/PatternKit.Generators/packages.lock.json @@ -117,6 +117,194 @@ "System.Runtime.CompilerServices.Unsafe": "4.5.3" } } + }, + ".NETStandard,Version=v2.1": { + "Microsoft.CodeAnalysis.Analyzers": { + "type": "Direct", + "requested": "[3.11.0, )", + "resolved": "3.11.0", + "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" + }, + "Microsoft.CodeAnalysis.CSharp": { + "type": "Direct", + "requested": "[4.14.0, )", + "resolved": "4.14.0", + "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "Microsoft.CodeAnalysis.Common": "[4.14.0]", + "System.Buffers": "4.5.1", + "System.Collections.Immutable": "9.0.0", + "System.Memory": "4.5.5", + "System.Numerics.Vectors": "4.5.0", + "System.Reflection.Metadata": "9.0.0", + "System.Runtime.CompilerServices.Unsafe": "6.0.0", + "System.Text.Encoding.CodePages": "7.0.0", + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, + "System.Collections.Immutable": { + "type": "Direct", + "requested": "[9.0.9, )", + "resolved": "9.0.9", + "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==", + "dependencies": { + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0" + } + }, + "Microsoft.CodeAnalysis.Common": { + "type": "Transitive", + "resolved": "4.14.0", + "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "System.Buffers": "4.5.1", + "System.Collections.Immutable": "9.0.0", + "System.Memory": "4.5.5", + "System.Numerics.Vectors": "4.5.0", + "System.Reflection.Metadata": "9.0.0", + "System.Runtime.CompilerServices.Unsafe": "6.0.0", + "System.Text.Encoding.CodePages": "7.0.0", + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, + "System.Buffers": { + "type": "Transitive", + "resolved": "4.5.1", + "contentHash": "Rw7ijyl1qqRS0YQD/WycNst8hUUMgrMH4FCn1nNm27M4VxchZ1js3fVjQaANHO5f3sN4isvP4a+Met9Y4YomAg==" + }, + "System.Memory": { + "type": "Transitive", + "resolved": "4.5.5", + "contentHash": "XIWiDvKPXaTveaB7HVganDlOCRoj03l+jrwNvcge/t8vhGYKvqV+dMv6G4SAX2NoNmN0wZfVPTAlFwZcZvVOUw==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Numerics.Vectors": "4.4.0", + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + }, + "System.Numerics.Vectors": { + "type": "Transitive", + "resolved": "4.5.0", + "contentHash": "QQTlPTl06J/iiDbJCiepZ4H//BVraReU4O4EoRw1U02H5TLUIT7xn3GnDp9AXPSlJUDyFs4uWjWafNX6WrAojQ==" + }, + "System.Reflection.Metadata": { + "type": "Transitive", + "resolved": "9.0.0", + "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==", + "dependencies": { + "System.Collections.Immutable": "9.0.0", + "System.Memory": "4.5.5" + } + }, + "System.Runtime.CompilerServices.Unsafe": { + "type": "Transitive", + "resolved": "6.0.0", + "contentHash": "/iUeP3tq1S0XdNNoMz5C9twLSrM/TH+qElHkXWaPvuNOt+99G75NrV0OS2EqHx5wMN7popYjpc8oTjC1y16DLg==" + }, + "System.Text.Encoding.CodePages": { + "type": "Transitive", + "resolved": "7.0.0", + "contentHash": "LSyCblMpvOe0N3E+8e0skHcrIhgV2huaNcjUUEa8hRtgEAm36aGkRoC8Jxlb6Ra6GSfF29ftduPNywin8XolzQ==", + "dependencies": { + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0" + } + }, + "System.Threading.Tasks.Extensions": { + "type": "Transitive", + "resolved": "4.5.4", + "contentHash": "zteT+G8xuGu6mS+mzDzYXbzS7rd3K6Fjb9RiZlYlJPam2/hU7JCBZBVEcywNuR+oZ1ncTvc/cq0faRr3P01OVg==", + "dependencies": { + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + } + }, + "net8.0": { + "Microsoft.CodeAnalysis.Analyzers": { + "type": "Direct", + "requested": "[3.11.0, )", + "resolved": "3.11.0", + "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" + }, + "Microsoft.CodeAnalysis.CSharp": { + "type": "Direct", + "requested": "[4.14.0, )", + "resolved": "4.14.0", + "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "Microsoft.CodeAnalysis.Common": "[4.14.0]", + "System.Collections.Immutable": "9.0.0", + "System.Reflection.Metadata": "9.0.0" + } + }, + "System.Collections.Immutable": { + "type": "Direct", + "requested": "[9.0.9, )", + "resolved": "9.0.9", + "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" + }, + "Microsoft.CodeAnalysis.Common": { + "type": "Transitive", + "resolved": "4.14.0", + "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "System.Collections.Immutable": "9.0.0", + "System.Reflection.Metadata": "9.0.0" + } + }, + "System.Reflection.Metadata": { + "type": "Transitive", + "resolved": "9.0.0", + "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==", + "dependencies": { + "System.Collections.Immutable": "9.0.0" + } + } + }, + "net9.0": { + "Microsoft.CodeAnalysis.Analyzers": { + "type": "Direct", + "requested": "[3.11.0, )", + "resolved": "3.11.0", + "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" + }, + "Microsoft.CodeAnalysis.CSharp": { + "type": "Direct", + "requested": "[4.14.0, )", + "resolved": "4.14.0", + "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "Microsoft.CodeAnalysis.Common": "[4.14.0]", + "System.Collections.Immutable": "9.0.0", + "System.Reflection.Metadata": "9.0.0" + } + }, + "System.Collections.Immutable": { + "type": "Direct", + "requested": "[9.0.9, )", + "resolved": "9.0.9", + "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" + }, + "Microsoft.CodeAnalysis.Common": { + "type": "Transitive", + "resolved": "4.14.0", + "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", + "dependencies": { + "Microsoft.CodeAnalysis.Analyzers": "3.11.0", + "System.Collections.Immutable": "9.0.0", + "System.Reflection.Metadata": "9.0.0" + } + }, + "System.Reflection.Metadata": { + "type": "Transitive", + "resolved": "9.0.0", + "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==" + } } } } \ No newline at end of file diff --git a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs index e69de29..2b0639a 100644 --- a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs +++ b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs @@ -0,0 +1,51 @@ +using PatternKit.Examples.ObserverDemo; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Examples.Tests.ObserverDemo; + +[Feature("Reactive Transaction (dynamic totals/discounts/tax) using Observer")] +public sealed class ReactiveTransactionTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private sealed record Ctx(ReactiveTransaction Tx, List Totals) + { + public Ctx() : this(new ReactiveTransaction(), new List()) { } + } + + private static Ctx Build() => new(); + + private static Ctx SubscribeTotals(Ctx c) + { + c.Tx.Total.Subscribe((old, @new) => c.Totals.Add(@new)); + return c; + } + + [Scenario("Totals and badges recompute as inputs change")] + [Fact] + public async Task Totals_Recompute() + { + await Given("a fresh transaction", Build) + .When("subscribing to totals", SubscribeTotals) + .When("adding line items", c => + { + c.Tx.AddItem(new LineItem("WIDGET", 2, 19.99m)); + c.Tx.AddItem(new LineItem("GADGET", 1, 49.99m, DiscountPct: 0.10m)); + return c; + }) + .When("setting promotions", c => { c.Tx.SetTier(LoyaltyTier.Gold); c.Tx.SetPayment(PaymentKind.StoreCard); return c; }) + .When("setting tax rate", c => { c.Tx.SetTaxRate(0.07m); return c; }) + .Then("subtotal is 89.97", c => c.Tx.Subtotal.Value == 89.97m) + .And("line item discounts sum to 4.999", c => c.Tx.LineItemDiscounts.Value == 4.999m) + .And("loyalty discount is 5.95", c => c.Tx.LoyaltyDiscount.Value == 5.95m) + .And("payment discount is 3.95", c => c.Tx.PaymentDiscount.Value == 3.95m) + .And("tax is 5.95", c => c.Tx.Tax.Value == 5.95m) + .And("total is 81.02", c => c.Tx.Total.Value == 81.02m) + .And("checkout is enabled", c => c.Tx.CanCheckout.Value) + .And("badge shows savings", c => (c.Tx.DiscountBadge.Value ?? string.Empty).Contains("You saved")) + .When("change tax rate to 10%", c => { c.Tx.SetTaxRate(0.10m); return c; }) + .Then("tax updates", c => c.Tx.Tax.Value == Math.Round((39.98m + (49.99m - 4.999m)) * 0.10m, 2)) + .AssertPassed(); + } +} + diff --git a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs index e69de29..fccce1e 100644 --- a/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs +++ b/test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs @@ -0,0 +1,38 @@ +using PatternKit.Examples.ObserverDemo; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Examples.Tests.ObserverDemo; + +[Feature("Reactive ViewModel (dependent properties + enablement) using Observer")] +public sealed class ReactiveViewModelTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private sealed record Ctx(ProfileViewModel Vm, List Notified) + { + public Ctx() : this(new ProfileViewModel(), new List()) { } + } + + private static Ctx Build() => new(); + + [Scenario("FullName recomputes from FirstName/LastName; CanSave toggles")] + [Fact] + public async Task FullName_And_Enablement() + { + await Given("an empty viewmodel", Build) + .When("subscribing to property changed", c => + { + var list = c.Notified; + c.Vm.PropertyChanged.Subscribe(name => list.Add(name)); + return c; + }) + .When("set names", c => { c.Vm.FirstName.Value = "Ada"; c.Vm.LastName.Value = "Lovelace"; return c; }) + .Then("FullName is composed", c => c.Vm.FullName.Value == "Ada Lovelace") + .And("CanSave is true", c => c.Vm.CanSave.Value) + .And("notifications include FullName and CanSave", c => c.Notified.Contains("FullName") && c.Notified.Contains("CanSave")) + .When("clear last name", c => { c.Vm.LastName.Value = ""; return c; }) + .Then("CanSave is false", c => !c.Vm.CanSave.Value) + .AssertPassed(); + } +} + diff --git a/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs b/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs index e69de29..8e39d3d 100644 --- a/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs +++ b/test/PatternKit.Tests/Behavioral/Observer/AsyncObserverTests.cs @@ -0,0 +1,215 @@ +using PatternKit.Behavioral.Observer; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Observer; + +[Feature("AsyncObserver (typed, fluent, thread-safe async event hub)")] +public sealed class AsyncObserverTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private readonly record struct Evt(int Id, string Name); + + private sealed record Ctx(AsyncObserver Hub, List Log) + { + public Ctx() : this(AsyncObserver.Create().Build(), []) { } + } + + private static Ctx Build_Default() => new(); + + private static Ctx Build_With_Sink_Aggregate(List<(Exception ex, Evt evt)> sink) + => new( + AsyncObserver.Create() + .OnError((ex, e) => { sink.Add((ex, e)); return default; }) + .ThrowAggregate() + .Build(), + []); + + private static Ctx Build_With_Sink_Swallow(List<(Exception ex, Evt evt)> sink) + => new( + AsyncObserver.Create() + .OnError((ex, e) => { sink.Add((ex, e)); return default; }) + .SwallowErrors() + .Build(), + []); + + private static Ctx Build_With_Sink_ThrowFirst(List<(Exception ex, Evt evt)> sink) + => new( + AsyncObserver.Create() + .OnError((ex, e) => { sink.Add((ex, e)); return default; }) + .ThrowFirstError() + .Build(), + []); + + // Helpers + private static ValueTask Sub_All_Async(Ctx c) + { + var log = c.Log; + c.Hub.Subscribe(async e => { await Task.Yield(); log.Add($"all:{e.Id}:{e.Name}"); }); + return ValueTask.FromResult(c); + } + + private static ValueTask Sub_Filtered_Async(Ctx c) + { + var log = c.Log; + c.Hub.Subscribe(async e => { await Task.Yield(); return (e.Id % 2) == 0; }, + async e => { await Task.Delay(1); log.Add($"even:{e.Id}"); }); + c.Hub.Subscribe(async e => { await Task.Yield(); return (e.Id % 2) == 1; }, + async e => { await Task.Delay(1); log.Add($"odd:{e.Id}"); }); + return ValueTask.FromResult(c); + } + + private static async ValueTask PublishAsync(Ctx c, int id, string name) + { + var ev = new Evt(id, name); + await c.Hub.PublishAsync(ev); + return c; + } + + private static async ValueTask AddAllAndFiltered(Ctx c) + { + await Sub_All_Async(c); + await Sub_Filtered_Async(c); + return c; + } + + private static async ValueTask PublishTwo(Ctx c, int id1, string n1, int id2, string n2) + { + await PublishAsync(c, id1, n1); + await PublishAsync(c, id2, n2); + return c; + } + + private static async ValueTask PublishCatch(Ctx c, Evt ev, string marker) where TEx : Exception + { + try { await c.Hub.PublishAsync(ev); } + catch (TEx) { c.Log.Add(marker); } + return c; + } + + private static async ValueTask AddUnsubThenPublishOnce(Ctx c) + { + var log = c.Log; + var sub = c.Hub.Subscribe(async e => { await Task.Yield(); log.Add($"once:{e.Id}"); }); + await PublishAsync(c, 10, "a"); + sub.Dispose(); + return c; + } + + // Scenarios + + [Scenario("PublishAsync delivers to all async subscribers in order")] + [Fact] + public async Task Basic_Subscribe_Publish() + { + await Given("a default async observer", Build_Default) + .When("subscribing two async 'all' handlers", c => + { + var log = c.Log; + c.Hub.Subscribe(async e => { await Task.Delay(1); log.Add($"h1:{e.Id}"); }); + c.Hub.Subscribe(async e => { await Task.Delay(1); log.Add($"h2:{e.Id}"); }); + return ValueTask.FromResult(c); + }) + .When("publishing event #7", c => PublishAsync(c, 7, "x")) + .Then("both handlers ran in order", c => string.Join(',', c.Log) == "h1:7,h2:7") + .AssertPassed(); + } + + [Scenario("Predicate filters (async) route events to matching subscribers only")] + [Fact] + public async Task Predicate_Filtering() + { + await Given("observer with all + filtered async subscribers", Build_Default) + .When("adding an 'all' handler and two filtered handlers", AddAllAndFiltered) + .When("publishing #1 and #2", c => PublishTwo(c, 1, "one", 2, "two")) + .Then("log shows all + matching filtered entries", c => + { + var s = string.Join('|', c.Log); + return s.Contains("all:1:one") && s.Contains("odd:1") && s.Contains("all:2:two") && s.Contains("even:2"); + }) + .AssertPassed(); + } + + [Scenario("Unsubscribing stops delivery (async)")] + [Fact] + public async Task Unsubscribe_Works() + { + await Given("observer and a tracked subscription", Build_Default) + .When("adding an unsubscribable handler", AddUnsubThenPublishOnce) + .When("publishing again after dispose", c => PublishAsync(c, 11, "b")) + .Then("only the first publish is logged", c => string.Join(',', c.Log) == "once:10") + .AssertPassed(); + } + + [Scenario("SwallowErrors: exceptions don't propagate; others continue (async)")] + [Fact] + public async Task SwallowErrors_Allows_Continuation() + { + var sink = new List<(Exception, Evt)>(); + await Given("async observer with swallow policy and sink", () => Build_With_Sink_Swallow(sink)) + .When("adding throwing + normal handlers", c => + { + c.Hub.Subscribe(static _ => throw new InvalidOperationException("boom")); + c.Hub.Subscribe(async e => { await Task.Yield(); c.Log.Add($"ok:{e.Id}"); }); + return ValueTask.FromResult(c); + }) + .When("publishing event", c => PublishAsync(c, 1, "x")) + .Then("no exception escaped and normal ran", c => c.Log.SequenceEqual(["ok:1"])) + .And("sink captured the exception", _ => sink is [{ Item1: InvalidOperationException } _]) + .AssertPassed(); + } + + [Scenario("ThrowFirstError: first exception is rethrown; later handlers don't run (async)")] + [Fact] + public async Task ThrowFirst_Stops_And_Propagates() + { + var sink = new List<(Exception, Evt)>(); + await Given("async observer with throw-first policy", () => Build_With_Sink_ThrowFirst(sink)) + .When("adding throwing then normal", c => + { + c.Hub.Subscribe(static _ => throw new ApplicationException("x")); + c.Hub.Subscribe(async e => { await Task.Yield(); c.Log.Add($"not:{e.Id}"); }); + return ValueTask.FromResult(c); + }) + .When("publishing and catching", c => PublishCatch(c, new Evt(5, "x"), "threw")) + .Then("later handler did not run", c => !c.Log.Contains("not:5")) + .And("sink recorded the exception", _ => sink is [{ Item1: ApplicationException } _]) + .AssertPassed(); + } + + [Scenario("ThrowAggregate: collect multiple exceptions and throw one AggregateException (async)")] + [Fact] + public async Task ThrowAggregate_Collects_All() + { + var sink = new List<(Exception, Evt)>(); + await Given("async observer with aggregate policy", () => Build_With_Sink_Aggregate(sink)) + .When("adding two throwing and one normal", c => + { + c.Hub.Subscribe(static _ => throw new InvalidOperationException("a")); + c.Hub.Subscribe(async e => { await Task.Yield(); c.Log.Add($"ok:{e.Id}"); }); + c.Hub.Subscribe(static _ => throw new ArgumentException("b")); + return ValueTask.FromResult(c); + }) + .When("publishing and catching aggregate", c => PublishCatch(c, new Evt(9, "n"), marker: "agg:2")) + .Then("normal handler ran despite throws", c => c.Log.Contains("ok:9")) + .And("aggregate captured both exceptions", c => c.Log.Contains("agg:2")) + .And("sink saw two exceptions", _ => sink.Count == 2) + .AssertPassed(); + } + + [Scenario("Sync adapter overloads interop with AsyncObserver")] + [Fact] + public async Task Sync_Adapters_Work() + { + await Given("async observer", Build_Default) + .When("adding sync-style subscriptions via adapters", c => + { + c.Hub.Subscribe(static (in e) => e.Id > 0, (in e) => { c.Log.Add($"sync:{e.Id}"); }); + c.Hub.Subscribe((in e) => { c.Log.Add($"all:{e.Id}"); }); + return ValueTask.FromResult(c); + }) + .When("publishing event #3", c => PublishAsync(c, 3, "x")) + .Then("both sync adapters ran", c => c.Log.SequenceEqual(["sync:3", "all:3"])) + .AssertPassed(); + } +} diff --git a/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs b/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs index e69de29..944e56f 100644 --- a/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs +++ b/test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs @@ -0,0 +1,187 @@ +using PatternKit.Behavioral.Observer; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Observer; + +[Feature("Observer (typed, fluent, thread-safe event hub)")] +public sealed class ObserverTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private readonly record struct Evt(int Id, string Name); + + private sealed record Ctx(Observer Hub, List Log) + { + public Ctx() : this(Observer.Create().Build(), []) { } + } + + private static Ctx Build_Default() => new(); + + private static Ctx Build_With_Sink_Aggregate(List<(Exception ex, Evt evt)> sink) + => new( + Observer.Create() + .OnError((ex, in e) => sink.Add((ex, e))) + .ThrowAggregate() + .Build(), + []); + + private static Ctx Build_With_Sink_Swallow(List<(Exception ex, Evt evt)> sink) + => new( + Observer.Create() + .OnError((ex, in e) => sink.Add((ex, e))) + .SwallowErrors() + .Build(), + []); + + private static Ctx Build_With_Sink_ThrowFirst(List<(Exception ex, Evt evt)> sink) + => new( + Observer.Create() + .OnError((ex, in e) => sink.Add((ex, e))) + .ThrowFirstError() + .Build(), + []); + + private static Ctx Sub_All(Ctx c) + { + var log = c.Log; + c.Hub.Subscribe((in e) => log.Add($"all:{e.Id}:{e.Name}")); + return c; + } + + private static Ctx Sub_Filtered(Ctx c) + { + var log = c.Log; + c.Hub.Subscribe(static (in e) => e.Id % 2 == 0, (in e) => log.Add($"even:{e.Id}")); + c.Hub.Subscribe(static (in e) => e.Id % 2 == 1, (in e) => log.Add($"odd:{e.Id}")); + return c; + } + + private static Ctx Publish(Ctx c, int id, string name) + { + var ev = new Evt(id, name); + c.Hub.Publish(in ev); + return c; + } + + [Scenario("Publish delivers to all subscribers in registration order")] + [Fact] + public async Task Basic_Subscribe_Publish() + { + await Given("a default observer and two subscribers", Build_Default) + .When("subscribing two 'all' handlers", c => + { + var log = c.Log; + c.Hub.Subscribe((in e) => log.Add($"h1:{e.Id}")); + c.Hub.Subscribe((in e) => log.Add($"h2:{e.Id}")); + return c; + }) + .When("publishing event #7", c => Publish(c, 7, "x")) + .Then("both handlers ran in order", c => string.Join(',', c.Log) == "h1:7,h2:7") + .AssertPassed(); + } + + [Scenario("Predicate filters route events to matching subscribers only")] + [Fact] + public async Task Predicate_Filtering() + { + await Given("observer with all + filtered subscribers", Build_Default) + .When("adding an 'all' handler and two filtered handlers", c => { Sub_All(c); Sub_Filtered(c); return c; }) + .When("publishing #1 and #2", c => { Publish(c, 1, "one"); Publish(c, 2, "two"); return c; }) + .Then("log shows all + matching filtered entries", c => + { + var s = string.Join('|', c.Log); + return s.Contains("all:1:one") && s.Contains("odd:1") && s.Contains("all:2:two") && s.Contains("even:2"); + }) + .AssertPassed(); + } + + [Scenario("Unsubscribing stops delivery")] + [Fact] + public async Task Unsubscribe_Works() + { + await Given("observer and a tracked subscription", Build_Default) + .When("adding an unsubscribable handler", c => + { + var log = c.Log; + var sub = c.Hub.Subscribe((in e) => log.Add($"once:{e.Id}")); + // publish once + Publish(c, 10, "a"); + sub.Dispose(); + return c; + }) + .When("publishing again after dispose", c => Publish(c, 11, "b")) + .Then("only the first publish is logged", c => string.Join(',', c.Log) == "once:10") + .AssertPassed(); + } + + [Scenario("SwallowErrors: exceptions don't propagate; others continue")] + [Fact] + public async Task SwallowErrors_Allows_Continuation() + { + var sink = new List<(Exception, Evt)>(); + await Given("observer with swallow policy and sink", () => Build_With_Sink_Swallow(sink)) + .When("adding throwing + normal handlers", c => + { + c.Hub.Subscribe(static (in _) => throw new InvalidOperationException("boom")); + c.Hub.Subscribe((in e) => c.Log.Add($"ok:{e.Id}")); + return c; + }) + .When("publishing event", c => Publish(c, 1, "x")) + .Then("no exception escaped and normal ran", c => c.Log.SequenceEqual(["ok:1"])) + .And("sink captured the exception", _ => sink.Count == 1 && sink[0].Item1 is InvalidOperationException) + .AssertPassed(); + } + + [Scenario("ThrowFirstError: first exception is rethrown; later handlers don't run")] + [Fact] + public async Task ThrowFirst_Stops_And_Propagates() + { + var sink = new List<(Exception, Evt)>(); + await Given("observer with throw-first policy", () => Build_With_Sink_ThrowFirst(sink)) + .When("adding throwing then normal", c => + { + c.Hub.Subscribe(static (in _) => throw new ApplicationException("x")); + c.Hub.Subscribe((in e) => c.Log.Add($"not:{e.Id}")); + return c; + }) + .When("publishing and catching", c => + { + var ev = new Evt(5, "x"); + try { c.Hub.Publish(in ev); } + catch (ApplicationException) { c.Log.Add("threw"); } + return c; + }) + .Then("later handler did not run", c => !c.Log.Contains("not:5")) + .And("sink recorded the exception", _ => sink.Count == 1 && sink[0].Item1 is ApplicationException) + .AssertPassed(); + } + + [Scenario("ThrowAggregate: collect multiple exceptions and throw one AggregateException")] + [Fact] + public async Task ThrowAggregate_Collects_All() + { + var sink = new List<(Exception, Evt)>(); + await Given("observer with aggregate policy", () => Build_With_Sink_Aggregate(sink)) + .When("adding two throwing and one normal", c => + { + c.Hub.Subscribe(static (in _) => throw new InvalidOperationException("a")); + c.Hub.Subscribe((in e) => c.Log.Add($"ok:{e.Id}")); + c.Hub.Subscribe(static (in _) => throw new ArgumentException("b")); + return c; + }) + .When("publishing and catching aggregate", c => + { + var ev = new Evt(9, "n"); + try { c.Hub.Publish(in ev); } + catch (AggregateException ae) + { + c.Log.Add($"agg:{ae.InnerExceptions.Count}"); + } + return c; + }) + .Then("normal handler ran despite throws", c => c.Log.Contains("ok:9")) + .And("aggregate captured both exceptions", c => c.Log.Contains("agg:2")) + .And("sink saw two exceptions", _ => sink.Count == 2) + .AssertPassed(); + } +}