Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions docs/examples/observer-demo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# In-Process Event Hub with Observer<TEvent>

This demo shows how to build a minimal, typed event hub using PatternKit’s Observer.

What it demonstrates
- Typed publish/subscribe: `Observer<TEvent>`
- Predicate-based subscriptions (per-subscriber filters)
- Error policies (aggregate, throw-first, swallow) + error sink
- Unsubscribe via `IDisposable`

Where to look
- Code: `src/PatternKit.Examples/ObserverDemo/`
- `SimpleEventHub.cs`: small wrapper and a sample `UserEvent` record
- Tests: `test/PatternKit.Tests/Behavioral/Observer/ObserverTests.cs`

Quick start
```csharp
using PatternKit.Behavioral.Observer;

// Build an observer hub for string events
var hub = Observer<string>.Create()
.OnError(static (ex, in msg) => Console.Error.WriteLine($"handler failed: {ex.Message} on '{msg}'"))
.ThrowAggregate() // default policy
.Build();

// Subscribe all messages
var subAll = hub.Subscribe(static (in string msg) => Console.WriteLine($"ALL: {msg}"));

// Subscribe only warnings
var subWarn = hub.Subscribe(static (in string msg) => msg.StartsWith("warn:", StringComparison.Ordinal),
static (in string msg) => Console.WriteLine($"WARN: {msg}"));

hub.Publish("hello");
hub.Publish("warn: low-disk");

subWarn.Dispose(); // unsubscribe
```

Error handling policies
- ThrowAggregate (default): run everyone; collect exceptions and throw a single `AggregateException` at the end.
- ThrowFirstError: stop at the first exception (later handlers don’t run).
- SwallowErrors: never throw; failures go only to the error sink.

Example: swallow errors so publishing never throws
```csharp
var hub = Observer<int>.Create()
.OnError(static (ex, in n) => Console.Error.WriteLine($"fail({n}): {ex.Message}"))
.SwallowErrors()
.Build();

hub.Subscribe(static (in int _) => throw new InvalidOperationException("boom"));
hub.Subscribe(static (in int n) => Console.WriteLine($"ok:{n}"));

hub.Publish(42); // prints: ok:42, no exception to caller
```

Notes
- Subscriptions are copy-on-write; publishing iterates a snapshot without locks.
- Registration order is preserved.
- Subscribing/unsubscribing during a publish affects the next publish, not the current one.
- Delegates use `in T` for zero-copy struct pass-through.

Run the tests
```bash
# From the repo root
dotnet build PatternKit.slnx -c Debug
dotnet test PatternKit.slnx -c Debug
```

49 changes: 49 additions & 0 deletions docs/examples/reactive-transaction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Reactive Transaction with Observer

This example builds a non-trivial transaction model that computes totals dynamically from line items, loyalty tier, payment kind, and tax rate.
It uses tiny reactive primitives powered by PatternKit’s `Observer<T>` so everything updates automatically.

What it demonstrates
- Reactive inputs: line items, loyalty tier, payment kind, tax rate
- Computed outputs: subtotal, line-item discounts, loyalty and payment discounts, tax, total
- UI-like dependents: `CanCheckout`, `DiscountBadge`
- Change broadcasting via both typed subscriptions and simple “property name” notifications

Code
```csharp
using PatternKit.Examples.ObserverDemo;

var tx = new ReactiveTransaction();

// Listen for total changes
using var _ = tx.Total.Subscribe((old, @new) => Console.WriteLine($"Total: {old:C} -> {@new:C}"));

// Build cart
tx.AddItem(new LineItem("WIDGET", 2, 19.99m));
tx.AddItem(new LineItem("GADGET", 1, 49.99m, DiscountPct: 0.10m)); // 10% line discount

// Promotions
tx.SetTier(LoyaltyTier.Gold); // ~7%
tx.SetPayment(PaymentKind.StoreCard); // extra 5% after loyalty

// Tax
tx.SetTaxRate(0.07m);

Console.WriteLine($"Subtotal: {tx.Subtotal.Value:C}");
Console.WriteLine($"Discounts: line={tx.LineItemDiscounts.Value:C}, loyalty={tx.LoyaltyDiscount.Value:C}, pay={tx.PaymentDiscount.Value:C}");
Console.WriteLine($"Tax: {tx.Tax.Value:C}");
Console.WriteLine($"Total: {tx.Total.Value:C}");
Console.WriteLine($"CanCheckout: {tx.CanCheckout.Value}");
Console.WriteLine(tx.DiscountBadge.Value ?? "No discounts");
```

