diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index a5e80fc7..db336331 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -167,6 +167,8 @@ module internal Utils = let ivar = TaskCompletionSource<_>() if t.IsFaulted then ivar.SetException t.Exception + else if t.IsCanceled then + ivar.SetCanceled() ivar.Task) |> join #endif @@ -1060,6 +1062,7 @@ module AsyncSeq = mb.Post (Some b) }) |> Async.map (fun _ -> mb.Post None) |> Async.StartChildAsTask + return! replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive)) |> iterAsync id } diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index b17355b6..f8433803 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -126,8 +126,6 @@ type Assert with | _ -> Assert.Fail(message) - - [] let ``AsyncSeq.never should equal itself`` () = Assert.AreEqual(AsyncSeq.never, AsyncSeq.never, timeout=100, exnEq=AreCancellationExns) @@ -1516,6 +1514,46 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () = | Choice2Of2 _ -> () | Choice1Of2 _ -> Assert.Fail ("error expected") +[] +let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () = + + let handle x = async { + do! Async.Sleep 50 + } + + let fakeAsync = async { + do! Async.Sleep 500 + return "fakeAsync" + } + + let makeAsyncSeqBatch () = + let rec loop() = asyncSeq { + let! batch = fakeAsync |> Async.Catch + match batch with + | Choice1Of2 batch -> + if (Seq.isEmpty batch) then + do! Async.Sleep 500 + yield! loop() + else + yield batch + yield! loop() + | Choice2Of2 err -> + printfn "Problem getting batch: %A" err + } + + loop() + + let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle + let exAsync = async { + do! Async.Sleep 2000 + failwith "error" + } + + let t = [x; exAsync] |> Async.Parallel |> Async.Ignore |> Async.StartAsTask + + // should fail after 2 seconds + Assert.Throws(fun _ -> t.Wait(4000) |> ignore) |> ignore + [] let ``AsyncSeq.iterAsyncParallelThrottled should propagate handler exception`` () =