diff --git a/README.md b/README.md index 8188c0c..2235635 100644 --- a/README.md +++ b/README.md @@ -62,17 +62,266 @@ if (parse.Execute("123", out var n)) Console.WriteLine(n); // 123 ``` +A forkable, lookahead **ReplayableSequence** (Iterator+): + +```csharp +using PatternKit.Behavioral.Iterator; + +var seq = ReplayableSequence.From(Enumerable.Range(1, 5)); +var c = seq.GetCursor(); + +// Look ahead without consuming +var first = c.Lookahead(0).OrDefault(); // 1 +var third = c.Lookahead(2).OrDefault(); // 3 + +// Consume immutably (returns next cursor) +if (c.TryNext(out var v1, out var c2) && c2.TryNext(out var v2, out var c3)) +{ + // v1 = 1, v2 = 2; c3 now points at 3 +} + +// Fork speculative branch +var fork = c3.Fork(); +if (fork.TryNext(out var v3, out fork) && fork.TryNext(out var v4, out fork)) +{ + // fork saw 3,4 while c3 is still at 3 +} + +// LINQ over a cursor (non-destructive to original cursor) +var evens = c3.Where(x => x % 2 == 0).ToList(); // [2,4] using a snapshot enumeration +``` + +### WindowSequence (sliding / striding windows) +```csharp +using PatternKit.Behavioral.Iterator; + +// Full sliding windows (size 3, stride 1) +var slides = Enumerable.Range(1,7) + .Windows(size:3) + .Select(w => string.Join(',', w.ToArray())) + .ToList(); // ["1,2,3","2,3,4","3,4,5","4,5,6","5,6,7"] + +// Stride 2 (skip one between window starts) +var stride = Enumerable.Range(1,9) + .Windows(size:4, stride:2) + .Select(w => string.Join('-', w.ToArray())) + .ToList(); // ["1-2-3-4","3-4-5-6","5-6-7-8"] + +// Include trailing partial +var partial = new[]{1,2,3,4,5} + .Windows(size:3, stride:3, includePartial:true) + .Select(w => (Vals: w.ToArray(), w.IsPartial)) + .ToList(); // [ ([1,2,3], false), ([4,5], true) ] + +// Reuse buffer (zero alloc per full window) – copy if you persist +var reused = Enumerable.Range(1,6) + .Windows(size:3, reuseBuffer:true) + .Select(w => w.ToArray()) // snapshot copy each window + .ToList(); +``` + +--- + +## 📘 Pattern Quick Reference +Tiny, copy‑paste friendly snippets for the most common patterns. Each builds an immutable, hot‑path friendly artifact. + +### ActionChain (middleware style rule pack) +```csharp +using PatternKit.Behavioral.Chain; + +var log = new List(); +var chain = ActionChain.Create() + .When((in r) => r.Path.StartsWith("/admin") && !r.Headers.ContainsKey("Authorization")) + .ThenStop(r => log.Add("deny")) + .When((in r) => r.Headers.ContainsKey("X-Request-Id")) + .ThenContinue(r => log.Add($"req={r.Headers["X-Request-Id"]}")) + .Finally((in r, next) => { log.Add($"{r.Method} {r.Path}"); next(in r); }) + .Build(); + +chain.Execute(new("GET","/health", new Dictionary())); +``` + +### ResultChain (first-match value producer with fallback) +```csharp +using PatternKit.Behavioral.Chain; + +public readonly record struct Request(string Method, string Path); +public readonly record struct Response(int Status, string Body); + +var router = ResultChain.Create() + .When(static (in r) => r.Method == "GET" && r.Path == "/health") + .Then(r => new Response(200, "OK")) + .When(static (in r) => r.Method == "GET" && r.Path.StartsWith("/users/")) + .Then(r => new Response(200, $"user:{r.Path[7..]}")) + // default / not found tail (only runs if no earlier handler produced) + .Finally(static (in _, out Response? res, _) => { res = new Response(404, "not found"); return true; }) + .Build(); + +router.Execute(new Request("GET", "/health"), out var ok); // ok.Status = 200 +router.Execute(new Request("GET", "/users/alice"), out var u); // 200, Body = user:alice +router.Execute(new Request("GET", "/missing"), out var nf); // 404, Body = not found +``` + +### Strategy (first matching branch) +```csharp +using PatternKit.Behavioral.Strategy; +var classify = Strategy.Create() + .When(i => i > 0).Then(_ => "positive") + .When(i => i < 0).Then(_ => "negative") + .Default(_ => "zero") + .Build(); +var result = classify.Execute(-5); // negative +``` + +### TryStrategy (first success wins parsing) +```csharp +var parse = TryStrategy.Create() + .Always((in string s, out int v) => int.TryParse(s, out v)) + .Finally((in string _, out int v) => { v = 0; return true; }) + .Build(); +parse.Execute("42", out var n); // n=42 +``` + +### ActionStrategy (multi-fire side‑effects) +```csharp +using PatternKit.Behavioral.Strategy; +var audit = new List(); +var strat = ActionStrategy.Create() + .When(i => i % 2 == 0).Then(i => audit.Add($"even:{i}")) + .When(i => i > 10).Then(i => audit.Add($"big:{i}")) + .Build(); +strat.Execute(12); // adds even:12, big:12 +``` + +### AsyncStrategy (await external work) +```csharp +var asyncStrat = AsyncStrategy.Create() + .When(s => s.StartsWith("http")) + .Then(async s => await Task.FromResult("url")) + .Default(async s => await Task.FromResult("text")) + .Build(); +var kind = await asyncStrat.Execute("http://localhost"); +``` + +### BranchBuilder (first-match router) +```csharp +using PatternKit.Creational.Builder; + +// Define delegate shapes (predicates take `in` param for struct-friendliness) +public delegate bool IntPred(in int x); +public delegate string IntHandler(in int x); + +var router = BranchBuilder.Create() + .Add(static (in int v) => v < 0, static (in int v) => "neg") + .Add(static (in int v) => v > 0, static (in int v) => "pos") + .Default(static (in int _) => "zero") + .Build( + fallbackDefault: static (in int _) => "zero", + projector: static (preds, handlers, hasDefault, def) => + { + // Project into a single dispatch function + return (Func)(x => + { + for (int i = 0; i < preds.Length; i++) + if (preds[i](in x)) + return handlers[i](in x); + return def(in x); + }); + }); + +var a = router(-5); // "neg" +var b = router(10); // "pos" +var c = router(0); // "zero" +``` + +### ChainBuilder (collect -> project) +```csharp +using PatternKit.Creational.Builder; + +var log = new List(); +var pipeline = ChainBuilder>.Create() + .Add(static s => log.Add($"A:{s}")) + .AddIf(true, static s => log.Add($"B:{s}")) + .Add(static s => log.Add($"C:{s}")) + .Build(actions => (Action)(msg => + { + foreach (var a in actions) a(msg); + })); + +pipeline("run"); +// log => ["A:run", "B:run", "C:run"] +``` + +### Composer (functional state accumulation) +```csharp +using PatternKit.Creational.Builder; + +public readonly record struct PersonState(string? Name, int Age); +public sealed record Person(string Name, int Age); + +var person = Composer + .New(static () => default) + .With(static s => s with { Name = "Ada" }) + .With(static s => s with { Age = 30 }) + .Require(static s => string.IsNullOrWhiteSpace(s.Name) ? "Name required" : null) + .Build(static s => new Person(s.Name!, s.Age)); +``` + +### MutableBuilder (imperative mutations + validation) +```csharp +using PatternKit.Creational.Builder; + +public sealed class Options +{ + public string? Host { get; set; } + public int Port { get; set; } +} + +var opts = MutableBuilder.New(static () => new Options()) + .With(static o => o.Host = "localhost") + .With(static o => o.Port = 8080) + .RequireNotEmpty(static o => o.Host, nameof(Options.Host)) + .RequireRange(static o => o.Port, 1, 65535, nameof(Options.Port)) + .Build(); +``` + +### Prototype (clone + mutate) +```csharp +using PatternKit.Creational.Prototype; + +public sealed class User { public string Role { get; set; } = "user"; public bool Active { get; set; } = true; } + +// Single prototype +var proto = Prototype.Create( + source: new User { Role = "user", Active = true }, + cloner: static (in User u) => new User { Role = u.Role, Active = u.Active }) + .With(static u => u.Active = false) // default mutation applied to every clone + .Build(); + +var admin = proto.Create(u => u.Role = "admin"); // clone + extra mutation + +// Registry of prototypes +var registry = Prototype.Create() + .Map("basic", new User { Role = "user", Active = true }, static (in User u) => new User { Role = u.Role, Active = u.Active }) + .Map("admin", new User { Role = "admin", Active = true }, static (in User u) => new User { Role = u.Role, Active = u.Active }) + .Mutate("admin", static u => u.Active = true) // append mutation to admin family + .Default(new User { Role = "guest", Active = false }, static (in User u) => new User { Role = u.Role, Active = u.Active }) + .Build(); + +var guest = registry.Create("missing-key"); // falls back to default (guest) +``` + --- ## 📦 Patterns (Planned & In Progress) PatternKit will grow to cover **Creational**, **Structural**, and **Behavioral** patterns with fluent, discoverable APIs: -| Category | Patterns ✓ = implemented | -| -------------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Creational** | [Factory](docs/patterns/creational/factory/factory.md) ✓ • [Composer](docs/patterns/creational/builder/composer.md) ✓ • [ChainBuilder](docs/patterns/creational/builder/chainbuilder.md) ✓ • [BranchBuilder](docs/patterns/creational/builder/chainbuilder.md) ✓ • [MutableBuilder](docs/patterns/creational/builder/mutablebuilder.md) ✓ • [Prototype](docs/patterns/creational/prototype/prototype.md) ✓ • [Singleton](docs/patterns/creational/singleton/singleton.md) ✓ | -| **Structural** | [Adapter](docs/patterns/structural/adapter/fluent-adapter.md) ✓ • [Bridge](docs/patterns/structural/bridge/bridge.md) ✓ • [Composite](docs/patterns/structural/composite/composite.md) ✓ • Decorator (planned) • Facade (planned) • Flyweight (planned) • Proxy (planned) | -| **Behavioral** | [Strategy](docs/patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](docs/patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](docs/patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](docs/patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](docs/patterns/behavioral/chain/resultchain.md) ✓ • [Command](docs/patterns/behavioral/command/command.md) ✓ • Iterator (planned) • [Mediator](docs/behavioral/mediator/mediator.md) ✓ • Memento (planned) • Observer (planned) • State (planned) • Template Method (planned) • Visitor (planned) | +| Category | Patterns ✓ = implemented | +| -------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Creational** | [Factory](docs/patterns/creational/factory/factory.md) ✓ • [Composer](docs/patterns/creational/builder/composer.md) ✓ • [ChainBuilder](docs/patterns/creational/builder/chainbuilder.md) ✓ • [BranchBuilder](docs/patterns/creational/builder/chainbuilder.md) ✓ • [MutableBuilder](docs/patterns/creational/builder/mutablebuilder.md) ✓ • [Prototype](docs/patterns/creational/prototype/prototype.md) ✓ • [Singleton](docs/patterns/creational/singleton/singleton.md) ✓ | +| **Structural** | [Adapter](docs/patterns/structural/adapter/fluent-adapter.md) ✓ • [Bridge](docs/patterns/structural/bridge/bridge.md) ✓ • [Composite](docs/patterns/structural/composite/composite.md) ✓ • Decorator (planned) • Facade (planned) • Flyweight (planned) • Proxy (planned) | +| **Behavioral** | [Strategy](docs/patterns/behavioral/strategy/strategy.md) ✓ • [TryStrategy](docs/patterns/behavioral/strategy/trystrategy.md) ✓ • [ActionStrategy](docs/patterns/behavioral/strategy/actionstrategy.md) ✓ • [ActionChain](docs/patterns/behavioral/chain/actionchain.md) ✓ • [ResultChain](docs/patterns/behavioral/chain/resultchain.md) ✓ • [ReplayableSequence](docs/patterns/behavioral/iterator/replayablesequence.md) ✓ • [WindowSequence](docs/patterns/behavioral/iterator/windowsequence.md) ✓ • Command (planned) • Mediator (planned) • Memento (planned) • Observer (planned) • State (planned) • Template Method (planned) • Visitor (planned) | Each pattern will ship with: diff --git a/docs/patterns/behavioral/iterator/asyncflow.md b/docs/patterns/behavioral/iterator/asyncflow.md new file mode 100644 index 0000000..f04f0a0 --- /dev/null +++ b/docs/patterns/behavioral/iterator/asyncflow.md @@ -0,0 +1,158 @@ +# Behavioral.Iterator.AsyncFlow + +`AsyncFlow` is the asynchronous counterpart to [`Flow`](./flow.md): a **lazy, pull-based functional pipeline** over +`IAsyncEnumerable` with replay & branching via `Share()`. It lets you compose async transformations without manual +plumbing, and then optionally create multiple consumers (forks) that each read the same buffered stream exactly once. + +--- +## When to choose AsyncFlow +| Use | Instead of | Why | +|-----|------------|-----| +| Composing async pure transformations | Ad-hoc `await foreach` chains | Fluent, reusable, testable pipelines | +| Multiple independent async readers | Re-enumerating a cold async source | `Share()` buffers once (single upstream pass) | +| Partitioning an async stream | Building two separate filters (double work) | `Branch()` partitions during a single replay | + +If you need synchronous cursors / lookahead: use [`ReplayableSequence`](./replayablesequence.md). If you need sliding +windows: see [`WindowSequence`](./windowsequence.md). For synchronous functional chains: [`Flow`](./flow.md). + +--- +## TL;DR +```csharp +using PatternKit.Behavioral.Iterator; + +var numbers = AsyncFlow.From(ProduceAsync()); + +var processed = numbers + .Map(x => x * 3) + .Filter(x => x % 2 == 0) + .FlatMap(x => RepeatAsync(x, count: 2)) + .Tee(x => Console.WriteLine($"debug:{x}")); + +await foreach (var v in processed) + /* use v */; + +// Share for multi-consumer fan-out (single upstream pass) +var shared = processed.Share(); +var doubled = shared.Fork().Map(x => x * 2); // independent consumer +var evens = shared.Fork().Filter(x => x % 2 == 0); + +var sum = await doubled.FoldAsync(0, (a, v) => a + v); +var firstEven = await evens.FirstOptionAsync(); +``` + +--- +## Core API +```csharp +public sealed class AsyncFlow : IAsyncEnumerable +{ + AsyncFlow Map(Func f); + AsyncFlow Filter(Func pred); + AsyncFlow FlatMap(Func> f); + AsyncFlow Tee(Action sideEffect); + SharedAsyncFlow Share(); +} + +public sealed class SharedAsyncFlow +{ + AsyncFlow Fork(); + (AsyncFlow True, AsyncFlow False) Branch(Func predicate); + // (Map / Filter still available via Fork() chaining) +} + +public static class AsyncFlowExtensions +{ + ValueTask FoldAsync(this AsyncFlow flow, TAcc seed, Func folder, CancellationToken = default); + ValueTask> FirstOptionAsync(this AsyncFlow flow, CancellationToken = default); +} +``` + +### Differences from `Flow` +| Aspect | Flow | AsyncFlow | +|--------|------|-----------| +| Source | `IEnumerable` | `IAsyncEnumerable` | +| FlatMap | `Func>` | `Func>` | +| Replay Engine | `ReplayableSequence` | Custom `AsyncReplayBuffer` | +| Terminal ops | LINQ or Fold/FirstOption | `FoldAsync` / `FirstOptionAsync` | +| Thread-safety | Lock around buffer fill | Lock + waiter coordination (TCS) | + +--- +## Replay & Forking Semantics +Calling `Share()` wraps the upstream async iterator in an *async replay buffer*: +* First fork enumerates and buffers as needed. +* Subsequent forks read already-buffered items up to the furthest consumer position. +* The upstream `MoveNextAsync()` is called **at most once per element** across all forks. + +This enables fan-out patterns without re-triggering expensive I/O or side effects. + +--- +## Branching +```csharp +var shared = AsyncFlow.From(RangeAsync(1, 10)).Share(); +var (evenFlow, oddFlow) = shared.Branch(i => i % 2 == 0); +var evenSum = await evenFlow.FoldAsync(0, (a,v) => a + v); // 2+4+6+8+10 +var firstOdd = await oddFlow.FirstOptionAsync(); // Some(1) +``` +`Branch` performs a *single* pass: partition logic runs while elements are replayed, not by re-enumerating. + +--- +## Cancellation & Backpressure +AsyncFlow respects cancellation tokens in all internal loops (`WithCancellation`). There is *no built-in backpressure* +mechanism—if downstream is slower than upstream, items accumulate in the replay buffer until consumed. + +For high-volume streams consider customizing the replay buffer with a bounded policy or spill strategy. + +--- +## Error Propagation +If upstream throws: +* The replay buffer captures the exception. +* All current and future forks observe the same exception at the appropriate index. +* No partial / inconsistent state is exposed (buffer is append-only). + +--- +## Example: Time-Stamped Processing +```csharp +var processed = AsyncFlow.From(RangeAsync(1, 5)) + .Map(x => (Value: x, Stamp: DateTimeOffset.UtcNow)) + .Filter(x => x.Value != 3) + .FlatMap(tuple => EmitWithDelayAsync(tuple, TimeSpan.FromMilliseconds(25))) + .Share(); + +var (high, low) = processed.Branch(t => t.Value >= 4); +var highCount = await high.FoldAsync(0, (a, _) => a + 1); // 2 +``` + +--- +## Performance Notes +| Factor | Notes | +|--------|-------| +| Memory | Buffer grows with produced elements until completion (no eviction). | +| Locking | Single lock around mutation + waiter coordination; fine for moderate concurrency. | +| Allocation | Delegates + per awaited item; no per-item Task creation (yield state machine only). | +| Fairness | No scheduling policy; forks progress as they pull. | + +--- +## Limitations / Future Ideas +* No async-aware `MapAsync(Func>)` operator yet (can wrap in FlatMap). +* No built-in throttling or rate limiting; pair with external flow control. +* Potential enhancement: Bounded / windowed replay buffer. + +--- +## Gotchas +| Gotcha | Explanation | Mitigation | +|--------|-------------|------------| +| Large unbounded streams | Full retention in buffer | Introduce custom chunking or GC-friendly segmentation | +| Very slow consumer | Memory grows until consumer catches up | Split pipeline earlier or add batching | +| Side effects in Tee | Will not repeat for forks (they happen once upstream) | Place side-effects before `Share()` if you want single execution | + +--- +## Interop & Mixing +* Convert synchronous pipelines: `AsyncFlow.From(synchronousSequence.ToAsync())` (write a small extension). +* Feed into `Flow` by materializing (`await flow.ToListAsync()`) then `Flow.From(list)`. +* Combine with `ReplayableSequence.AsAsyncEnumerable()` for uniform async surface (when targeting modern TFMs). + +--- +## See Also +* [Flow](./flow.md) – synchronous functional pipeline. +* [ReplayableSequence](./replayablesequence.md) – multi-pass / lookahead. +* [WindowSequence](./windowsequence.md) – sliding & striding windows. + diff --git a/docs/patterns/behavioral/iterator/flow.md b/docs/patterns/behavioral/iterator/flow.md new file mode 100644 index 0000000..a117668 --- /dev/null +++ b/docs/patterns/behavioral/iterator/flow.md @@ -0,0 +1,124 @@ +# Behavioral.Iterator.Flow + +`Flow` is a fluent, **pull-based functional pipeline** that chains transformations (Map / Filter / FlatMap / Tee) +and can be upgraded into a replayable, **forkable** / **branchable** stream via `Share()`. It complements +`ReplayableSequence` (cursor-based random access) and `WindowSequence` (batch framing) by focusing on *functional +composition* and *selective multi-consumer fan‑out*. + +Built to feel like a tiny synchronous analogue of an RxJS `pipe`—but intentionally minimal: +no subjects, scheduling, or async—just pure, lazy, allocation-light enumeration. + +--- +## TL;DR +```csharp +using PatternKit.Behavioral.Iterator; + +var flow = Flow.From(Enumerable.Range(1, 8)) + .Map(x => x * 2) // 2,4,6,8,10,12,14,16 + .Filter(x => x % 4 == 0) // 4,8,12,16 + .FlatMap(x => new[]{x, x})// 4,4,8,8,12,12,16,16 + .Tee(x => Console.WriteLine($"debug:{x}")); + +var list = flow.ToList(); +``` + +Fork & branch without re-enumerating upstream: +```csharp +var shared = Flow.From(Enumerable.Range(1, 6)).Share(); +var odds = shared.Branch(x => x % 2 == 0).False; // 1,3,5 +var evens = shared.Fork().Filter(x => x % 2 == 0); // 2,4,6 +``` + +--- +## Core Types +```csharp +public sealed class Flow : IEnumerable +{ + Flow Map(Func f); + Flow Filter(Func pred); + Flow FlatMap(Func> f); + Flow Tee(Action sideEffect); + SharedFlow Share(); // enable replay / forks / branching +} + +public sealed class SharedFlow +{ + Flow Fork(); // new independent reader + Flow[] Fork(int count); // multiple readers + (Flow True, Flow False) Branch(Func predicate); // partition + Flow Map(Func f); // convenience (delegates to Fork) + Flow Filter(Func pred); + Flow AsFlow(); +} + +public static class FlowExtensions +{ + TAcc Fold(this Flow flow, TAcc seed, Func folder); + Option FirstOption(this Flow flow); +} +``` + +--- +## Design Notes +| Concern | Approach | +|---------|----------| +| Laziness | All operators defer until enumeration. | +| Sharing | `Share()` converts pipeline to a `ReplayableSequence` so forks replay without re-running upstream. | +| Fork cost | Fork = cheap cursor snapshot + fresh `Flow` wrapper. | +| Branching | `Branch(predicate)` enumerates once and yields two filtered views. | +| Safety | Single-thread (no internal locking). | +| Allocation | Mostly delegates; shared mode buffers elements once. | + +--- +## Example: Two Independent Projections +```csharp +var shared = Flow.From(new[]{"alpha","beta","gamma"}).Share(); +var lengths = shared.Fork().Map(s => s.Length).ToList(); // [5,4,5] +var upper = shared.Fork().Map(s => s.ToUpperInvariant()).ToList(); +``` + +## Example: Partition & Aggregate +```csharp +var sf = Flow.From(Enumerable.Range(1,10)).Share(); +var (evenFlow, oddFlow) = sf.Branch(i => i % 2 == 0); +var evenSum = evenFlow.Fold(0, (a,x) => a + x); // 2+4+...+10 = 30 +var oddMax = oddFlow.Fold(0, Math.Max); // 9 +``` + +## Example: FirstOption & Early Exit +```csharp +var maybe = Flow.From(Enumerable.Range(50, 5)) + .Filter(x => x > 52) + .FirstOption(); // Some(53) +``` + +--- +## When to Use Which Iterator? +| Need | Use | +|------|-----| +| Multi-pass cursors & lookahead | `ReplayableSequence` | +| Sliding / striding batch windows | `WindowSequence` | +| Functional chain + forks / branches | `Flow` | + +They are complementary; `Flow.Share()` internally *uses* `ReplayableSequence` to ensure upstream is enumerated only once. + +--- +## Gotchas +| Gotcha | Mitigation | +|--------|-----------| +| Forgetting `Share()` before forking | Without `Share()`, re-enumerating the same `Flow` re-runs upstream. Call `Share()` if side-effects/expensive sources need single pass. | +| Mutation inside `Tee` | Keep side-effects idempotent / safe for possible replays (during debugging). | +| Large retained shared flows | Buffered elements remain in memory until GC; dispose references if done early. | + +--- +## Future Ideas +* AsyncFlow (Task/ValueTask aware operators) +* Parallelizing `FlatMap` merges (controlled degree of concurrency) +* Error handling / recovery operators (TryMap, Recover) + +--- +## See Also +* [ReplayableSequence](./replayablesequence.md) +* [WindowSequence](./windowsequence.md) +* Strategy / Chain patterns for higher-level branching or rule evaluation. + diff --git a/docs/patterns/behavioral/iterator/replayablesequence.md b/docs/patterns/behavioral/iterator/replayablesequence.md new file mode 100644 index 0000000..7d8233b --- /dev/null +++ b/docs/patterns/behavioral/iterator/replayablesequence.md @@ -0,0 +1,186 @@ +# Behavioral.Iterator.ReplayableSequence + +`ReplayableSequence` is a fluent, allocation-light helper that lets you treat any forward `IEnumerable` as a +multi-pass, lookahead, forkable stream – **without** pre-materializing everything into an array or repeatedly +re-enumerating the original source. + +It augments the classic Iterator pattern by giving you *struct cursors* that: + +* Advance independently (fork at any position) +* Support `Peek()` and arbitrary positive `Lookahead(offset)` +* Can be turned into (lazy) `IEnumerable` sequences at any time +* Cooperatively fill a shared on-demand buffer (each underlying element is pulled at most once) +* Interop naturally with LINQ (and add a couple of extra fluent helpers like `Batch`) + +--- +## TL;DR + +```csharp +using PatternKit.Behavioral.Iterator; + +var seq = ReplayableSequence.From(Enumerable.Range(1, 5)); +var c1 = seq.GetCursor(); + +// Read two values +c1.TryNext(out var a, out c1); // a = 1 +c1.TryNext(out var b, out c1); // b = 2 + +// Fork (branch) without consuming more of c1 +var c2 = c1.Fork(); + +// c2 can scan ahead independently +var la = c2.Lookahead(0).OrDefault(); // 3 +var lb = c2.Lookahead(1).OrDefault(); // 4 + +// Enumerate remaining from c2 (3,4,5) with LINQ +var evens = c2.Where(x => x % 2 == 0).ToList(); // [4] + +// c1 is still parked at element 3 +c1.TryNext(out var third, out c1); // 3 +``` + +--- +## Why not just use `IEnumerable` directly? + +Typical options when you need speculative / multi-pass logic: + +| Need | Common Approach | Downsides | +|------|-----------------|-----------| +| Lookahead | `Queue` + manual buffering | Manual complexity, error-prone indices | +| Backtracking / fork | Re-enumerate source multiple times | Re-runs expensive producers / I/O | +| Multiple cursors | Materialize to `List` | Upfront cost + full allocation | + +`ReplayableSequence` gives *pay-as-you-go* buffering: only what you actually touch is stored. Perfect for: + +* Tokenizers / lightweight parsers +* Rule engines scanning the same prefix in different ways +* DSL interpreters +* Streaming transforms where limited rewind is handy +* Batch framing or chunked processing with optional lookahead + +--- +## Core API + +```csharp +public sealed class ReplayableSequence +{ + public static ReplayableSequence From(IEnumerable source); + public Cursor GetCursor(); + public IEnumerable AsEnumerable(); + + public readonly struct Cursor + { + int Position { get; } + Cursor Fork(); + bool TryNext(out T value, out Cursor next); // immutable advance + bool Peek(out T value); // no advance + Option Lookahead(int offset); // offset >= 0 + IEnumerable AsEnumerable(); // enumerate from current position (cursor itself not moved) + } +} + +public static class ReplayableSequenceExtensions +{ + IEnumerable Select(Cursor c, Func f); + IEnumerable Where(Cursor c, Func pred); + IEnumerable> Batch(Cursor c, int size); + IEnumerable AsEnumerable(ReplayableSequence seq); // convenience +} +``` + +### Design Notes +* Cursor is a readonly struct → copying / forking is cheap. +* `TryNext` returns a *new* advanced cursor (functional style) to avoid hidden mutation. +* All cursors share a single underlying buffer – thread confinement is assumed (not thread-safe). +* `Lookahead(n)` ensures the buffer contains index `Position + n` (if possible) and returns an `Option`. +* After the source is fully drained the buffer becomes a random-access immutable snapshot for all cursors. + +--- +## Examples + +### 1. Token-style lookahead +```csharp +var letters = ReplayableSequence.From("abcde".ToCharArray()); +var cur = letters.GetCursor(); + +// Need 2-char decision? +if (cur.Lookahead(0).OrDefault() == 'a' && cur.Lookahead(1).OrDefault() == 'b') +{ + cur.TryNext(out _, out cur); // consume 'a' + cur.TryNext(out _, out cur); // consume 'b' + // ... parse AB token +} +``` + +### 2. Fork speculative parse branch +```csharp +var seq = ReplayableSequence.From(new[]{1,2,3,9,9}); +var p = seq.GetCursor(); +var attempt = p.Fork(); + +// Try read three numbers summing to 6 +int sum = 0; int read = 0; +while (read < 3 && attempt.TryNext(out var v, out attempt)) { sum += v; read++; } + +if (sum == 6) // success → commit (just adopt attempt cursor) + p = attempt; // original p now advanced logically +// else: discard attempt (p unchanged) +``` + +### 3. Batch processing (streaming window framing) +```csharp +var seq = ReplayableSequence.From(Enumerable.Range(1, 10)); +var c = seq.GetCursor(); +foreach (var batch in c.Batch(4)) +{ + Console.WriteLine(string.Join(',', batch)); +} +// 1,2,3,4 +// 5,6,7,8 +// 9,10 +``` + +### 4. Mixed LINQ + cursor ops +```csharp +var seq = ReplayableSequence.From(Enumerable.Range(1, 8)); +var c = seq.GetCursor(); + +// Peek without moving +c.Peek(out var first); // 1 + +// Use Where on a cursor (does not move the original beyond enumeration copy) +var odds = c.Where(x => x % 2 == 1).Take(3).ToList(); // [1,3,5] + +// c still at position 0 (functional enumeration) +``` + +--- +## Testing Invariants +| Invariant | Meaning | +|-----------|---------| +| Single production | Underlying source element is produced (MoveNext true) at most once. | +| Idempotent forks | Forking does not mutate either cursor. | +| Safe lookahead | `Lookahead(k)` never advances state. | +| Lazy buffering | No elements buffered until requested. | + +--- +## Gotchas & Tips +* Negative offsets → `ArgumentOutOfRangeException` (failing fast clarifies bugs). +* Avoid very large unbounded lookahead if your source is huge (each requested index must be buffered). +* Not thread-safe: confine a sequence + its cursors to one logical consumer (or add external synchronization). +* `Batch` yields arrays per chunk (copy for immutability); if you need pooled buffers, add a specialized variant. + +--- +## Comparison +| Approach | Pros | Cons | +|----------|------|------| +| Plain re-enumeration | Simple | Re-runs side effects / I/O, duplicate work | +| Materialize `List` | Random access | Upfront full allocation | +| ReplayableSequence | On-demand, multi-cursor, lookahead | Buffer growth unbounded if you read far | + +--- +## See also +* Standard Iterator pattern (this is an *enriched* variant) +* Chain / Strategy patterns when composing behaviors over streamed elements +* `Option` (used for `Lookahead`) for fluent presence/absence handling + diff --git a/docs/patterns/behavioral/iterator/windowsequence.md b/docs/patterns/behavioral/iterator/windowsequence.md new file mode 100644 index 0000000..536f2a2 --- /dev/null +++ b/docs/patterns/behavioral/iterator/windowsequence.md @@ -0,0 +1,124 @@ +# Behavioral.Iterator.WindowSequence + +`WindowSequence.Windows(...)` is a fluent, allocation-light helper that produces **sliding or striding windows** over any `IEnumerable`. +It demonstrates how you can extend the classic Iterator pattern with richer semantics (window size, stride, partial trailing +windows, reusable buffers) while still exposing a standard, lazy, LINQ-friendly API. + +--- +## TL;DR +```csharp +using PatternKit.Behavioral.Iterator; + +var windows = Enumerable.Range(1, 7) + .Windows(size: 3, stride: 1) // slide 1 each time + .Select(w => string.Join(',', w.ToArray())); +// -> ["1,2,3", "2,3,4", "3,4,5", "4,5,6", "5,6,7"] +``` + +Stride 2 (skip elements between starts): +```csharp +var stepped = Enumerable.Range(1, 9) + .Windows(size: 4, stride: 2) + .Select(w => string.Join('-', w.ToArray())); +// -> ["1-2-3-4", "3-4-5-6", "5-6-7-8"] +``` + +Include trailing partial window: +```csharp +var partials = new[]{1,2,3,4,5} + .Windows(size: 3, stride: 3, includePartial: true) + .Select(w => $"[{string.Join(',', w.ToArray())}] (partial={w.IsPartial})"); +// -> ["[1,2,3] (partial=False)", "[4,5] (partial=True)"] +``` + +Reuse a buffer (zero alloc per full window, but you MUST copy if you retain data): +```csharp +var reused = Enumerable.Range(1, 6) + .Windows(size: 3, reuseBuffer: true) + .Select(w => w.ToArray()) // force snapshot copy each time + .ToList(); +// windows: [1,2,3], [2,3,4], [3,4,5], [4,5,6] +``` + +--- +## API Shape +```csharp +public static class WindowSequence +{ + public static IEnumerable> Windows( + this IEnumerable source, + int size, + int stride = 1, + bool includePartial = false, + bool reuseBuffer = false); + + public readonly struct Window + { + public int Count { get; } + public bool IsPartial { get; } + public bool IsBufferReused { get; } + public T this[int index] { get; } + public T[] ToArray(); // always copies + public IEnumerator GetEnumerator(); + } +} +``` + +### Parameters +* `size` – required window length (> 0). +* `stride` – elements to advance between successive window starts (default 1). +* `includePartial` – include trailing window with `Count < size`. +* `reuseBuffer` – reuse a single backing array for *full* windows (partial windows still copy). Call `ToArray()` to snapshot. + +--- +## Semantics & Guarantees +| Aspect | Behavior | +|--------|----------| +| Enumeration | Single pass over source; deferred execution. | +| Overlap | Controlled by `stride` (1 = full overlap sliding). | +| Partial | Disabled by default; enable via `includePartial`. | +| Reuse | When `reuseBuffer=true`, full windows share the same array (treat returned `Window` as ephemeral unless copied). | +| Safety | `ToArray()` always returns an independent copy. | + +--- +## Use Cases +* Batch / micro-batch analytics (moving averages, rolling sums) +* Stream framing (fixed length records with overlap) +* Feature extraction windows (ML preprocessing) +* Temporal rule evaluation (N previous events) + +--- +## Example: Rolling Average +```csharp +double RollingAverage(IEnumerable src, int windowSize) + => src.Windows(windowSize) + .Select(w => w.ToArray().Average()) + .LastOrDefault(); +``` + +## Example: Find First Increasing Triple +```csharp +var triple = nums.Windows(size:3) + .Select(w => w.ToArray()) + .FirstOrDefault(arr => arr[0] < arr[1] && arr[1] < arr[2]); +``` + +--- +## Performance Notes +* Uses a `Queue` internally for clarity – O(size) per full window snapshot when copying. +* For extremely hot paths, a ring buffer variant would reduce copy cost; this implementation prioritizes readability. +* Set `reuseBuffer:true` to avoid per-window allocations (copy yourself if persistent storage is required). + +--- +## Gotchas +| Gotcha | Explanation | +|--------|-------------| +| Buffer reuse surprises | Mutating or retaining reused buffer contents without copying will show later window values. Always call `ToArray()` if you persist. | +| Large stride + partial disabled | You may silently drop tail elements—enable `includePartial` if you need them. | +| size or stride <= 0 | Immediate `ArgumentOutOfRangeException`. | + +--- +## See Also +* [ReplayableSequence](./replayablesequence.md) – multi-cursor, lookahead iteration. +* LINQ standard operators for reference semantics. + diff --git a/docs/patterns/index.md b/docs/patterns/index.md index 7a87099..995ab22 100644 --- a/docs/patterns/index.md +++ b/docs/patterns/index.md @@ -42,6 +42,13 @@ If you’re looking for end-to-end, production-shaped demos, check the **Example * **[Behavioral.Strategy.AsyncStrategy](behavioral/strategy/asyncstrategy.md)** Async sibling for strategies that await external work. +### Iterator + +* **[Behavioral.Iterator.ReplayableSequence](behavioral/iterator/replayablesequence.md)** + Forkable, lookahead, on-demand buffered sequence with immutable struct cursors, speculative forks, and LINQ interop (pay-as-you-go buffering). +* **[Behavioral.Iterator.WindowSequence](behavioral/iterator/windowsequence.md)** + Sliding / striding window iterator with optional partial trailing window and buffer reuse for zero-allocation full windows. + --- ## Creational (Builder) diff --git a/docs/patterns/toc.yml b/docs/patterns/toc.yml index f66c888..76f05b1 100644 --- a/docs/patterns/toc.yml +++ b/docs/patterns/toc.yml @@ -19,6 +19,16 @@ href: behavioral/strategy/actionstrategy.md - name: AsyncStrategy href: behavioral/strategy/asyncstrategy.md + - name: Iterator + items: + - name: ReplayableSequence + href: behavioral/iterator/replayablesequence.md + - name: WindowSequence + href: behavioral/iterator/windowsequence.md + - name: Flow + href: behavioral/iterator/flow.md + - name: AsyncFlow + href: behavioral/iterator/asyncflow.md - name: Mediator href: behavioral/mediator/mediator.md - name: Command diff --git a/src/PatternKit.Core/Behavioral/Iterator/AsyncFlow.cs b/src/PatternKit.Core/Behavioral/Iterator/AsyncFlow.cs new file mode 100644 index 0000000..99a5c5a --- /dev/null +++ b/src/PatternKit.Core/Behavioral/Iterator/AsyncFlow.cs @@ -0,0 +1,252 @@ +#if !NETSTANDARD2_0 +using System.Runtime.CompilerServices; +using PatternKit.Common; + +namespace PatternKit.Behavioral.Iterator; + +/// +/// Async counterpart to built on . +/// Provides Map / Filter / FlatMap / Tee and a replayable that allows +/// multiple forks to enumerate without re-running upstream side-effects. +/// +public sealed class AsyncFlow : IAsyncEnumerable +{ + private readonly Func> _factory; + private AsyncFlow(Func> factory) => _factory = factory; + + public static AsyncFlow From(IAsyncEnumerable source) + { + if (source is null) throw new ArgumentNullException(nameof(source)); + return new AsyncFlow(() => source); + } + + public AsyncFlow Map(Func selector) + { + if (selector is null) throw new ArgumentNullException(nameof(selector)); + return new AsyncFlow(() => Core()); + + async IAsyncEnumerable Core([EnumeratorCancellation] CancellationToken ct = default) + { + await foreach (var v in _factory().WithCancellation(ct).ConfigureAwait(false)) + yield return selector(v); + } + } + + public AsyncFlow Filter(Func predicate) + { + if (predicate is null) throw new ArgumentNullException(nameof(predicate)); + return new AsyncFlow(() => Core()); + + async IAsyncEnumerable Core([EnumeratorCancellation] CancellationToken ct = default) + { + await foreach (var v in _factory().WithCancellation(ct).ConfigureAwait(false)) + if (predicate(v)) yield return v; + } + } + + public AsyncFlow FlatMap(Func> selector) + { + if (selector is null) throw new ArgumentNullException(nameof(selector)); + return new AsyncFlow(() => Core()); + + async IAsyncEnumerable Core([EnumeratorCancellation] CancellationToken ct = default) + { + await foreach (var v in _factory().WithCancellation(ct).ConfigureAwait(false)) + { + await foreach (var inner in selector(v).WithCancellation(ct).ConfigureAwait(false)) + yield return inner; + } + } + } + + public AsyncFlow Tee(Action effect) + { + if (effect is null) throw new ArgumentNullException(nameof(effect)); + return new AsyncFlow(() => Core()); + + async IAsyncEnumerable Core([EnumeratorCancellation] CancellationToken ct = default) + { + await foreach (var v in _factory().WithCancellation(ct).ConfigureAwait(false)) + { + effect(v); + yield return v; + } + } + } + + public SharedAsyncFlow Share() => new(AsyncReplayBuffer.Create(_factory())); + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => _factory().GetAsyncEnumerator(cancellationToken); +} + +/// +/// Thread-safe async replay buffer that enumerates the upstream exactly once and serves buffered elements +/// to any number of concurrent consumers. +/// +internal sealed class AsyncReplayBuffer +{ + private readonly IAsyncEnumerator _source; + private readonly List _buffer = new(64); + private bool _completed; + private Exception? _error; + private readonly object _sync = new(); + private readonly List> _waiters = new(); + + private AsyncReplayBuffer(IAsyncEnumerator source) => _source = source; + + public static AsyncReplayBuffer Create(IAsyncEnumerable source) + => new(source.GetAsyncEnumerator()); + + public async ValueTask TryGetAsync(int index, CancellationToken ct) + { + if (index < 0) return false; + while (true) + { + TaskCompletionSource? waiter = null; + lock (_sync) + { + if (index < _buffer.Count) + return true; + if (_completed) + return false; + waiter = new(TaskCreationOptions.RunContinuationsAsynchronously); + _waiters.Add(waiter); + } + + bool shouldProduce = false; + lock (_sync) + { + if (_waiters.Count == 1 && !_completed) + shouldProduce = true; + } + + if (shouldProduce) + { + try + { + if (await _source.MoveNextAsync().ConfigureAwait(false)) + { + lock (_sync) + { + _buffer.Add(_source.Current); + var ws = _waiters.ToArray(); + _waiters.Clear(); + foreach (var w in ws) w.TrySetResult(true); + } + continue; + } + + await DisposeSourceAsync().ConfigureAwait(false); + lock (_sync) + { + _completed = true; + var ws = _waiters.ToArray(); + _waiters.Clear(); + foreach (var w in ws) w.TrySetResult(false); + } + } + catch (Exception ex) + { + await DisposeSourceAsync().ConfigureAwait(false); + lock (_sync) + { + _completed = true; + _error = ex; + var ws = _waiters.ToArray(); + _waiters.Clear(); + foreach (var w in ws) w.TrySetException(ex); + } + } + } + + using var reg = ct.Register(static s => ((TaskCompletionSource)s!).TrySetCanceled(), waiter); + var signaled = await waiter.Task.ConfigureAwait(false); + if (!signaled) + return false; + } + } + + private async ValueTask DisposeSourceAsync() + { + try + { + await _source.DisposeAsync().ConfigureAwait(false); + } + catch + { + // swallow dispose errors + } + } + + public T Get(int index) + { + lock (_sync) + { + if (index < _buffer.Count) return _buffer[index]; + if (_error is not null) throw _error; + throw new InvalidOperationException("Element not buffered yet."); + } + } +} + +public sealed class SharedAsyncFlow +{ + private readonly AsyncReplayBuffer _buffer; + internal SharedAsyncFlow(AsyncReplayBuffer buffer) => _buffer = buffer; + + public AsyncFlow Fork() + => AsyncFlow.From(Enumerate()); + + public (AsyncFlow True, AsyncFlow False) Branch(Func predicate) + { + if (predicate is null) throw new ArgumentNullException(nameof(predicate)); + return ( + AsyncFlow.From(Filter(true)), + AsyncFlow.From(Filter(false)) + ); + + async IAsyncEnumerable Filter(bool polarity, [EnumeratorCancellation] CancellationToken ct = default) + { + int i = 0; + while (await _buffer.TryGetAsync(i, ct).ConfigureAwait(false)) + { + var v = _buffer.Get(i); + if (predicate(v) == polarity) yield return v; + i++; + } + } + } + + private async IAsyncEnumerable Enumerate([EnumeratorCancellation] CancellationToken ct = default) + { + int i = 0; + while (await _buffer.TryGetAsync(i, ct).ConfigureAwait(false)) + { + yield return _buffer.Get(i); + i++; + } + } +} + +public static class AsyncFlowExtensions +{ + public static async ValueTask FoldAsync(this AsyncFlow flow, TAcc seed, Func folder, CancellationToken ct = default) + { + if (flow is null) throw new ArgumentNullException(nameof(flow)); + if (folder is null) throw new ArgumentNullException(nameof(folder)); + var acc = seed; + await foreach (var v in flow.WithCancellation(ct).ConfigureAwait(false)) + acc = folder(acc, v); + return acc; + } + + public static async ValueTask> FirstOptionAsync(this AsyncFlow flow, CancellationToken ct = default) + { + if (flow is null) throw new ArgumentNullException(nameof(flow)); + await foreach (var v in flow.WithCancellation(ct).ConfigureAwait(false)) + return Option.Some(v); + return Option.None(); + } +} +#endif diff --git a/src/PatternKit.Core/Behavioral/Iterator/Flow.cs b/src/PatternKit.Core/Behavioral/Iterator/Flow.cs new file mode 100644 index 0000000..6aef38b --- /dev/null +++ b/src/PatternKit.Core/Behavioral/Iterator/Flow.cs @@ -0,0 +1,176 @@ +using PatternKit.Common; + +namespace PatternKit.Behavioral.Iterator; + +/// +/// A fluent, functional pipeline ("flow") over an supporting +/// transformation (), filtering (), flattening (), +/// side-effects (), sharing + forking (, ), and logical branching (). +/// +/// +/// +/// composes *lazy* LINQ-style transformations without immediately enumerating the +/// underlying source. Calling turns the pipeline into a , +/// which materializes elements through a only once and enables safe +/// multi-consumer forking + partitioning without re-enumerating upstream. +/// +/// +/// The goal is to demonstrate a custom iterator abstraction similar in spirit to an Rx pipe, but +/// with synchronous, pull-based semantics and explicit replay / branching control. +/// +/// Thread-safety: Flows and shared flows are not thread-safe; confine to a single logical thread. +/// +public sealed class Flow : IEnumerable +{ + private readonly Func> _factory; + + private Flow(Func> factory) => _factory = factory; + + /// Create a flow from an existing source (no defensive copy; deferred). + public static Flow From(IEnumerable source) + { + if (source is null) throw new ArgumentNullException(nameof(source)); + return new Flow(() => source); + } + + /// Map each element via . + public Flow Map(Func selector) + { + if (selector is null) throw new ArgumentNullException(nameof(selector)); + return new Flow(() => _factory().Select(selector)); + } + + /// Filter elements via . + public Flow Filter(Func predicate) + { + if (predicate is null) throw new ArgumentNullException(nameof(predicate)); + return new Flow(() => _factory().Where(predicate)); + } + + /// Flat-map each element to a (possibly empty) inner sequence. + public Flow FlatMap(Func> selector) + { + if (selector is null) throw new ArgumentNullException(nameof(selector)); + return new Flow(() => _factory().SelectMany(selector)); + } + + /// Run a side-effect for each element while preserving the element. + public Flow Tee(Action effect) + { + if (effect is null) throw new ArgumentNullException(nameof(effect)); + return new Flow(Iterate); + + IEnumerable Iterate() + { + foreach (var v in _factory()) + { + effect(v); + yield return v; + } + } + } + + /// Turn this flow into a shared replayable stream for safe forking / branching. + public SharedFlow Share() => new(ReplayableSequence.From(_factory())); + + /// + public IEnumerator GetEnumerator() => _factory().GetEnumerator(); + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); +} + +/// +/// A shared, replayable flow (backed by ) enabling multi-consumer forking, +/// branching (partition), and additional functional transformation while guaranteeing each upstream element is +/// materialized at most once. +/// +public sealed class SharedFlow +{ + private readonly ReplayableSequence _sequence; + + internal SharedFlow(ReplayableSequence sequence) => _sequence = sequence; + + /// Create a new fork (cursor snapshot) that behaves as an independent . + public Flow Fork() + => Flow.From(_sequence.GetCursor().AsEnumerable()); + + /// Create forks at the current root. + public Flow[] Fork(int count) + { + if (count <= 0) throw new ArgumentOutOfRangeException(nameof(count)); + var arr = new Flow[count]; + for (int i = 0; i < count; i++) arr[i] = Fork(); + return arr; + } + + /// + /// Partition the stream into two flows based on without re-enumerating upstream. + /// + public (Flow True, Flow False) Branch(Func predicate) + { + if (predicate is null) throw new ArgumentNullException(nameof(predicate)); + return ( + Flow.From(FilterIter(true)), + Flow.From(FilterIter(false)) + ); + + IEnumerable FilterIter(bool polarity) + { + var cursor = _sequence.GetCursor(); + foreach (var item in cursor.AsEnumerable()) + { + var pass = predicate(item); + if (pass == polarity) yield return item; + } + } + } + + /// Map values in a shared fashion (still uses replay cursor per enumeration). + public Flow Map(Func selector) + => Fork().Map(selector); + + /// Filter values in a shared fashion. + public Flow Filter(Func predicate) + => Fork().Filter(predicate); + + /// Expose the underlying enumeration (replayable) as a regular flow. + public Flow AsFlow() => Fork(); +} + +/// +/// Helper extensions for flow composition sugar / interop. +/// +public static class FlowExtensions +{ + /// + /// Terminal reduce (aggregate) that folds the sequence into a single value. + /// + public static TAcc Fold(this Flow flow, TAcc seed, Func folder) + { + if (flow is null) throw new ArgumentNullException(nameof(flow)); + if (folder is null) throw new ArgumentNullException(nameof(folder)); + var acc = seed; + foreach (var v in flow) acc = folder(acc, v); + return acc; + } + + /// Fold for a shared flow (fork first so the enumeration does not interfere with other consumers). + public static TAcc Fold(this SharedFlow flow, TAcc seed, Func folder) + => flow.AsFlow().Fold(seed, folder); + + /// Return the first value or default. + public static T? FirstOrDefault(this Flow flow, Func? predicate = null) + { + if (flow is null) throw new ArgumentNullException(nameof(flow)); + predicate ??= static _ => true; + foreach (var v in flow) if (predicate(v)) return v; + return default; + } + + /// Convert flow to (first element). + public static Option FirstOption(this Flow flow) + { + if (flow is null) throw new ArgumentNullException(nameof(flow)); + var e = flow.GetEnumerator(); + return e.MoveNext() ? Option.Some(e.Current) : Option.None(); + } +} diff --git a/src/PatternKit.Core/Behavioral/Iterator/ReplayableSequence.cs b/src/PatternKit.Core/Behavioral/Iterator/ReplayableSequence.cs new file mode 100644 index 0000000..5ae94a5 --- /dev/null +++ b/src/PatternKit.Core/Behavioral/Iterator/ReplayableSequence.cs @@ -0,0 +1,210 @@ +using PatternKit.Common; + +namespace PatternKit.Behavioral.Iterator; + +/// +/// A replayable, forkable, lookahead-capable sequence abstraction that augments when +/// you need multi-pass / speculative traversal without re-enumerating or re-materializing the original source. +/// +/// Element type. +/// +/// +/// Why? Standard is single forward pass per enumerator. To implement lookahead, +/// backtracking, or parallel cursors you typically buffer, or you fully materialize into an array/list and index it. +/// sits in between: it buffers on demand as cursors ask for items. +/// Every element is pulled from the underlying source at most once, appended to an internal buffer, then served from +/// that buffer to any number of forks / cursors. +/// +/// +/// Core ideas: +/// +/// +/// Cursor: lightweight value struct holding an index into a shared buffer. +/// On-demand buffering: elements materialize only when first requested (via TryNext, Peek or Lookahead). +/// Fork: create an independent cursor snapshot at the current position. +/// Lookahead: inspect future elements without advancing (parser / tokenizer friendly). +/// LINQ interop: any cursor can be projected to an without changing its own position. +/// +/// Thread-safety: Concurrent readers (cursors / enumeration) are supported; the first-touch buffering +/// operation is synchronized so elements are pulled from the underlying enumerator at most once. Writes only occur +/// during initial enumeration into the internal buffer; after exhaustion the buffer is immutable. +/// +public sealed class ReplayableSequence +{ + private readonly List _buffer = new(); + private IEnumerator? _source; // null when exhausted / disposed + private readonly object _sync = new(); + + private ReplayableSequence(IEnumerable source) => _source = source.GetEnumerator(); + + /// Create a replayable wrapper over . + public static ReplayableSequence From(IEnumerable source) => new(source); + + /// Obtain a fresh cursor at the start (position 0). + public Cursor GetCursor() => new(this, 0); + + /// Enumerates the entire (shared) sequence as an . + /// + /// Each enumeration uses a new starting cursor. Already buffered elements are reused; remaining ones stream in. + /// + public IEnumerable AsEnumerable() + { + var c = GetCursor(); + while (c.TryNext(out var v, out var next)) + { + yield return v; + c = next; + } + } + + /// Exposes the sequence as an (synchronous push under the hood). +#if !NETSTANDARD2_0 + public async IAsyncEnumerable AsAsyncEnumerable([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default) + { + var c = GetCursor(); + while (c.TryNext(out var v, out var next)) + { + ct.ThrowIfCancellationRequested(); + yield return v; + c = next; + await Task.Yield(); // minimal fairness for async pipelines + } + } +#endif + internal bool EnsureBuffered(int index) + { + if (index < _buffer.Count) return true; // already present + if (_source is null) return false; // already exhausted + lock (_sync) + { + if (index < _buffer.Count) return true; // check again under lock + if (_source is null) return false; // check again under lock + + while (index >= _buffer.Count) + { + if (_source.MoveNext()) + { + _buffer.Add(_source.Current); + } + else + { + _source.Dispose(); + _source = null; + return false; + } + } + } + + return true; + } + + /// + /// A lightweight position inside a . Value-type; copying is cheap. + /// + /// + /// Enumerating via a cursor is non-destructive to other cursors; all share the underlying buffer. + /// + public readonly struct Cursor + { + private readonly ReplayableSequence _owner; + private readonly int _index; // current position (next read position) + + internal Cursor(ReplayableSequence owner, int index) => (_owner, _index) = (owner, index); + + /// Current (next) index (0-based) inside the sequence. + public int Position => _index; + + /// Create an independent cursor at the same position. + public Cursor Fork() => new(_owner, _index); + + /// Try to read the next element. Returns an advanced cursor when successful. + /// The next value when present. + /// The advanced cursor (only valid when result is true). + /// when an element was available; otherwise . + public bool TryNext(out T value, out Cursor next) + { + if (_owner.EnsureBuffered(_index)) + { + value = _owner._buffer[_index]; + next = new Cursor(_owner, _index + 1); + return true; + } + + value = default!; + next = this; + return false; + } + + /// Peek the next element without advancing. + public bool Peek(out T value) + { + if (_owner.EnsureBuffered(_index)) + { + value = _owner._buffer[_index]; + return true; + } + + value = default!; + return false; + } + + /// Look ahead (offset >= 0) without advancing. Returns an . + public Option Lookahead(int offset) + { + Throw.IfNegative(offset); + var idx = _index + offset; + return _owner.EnsureBuffered(idx) ? Option.Some(_owner._buffer[idx]) : Option.None(); + } + + /// + /// Enumerate from this position forward as an . The original cursor + /// is not advanced (a copy is enumerated). + /// + public IEnumerable AsEnumerable() + { + var c = this; // snapshot + while (c.TryNext(out var v, out var adv)) + { + yield return v; + c = adv; + } + } + } +} + +/// +/// LINQ-like and utility extensions over . +/// +public static class ReplayableSequenceExtensions +{ + /// Project elements from a cursor using . + public static IEnumerable Select( + this ReplayableSequence.Cursor cursor, + Func selector) + => cursor.AsEnumerable().Select(selector); + + /// Filter elements from a cursor using . + public static IEnumerable Where( + this ReplayableSequence.Cursor cursor, + Func predicate) + => cursor.AsEnumerable().Where(predicate); + + /// Batch elements into fixed-size chunks (last batch may be smaller). + public static IEnumerable> Batch(this ReplayableSequence.Cursor cursor, int size) + { + Throw.IfNegativeOrZero(size); + var batch = new List(size); + foreach (var item in cursor.AsEnumerable()) + { + batch.Add(item); + if (batch.Count != size) + continue; + + yield return batch.ToArray(); // immutable snapshot + batch.Clear(); + } + + if (batch.Count > 0) + yield return batch.ToArray(); + } +} \ No newline at end of file diff --git a/src/PatternKit.Core/Behavioral/Iterator/WindowSequence.cs b/src/PatternKit.Core/Behavioral/Iterator/WindowSequence.cs new file mode 100644 index 0000000..189fe8c --- /dev/null +++ b/src/PatternKit.Core/Behavioral/Iterator/WindowSequence.cs @@ -0,0 +1,131 @@ +namespace PatternKit.Behavioral.Iterator; + +/// +/// Sliding / striding window iterator over an that yields immutable (or buffer-reused) window views. +/// Demonstrates creating a custom enumerator with additional semantics while still presenting standard IEnumerable API. +/// +/// +/// Use to produce fixed-size windows with an optional stride and partial inclusion. +/// Design goals: clarity and extensibility over micro-optimizations; showcases custom enumerator + reusable buffer option. +/// +public static class WindowSequence +{ + /// + /// Produces a sequence of sliding (or striding) windows from . + /// + /// Element type. + /// Underlying source sequence (enumerated exactly once). + /// Window size (> 0). + /// Elements to advance between windows (default 1; must be > 0). + /// When true, a trailing window smaller than is yielded. + /// When true, the same underlying array is reused for each full window (call if you need a snapshot). + /// An of values. + /// Thrown when or are not positive. + /// + /// Reuse buffer trade-off: Setting reduces allocations for hot paths; consumers MUST copy data they intend to keep. + /// The implementation is intentionally allocation-light (queue nodes only) and simple; a ring buffer would be faster but more complex. + /// + public static IEnumerable> Windows( + this IEnumerable source, + int size, + int stride = 1, + bool includePartial = false, + bool reuseBuffer = false) + { + if (source is null) throw new ArgumentNullException(nameof(source)); + if (size <= 0) throw new ArgumentOutOfRangeException(nameof(size)); + if (stride <= 0) throw new ArgumentOutOfRangeException(nameof(stride)); + + return Enumerate(); // deferred + + IEnumerable> Enumerate() + { + using var e = source.GetEnumerator(); + var queue = new Queue(size); + var buffer = reuseBuffer ? new T[size] : null; + + // Prime initial window + while (queue.Count < size && e.MoveNext()) + queue.Enqueue(e.Current); + + if (queue.Count == size) + { + while (true) + { + yield return MakeWindow(queue, size, partial: false); + + // advance by stride (drop elements or empty queue) + for (var i = 0; i < stride && queue.Count > 0; i++) + queue.Dequeue(); + + // refill + while (queue.Count < size && e.MoveNext()) + queue.Enqueue(e.Current); + + if (queue.Count < size) break; // done with full windows + } + } + + if (includePartial && queue.Count > 0) + yield return MakeWindow(queue, queue.Count, partial: true); + + yield break; + + // local factory + Window MakeWindow(Queue q, int count, bool partial) + { + if (reuseBuffer && buffer is not null && !partial && count == size) + { + q.CopyTo(buffer, 0); + return new Window(buffer, count, partial, reusable: true); + } + var arr = new T[count]; + q.CopyTo(arr, 0); + return new Window(arr, count, partial, reusable: false); + } + } + } + + /// + /// Represents a fixed-size (or trailing partial) window of elements. + /// + /// Element type. + public readonly struct Window + { + private readonly T[] _buffer; + /// Number of meaningful elements in the window. + public int Count { get; } + /// True when this is a trailing partial (smaller than requested size). + public bool IsPartial { get; } + /// True when the underlying buffer is reused across windows (caller must copy). + public bool IsBufferReused { get; } + + internal Window(T[] buffer, int count, bool partial, bool reusable) + { + _buffer = buffer; + Count = count; + IsPartial = partial; + IsBufferReused = reusable; + } + + /// Indexed element access (0-based). + public T this[int index] + => (uint)index < (uint)Count ? _buffer[index] : throw new ArgumentOutOfRangeException(nameof(index)); + + /// + /// Materializes the window as a fresh array (always copies to guarantee immutability even when buffer reused). + /// + public T[] ToArray() + { + var copy = new T[Count]; + Array.Copy(_buffer, 0, copy, 0, Count); + return copy; + } + + /// Enumerates the elements in this window (snapshot view—may reflect later changes if buffer is reused). + public IEnumerator GetEnumerator() + { + for (var i = 0; i < Count; i++) yield return _buffer[i]; + } + } +} diff --git a/src/PatternKit.Core/Common/Throw.cs b/src/PatternKit.Core/Common/Throw.cs index 833f32d..46f7de6 100644 --- a/src/PatternKit.Core/Common/Throw.cs +++ b/src/PatternKit.Core/Common/Throw.cs @@ -75,6 +75,33 @@ public static T NoStrategyMatched() => [DoesNotReturn] public static void NoStrategyMatched() => throw new InvalidOperationException("No strategy matched and no default provided."); + + + /// + /// Throws an if is negative. + /// + /// The value to check. + /// The name of the parameter being checked (automatically provided). + /// Thrown if is negative. + public static void IfNegative(int value, [CallerMemberName] string? name = null) + { + if (value < 0) + throw new ArgumentOutOfRangeException(name, value, "Value must be non-negative."); + } + + + /// + /// Throws an if is zero or negative. + /// + /// The value to check. + /// The name of the parameter being checked (automatically provided). + /// Thrown if is zero or negative. + public static void IfNegativeOrZero(int value, [CallerMemberName] string? name = null) + { + if (value <= 0) + throw new ArgumentOutOfRangeException(name, value, "Value must be positive."); + } + public static void ArgumentNullWhenNull([CallerMemberName] object? arg = null) { diff --git a/src/PatternKit.Core/Structural/Composite/Composite.cs b/src/PatternKit.Core/Structural/Composite/Composite.cs index b005422..6bdf999 100644 --- a/src/PatternKit.Core/Structural/Composite/Composite.cs +++ b/src/PatternKit.Core/Structural/Composite/Composite.cs @@ -20,7 +20,7 @@ namespace PatternKit.Structural.Composite; public sealed class Composite { /// - /// Delegate for a leaf operation that produces a from an input. + /// Delegate for a leaf operation that produces a typed response from an input. /// /// Input value (readonly via in). /// The computed result for this leaf. diff --git a/src/PatternKit.Generators/packages.lock.json b/src/PatternKit.Generators/packages.lock.json index 40a2a74..4eb9db9 100644 --- a/src/PatternKit.Generators/packages.lock.json +++ b/src/PatternKit.Generators/packages.lock.json @@ -117,194 +117,6 @@ "System.Runtime.CompilerServices.Unsafe": "4.5.3" } } - }, - ".NETStandard,Version=v2.1": { - "Microsoft.CodeAnalysis.Analyzers": { - "type": "Direct", - "requested": "[3.11.0, )", - "resolved": "3.11.0", - "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" - }, - "Microsoft.CodeAnalysis.CSharp": { - "type": "Direct", - "requested": "[4.14.0, )", - "resolved": "4.14.0", - "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "Microsoft.CodeAnalysis.Common": "[4.14.0]", - "System.Buffers": "4.5.1", - "System.Collections.Immutable": "9.0.0", - "System.Memory": "4.5.5", - "System.Numerics.Vectors": "4.5.0", - "System.Reflection.Metadata": "9.0.0", - "System.Runtime.CompilerServices.Unsafe": "6.0.0", - "System.Text.Encoding.CodePages": "7.0.0", - "System.Threading.Tasks.Extensions": "4.5.4" - } - }, - "System.Collections.Immutable": { - "type": "Direct", - "requested": "[9.0.9, )", - "resolved": "9.0.9", - "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==", - "dependencies": { - "System.Memory": "4.5.5", - "System.Runtime.CompilerServices.Unsafe": "6.0.0" - } - }, - "Microsoft.CodeAnalysis.Common": { - "type": "Transitive", - "resolved": "4.14.0", - "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "System.Buffers": "4.5.1", - "System.Collections.Immutable": "9.0.0", - "System.Memory": "4.5.5", - "System.Numerics.Vectors": "4.5.0", - "System.Reflection.Metadata": "9.0.0", - "System.Runtime.CompilerServices.Unsafe": "6.0.0", - "System.Text.Encoding.CodePages": "7.0.0", - "System.Threading.Tasks.Extensions": "4.5.4" - } - }, - "System.Buffers": { - "type": "Transitive", - "resolved": "4.5.1", - "contentHash": "Rw7ijyl1qqRS0YQD/WycNst8hUUMgrMH4FCn1nNm27M4VxchZ1js3fVjQaANHO5f3sN4isvP4a+Met9Y4YomAg==" - }, - "System.Memory": { - "type": "Transitive", - "resolved": "4.5.5", - "contentHash": "XIWiDvKPXaTveaB7HVganDlOCRoj03l+jrwNvcge/t8vhGYKvqV+dMv6G4SAX2NoNmN0wZfVPTAlFwZcZvVOUw==", - "dependencies": { - "System.Buffers": "4.5.1", - "System.Numerics.Vectors": "4.4.0", - "System.Runtime.CompilerServices.Unsafe": "4.5.3" - } - }, - "System.Numerics.Vectors": { - "type": "Transitive", - "resolved": "4.5.0", - "contentHash": "QQTlPTl06J/iiDbJCiepZ4H//BVraReU4O4EoRw1U02H5TLUIT7xn3GnDp9AXPSlJUDyFs4uWjWafNX6WrAojQ==" - }, - "System.Reflection.Metadata": { - "type": "Transitive", - "resolved": "9.0.0", - "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==", - "dependencies": { - "System.Collections.Immutable": "9.0.0", - "System.Memory": "4.5.5" - } - }, - "System.Runtime.CompilerServices.Unsafe": { - "type": "Transitive", - "resolved": "6.0.0", - "contentHash": "/iUeP3tq1S0XdNNoMz5C9twLSrM/TH+qElHkXWaPvuNOt+99G75NrV0OS2EqHx5wMN7popYjpc8oTjC1y16DLg==" - }, - "System.Text.Encoding.CodePages": { - "type": "Transitive", - "resolved": "7.0.0", - "contentHash": "LSyCblMpvOe0N3E+8e0skHcrIhgV2huaNcjUUEa8hRtgEAm36aGkRoC8Jxlb6Ra6GSfF29ftduPNywin8XolzQ==", - "dependencies": { - "System.Memory": "4.5.5", - "System.Runtime.CompilerServices.Unsafe": "6.0.0" - } - }, - "System.Threading.Tasks.Extensions": { - "type": "Transitive", - "resolved": "4.5.4", - "contentHash": "zteT+G8xuGu6mS+mzDzYXbzS7rd3K6Fjb9RiZlYlJPam2/hU7JCBZBVEcywNuR+oZ1ncTvc/cq0faRr3P01OVg==", - "dependencies": { - "System.Runtime.CompilerServices.Unsafe": "4.5.3" - } - } - }, - "net8.0": { - "Microsoft.CodeAnalysis.Analyzers": { - "type": "Direct", - "requested": "[3.11.0, )", - "resolved": "3.11.0", - "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" - }, - "Microsoft.CodeAnalysis.CSharp": { - "type": "Direct", - "requested": "[4.14.0, )", - "resolved": "4.14.0", - "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "Microsoft.CodeAnalysis.Common": "[4.14.0]", - "System.Collections.Immutable": "9.0.0", - "System.Reflection.Metadata": "9.0.0" - } - }, - "System.Collections.Immutable": { - "type": "Direct", - "requested": "[9.0.9, )", - "resolved": "9.0.9", - "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" - }, - "Microsoft.CodeAnalysis.Common": { - "type": "Transitive", - "resolved": "4.14.0", - "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "System.Collections.Immutable": "9.0.0", - "System.Reflection.Metadata": "9.0.0" - } - }, - "System.Reflection.Metadata": { - "type": "Transitive", - "resolved": "9.0.0", - "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==", - "dependencies": { - "System.Collections.Immutable": "9.0.0" - } - } - }, - "net9.0": { - "Microsoft.CodeAnalysis.Analyzers": { - "type": "Direct", - "requested": "[3.11.0, )", - "resolved": "3.11.0", - "contentHash": "v/EW3UE8/lbEYHoC2Qq7AR/DnmvpgdtAMndfQNmpuIMx/Mto8L5JnuCfdBYtgvalQOtfNCnxFejxuRrryvUTsg==" - }, - "Microsoft.CodeAnalysis.CSharp": { - "type": "Direct", - "requested": "[4.14.0, )", - "resolved": "4.14.0", - "contentHash": "568a6wcTivauIhbeWcCwfWwIn7UV7MeHEBvFB2uzGIpM2OhJ4eM/FZ8KS0yhPoNxnSpjGzz7x7CIjTxhslojQA==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "Microsoft.CodeAnalysis.Common": "[4.14.0]", - "System.Collections.Immutable": "9.0.0", - "System.Reflection.Metadata": "9.0.0" - } - }, - "System.Collections.Immutable": { - "type": "Direct", - "requested": "[9.0.9, )", - "resolved": "9.0.9", - "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" - }, - "Microsoft.CodeAnalysis.Common": { - "type": "Transitive", - "resolved": "4.14.0", - "contentHash": "PC3tuwZYnC+idaPuoC/AZpEdwrtX7qFpmnrfQkgobGIWiYmGi5MCRtl5mx6QrfMGQpK78X2lfIEoZDLg/qnuHg==", - "dependencies": { - "Microsoft.CodeAnalysis.Analyzers": "3.11.0", - "System.Collections.Immutable": "9.0.0", - "System.Reflection.Metadata": "9.0.0" - } - }, - "System.Reflection.Metadata": { - "type": "Transitive", - "resolved": "9.0.0", - "contentHash": "ANiqLu3DxW9kol/hMmTWbt3414t9ftdIuiIU7j80okq2YzAueo120M442xk1kDJWtmZTqWQn7wHDvMRipVOEOQ==" - } } } } \ No newline at end of file diff --git a/test/PatternKit.Tests/Behavioral/Iterator/AsyncFlowTests.cs b/test/PatternKit.Tests/Behavioral/Iterator/AsyncFlowTests.cs new file mode 100644 index 0000000..6cac493 --- /dev/null +++ b/test/PatternKit.Tests/Behavioral/Iterator/AsyncFlowTests.cs @@ -0,0 +1,103 @@ +#if !NETSTANDARD2_0 +using PatternKit.Behavioral.Iterator; +using TinyBDD; +using TinyBDD.Assertions; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Iterator; + +[Feature("AsyncFlow: async functional pipeline with share/fork/branch")] +public sealed class AsyncFlowTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private static async IAsyncEnumerable RangeAsync(int count, int delayMs = 0) + { + for (int i = 1; i <= count; i++) + { + if (delayMs > 0) await Task.Delay(delayMs).ConfigureAwait(false); + yield return i; + } + } + + private static async IAsyncEnumerable DuplicateAsync(int v) + { + yield return v; + yield return v + 1; + await Task.CompletedTask; + } + + // Helper async methods for scenario steps (avoid ambiguous Task/ValueTask inference) + private static async Task<(List Result, List Log)> ComposePipeline(AsyncFlow f) + { + var log = new List(); + var result = new List(); + await foreach (var v in f.Map(x => x * 2) // 2 4 6 8 10 + .Filter(x => x % 4 == 0)// 4 8 + .FlatMap(x => DuplicateAsync(x)) // 4,5,8,9 + .Tee(log.Add)) + result.Add(v); + return (result, log); + } + + private static async Task<(List First, List Second)> RunForks(SharedAsyncFlow shared) + { + var f1Task = shared.Fork().Map(x => x * 10) + .FoldAsync(new List(), (acc, v) => { acc.Add(v); return acc; }); + var f2Task = shared.Fork().Filter(x => x % 2 == 1) + .FoldAsync(new List(), (acc, v) => { acc.Add(v); return acc; }); + await Task.WhenAll(f1Task.AsTask(), f2Task.AsTask()); + return (f1Task.Result, f2Task.Result); + } + + private static async Task<(List even, List odd)> BranchPartition(SharedAsyncFlow sf) + { + var (evenFlow, oddFlow) = sf.Branch(x => x % 2 == 0); + var even = await evenFlow.FoldAsync(new List(), (a, v) => { a.Add(v); return a; }); + var odd = await oddFlow.FoldAsync(new List(), (a, v) => { a.Add(v); return a; }); + return (even, odd); + } + + private static async Task<(PatternKit.Common.Option Some, PatternKit.Common.Option None)> Firsts((AsyncFlow NonEmpty, AsyncFlow Empty) flows) + { + var some = await flows.NonEmpty.FirstOptionAsync(); + var none = await flows.Empty.FirstOptionAsync(); + return (some, none); + } + + [Scenario("Async Map/Filter/FlatMap/Tee composition")] + [Fact] + public Task Composition() + => Given("async flow 1..5", () => AsyncFlow.From(RangeAsync(5))) + .When("compose operations", ComposePipeline) + .Then("result is 4,5,8,9", t => string.Join(',', t.Result) == "4,5,8,9") + .And("tee captured same", t => string.Join(',', t.Log) == "4,5,8,9") + .AssertPassed(); + + [Scenario("Share + concurrent forks enumerate source once")] + [Fact] + public Task ShareForkSingleEnumeration() + => Given("shared async flow over 1..6", () => AsyncFlow.From(RangeAsync(6)).Share()) + .When("two forks consumed concurrently", RunForks) + .Then("first fork is 10,20,30,40,50,60", r => string.Join(',', r.First) == "10,20,30,40,50,60") + .And("second fork is 1,3,5", r => string.Join(',', r.Second) == "1,3,5") + .AssertPassed(); + + [Scenario("Branch partitions even/odd")] + [Fact] + public Task Branching() + => Given("shared async flow 1..7", () => AsyncFlow.From(RangeAsync(7)).Share()) + .When("branch on even", BranchPartition) + .Then("even 2,4,6", r => string.Join(',', r.even) == "2,4,6") + .And("odd 1,3,5,7", r => string.Join(',', r.odd) == "1,3,5,7") + .AssertPassed(); + + [Scenario("FirstOptionAsync returns Some then None")] + [Fact] + public Task FirstOption() + => Given("flows", () => (NonEmpty: AsyncFlow.From(RangeAsync(3)), Empty: AsyncFlow.From(RangeAsync(0)))) + .When("first options", Firsts) + .Then("Some=1", r => r.Some.HasValue && r.Some.ValueOrDefault == 1) + .And("None has no value", r => !r.None.HasValue) + .AssertPassed(); +} +#endif diff --git a/test/PatternKit.Tests/Behavioral/Iterator/FlowTests.cs b/test/PatternKit.Tests/Behavioral/Iterator/FlowTests.cs new file mode 100644 index 0000000..54b6b33 --- /dev/null +++ b/test/PatternKit.Tests/Behavioral/Iterator/FlowTests.cs @@ -0,0 +1,80 @@ +using PatternKit.Behavioral.Iterator; +using TinyBDD; +using TinyBDD.Assertions; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Iterator; + +[Feature("Flow: functional iterator pipeline with share/fork/branch")] +public sealed class FlowTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private sealed class CountingEnumerable : IEnumerable + { + private readonly int _count; + public int MoveNextCalls { get; private set; } + public CountingEnumerable(int count) => _count = count; + public IEnumerator GetEnumerator() + { + for (int i = 0; i < _count; i++) + { + MoveNextCalls++; + yield return i + 1; // 1..n + } + } + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); + } + + [Scenario("Map / Filter / FlatMap / Tee composition produces expected sequence")] + [Fact] + public Task Composition() + => Given("flow over 1..5", () => Flow.From(Enumerable.Range(1,5))) + .When("map *2, filter even, flatMap duplicates, tee collects", f => + { + var log = new List(); + var result = f.Map(x => x * 2) // 2,4,6,8,10 + .Filter(x => x % 4 == 0) // 4,8 + .FlatMap(x => new[]{x, x}) // 4,4,8,8 + .Tee(log.Add) + .ToList(); + return (result, log); + }) + .Then("result is 4,4,8,8", t => Expect.For(string.Join(',', t.result)).ToBe("4,4,8,8")) + .And("tee captured same sequence", t => Expect.For(string.Join(',', t.log)).ToBe("4,4,8,8")) + .AssertPassed(); + + [Scenario("Share + two forks enumerate only once upstream")] + [Fact] + public Task ShareForkSingleEnumeration() + => Given("counting source 1..5", () => new CountingEnumerable(5)) + .When("shared and two forks consumed", src => + { + var shared = Flow.From(src).Map(x => x + 0).Share(); + var fork1 = shared.Fork().Map(x => x * 10).ToList(); + var fork2 = shared.Fork().Filter(x => x % 2 == 1).ToList(); + return (src, fork1, fork2); + }) + .Then("source MoveNextCalls == 5 (single pass)", t => t.src.MoveNextCalls == 5) + .And("fork1 is 10,20,30,40,50", t => string.Join(',', t.fork1) == "10,20,30,40,50") + .And("fork2 is 1,3,5", t => string.Join(',', t.fork2) == "1,3,5") + .AssertPassed(); + + [Scenario("Branch splits into true/false flows")] + [Fact] + public Task Branching() + => Given("shared flow over 1..6", () => Flow.From(Enumerable.Range(1,6)).Share()) + .When("branch is even predicate", sf => sf.Branch(x => x % 2 == 0)) + .Then("true branch has evens", b => string.Join(',', b.True.ToList()) == "2,4,6") + .And("false branch has odds", b => string.Join(',', b.False.ToList()) == "1,3,5") + .AssertPassed(); + + [Scenario("FirstOption returns Some for non-empty flow and None for empty")] + [Fact] + public Task FirstOption() + => Given("flow over numbers", () => Flow.From(new[]{7,8,9})) + .When("first option extracted", f => (Some:f.FirstOption(), None: Flow.From(Array.Empty()).FirstOption())) + .Then("Some has value 7", r => r.Some.HasValue && r.Some.ValueOrDefault == 7) + .And("None has no value", r => !r.None.HasValue) + .AssertPassed(); +} + diff --git a/test/PatternKit.Tests/Behavioral/Iterator/ReplayableSequenceTests.cs b/test/PatternKit.Tests/Behavioral/Iterator/ReplayableSequenceTests.cs new file mode 100644 index 0000000..6ca70e5 --- /dev/null +++ b/test/PatternKit.Tests/Behavioral/Iterator/ReplayableSequenceTests.cs @@ -0,0 +1,188 @@ +using System.Collections; +using PatternKit.Behavioral.Iterator; +using TinyBDD; +using TinyBDD.Assertions; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Iterator; + +[Feature("ReplayableSequence (forkable, lookahead iterator)")] +public sealed class ReplayableSequenceTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + private sealed class CountingEnumerable(int start, int count) : IEnumerable + { + public int MoveNextCalls { get; private set; } + + public IEnumerator GetEnumerator() + { + for (var i = 0; i < count; i++) + { + MoveNextCalls++; + yield return start + i; + } + } + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + } + + private static (ReplayableSequence Seq, ReplayableSequence.Cursor C) BuildSimple() + { + var seq = ReplayableSequence.From(Enumerable.Range(1, 5)); + return (seq, seq.GetCursor()); + } + + [Scenario("TryNext advances via returned cursor and yields all elements in order")] + [Fact] + public Task TryNextYieldsInOrder() + => Given("sequence 1..3", () => ReplayableSequence.From(Enumerable.Range(1, 3)).GetCursor()) + .When("reading sequentially", c => + { + var values = new List(); + while (c.TryNext(out var v, out var next)) + { + values.Add(v); + c = next; + } + + return (values, final: c); + }) + .Then("values are 1|2|3", t => Expect.For(string.Join('|', t.values)).ToBe("1|2|3")) + .AssertPassed(); + + [Scenario("Peek does not advance position")] + [Fact] + public Task PeekDoesNotAdvance() + => Given("cursor at start of 1..5", BuildSimple) + .When("peeking twice", t => + { + t.C.Peek(out var a); + t.C.Peek(out var b); + return (t.Seq, t.C, a, b); + }) + .Then("both peeks see 1", t => Expect.For((t.a, t.b)).ToEqual((1, 1))) + .And("subsequent TryNext still returns 1", t => + { + var c = t.C; + c.TryNext(out var first, out c); + return first == 1; + }) + .AssertPassed(); + + [Scenario("Lookahead does not advance and is stable")] + [Fact] + public Task LookaheadStable() + => Given("cursor at start of 1..5", BuildSimple) + .When("looking ahead 0 & 2 multiple times", t => + { + var a1 = t.C.Lookahead(0).OrDefault(); + var a2 = t.C.Lookahead(0).OrDefault(); + var c1 = t.C.Lookahead(2).OrDefault(); + var c2 = t.C.Lookahead(2).OrDefault(); + return (t.Seq, t.C, a1, a2, c1, c2); + }) + .Then("values are consistent (1,1,3,3)", t => + { + Expect.For(t.a1).ToBe(1); + Expect.For(t.a2).ToBe(1); + Expect.For(t.c1).ToBe(3); + Expect.For(t.c2).ToBe(3); + }) + .And("cursor still at position 0", t => t.C.Position == 0) + .AssertPassed(); + + [Scenario("Forking creates independent cursor")] + [Fact] + public Task ForkIndependence() + => Given("cursor advanced two items", () => + { + var seq = ReplayableSequence.From(Enumerable.Range(10, 5)); + var c = seq.GetCursor(); + c.TryNext(out _, out c); // 10 + c.TryNext(out _, out c); // 11 + return (seq, c); + }) + .When("forking and advancing fork only", t => + { + var fork = t.c.Fork(); + fork.TryNext(out var v, out fork); // should read 12 + return (t.seq, t.c, fork, read: v); + }) + .Then("fork read 12", t => t.read == 12) + .And("original cursor position unchanged (still 2)", t => t.c.Position == 2) + .AssertPassed(); + + [Scenario("Lazy buffering only pulls what is needed")] + [Fact] + public Task LazyBuffering() + => Given("counting enumerable 100..104", () => + { + var src = new CountingEnumerable(100, 5); + var seq = ReplayableSequence.From(src); + return (src, seq, c: seq.GetCursor()); + }) + .When("peeking then lookahead(2)", t => + { + t.c.Peek(out _); // needs first element + t.c.Lookahead(2); // needs up to index 2 (third element) + return t; + }) + .Then("MoveNext calls == 3", t => t.src.MoveNextCalls == 3) + .And("buffer did not pre-fetch beyond requested", t => + { + var before = t.src.MoveNextCalls; + t.c.Lookahead(3); + return t.src.MoveNextCalls == before + 1; + }) + .AssertPassed(); + + [Scenario("Batch groups elements correctly including final partial batch")] + [Fact] + public Task BatchGroups() + => Given("cursor over 1..10", () => ReplayableSequence.From(Enumerable.Range(1, 10)).GetCursor()) + .When("batch size 4 enumerated", c => c.Batch(4).Select(b => string.Join('-', b)).ToList()) + .Then("batches are 1-2-3-4 | 5-6-7-8 | 9-10", batches => + Expect.For(string.Join(" | ", batches)).ToBe("1-2-3-4 | 5-6-7-8 | 9-10")) + .AssertPassed(); + + [Scenario("LINQ Where/Select over cursor does not advance original cursor")] + [Fact] + public Task LinqDoesNotAdvanceOriginal() + => Given("cursor at start 1..6", () => ReplayableSequence.From(Enumerable.Range(1, 6)).GetCursor()) + .When("enumerating transformed projection", c => + { + var projected = c.Where(x => x % 2 == 0).Select(x => x * 10).Take(2).ToList(); + return (c, projected); + }) + .Then("projected is 20|40", t => Expect.For(string.Join('|', t.projected)).ToBe("20|40")) + .And("original cursor still at pos 0", t => t.c.Position == 0) + .AssertPassed(); + + [Scenario("LINQ Select over cursor does advance original cursor")] + [Fact] + public Task LinqSelectDoesNotAdvanceOriginal() + => Given("cursor at start 1..6", () => ReplayableSequence.From(Enumerable.Range(1, 6)).GetCursor()) + .When("enumerating transformed projection", c => + { + var projected = c.Select(x => x * 10).Take(2).ToList(); + return (c, projected); + }) + .Then("projected is 10|20", t => Expect.For(string.Join('|', t.projected)).ToBe("10|20")) + .And("original cursor still at pos 0", t => t.c.Position == 0) + .AssertPassed(); + + + [Scenario("LINQ Select over sequence does not advance cursor")] + [Fact] + public Task LinqSelectOverSequenceDoesNotAdvanceCursor() + => Given("cursor at start 1..6", () => ReplayableSequence.From(Enumerable.Range(1, 6))) + .When("enumerating transformed projection", c => + { + var projected = c.AsEnumerable().Select(x => x * 10).Take(2).ToList(); + return (c, projected); + }) + .Then("projected is 10|20", t => Expect.For(string.Join('|', t.projected)).ToBe("10|20")) + .And("original cursor still at pos 0", t => t.c.GetCursor().Position == 0) + .AssertPassed(); + +} \ No newline at end of file diff --git a/test/PatternKit.Tests/Behavioral/Iterator/WindowSequenceTests.cs b/test/PatternKit.Tests/Behavioral/Iterator/WindowSequenceTests.cs new file mode 100644 index 0000000..61818ce --- /dev/null +++ b/test/PatternKit.Tests/Behavioral/Iterator/WindowSequenceTests.cs @@ -0,0 +1,72 @@ +using PatternKit.Behavioral.Iterator; +using TinyBDD; +using TinyBDD.Assertions; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Tests.Behavioral.Iterator; + +[Feature("WindowSequence (sliding/striding windows)")] +public sealed class WindowSequenceTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + [Scenario("Sliding size=3 stride=1 over 1..5 yields 3 overlapping windows")] + [Fact] + public Task BasicSliding() + => Given("range 1..5", () => Enumerable.Range(1, 5)) + .When("windowed size3 stride1", src => src.Windows(size: 3, stride: 1).Select(w => string.Join('-', w.ToArray())).ToList()) + .Then("windows are 1-2-3 | 2-3-4 | 3-4-5", list => Expect.For(string.Join(" | ", list)).ToBe("1-2-3 | 2-3-4 | 3-4-5")) + .AssertPassed(); + + [Scenario("Stride > 1 drops elements between windows")] + [Fact] + public Task StrideTwo() + => Given("range 1..6", () => Enumerable.Range(1, 6)) + .When("size3 stride2", src => src.Windows(size: 3, stride: 2).Select(w => w.ToArray()).ToList()) + .Then("two windows produced", ws => Expect.For(ws.Count).ToBe(2)) + .And("first is 1,2,3", ws => ws[0][0] == 1 && ws[0][2] == 3) + .And("second is 3,4,5", ws => ws[1][0] == 3 && ws[1][2] == 5) + .AssertPassed(); + + [Scenario("Partial enabled yields trailing partial window")] + [Fact] + public Task PartialTrailing() + => Given("range 1..4", () => Enumerable.Range(1, 4)) + .When("size3 stride2 include partial", + src => src.Windows(size: 3, stride: 2, includePartial: true).Select(w => (Vals: w.ToArray(), w.IsPartial)).ToList()) + .Then("two windows", list => list.Count == 2) + .And("first full, second partial", list => !list[0].IsPartial && list[1].IsPartial) + .And("partial contains 3,4", list => string.Join('-', list[1].Vals) == "3-4") + .AssertPassed(); + + [Scenario("ReuseBuffer reuses underlying array for full windows")] + [Fact] + public Task ReuseBufferBehavior() + => Given("range 1..5", () => Enumerable.Range(1, 5)) + .When("size3 stride1 reuse buffer", src => src.Windows(3, 1, reuseBuffer: true).Take(3).ToList()) + .Then("all report reused buffer", ws => ws.All(w => w.IsBufferReused)) + .And("ToArray snapshots are distinct", ws => + { + var snaps = ws.Select(w => w.ToArray()).ToList(); + return snaps[0] != snaps[1] && snaps[1] != snaps[2]; + }) + .AssertPassed(); + + [Scenario("Argument validation: size<=0 throws")] + [Fact] + public Task SizeValidation() + => Given("invalid size 0", () => 0) + .When("invoking Windows", i => + { + try + { + _ = Enumerable.Range(1, 3).Windows(i).First(); + return false; + } + catch (ArgumentOutOfRangeException) + { + return true; + } + }) + .Then("throws", ok => ok) + .AssertPassed(); +} \ No newline at end of file diff --git a/test/PatternKit.Tests/PatternKit.Tests.csproj b/test/PatternKit.Tests/PatternKit.Tests.csproj index 37fb9e5..f2ba27f 100644 --- a/test/PatternKit.Tests/PatternKit.Tests.csproj +++ b/test/PatternKit.Tests/PatternKit.Tests.csproj @@ -14,6 +14,7 @@ + diff --git a/test/PatternKit.Tests/packages.lock.json b/test/PatternKit.Tests/packages.lock.json index 40c48bc..3874d10 100644 --- a/test/PatternKit.Tests/packages.lock.json +++ b/test/PatternKit.Tests/packages.lock.json @@ -24,6 +24,12 @@ "resolved": "9.0.9", "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" }, + "System.Linq.Async": { + "type": "Direct", + "requested": "[6.0.3, )", + "resolved": "6.0.3", + "contentHash": "hSHiq2m1ky7zUQgTp+/2h1K3lABIQ+GltRixoclHPg/Sc1vnfeS6g/Uy5moOVZKrZJdQiFPFZd6OobBp3tZcFg==" + }, "TinyBDD.Xunit": { "type": "Direct", "requested": "[0.9.0, )", @@ -240,6 +246,12 @@ "resolved": "9.0.9", "contentHash": "/kpkgDxH984e3J3z5v/DIFi+0TWbUJXS8HNKUYBy3YnXtK09JVGs3cw5aOV6fDSw5NxbWLWlGrYjRteu6cjX3w==" }, + "System.Linq.Async": { + "type": "Direct", + "requested": "[6.0.3, )", + "resolved": "6.0.3", + "contentHash": "hSHiq2m1ky7zUQgTp+/2h1K3lABIQ+GltRixoclHPg/Sc1vnfeS6g/Uy5moOVZKrZJdQiFPFZd6OobBp3tZcFg==" + }, "TinyBDD.Xunit": { "type": "Direct", "requested": "[0.9.0, )",