diff --git a/release-notes.txt b/release-notes.txt index 72c765f..1c78f5b 100644 --- a/release-notes.txt +++ b/release-notes.txt @@ -2,6 +2,7 @@ Release notes: Unreleased + - feat: add `TaskSeq.toChannelAsync` and `TaskSeq.ofChannel` for bidirectional `System.Threading.Channels` integration, closing #415 - test: add SideEffects module and ImmTaskSeq variant tests to TaskSeq.ChunkBy.Tests.fs, improving coverage for chunkBy and chunkByAsync - fixes: `Async.bind` signature corrected from `(Async<'T> -> Async<'U>)` to `('T -> Async<'U>)` to match standard monadic bind semantics (same as `Task.bind`); the previous signature made the function effectively equivalent to direct application - refactor: simplify splitAt 'rest' taskSeq to use while!, removing redundant go2 mutable and manual MoveNextAsync pre-advance diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs index d43e87e..8f0e503 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs @@ -1,6 +1,7 @@ module TaskSeq.Tests.``Conversion-To`` open System.Collections.Generic +open System.Threading.Channels open Xunit open FsUnit.Xunit @@ -186,3 +187,72 @@ module SideEffects = let (results2: seq<_>) = tq |> TaskSeq.toSeq results1 |> Seq.toArray |> should equal [| 1..10 |] results2 |> Seq.toArray |> should equal [| 11..20 |] + +module Channel = + + [] + let ``TaskSeq-toChannelAsync with null writer raises`` () = + assertNullArg + <| fun () -> + TaskSeq.toChannelAsync null (TaskSeq.ofArray [| 1 |]) + |> ignore + + [] + let ``TaskSeq-toChannelAsync with null source raises`` () = + let ch = Channel.CreateUnbounded() + + assertNullArg + <| fun () -> TaskSeq.toChannelAsync ch.Writer null |> ignore + + [] + let ``TaskSeq-ofChannel with null reader raises`` () = + assertNullArg + <| fun () -> TaskSeq.ofChannel null |> ignore + + [] + let ``TaskSeq-toChannelAsync with empty source completes the channel`` () = task { + let ch = Channel.CreateUnbounded() + do! TaskSeq.toChannelAsync ch.Writer TaskSeq.empty + ch.Reader.Completion.IsCompleted |> should be True + let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync + results |> should be Empty + } + + [)>] + let ``TaskSeq-toChannelAsync writes all elements and completes the channel`` variant = task { + let tq = Gen.getSeqImmutable variant + let ch = Channel.CreateUnbounded() + do! TaskSeq.toChannelAsync ch.Writer tq + let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync + results |> should equal [| 1..10 |] + // Completion resolves once the channel is marked done and the buffer is drained + do! ch.Reader.Completion + } + + [)>] + let ``TaskSeq-ofChannel yields all elements written to the channel`` variant = task { + let tq = Gen.getSeqImmutable variant + let ch = Channel.CreateUnbounded() + do! TaskSeq.toChannelAsync ch.Writer tq + let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync + results |> should equal [| 1..10 |] + } + + [] + let ``TaskSeq-ofChannel ends when channel is completed and drained`` () = task { + let ch = Channel.CreateUnbounded() + do! ch.Writer.WriteAsync 42 + do! ch.Writer.WriteAsync 99 + ch.Writer.Complete() + let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync + results |> should equal [| 42; 99 |] + } + + [)>] + let ``TaskSeq-toChannelAsync executes side effects`` variant = task { + let tq = Gen.getSeqWithSideEffect variant + let ch = Channel.CreateUnbounded() + do! TaskSeq.toChannelAsync ch.Writer tq + let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync + results |> should equal [| 1..10 |] + } diff --git a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj index e3bae0c..4e84c91 100644 --- a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj +++ b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj @@ -74,5 +74,7 @@ Generates optimized IL code through resumable state machines, and comes with a c true + + diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index d3a94f9..e944c44 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -2,6 +2,7 @@ namespace FSharp.Control open System.Collections.Generic open System.Threading +open System.Threading.Channels open System.Threading.Tasks // Just for convenience @@ -180,6 +181,23 @@ type TaskSeq private () = static member toIListAsync source = Internal.toResizeArrayAndMapAsync (fun x -> x :> IList<_>) source + static member toChannelAsync (writer: ChannelWriter<'T>) (source: TaskSeq<'T>) : Task = + Internal.checkNonNull (nameof writer) writer + Internal.checkNonNull (nameof source) source + + task { + try + use e = source.GetAsyncEnumerator CancellationToken.None + + while! e.MoveNextAsync() do + do! writer.WriteAsync e.Current + + writer.TryComplete() |> ignore + with exn -> + writer.TryComplete exn |> ignore + } + :> Task + // // Convert 'OfXXX' functions // @@ -261,6 +279,17 @@ type TaskSeq private () = yield c } + static member ofChannel(reader: ChannelReader<'T>) : TaskSeq<'T> = + Internal.checkNonNull (nameof reader) reader + + taskSeq { + while! reader.WaitToReadAsync() do + let mutable item = Unchecked.defaultof<_> + + while reader.TryRead &item do + yield item + } + static member withCancellation (cancellationToken: CancellationToken) (source: TaskSeq<'T>) = Internal.checkNonNull (nameof source) source diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 1caf826..ac620d7 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -2,6 +2,7 @@ namespace FSharp.Control open System.Collections.Generic open System.Threading +open System.Threading.Channels open System.Threading.Tasks [] @@ -529,6 +530,20 @@ type TaskSeq = /// Thrown when the input sequence is null. static member toIListAsync: source: TaskSeq<'T> -> Task> + /// + /// Writes all elements of the input task sequence to a + /// and marks the writer as complete when the sequence + /// is exhausted. If an exception is raised during iteration, the writer is completed with + /// that exception so that downstream readers observe it. + /// This function is non-blocking while it writes to the channel. + /// + /// + /// The channel writer to write elements into. + /// The input task sequence. + /// A that completes when all elements have been written. + /// Thrown when or is null. + static member toChannelAsync: writer: ChannelWriter<'T> -> source: TaskSeq<'T> -> Task + /// /// Views the given as a task sequence, that is, as an . /// @@ -642,6 +657,17 @@ type TaskSeq = /// Thrown when the input sequence is null. static member ofAsyncArray: source: Async<'T> array -> TaskSeq<'T> + /// + /// Views a as a task sequence. Elements are yielded as they + /// become available; the sequence ends when the channel is completed and all buffered elements + /// have been consumed. + /// + /// + /// The channel reader to read elements from. + /// A task sequence that yields elements from the channel. + /// Thrown when is null. + static member ofChannel: reader: ChannelReader<'T> -> TaskSeq<'T> + /// /// Returns a task sequence that, when iterated, passes the given to the /// underlying . This is the equivalent of calling