How it works
- `ObservableList<T>` and `ObservableVar<T>` publish change events.
- `ReactiveTransaction` recomputes whenever any input changes and pushes new values to reactive outputs.
- Consumers subscribe specifically (e.g., `Total`) or to property-name updates via a small `PropertyChangedHub`.

Where to look
- Source: `src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs`
- Reactive primitives: `src/PatternKit.Examples/ObserverDemo/ReactivePrimitives.cs`
- Tests: `test/PatternKit.Examples.Tests/ObserverDemo/ReactiveTransactionTests.cs`

36 changes: 36 additions & 0 deletions docs/examples/reactive-viewmodel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Reactive ViewModel with Observer

This example shows how to build dependent properties and control enablement without `INotifyPropertyChanged`, using PatternKit’s Observer-based primitives.

What it demonstrates
- Dependent properties: `FullName` derived from `FirstName` + `LastName`
- Control enablement: `CanSave` toggles based on input validity
- Lightweight property-changed hub for UI bindings

Code
```csharp
using PatternKit.Examples.ObserverDemo;

var vm = new ProfileViewModel();

// Subscribe to name changes
var unsub = vm.PropertyChanged.Subscribe(name => Console.WriteLine($"Changed: {name}"));

vm.FirstName.Value = "Ada";
vm.LastName.Value = "Lovelace";

Console.WriteLine(vm.FullName.Value); // "Ada Lovelace"
Console.WriteLine(vm.CanSave.Value); // true

unsub.Dispose();
```

How it works
- `ObservableVar<T>` emits a `(old,new)` event on changes.
- The viewmodel subscribes to `FirstName` and `LastName`, recomputes `FullName` and `CanSave`, and raises name-based notifications via a small `PropertyChangedHub`.
- No reflection, no `INotifyPropertyChanged` interface required.

Where to look
- Source: `src/PatternKit.Examples/ObserverDemo/ReactiveTransaction.cs` (contains `ProfileViewModel`)
- Tests: `test/PatternKit.Examples.Tests/ObserverDemo/ReactiveViewModelTests.cs`

9 changes: 9 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,12 @@

- name: Text Editor History (Memento)
href: text-editor-memento.md

- name: Observer — In-Process Event Hub
href: observer-demo.md

- name: Reactive ViewModel (Dependent Properties + Enablement)
href: reactive-viewmodel.md

- name: Reactive Transaction (Dynamic Discounts, Tax, Total)
href: reactive-transaction.md
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ PatternKit will grow to cover **Creational**, **Structural**, and **Behavioral**
| -------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Creational** | [Factory](patterns/creational/factory/factory.md) ✓ • [Composer](patterns/creational/builder/composer.md) ✓ • [ChainBuilder](patterns/creational/builder/chainbuilder.md) ✓ • [BranchBuilder](patterns/creational/builder/chainbuilder.md) ✓ • [MutableBuilder](patterns/creational/builder/mutablebuilder.md) ✓ • [Prototype](patterns/creational/prototype/prototype.md) ✓ • [Singleton](patterns/creational/singleton/singleton.md) ✓ |
| **Structural** | [Adapter](patterns/structural/adapter/fluent-adapter.md) ✓ • [Bridge](patterns/structural/bridge/bridge.md) ✓ • [Composite](patterns/structural/composite/composite.md) ✓ • [Decorator](patterns/structural/decorator/index.md) ✓ • [Facade](patterns/structural/facade/facade.md) ✓ • [Flyweight](patterns/structural/flyweight/index.md) ✓ • [Proxy](patterns/structural/proxy/index.md) ✓ |
| **Behavioral** | [Strategy](patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](patterns/behavioral/chain/resultchain.md) ✓ • [Command](patterns/behavioral/command/command.md) ✓ • [ReplayableSequence](patterns/behavioral/iterator/replayablesequence.md) ✓ • [WindowSequence](patterns/behavioral/iterator/windowsequence.md) ✓ • [Mediator](patterns/behavioral/mediator/mediator.md) ✓ • [Memento](patterns/behavioral/memento/memento.md) ✓ • Observer (planned) • State (planned) • Template Method (planned) • Visitor (planned) |
| **Behavioral** | [Strategy](patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](patterns/behavioral/chain/resultchain.md) ✓ • [Command](patterns/behavioral/command/command.md) ✓ • [ReplayableSequence](patterns/behavioral/iterator/replayablesequence.md) ✓ • [WindowSequence](patterns/behavioral/iterator/windowsequence.md) ✓ • [Mediator](patterns/behavioral/mediator/mediator.md) ✓ • [Memento](patterns/behavioral/memento/memento.md) ✓ • [Observer](patterns/behavioral/observer/observer.md) ✓ • [AsyncObserver](patterns/behavioral/observer/asyncobserver.md) ✓ • State (planned) • Template Method (planned) • Visitor (planned) |

