diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index cfd430c..fcfbe4d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,8 @@ +### 4.17.0 + +* Performance: Optimised `AsyncSeq.pairwise` to use a `hasPrev` flag and a direct `mutable` field instead of wrapping the previous element in `Some`. Previously, each iteration allocated a new `'T option` object on the heap; the new implementation eliminates that allocation entirely, reducing GC pressure for long sequences. +* Bug fix: `AsyncSeq.splitAt` and `AsyncSeq.tryTail` now correctly dispose the underlying enumerator when an exception or cancellation occurs during the initial `MoveNext` call. Previously the enumerator could leak if the source sequence threw during the first few steps. + ### 4.16.0 * Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators. diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 06291a0..f0d08b9 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1306,13 +1306,14 @@ module AsyncSeq = use ie = source.GetEnumerator() let! v = ie.MoveNext() let mutable b = v - let mutable prev = None + // Use a flag + mutable field instead of Option to avoid per-element heap allocation + let mutable hasPrev = false + let mutable prev = Unchecked.defaultof<'T> while b.IsSome do let v = b.Value - match prev with - | None -> () - | Some p -> yield (p, v) - prev <- Some v + if hasPrev then yield (prev, v) + hasPrev <- true + prev <- v let! moven = ie.MoveNext() b <- moven } @@ -2152,51 +2153,59 @@ module AsyncSeq = let tryTail (source: AsyncSeq<'T>) : Async option> = async { let ie = source.GetEnumerator() - let! first = ie.MoveNext() - match first with - | None -> + try + let! first = ie.MoveNext() + match first with + | None -> + ie.Dispose() + return None + | Some _ -> + return Some (asyncSeq { + try + let! next = ie.MoveNext() + let mutable b = next + while b.IsSome do + yield b.Value + let! moven = ie.MoveNext() + b <- moven + finally + ie.Dispose() }) + with ex -> ie.Dispose() - return None - | Some _ -> - return Some (asyncSeq { - try - let! next = ie.MoveNext() - let mutable b = next - while b.IsSome do - yield b.Value - let! moven = ie.MoveNext() - b <- moven - finally - ie.Dispose() }) } + return raise ex } /// Splits an async sequence at the given index, returning the first `count` elements as an array /// and the remaining elements as a new AsyncSeq. The source is enumerated once. let splitAt (count: int) (source: AsyncSeq<'T>) : Async<'T array * AsyncSeq<'T>> = async { if count < 0 then invalidArg "count" "must be non-negative" let ie = source.GetEnumerator() - let ra = ResizeArray<'T>() - let! m = ie.MoveNext() - let mutable b = m - while b.IsSome && ra.Count < count do - ra.Add b.Value - let! next = ie.MoveNext() - b <- next - let first = ra.ToArray() - let rest = - if b.IsNone then - ie.Dispose() - empty<'T> - else - let cur = ref b - asyncSeq { - try - while cur.Value.IsSome do - yield cur.Value.Value - let! next = ie.MoveNext() - cur.Value <- next - finally - ie.Dispose() } - return first, rest } + try + let ra = ResizeArray<'T>() + let! m = ie.MoveNext() + let mutable b = m + while b.IsSome && ra.Count < count do + ra.Add b.Value + let! next = ie.MoveNext() + b <- next + let first = ra.ToArray() + let rest = + if b.IsNone then + ie.Dispose() + empty<'T> + else + let mutable cur = b + asyncSeq { + try + while cur.IsSome do + yield cur.Value + let! next = ie.MoveNext() + cur <- next + finally + ie.Dispose() } + return first, rest + with ex -> + ie.Dispose() + return raise ex } let toArrayAsync (source : AsyncSeq<'T>) : Async<'T[]> = async { let ra = (new ResizeArray<_>()) diff --git a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj index 9059760..c109047 100644 --- a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj +++ b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj @@ -24,7 +24,7 @@ - + diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 6c640d9..b8ed9b4 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -2816,6 +2816,13 @@ let ``AsyncSeq.pairwise with three elements should produce two pairs`` () = let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously Assert.AreEqual([(1, 2); (2, 3)], result) +[] +let ``AsyncSeq.pairwise with many elements produces correct pairs`` () = + let source = AsyncSeq.ofSeq [1..10] + let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously + let expected = [(1,2); (2,3); (3,4); (4,5); (5,6); (6,7); (7,8); (8,9); (9,10)] + Assert.AreEqual(expected, result) + [] let ``AsyncSeq.windowed empty sequence returns empty`` () = let result = AsyncSeq.windowed 3 AsyncSeq.empty |> AsyncSeq.toListSynchronously @@ -3746,6 +3753,19 @@ let ``AsyncSeq.splitAt with negative count throws ArgumentException`` () = Assert.Throws(fun () -> AsyncSeq.splitAt -1 AsyncSeq.empty |> Async.RunSynchronously |> ignore) |> ignore +[] +let ``AsyncSeq.splitAt disposes enumerator when source throws during collection`` () = + let mutable disposed = false + let source = asyncSeq { + use _ = { new System.IDisposable with member _.Dispose() = disposed <- true } + yield 1 + yield 2 + failwith "source error" + } + try AsyncSeq.splitAt 10 source |> Async.RunSynchronously |> ignore + with _ -> () + Assert.IsTrue(disposed, "enumerator should be disposed after exception during collection") + // ===== removeAt ===== [] @@ -4190,6 +4210,20 @@ let ``AsyncSeq.tryTail returns all-but-first elements`` () = let tail = result.Value |> AsyncSeq.toListAsync |> Async.RunSynchronously Assert.AreEqual([2;3;4;5], tail) +[] +let ``AsyncSeq.tryTail disposes enumerator when source throws on first MoveNext`` () = + let mutable disposed = false + // Use a pre-failed task so the exception occurs during MoveNext() (async), not during GetEnumerator() + let failedTask = System.Threading.Tasks.Task.FromException(System.Exception("source error")) + let source = asyncSeq { + use _ = { new System.IDisposable with member _.Dispose() = disposed <- true } + let! _ = failedTask |> Async.AwaitTask + yield 1 + } + try AsyncSeq.tryTail source |> Async.RunSynchronously |> ignore + with _ -> () + Assert.IsTrue(disposed, "enumerator should be disposed after exception on first MoveNext") + [] let ``AsyncSeq.where is alias for filter`` () = let result =