Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 4.17.0

* Performance: Optimised `AsyncSeq.pairwise` to use a `hasPrev` flag and a direct `mutable` field instead of wrapping the previous element in `Some`. Previously, each iteration allocated a new `'T option` object on the heap; the new implementation eliminates that allocation entirely, reducing GC pressure for long sequences.
* Bug fix: `AsyncSeq.splitAt` and `AsyncSeq.tryTail` now correctly dispose the underlying enumerator when an exception or cancellation occurs during the initial `MoveNext` call. Previously the enumerator could leak if the source sequence threw during the first few steps.

### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.
Expand Down
95 changes: 52 additions & 43 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1306,13 +1306,14 @@
use ie = source.GetEnumerator()
let! v = ie.MoveNext()
let mutable b = v
let mutable prev = None
// Use a flag + mutable field instead of Option to avoid per-element heap allocation
let mutable hasPrev = false
let mutable prev = Unchecked.defaultof<'T>
while b.IsSome do
let v = b.Value
match prev with
| None -> ()
| Some p -> yield (p, v)
prev <- Some v
if hasPrev then yield (prev, v)
hasPrev <- true
prev <- v
let! moven = ie.MoveNext()
b <- moven }

Expand Down Expand Up @@ -2152,51 +2153,59 @@

let tryTail (source: AsyncSeq<'T>) : Async<AsyncSeq<'T> option> = async {
let ie = source.GetEnumerator()
let! first = ie.MoveNext()
match first with
| None ->
try
let! first = ie.MoveNext()
match first with
| None ->
ie.Dispose()
return None
| Some _ ->
return Some (asyncSeq {
try
let! next = ie.MoveNext()
let mutable b = next
while b.IsSome do
yield b.Value
let! moven = ie.MoveNext()
b <- moven
finally
ie.Dispose() })
with ex ->
ie.Dispose()
return None
| Some _ ->
return Some (asyncSeq {
try
let! next = ie.MoveNext()
let mutable b = next
while b.IsSome do
yield b.Value
let! moven = ie.MoveNext()
b <- moven
finally
ie.Dispose() }) }
return raise ex }

/// Splits an async sequence at the given index, returning the first `count` elements as an array
/// and the remaining elements as a new AsyncSeq. The source is enumerated once.
let splitAt (count: int) (source: AsyncSeq<'T>) : Async<'T array * AsyncSeq<'T>> = async {
if count < 0 then invalidArg "count" "must be non-negative"
let ie = source.GetEnumerator()
let ra = ResizeArray<'T>()
let! m = ie.MoveNext()
let mutable b = m
while b.IsSome && ra.Count < count do
ra.Add b.Value
let! next = ie.MoveNext()
b <- next
let first = ra.ToArray()
let rest =
if b.IsNone then
ie.Dispose()
empty<'T>
else
let cur = ref b
asyncSeq {
try
while cur.Value.IsSome do
yield cur.Value.Value
let! next = ie.MoveNext()
cur.Value <- next
finally
ie.Dispose() }
return first, rest }
try
let ra = ResizeArray<'T>()
let! m = ie.MoveNext()
let mutable b = m
while b.IsSome && ra.Count < count do
ra.Add b.Value
let! next = ie.MoveNext()
b <- next
let first = ra.ToArray()
let rest =
if b.IsNone then
ie.Dispose()
empty<'T>
else
let mutable cur = b
asyncSeq {
try
while cur.IsSome do
yield cur.Value
let! next = ie.MoveNext()
cur <- next
finally
ie.Dispose() }
return first, rest
with ex ->
ie.Dispose()
return raise ex }

let toArrayAsync (source : AsyncSeq<'T>) : Async<'T[]> = async {
let ra = (new ResizeArray<_>())
Expand Down Expand Up @@ -2729,7 +2738,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2741 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2741 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
2 changes: 1 addition & 1 deletion src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.6" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.7" />
<PackageReference Include="System.Threading.Channels" Version="*" />
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
</ItemGroup>
Expand Down
34 changes: 34 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -2816,6 +2816,13 @@
let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(1, 2); (2, 3)], result)

[<Test>]
let ``AsyncSeq.pairwise with many elements produces correct pairs`` () =
let source = AsyncSeq.ofSeq [1..10]
let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously
let expected = [(1,2); (2,3); (3,4); (4,5); (5,6); (6,7); (7,8); (8,9); (9,10)]
Assert.AreEqual(expected, result)

[<Test>]
let ``AsyncSeq.windowed empty sequence returns empty`` () =
let result = AsyncSeq.windowed 3 AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
Expand Down Expand Up @@ -3746,6 +3753,19 @@
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.splitAt -1 AsyncSeq.empty<int> |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.splitAt disposes enumerator when source throws during collection`` () =
let mutable disposed = false
let source = asyncSeq {
use _ = { new System.IDisposable with member _.Dispose() = disposed <- true }
yield 1
yield 2
failwith "source error"
}
try AsyncSeq.splitAt 10 source |> Async.RunSynchronously |> ignore
with _ -> ()
Assert.IsTrue(disposed, "enumerator should be disposed after exception during collection")

// ===== removeAt =====

[<Test>]
Expand Down Expand Up @@ -4190,6 +4210,20 @@
let tail = result.Value |> AsyncSeq.toListAsync |> Async.RunSynchronously
Assert.AreEqual([2;3;4;5], tail)

[<Test>]
let ``AsyncSeq.tryTail disposes enumerator when source throws on first MoveNext`` () =
let mutable disposed = false
// Use a pre-failed task so the exception occurs during MoveNext() (async), not during GetEnumerator()
let failedTask = System.Threading.Tasks.Task.FromException<unit>(System.Exception("source error"))
let source = asyncSeq {
use _ = { new System.IDisposable with member _.Dispose() = disposed <- true }
let! _ = failedTask |> Async.AwaitTask
yield 1
}
try AsyncSeq.tryTail source |> Async.RunSynchronously |> ignore
with _ -> ()
Assert.IsTrue(disposed, "enumerator should be disposed after exception on first MoveNext")

[<Test>]
let ``AsyncSeq.where is alias for filter`` () =
let result =
Expand Down Expand Up @@ -4490,7 +4524,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4527 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4503,7 +4537,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4540 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4512,7 +4546,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4549 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
Loading