Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Release notes:

Unreleased
- feat: add `TaskSeq.toChannelAsync` and `TaskSeq.ofChannel` for bidirectional `System.Threading.Channels` integration, closing #415
- test: add SideEffects module and ImmTaskSeq variant tests to TaskSeq.ChunkBy.Tests.fs, improving coverage for chunkBy and chunkByAsync
- fixes: `Async.bind` signature corrected from `(Async<'T> -> Async<'U>)` to `('T -> Async<'U>)` to match standard monadic bind semantics (same as `Task.bind`); the previous signature made the function effectively equivalent to direct application
- refactor: simplify splitAt 'rest' taskSeq to use while!, removing redundant go2 mutable and manual MoveNextAsync pre-advance
Expand Down
70 changes: 70 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module TaskSeq.Tests.``Conversion-To``

open System.Collections.Generic
open System.Threading.Channels

open Xunit
open FsUnit.Xunit
Expand Down Expand Up @@ -186,3 +187,72 @@ module SideEffects =
let (results2: seq<_>) = tq |> TaskSeq.toSeq
results1 |> Seq.toArray |> should equal [| 1..10 |]
results2 |> Seq.toArray |> should equal [| 11..20 |]

module Channel =

[<Fact>]
let ``TaskSeq-toChannelAsync with null writer raises`` () =
assertNullArg
<| fun () ->
TaskSeq.toChannelAsync null (TaskSeq.ofArray [| 1 |])
|> ignore

[<Fact>]
let ``TaskSeq-toChannelAsync with null source raises`` () =
let ch = Channel.CreateUnbounded<int>()

assertNullArg
<| fun () -> TaskSeq.toChannelAsync ch.Writer null |> ignore

[<Fact>]
let ``TaskSeq-ofChannel with null reader raises`` () =
assertNullArg
<| fun () -> TaskSeq.ofChannel<int> null |> ignore

[<Fact>]
let ``TaskSeq-toChannelAsync with empty source completes the channel`` () = task {
let ch = Channel.CreateUnbounded<int>()
do! TaskSeq.toChannelAsync ch.Writer TaskSeq.empty
ch.Reader.Completion.IsCompleted |> should be True
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
results |> should be Empty
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-toChannelAsync writes all elements and completes the channel`` variant = task {
let tq = Gen.getSeqImmutable variant
let ch = Channel.CreateUnbounded<int>()
do! TaskSeq.toChannelAsync ch.Writer tq
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
results |> should equal [| 1..10 |]
// Completion resolves once the channel is marked done and the buffer is drained
do! ch.Reader.Completion
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-ofChannel yields all elements written to the channel`` variant = task {
let tq = Gen.getSeqImmutable variant
let ch = Channel.CreateUnbounded<int>()
do! TaskSeq.toChannelAsync ch.Writer tq
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
results |> should equal [| 1..10 |]
}

[<Fact>]
let ``TaskSeq-ofChannel ends when channel is completed and drained`` () = task {
let ch = Channel.CreateUnbounded<int>()
do! ch.Writer.WriteAsync 42
do! ch.Writer.WriteAsync 99
ch.Writer.Complete()
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
results |> should equal [| 42; 99 |]
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-toChannelAsync executes side effects`` variant = task {
let tq = Gen.getSeqWithSideEffect variant
let ch = Channel.CreateUnbounded<int>()
do! TaskSeq.toChannelAsync ch.Writer tq
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
results |> should equal [| 1..10 |]
}
2 changes: 2 additions & 0 deletions src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,7 @@ Generates optimized IL code through resumable state machines, and comes with a c
<!-- if using "remove unused references", this prevents FSharp.Core from being shown in that list -->
<TreatAsUsed>true</TreatAsUsed>
</PackageReference>
<!-- Provides System.Threading.Channels for netstandard2.1 consumers; built into net5.0+ -->
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>
</Project>
29 changes: 29 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace FSharp.Control

open System.Collections.Generic
open System.Threading
open System.Threading.Channels
open System.Threading.Tasks

// Just for convenience
Expand Down Expand Up @@ -180,6 +181,23 @@ type TaskSeq private () =

static member toIListAsync source = Internal.toResizeArrayAndMapAsync (fun x -> x :> IList<_>) source

static member toChannelAsync (writer: ChannelWriter<'T>) (source: TaskSeq<'T>) : Task =
Internal.checkNonNull (nameof writer) writer
Internal.checkNonNull (nameof source) source

task {
try
use e = source.GetAsyncEnumerator CancellationToken.None

while! e.MoveNextAsync() do
do! writer.WriteAsync e.Current

writer.TryComplete() |> ignore
with exn ->
writer.TryComplete exn |> ignore
}
:> Task

//
// Convert 'OfXXX' functions
//
Expand Down Expand Up @@ -261,6 +279,17 @@ type TaskSeq private () =
yield c
}

static member ofChannel(reader: ChannelReader<'T>) : TaskSeq<'T> =
Internal.checkNonNull (nameof reader) reader

taskSeq {
while! reader.WaitToReadAsync() do
let mutable item = Unchecked.defaultof<_>

while reader.TryRead &item do
yield item
}

static member withCancellation (cancellationToken: CancellationToken) (source: TaskSeq<'T>) =
Internal.checkNonNull (nameof source) source

Expand Down
26 changes: 26 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace FSharp.Control

open System.Collections.Generic
open System.Threading
open System.Threading.Channels
open System.Threading.Tasks

[<AutoOpen>]
Expand Down Expand Up @@ -529,6 +530,20 @@ type TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
static member toIListAsync: source: TaskSeq<'T> -> Task<IList<'T>>

/// <summary>
/// Writes all elements of the input task sequence <paramref name="source" /> to a
/// <see cref="ChannelWriter&lt;'T>" /> and marks the writer as complete when the sequence
/// is exhausted. If an exception is raised during iteration, the writer is completed with
/// that exception so that downstream readers observe it.
/// This function is non-blocking while it writes to the channel.
/// </summary>
///
/// <param name="writer">The channel writer to write elements into.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>A <see cref="Task" /> that completes when all elements have been written.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when <paramref name="writer" /> or <paramref name="source" /> is null.</exception>
static member toChannelAsync: writer: ChannelWriter<'T> -> source: TaskSeq<'T> -> Task

/// <summary>
/// Views the given <see cref="array" /> as a task sequence, that is, as an <see cref="IAsyncEnumerable&lt;'T>" />.
/// </summary>
Expand Down Expand Up @@ -642,6 +657,17 @@ type TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
static member ofAsyncArray: source: Async<'T> array -> TaskSeq<'T>

/// <summary>
/// Views a <see cref="ChannelReader&lt;'T>" /> as a task sequence. Elements are yielded as they
/// become available; the sequence ends when the channel is completed and all buffered elements
/// have been consumed.
/// </summary>
///
/// <param name="reader">The channel reader to read elements from.</param>
/// <returns>A task sequence that yields elements from the channel.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when <paramref name="reader" /> is null.</exception>
static member ofChannel: reader: ChannelReader<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, passes the given <paramref name="cancellationToken" /> to the
/// underlying <see cref="IAsyncEnumerable&lt;'T&gt;" />. This is the equivalent of calling
Expand Down
Loading