Each pattern will ship with:

Expand Down
105 changes: 105 additions & 0 deletions docs/patterns/behavioral/observer/asyncobserver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# AsyncObserver<TEvent>

An asynchronous, thread-safe event hub for broadcasting events of type `TEvent` to multiple subscribers. Handlers and predicates are `ValueTask`-based, allowing you to await I/O without blocking.

Use it when your observers perform async work (I/O, timers, pipelines) and you want a clean, fluent API matching the synchronous `Observer<TEvent>`.

---

## What it is

- Typed: `AsyncObserver<TEvent>` delivers strongly-typed events.
- Async-first: `Predicate` and `Handler` return `ValueTask`.
- Cancellation: `PublishAsync` accepts a `CancellationToken`.
- Lock-free, copy-on-write subscriptions: publish iterates a snapshot without locks.
- Predicate filters: decide per-subscriber whether to run the handler.
- Immutable & thread-safe after `Build()`.

---

## TL;DR example

```csharp
using PatternKit.Behavioral.Observer;

var hub = AsyncObserver<string>.Create()
.OnError(static (ex, in msg) => { Console.Error.WriteLine(ex.Message); return ValueTask.CompletedTask; })
.ThrowAggregate() // default
.Build();

hub.Subscribe(async (in string msg) => { await Task.Delay(1); Console.WriteLine($"ALL:{msg}"); });

hub.Subscribe(
predicate: static (in string msg) => new ValueTask<bool>(msg.StartsWith("warn:", StringComparison.Ordinal)),
handler: async (in string msg) => { await Task.Yield(); Console.WriteLine($"WARN:{msg}"); });

await hub.PublishAsync("hello");
await hub.PublishAsync("warn: disk");
```

---

## API shape

```csharp
var hub = AsyncObserver<TEvent>.Create()
.OnError(static (Exception ex, in TEvent evt) => /* log */ ValueTask.CompletedTask) // optional
.ThrowAggregate() // default
// .ThrowFirstError()
// .SwallowErrors()
.Build();

IDisposable s1 = hub.Subscribe(static (in TEvent evt) => /* ValueTask handler */);
IDisposable s2 = hub.Subscribe(static (in TEvent evt) => /* ValueTask<bool> filter */, static (in TEvent evt) => /* ValueTask handler */);

await hub.PublishAsync(evt, cancellationToken);
```

- `PublishAsync(TEvent evt, CancellationToken ct = default)` drives the async flow; it does not take `in` because async methods cannot have `in/ref/out` parameters.
- Delegates still use `in TEvent` for zero-copy of large structs.

---

## Error handling policies

Same as `Observer<TEvent>`:

- ThrowAggregate (default): run all matching handlers; collect exceptions and throw a single `AggregateException` at the end. Error sink is awaited for each failure.
- ThrowFirstError: throw immediately on the first failing handler; remaining handlers do not run.
- SwallowErrors: never throw from `PublishAsync`; failures go only to the error sink if configured.

Error sink forms:

- Async: `.OnError((ex, in evt) => ValueTask)`
- Sync adapter: `.OnError((ex, in evt) => void)` which is adapted to `ValueTask`.

---

## Interop with synchronous Observer

You can reuse synchronous delegates via adapter overloads:

```csharp
var asyncHub = AsyncObserver<int>.Create().Build();
asyncHub.Subscribe(static (in int x) => x > 0, static (in int x) => Console.WriteLine($"+:{x}"));
asyncHub.Subscribe(static (in int x) => Console.WriteLine(x));
```

These overloads wrap sync delegates in `ValueTask` delegates with zero allocations on the fast path.

---

## Notes

