From 2f937fd58b8d90a9af14423519971c1323e4698e Mon Sep 17 00:00:00 2001 From: Don Syme Date: Thu, 15 Oct 2015 23:10:47 +0100 Subject: [PATCH 1/2] Fix 35 - leak in append --- RELEASE_NOTES.md | 3 + src/FSharp.Control.AsyncSeq/AssemblyInfo.fs | 6 +- src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 271 ++++++++---------- .../AsyncSeqTests.fs | 28 +- 4 files changed, 146 insertions(+), 162 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 78dd353..7e5e06c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +### 2.0.2 - 15.10.2015 +* Fix leak in AsyncSeq.append and other derived generators + ### 2.0.1 - 01.06.2015 * Add AsyncSeq.sum, length, contains, exists, forall, tryPick, tryFind diff --git a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs index edf9482..77c70ac 100644 --- a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs +++ b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs @@ -4,9 +4,9 @@ open System.Reflection [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "2.0.0" + let [] Version = "2.0.1" diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 1ea75c2..dcf361e 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -6,6 +6,7 @@ namespace FSharp.Control open System open System.IO +open System.Collections.Generic open System.Threading open System.Threading.Tasks open System.Runtime.ExceptionServices @@ -102,51 +103,48 @@ module AsyncSeq = return (if res then Some v else None) } member x.Dispose() = () } } - type AppendState = - | NotStarted1 = -1 - | HaveEnumerator1 = 0 - | NotStarted2 = 1 - | HaveEnumerator2 = 2 - | Finished = 3 + type AppendState<'T> = + | NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T> + | HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T> + | NotStarted2 of AsyncSeq<'T> + | HaveEnumerator2 of IAsyncEnumerator<'T> + | Finished let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted1 (inp1, inp2) { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref AppendState.NotStarted1 - let enum = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | AppendState.NotStarted1 -> + | AppendState.NotStarted1 (inp1, inp2) -> return! - (enum := inp1.GetEnumerator() - state := AppendState.HaveEnumerator1 + (let enum1 = inp1.GetEnumerator() + state := AppendState.HaveEnumerator1 (enum1, inp2) x.MoveNext()) - | AppendState.HaveEnumerator1 -> - let e = enum.Value - let! res = e.MoveNext() + | AppendState.HaveEnumerator1 (enum1, inp2) -> + let! res = enum1.MoveNext() match res with | None -> return! - (state := AppendState.NotStarted2 - enum := Unchecked.defaultof<_> - dispose e + (state := AppendState.NotStarted2 inp2 + dispose enum1 x.MoveNext()) | Some _ -> return res - | AppendState.NotStarted2 -> + | AppendState.NotStarted2 inp2 -> return! - (enum := inp2.GetEnumerator() - state := AppendState.HaveEnumerator2 + (let enum2 = inp2.GetEnumerator() + state := AppendState.HaveEnumerator2 enum2 x.MoveNext()) - | AppendState.HaveEnumerator2 -> - let e = enum.Value - let! res = e.MoveNext() + | AppendState.HaveEnumerator2 enum2 -> + let! res = enum2.MoveNext() return (match res with | None -> state := AppendState.Finished - enum := Unchecked.defaultof<_> - dispose e + dispose enum2 None | Some _ -> res) @@ -154,12 +152,10 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | AppendState.HaveEnumerator1 - | AppendState.HaveEnumerator2 -> - let e = enum.Value + | AppendState.HaveEnumerator1 (enum, _) + | AppendState.HaveEnumerator2 enum -> state := AppendState.Finished - enum := Unchecked.defaultof<_> - dispose e + dispose enum | _ -> () } } @@ -168,29 +164,29 @@ module AsyncSeq = member x.GetEnumerator() = f().GetEnumerator() } - type BindState = - | NotStarted = -1 - | HaveEnumerator = 0 - | Finished = 1 + type BindState<'T,'U> = + | NotStarted of Async<'T> + | HaveEnumerator of IAsyncEnumerator<'U> + | Finished let bindAsync (f: 'T -> AsyncSeq<'U>) (inp : Async<'T>) : AsyncSeq<'U> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref BindState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | BindState.NotStarted -> + | BindState.NotStarted inp -> let! v = inp return! (let s = f v let e = s.GetEnumerator() - enum := e - state := BindState.HaveEnumerator + state := BindState.HaveEnumerator e x.MoveNext()) - | BindState.HaveEnumerator -> - let! res = enum.Value.MoveNext() + | BindState.HaveEnumerator e -> + let! res = e.MoveNext() return (match res with | None -> x.Dispose() | Some _ -> () @@ -199,10 +195,8 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | BindState.HaveEnumerator -> - let e = enum.Value + | BindState.HaveEnumerator e -> state := BindState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } @@ -240,22 +234,23 @@ module AsyncSeq = let! moven = ie.MoveNext() b := moven } - type TryWithState = - | NotStarted = -1 - | HaveBodyEnumerator = 0 - | HaveHandlerEnumerator = 1 - | Finished = 2 + type TryWithState<'T> = + | NotStarted of AsyncSeq<'T> + | HaveBodyEnumerator of IAsyncEnumerator<'T> + | HaveHandlerEnumerator of IAsyncEnumerator<'T> + | Finished /// Implements the 'TryWith' functionality for computation builder let tryWith (inp: AsyncSeq<'T>) (handler : exn -> AsyncSeq<'T>) : AsyncSeq<'T> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref TryWithState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | TryWithState.NotStarted -> + | NotStarted inp -> let res = ref Unchecked.defaultof<_> try res := Choice1Of2 (inp.GetEnumerator()) @@ -264,17 +259,15 @@ module AsyncSeq = match res.Value with | Choice1Of2 r -> return! - (enum := r - state := TryWithState.HaveBodyEnumerator + (state := TryWithState.HaveBodyEnumerator r x.MoveNext()) | Choice2Of2 exn -> return! (x.Dispose() - enum := (handler exn).GetEnumerator() - state := TryWithState.HaveHandlerEnumerator + let enum = (handler exn).GetEnumerator() + state := TryWithState.HaveHandlerEnumerator enum x.MoveNext()) - | TryWithState.HaveBodyEnumerator -> - let e = enum.Value + | TryWithState.HaveBodyEnumerator e -> let res = ref Unchecked.defaultof<_> try let! r = e.MoveNext() @@ -291,11 +284,11 @@ module AsyncSeq = | Choice2Of2 exn -> return! (x.Dispose() - enum := (handler exn).GetEnumerator() - state := TryWithState.HaveHandlerEnumerator + let e = (handler exn).GetEnumerator() + state := TryWithState.HaveHandlerEnumerator e x.MoveNext()) - | TryWithState.HaveHandlerEnumerator -> - let! res = enum.Value.MoveNext() + | TryWithState.HaveHandlerEnumerator e -> + let! res = e.MoveNext() return (match res with | Some _ -> res | None -> x.Dispose(); None) @@ -303,37 +296,35 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | TryWithState.HaveBodyEnumerator | TryWithState.HaveHandlerEnumerator -> - let e = enum.Value + | TryWithState.HaveBodyEnumerator e | TryWithState.HaveHandlerEnumerator e -> state := TryWithState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } - type TryFinallyState = - | NotStarted = -1 - | HaveBodyEnumerator = 0 - | Finished = 1 + type TryFinallyState<'T> = + | NotStarted of AsyncSeq<'T> + | HaveBodyEnumerator of IAsyncEnumerator<'T> + | Finished // This pushes the handler through all the async computations // The (synchronous) compensation is run when the Dispose() is called let tryFinally (inp: AsyncSeq<'T>) (compensation : unit -> unit) : AsyncSeq<'T> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref TryFinallyState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | TryFinallyState.NotStarted -> + | TryFinallyState.NotStarted inp -> return! (let e = inp.GetEnumerator() - enum := e - state := TryFinallyState.HaveBodyEnumerator + state := TryFinallyState.HaveBodyEnumerator e x.MoveNext()) - | TryFinallyState.HaveBodyEnumerator -> - let! res = enum.Value.MoveNext() + | TryFinallyState.HaveBodyEnumerator e -> + let! res = e.MoveNext() return (match res with | None -> x.Dispose() @@ -343,52 +334,48 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | TryFinallyState.HaveBodyEnumerator -> - let e = enum.Value + | TryFinallyState.HaveBodyEnumerator e-> state := TryFinallyState.Finished - enum := Unchecked.defaultof<_> dispose e compensation() | _ -> () } } - type CollectState = - | NotStarted = -1 - | HaveInputEnumerator = 0 - | HaveInnerEnumerator = 1 - | Finished = 2 + type CollectState<'T,'U> = + | NotStarted of AsyncSeq<'T> + | HaveInputEnumerator of IAsyncEnumerator<'T> + | HaveInnerEnumerator of IAsyncEnumerator<'T> * IAsyncEnumerator<'U> + | Finished let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref CollectState.NotStarted - let enum1 = ref Unchecked.defaultof> - let enum2 = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | CollectState.NotStarted -> + | NotStarted inp -> return! - (enum1 := inp.GetEnumerator() - state := CollectState.HaveInputEnumerator + (let e1 = inp.GetEnumerator() + state := HaveInputEnumerator e1 x.MoveNext()) - | CollectState.HaveInputEnumerator -> - let! res1 = enum1.Value.MoveNext() + | HaveInputEnumerator e1 -> + let! res1 = e1.MoveNext() return! (match res1 with | Some v1 -> - enum2 := (f v1).GetEnumerator() - state := CollectState.HaveInnerEnumerator + let e2 = (f v1).GetEnumerator() + state := HaveInnerEnumerator (e1, e2) | None -> x.Dispose() x.MoveNext()) - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value + | HaveInnerEnumerator (e1, e2) -> let! res2 = e2.MoveNext() match res2 with | None -> - enum2 := Unchecked.defaultof<_> - state := CollectState.HaveInputEnumerator + state := HaveInputEnumerator e1 dispose e2 return! x.MoveNext() | Some _ -> @@ -397,50 +384,50 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | CollectState.HaveInputEnumerator -> - let e1 = enum1.Value - state := CollectState.Finished - enum1 := Unchecked.defaultof<_> + | HaveInputEnumerator e1 -> + state := Finished dispose e1 - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value - state := CollectState.HaveInputEnumerator + | HaveInnerEnumerator (e1, e2) -> + state := Finished dispose e2 - x.Dispose() + dispose e1 | _ -> () } } + type CollectSeqState<'T,'U> = + | NotStarted of seq<'T> + | HaveInputEnumerator of IEnumerator<'T> + | HaveInnerEnumerator of IEnumerator<'T> * IAsyncEnumerator<'U> + | Finished + // Like collect, but the input is a sequence, where no bind is required on each step of the enumeration let collectSeq (f: 'T -> AsyncSeq<'U>) (inp: seq<'T>) : AsyncSeq<'U> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref CollectState.NotStarted - let enum1 = ref Unchecked.defaultof> - let enum2 = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | CollectState.NotStarted -> + | NotStarted inp -> return! - (enum1 := inp.GetEnumerator() - state := CollectState.HaveInputEnumerator + (let e1 = inp.GetEnumerator() + state := HaveInputEnumerator e1 x.MoveNext()) - | CollectState.HaveInputEnumerator -> + | HaveInputEnumerator e1 -> return! - (let e1 = enum1.Value - if e1.MoveNext() then - enum2 := (f e1.Current).GetEnumerator() - state := CollectState.HaveInnerEnumerator + (if e1.MoveNext() then + let e2 = (f e1.Current).GetEnumerator() + state := HaveInnerEnumerator (e1, e2) else x.Dispose() x.MoveNext()) - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value + | HaveInnerEnumerator (e1, e2)-> let! res2 = e2.MoveNext() match res2 with | None -> return! - (enum2 := Unchecked.defaultof<_> - state := CollectState.HaveInputEnumerator + (state := HaveInputEnumerator e1 dispose e2 x.MoveNext()) | Some _ -> @@ -448,50 +435,46 @@ module AsyncSeq = | _ -> return None} member x.Dispose() = match !state with - | CollectState.HaveInputEnumerator -> - let e1 = enum1.Value - state := CollectState.Finished - enum1 := Unchecked.defaultof<_> + | HaveInputEnumerator e1 -> + state := Finished dispose e1 - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value - state := CollectState.HaveInputEnumerator + | HaveInnerEnumerator (e1, e2) -> + state := Finished dispose e2 + dispose e1 x.Dispose() | _ -> () } } - type MapState = - | NotStarted = -1 - | HaveEnumerator = 0 - | Finished = 1 + type MapState<'T> = + | NotStarted of seq<'T> + | HaveEnumerator of IEnumerator<'T> + | Finished let ofSeq (inp: seq<'T>) : AsyncSeq<'T> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 + let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref MapState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref initialState { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | MapState.NotStarted -> - enum := inp.GetEnumerator() - state := MapState.HaveEnumerator + | NotStarted inp -> + let e = inp.GetEnumerator() + state := MapState.HaveEnumerator e return! x.MoveNext() - | MapState.HaveEnumerator -> - let e1 = enum.Value + | HaveEnumerator e -> return - (if e1.MoveNext() then - Some e1.Current + (if e.MoveNext() then + Some e.Current else x.Dispose() None) | _ -> return None } member x.Dispose() = match !state with - | MapState.HaveEnumerator -> - let e = enum.Value + | HaveEnumerator e -> state := MapState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index d21b996..695bb35 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -945,6 +945,7 @@ let ``AsyncSeq.interleave should fail with Exception if a task fails``() = |> AsyncSeq.toList |> ignore) |> ignore + let perfTest1 n = let empty = async { return () } Seq.init n id @@ -1020,21 +1021,18 @@ let perfTest4 n = //perfTest4 100000 0.362 0.442 //perfTest4 1000000 3.533 4.656 -(* -let FindPackages(_,_,_,_) = [| "package" |] - -type Source = { IsNuget: bool; Url: string } -let DefaultNugetSource = { IsNuget = true; Url = "http://nuget.org" } - -//------------------------------- -let SearchPackagesByName(sources, search) = - let sources = [ yield! sources; yield DefaultNugetSource ] - [ for source in sources -> - asyncSeq { if source.IsNuget then - for p in FindPackages(None, source.Url, search, 1000) do - yield p } ] - |> AsyncSeq.mergeAll -*) +[] +let ``AsyncSeq.unfoldAsync should be iterable in finite resources``() = + let generator state = + async { + if state < 10000 then + return Some ((), state + 1) + else + return None + } + AsyncSeq.unfoldAsync generator 0 + |> AsyncSeq.iter ignore + |> Async.RunSynchronously From 6c3995f2358695c7a48f2b5c800655f4753a6157 Mon Sep 17 00:00:00 2001 From: Don Syme Date: Thu, 15 Oct 2015 23:27:02 +0100 Subject: [PATCH 2/2] cleanup states --- src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 82 ++++++++++++------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index dcf361e..40b7d7b 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -103,6 +103,7 @@ module AsyncSeq = return (if res then Some v else None) } member x.Dispose() = () } } + [] type AppendState<'T> = | NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T> | HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T> @@ -111,11 +112,9 @@ module AsyncSeq = | Finished let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted1 (inp1, inp2) { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (AppendState.NotStarted1 (inp1, inp2) ) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with @@ -164,17 +163,16 @@ module AsyncSeq = member x.GetEnumerator() = f().GetEnumerator() } + [] type BindState<'T,'U> = | NotStarted of Async<'T> | HaveEnumerator of IAsyncEnumerator<'U> | Finished let bindAsync (f: 'T -> AsyncSeq<'U>) (inp : Async<'T>) : AsyncSeq<'U> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (BindState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with @@ -234,6 +232,7 @@ module AsyncSeq = let! moven = ie.MoveNext() b := moven } + [] type TryWithState<'T> = | NotStarted of AsyncSeq<'T> | HaveBodyEnumerator of IAsyncEnumerator<'T> @@ -243,14 +242,13 @@ module AsyncSeq = /// Implements the 'TryWith' functionality for computation builder let tryWith (inp: AsyncSeq<'T>) (handler : exn -> AsyncSeq<'T>) : AsyncSeq<'T> = // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (TryWithState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | NotStarted inp -> + | TryWithState.NotStarted inp -> let res = ref Unchecked.defaultof<_> try res := Choice1Of2 (inp.GetEnumerator()) @@ -302,6 +300,7 @@ module AsyncSeq = | _ -> () } } + [] type TryFinallyState<'T> = | NotStarted of AsyncSeq<'T> | HaveBodyEnumerator of IAsyncEnumerator<'T> @@ -310,11 +309,9 @@ module AsyncSeq = // This pushes the handler through all the async computations // The (synchronous) compensation is run when the Dispose() is called let tryFinally (inp: AsyncSeq<'T>) (compensation : unit -> unit) : AsyncSeq<'T> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (TryFinallyState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with @@ -341,6 +338,7 @@ module AsyncSeq = | _ -> () } } + [] type CollectState<'T,'U> = | NotStarted of AsyncSeq<'T> | HaveInputEnumerator of IAsyncEnumerator<'T> @@ -348,34 +346,32 @@ module AsyncSeq = | Finished let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (CollectState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | NotStarted inp -> + | CollectState.NotStarted inp -> return! (let e1 = inp.GetEnumerator() - state := HaveInputEnumerator e1 + state := CollectState.HaveInputEnumerator e1 x.MoveNext()) - | HaveInputEnumerator e1 -> + | CollectState.HaveInputEnumerator e1 -> let! res1 = e1.MoveNext() return! (match res1 with | Some v1 -> let e2 = (f v1).GetEnumerator() - state := HaveInnerEnumerator (e1, e2) + state := CollectState.HaveInnerEnumerator (e1, e2) | None -> x.Dispose() x.MoveNext()) - | HaveInnerEnumerator (e1, e2) -> + | CollectState.HaveInnerEnumerator (e1, e2) -> let! res2 = e2.MoveNext() match res2 with | None -> - state := HaveInputEnumerator e1 + state := CollectState.HaveInputEnumerator e1 dispose e2 return! x.MoveNext() | Some _ -> @@ -384,15 +380,16 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | HaveInputEnumerator e1 -> - state := Finished + | CollectState.HaveInputEnumerator e1 -> + state := CollectState.Finished dispose e1 - | HaveInnerEnumerator (e1, e2) -> - state := Finished + | CollectState.HaveInnerEnumerator (e1, e2) -> + state := CollectState.Finished dispose e2 dispose e1 | _ -> () } } + [] type CollectSeqState<'T,'U> = | NotStarted of seq<'T> | HaveInputEnumerator of IEnumerator<'T> @@ -401,33 +398,31 @@ module AsyncSeq = // Like collect, but the input is a sequence, where no bind is required on each step of the enumeration let collectSeq (f: 'T -> AsyncSeq<'U>) (inp: seq<'T>) : AsyncSeq<'U> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (CollectSeqState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | NotStarted inp -> + | CollectSeqState.NotStarted inp -> return! (let e1 = inp.GetEnumerator() - state := HaveInputEnumerator e1 + state := CollectSeqState.HaveInputEnumerator e1 x.MoveNext()) - | HaveInputEnumerator e1 -> + | CollectSeqState.HaveInputEnumerator e1 -> return! (if e1.MoveNext() then let e2 = (f e1.Current).GetEnumerator() - state := HaveInnerEnumerator (e1, e2) + state := CollectSeqState.HaveInnerEnumerator (e1, e2) else x.Dispose() x.MoveNext()) - | HaveInnerEnumerator (e1, e2)-> + | CollectSeqState.HaveInnerEnumerator (e1, e2)-> let! res2 = e2.MoveNext() match res2 with | None -> return! - (state := HaveInputEnumerator e1 + (state := CollectSeqState.HaveInputEnumerator e1 dispose e2 x.MoveNext()) | Some _ -> @@ -435,35 +430,34 @@ module AsyncSeq = | _ -> return None} member x.Dispose() = match !state with - | HaveInputEnumerator e1 -> - state := Finished + | CollectSeqState.HaveInputEnumerator e1 -> + state := CollectSeqState.Finished dispose e1 - | HaveInnerEnumerator (e1, e2) -> - state := Finished + | CollectSeqState.HaveInnerEnumerator (e1, e2) -> + state := CollectSeqState.Finished dispose e2 dispose e1 x.Dispose() | _ -> () } } + [] type MapState<'T> = | NotStarted of seq<'T> | HaveEnumerator of IEnumerator<'T> | Finished let ofSeq (inp: seq<'T>) : AsyncSeq<'T> = - // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - let initialState = NotStarted inp { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref initialState + let state = ref (MapState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | NotStarted inp -> + | MapState.NotStarted inp -> let e = inp.GetEnumerator() state := MapState.HaveEnumerator e return! x.MoveNext() - | HaveEnumerator e -> + | MapState.HaveEnumerator e -> return (if e.MoveNext() then Some e.Current @@ -473,7 +467,7 @@ module AsyncSeq = | _ -> return None } member x.Dispose() = match !state with - | HaveEnumerator e -> + | MapState.HaveEnumerator e -> state := MapState.Finished dispose e | _ -> () } }