Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ module internal Utils =
let ivar = TaskCompletionSource<_>()
if t.IsFaulted then
ivar.SetException t.Exception
else if t.IsCanceled then
ivar.SetCanceled()
ivar.Task)
|> join
#endif
Expand Down Expand Up @@ -1060,6 +1062,7 @@ module AsyncSeq =
mb.Post (Some b) })
|> Async.map (fun _ -> mb.Post None)
|> Async.StartChildAsTask

return!
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
|> iterAsync id }
Expand Down
42 changes: 40 additions & 2 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ type Assert with
| _ ->
Assert.Fail(message)



[<Test>]
let ``AsyncSeq.never should equal itself`` () =
Assert.AreEqual(AsyncSeq.never<int>, AsyncSeq.never<int>, timeout=100, exnEq=AreCancellationExns)
Expand Down Expand Up @@ -1516,6 +1514,46 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () =
| Choice2Of2 _ -> ()
| Choice1Of2 _ -> Assert.Fail ("error expected")

[<Test>]
let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () =

let handle x = async {
do! Async.Sleep 50
}

let fakeAsync = async {
do! Async.Sleep 500
return "fakeAsync"
}

let makeAsyncSeqBatch () =
let rec loop() = asyncSeq {
let! batch = fakeAsync |> Async.Catch
match batch with
| Choice1Of2 batch ->
if (Seq.isEmpty batch) then
do! Async.Sleep 500
yield! loop()
else
yield batch
yield! loop()
| Choice2Of2 err ->
printfn "Problem getting batch: %A" err
}

loop()

let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle
let exAsync = async {
do! Async.Sleep 2000
failwith "error"
}

let t = [x; exAsync] |> Async.Parallel |> Async.Ignore |> Async.StartAsTask

// should fail after 2 seconds
Assert.Throws<AggregateException>(fun _ -> t.Wait(4000) |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.iterAsyncParallelThrottled should propagate handler exception`` () =

Expand Down