- Ordering: handlers run in registration order.
- Reentrancy: subscribing/unsubscribing during `PublishAsync` affects subsequent publishes.
- Cancellation: `PublishAsync` checks the token between subscribers; a cancellation stops the loop with `OperationCanceledException`.
- Performance: copy-on-write subscriptions keep publish contention-free; avoid heavy per-event allocations in handlers.

---

## See also

- [Observer](./observer.md) for the synchronous variant
- [ActionChain](../chain/actionchain.md) and [Mediator](../mediator/mediator.md) for other orchestration styles

103 changes: 103 additions & 0 deletions docs/patterns/behavioral/observer/observer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Observer<TEvent>

A thread-safe, fluent event hub for broadcasting events of type `TEvent` to multiple subscribers. Observers can opt-in via an optional predicate filter.

Use it when you need a decoupled, in-process publish/subscribe mechanism: UI events, domain notifications, pipeline hooks, or instrumentation.

---

## What it is

- Typed: `Observer<TEvent>` delivers strongly-typed events.
- Fluent builder: configure error handling, then `Build()`.
- Lock-free copy-on-write subscriptions: publish reads a snapshot array; subscribe/unsubscribe are atomic.
- Predicate filters: deliver to a handler only when a condition matches.
- Immutable & thread-safe after `Build()`.

---

## TL;DR example

```csharp
using PatternKit.Behavioral.Observer;

var hub = Observer<string>.Create()
.OnError(static (ex, in msg) => Console.Error.WriteLine($"handler failed: {ex.Message} for '{msg}'"))
.ThrowAggregate() // default; collect all exceptions and throw one AggregateException
.Build();

// subscribe everyone
var subAll = hub.Subscribe(static (in string msg) => Console.WriteLine($"ALL: {msg}"));

// subscribe with a filter
var subWarn = hub.Subscribe(static (in string msg) => msg.StartsWith("warn:", StringComparison.Ordinal),
static (in string msg) => Console.WriteLine($"WARN: {msg}"));

hub.Publish("hello");
hub.Publish("warn: low-disk");

subWarn.Dispose(); // unsubscribe
```

---

## API shape

```csharp
var hub = Observer<TEvent>.Create()
.OnError(static (Exception ex, in TEvent evt) => /* log */) // optional
.ThrowAggregate() // default
// .ThrowFirstError() // stop at first exception
// .SwallowErrors() // never throw to caller
.Build();

IDisposable s1 = hub.Subscribe(static (in TEvent evt) => /* handler */);
IDisposable s2 = hub.Subscribe(static (in TEvent evt) => /* bool filter */, static (in TEvent evt) => /* handler */);

hub.Publish(in evt);

s1.Dispose(); // unsubscribe
```

Delegates use `in TEvent` to avoid copying large structs. Subscriptions return `IDisposable`; disposing is idempotent.

---

## Error handling policies

Choose via the builder (defaults to `ThrowAggregate`):

- ThrowAggregate: run all matching handlers; collect exceptions and throw a single `AggregateException` at the end. The error sink still receives each failure.
- ThrowFirstError: throw immediately on the first failing handler; remaining handlers do not run.
- SwallowErrors: never throw from `Publish`; all exceptions are sent to the error sink if provided.

You can configure an error sink with `.OnError((ex, in evt) => ...)`. The sink should not throw; if it does, it is swallowed to protect the publisher.

---

## Design notes

- Copy-on-write subscriptions: publishing is contention-free; subscribe/unsubscribe perform an atomic array swap.
- Ordering: handlers are invoked in registration order.
- Filters: evaluated per-subscriber; if `false`, the handler is skipped.
- Reentrancy: handlers may subscribe/unsubscribe during `Publish`; such changes affect subsequent publishes (not the current one).
- Thread-safety: built instances are safe to share across threads; builder is not thread-safe.

---

## When to use

- Decouple producers and consumers inside a process.
- Many readers, few writers: telemetry, UI, domain events.
- Need simple filtering at the subscriber side.

If you need cross-process or durable messaging, integrate with a message bus (RabbitMQ, Azure Service Bus) and consider using Observer as an in-process fan-out.

---

## See also

- [Mediator](../mediator/mediator.md): coordinate interactions via a central mediator.
- [ActionChain](../chain/actionchain.md): middleware pipeline for conditional processing.
- Examples: [Observer demo](../../../examples/observer-demo.md)

Loading
Loading