diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 78dd353..7e5e06c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +### 2.0.2 - 15.10.2015 +* Fix leak in AsyncSeq.append and other derived generators + ### 2.0.1 - 01.06.2015 * Add AsyncSeq.sum, length, contains, exists, forall, tryPick, tryFind diff --git a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs index edf9482..77c70ac 100644 --- a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs +++ b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs @@ -4,9 +4,9 @@ open System.Reflection [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "2.0.0" + let [] Version = "2.0.1" diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 1ea75c2..40b7d7b 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -6,6 +6,7 @@ namespace FSharp.Control open System open System.IO +open System.Collections.Generic open System.Threading open System.Threading.Tasks open System.Runtime.ExceptionServices @@ -102,51 +103,47 @@ module AsyncSeq = return (if res then Some v else None) } member x.Dispose() = () } } - type AppendState = - | NotStarted1 = -1 - | HaveEnumerator1 = 0 - | NotStarted2 = 1 - | HaveEnumerator2 = 2 - | Finished = 3 + [] + type AppendState<'T> = + | NotStarted1 of AsyncSeq<'T> * AsyncSeq<'T> + | HaveEnumerator1 of IAsyncEnumerator<'T> * AsyncSeq<'T> + | NotStarted2 of AsyncSeq<'T> + | HaveEnumerator2 of IAsyncEnumerator<'T> + | Finished let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> = { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref AppendState.NotStarted1 - let enum = ref Unchecked.defaultof> + let state = ref (AppendState.NotStarted1 (inp1, inp2) ) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | AppendState.NotStarted1 -> + | AppendState.NotStarted1 (inp1, inp2) -> return! - (enum := inp1.GetEnumerator() - state := AppendState.HaveEnumerator1 + (let enum1 = inp1.GetEnumerator() + state := AppendState.HaveEnumerator1 (enum1, inp2) x.MoveNext()) - | AppendState.HaveEnumerator1 -> - let e = enum.Value - let! res = e.MoveNext() + | AppendState.HaveEnumerator1 (enum1, inp2) -> + let! res = enum1.MoveNext() match res with | None -> return! - (state := AppendState.NotStarted2 - enum := Unchecked.defaultof<_> - dispose e + (state := AppendState.NotStarted2 inp2 + dispose enum1 x.MoveNext()) | Some _ -> return res - | AppendState.NotStarted2 -> + | AppendState.NotStarted2 inp2 -> return! - (enum := inp2.GetEnumerator() - state := AppendState.HaveEnumerator2 + (let enum2 = inp2.GetEnumerator() + state := AppendState.HaveEnumerator2 enum2 x.MoveNext()) - | AppendState.HaveEnumerator2 -> - let e = enum.Value - let! res = e.MoveNext() + | AppendState.HaveEnumerator2 enum2 -> + let! res = enum2.MoveNext() return (match res with | None -> state := AppendState.Finished - enum := Unchecked.defaultof<_> - dispose e + dispose enum2 None | Some _ -> res) @@ -154,12 +151,10 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | AppendState.HaveEnumerator1 - | AppendState.HaveEnumerator2 -> - let e = enum.Value + | AppendState.HaveEnumerator1 (enum, _) + | AppendState.HaveEnumerator2 enum -> state := AppendState.Finished - enum := Unchecked.defaultof<_> - dispose e + dispose enum | _ -> () } } @@ -168,29 +163,28 @@ module AsyncSeq = member x.GetEnumerator() = f().GetEnumerator() } - type BindState = - | NotStarted = -1 - | HaveEnumerator = 0 - | Finished = 1 + [] + type BindState<'T,'U> = + | NotStarted of Async<'T> + | HaveEnumerator of IAsyncEnumerator<'U> + | Finished let bindAsync (f: 'T -> AsyncSeq<'U>) (inp : Async<'T>) : AsyncSeq<'U> = { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref BindState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref (BindState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | BindState.NotStarted -> + | BindState.NotStarted inp -> let! v = inp return! (let s = f v let e = s.GetEnumerator() - enum := e - state := BindState.HaveEnumerator + state := BindState.HaveEnumerator e x.MoveNext()) - | BindState.HaveEnumerator -> - let! res = enum.Value.MoveNext() + | BindState.HaveEnumerator e -> + let! res = e.MoveNext() return (match res with | None -> x.Dispose() | Some _ -> () @@ -199,10 +193,8 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | BindState.HaveEnumerator -> - let e = enum.Value + | BindState.HaveEnumerator e -> state := BindState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } @@ -240,22 +232,23 @@ module AsyncSeq = let! moven = ie.MoveNext() b := moven } - type TryWithState = - | NotStarted = -1 - | HaveBodyEnumerator = 0 - | HaveHandlerEnumerator = 1 - | Finished = 2 + [] + type TryWithState<'T> = + | NotStarted of AsyncSeq<'T> + | HaveBodyEnumerator of IAsyncEnumerator<'T> + | HaveHandlerEnumerator of IAsyncEnumerator<'T> + | Finished /// Implements the 'TryWith' functionality for computation builder let tryWith (inp: AsyncSeq<'T>) (handler : exn -> AsyncSeq<'T>) : AsyncSeq<'T> = + // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref TryWithState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref (TryWithState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | TryWithState.NotStarted -> + | TryWithState.NotStarted inp -> let res = ref Unchecked.defaultof<_> try res := Choice1Of2 (inp.GetEnumerator()) @@ -264,17 +257,15 @@ module AsyncSeq = match res.Value with | Choice1Of2 r -> return! - (enum := r - state := TryWithState.HaveBodyEnumerator + (state := TryWithState.HaveBodyEnumerator r x.MoveNext()) | Choice2Of2 exn -> return! (x.Dispose() - enum := (handler exn).GetEnumerator() - state := TryWithState.HaveHandlerEnumerator + let enum = (handler exn).GetEnumerator() + state := TryWithState.HaveHandlerEnumerator enum x.MoveNext()) - | TryWithState.HaveBodyEnumerator -> - let e = enum.Value + | TryWithState.HaveBodyEnumerator e -> let res = ref Unchecked.defaultof<_> try let! r = e.MoveNext() @@ -291,11 +282,11 @@ module AsyncSeq = | Choice2Of2 exn -> return! (x.Dispose() - enum := (handler exn).GetEnumerator() - state := TryWithState.HaveHandlerEnumerator + let e = (handler exn).GetEnumerator() + state := TryWithState.HaveHandlerEnumerator e x.MoveNext()) - | TryWithState.HaveHandlerEnumerator -> - let! res = enum.Value.MoveNext() + | TryWithState.HaveHandlerEnumerator e -> + let! res = e.MoveNext() return (match res with | Some _ -> res | None -> x.Dispose(); None) @@ -303,37 +294,34 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | TryWithState.HaveBodyEnumerator | TryWithState.HaveHandlerEnumerator -> - let e = enum.Value + | TryWithState.HaveBodyEnumerator e | TryWithState.HaveHandlerEnumerator e -> state := TryWithState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } - type TryFinallyState = - | NotStarted = -1 - | HaveBodyEnumerator = 0 - | Finished = 1 + [] + type TryFinallyState<'T> = + | NotStarted of AsyncSeq<'T> + | HaveBodyEnumerator of IAsyncEnumerator<'T> + | Finished // This pushes the handler through all the async computations // The (synchronous) compensation is run when the Dispose() is called let tryFinally (inp: AsyncSeq<'T>) (compensation : unit -> unit) : AsyncSeq<'T> = { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref TryFinallyState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref (TryFinallyState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | TryFinallyState.NotStarted -> + | TryFinallyState.NotStarted inp -> return! (let e = inp.GetEnumerator() - enum := e - state := TryFinallyState.HaveBodyEnumerator + state := TryFinallyState.HaveBodyEnumerator e x.MoveNext()) - | TryFinallyState.HaveBodyEnumerator -> - let! res = enum.Value.MoveNext() + | TryFinallyState.HaveBodyEnumerator e -> + let! res = e.MoveNext() return (match res with | None -> x.Dispose() @@ -343,52 +331,47 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | TryFinallyState.HaveBodyEnumerator -> - let e = enum.Value + | TryFinallyState.HaveBodyEnumerator e-> state := TryFinallyState.Finished - enum := Unchecked.defaultof<_> dispose e compensation() | _ -> () } } - type CollectState = - | NotStarted = -1 - | HaveInputEnumerator = 0 - | HaveInnerEnumerator = 1 - | Finished = 2 + [] + type CollectState<'T,'U> = + | NotStarted of AsyncSeq<'T> + | HaveInputEnumerator of IAsyncEnumerator<'T> + | HaveInnerEnumerator of IAsyncEnumerator<'T> * IAsyncEnumerator<'U> + | Finished let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> = { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref CollectState.NotStarted - let enum1 = ref Unchecked.defaultof> - let enum2 = ref Unchecked.defaultof> + let state = ref (CollectState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | CollectState.NotStarted -> + | CollectState.NotStarted inp -> return! - (enum1 := inp.GetEnumerator() - state := CollectState.HaveInputEnumerator + (let e1 = inp.GetEnumerator() + state := CollectState.HaveInputEnumerator e1 x.MoveNext()) - | CollectState.HaveInputEnumerator -> - let! res1 = enum1.Value.MoveNext() + | CollectState.HaveInputEnumerator e1 -> + let! res1 = e1.MoveNext() return! (match res1 with | Some v1 -> - enum2 := (f v1).GetEnumerator() - state := CollectState.HaveInnerEnumerator + let e2 = (f v1).GetEnumerator() + state := CollectState.HaveInnerEnumerator (e1, e2) | None -> x.Dispose() x.MoveNext()) - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value + | CollectState.HaveInnerEnumerator (e1, e2) -> let! res2 = e2.MoveNext() match res2 with | None -> - enum2 := Unchecked.defaultof<_> - state := CollectState.HaveInputEnumerator + state := CollectState.HaveInputEnumerator e1 dispose e2 return! x.MoveNext() | Some _ -> @@ -397,50 +380,49 @@ module AsyncSeq = return None } member x.Dispose() = match !state with - | CollectState.HaveInputEnumerator -> - let e1 = enum1.Value + | CollectState.HaveInputEnumerator e1 -> state := CollectState.Finished - enum1 := Unchecked.defaultof<_> dispose e1 - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value - state := CollectState.HaveInputEnumerator + | CollectState.HaveInnerEnumerator (e1, e2) -> + state := CollectState.Finished dispose e2 - x.Dispose() + dispose e1 | _ -> () } } + [] + type CollectSeqState<'T,'U> = + | NotStarted of seq<'T> + | HaveInputEnumerator of IEnumerator<'T> + | HaveInnerEnumerator of IEnumerator<'T> * IAsyncEnumerator<'U> + | Finished + // Like collect, but the input is a sequence, where no bind is required on each step of the enumeration let collectSeq (f: 'T -> AsyncSeq<'U>) (inp: seq<'T>) : AsyncSeq<'U> = { new IAsyncEnumerable<'U> with member x.GetEnumerator() = - let state = ref CollectState.NotStarted - let enum1 = ref Unchecked.defaultof> - let enum2 = ref Unchecked.defaultof> + let state = ref (CollectSeqState.NotStarted inp) { new IAsyncEnumerator<'U> with member x.MoveNext() = async { match !state with - | CollectState.NotStarted -> + | CollectSeqState.NotStarted inp -> return! - (enum1 := inp.GetEnumerator() - state := CollectState.HaveInputEnumerator + (let e1 = inp.GetEnumerator() + state := CollectSeqState.HaveInputEnumerator e1 x.MoveNext()) - | CollectState.HaveInputEnumerator -> + | CollectSeqState.HaveInputEnumerator e1 -> return! - (let e1 = enum1.Value - if e1.MoveNext() then - enum2 := (f e1.Current).GetEnumerator() - state := CollectState.HaveInnerEnumerator + (if e1.MoveNext() then + let e2 = (f e1.Current).GetEnumerator() + state := CollectSeqState.HaveInnerEnumerator (e1, e2) else x.Dispose() x.MoveNext()) - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value + | CollectSeqState.HaveInnerEnumerator (e1, e2)-> let! res2 = e2.MoveNext() match res2 with | None -> return! - (enum2 := Unchecked.defaultof<_> - state := CollectState.HaveInputEnumerator + (state := CollectSeqState.HaveInputEnumerator e1 dispose e2 x.MoveNext()) | Some _ -> @@ -448,50 +430,45 @@ module AsyncSeq = | _ -> return None} member x.Dispose() = match !state with - | CollectState.HaveInputEnumerator -> - let e1 = enum1.Value - state := CollectState.Finished - enum1 := Unchecked.defaultof<_> + | CollectSeqState.HaveInputEnumerator e1 -> + state := CollectSeqState.Finished dispose e1 - | CollectState.HaveInnerEnumerator -> - let e2 = enum2.Value - state := CollectState.HaveInputEnumerator + | CollectSeqState.HaveInnerEnumerator (e1, e2) -> + state := CollectSeqState.Finished dispose e2 + dispose e1 x.Dispose() | _ -> () } } - type MapState = - | NotStarted = -1 - | HaveEnumerator = 0 - | Finished = 1 + [] + type MapState<'T> = + | NotStarted of seq<'T> + | HaveEnumerator of IEnumerator<'T> + | Finished let ofSeq (inp: seq<'T>) : AsyncSeq<'T> = { new IAsyncEnumerable<'T> with member x.GetEnumerator() = - let state = ref MapState.NotStarted - let enum = ref Unchecked.defaultof> + let state = ref (MapState.NotStarted inp) { new IAsyncEnumerator<'T> with member x.MoveNext() = async { match !state with - | MapState.NotStarted -> - enum := inp.GetEnumerator() - state := MapState.HaveEnumerator + | MapState.NotStarted inp -> + let e = inp.GetEnumerator() + state := MapState.HaveEnumerator e return! x.MoveNext() - | MapState.HaveEnumerator -> - let e1 = enum.Value + | MapState.HaveEnumerator e -> return - (if e1.MoveNext() then - Some e1.Current + (if e.MoveNext() then + Some e.Current else x.Dispose() None) | _ -> return None } member x.Dispose() = match !state with - | MapState.HaveEnumerator -> - let e = enum.Value + | MapState.HaveEnumerator e -> state := MapState.Finished - enum := Unchecked.defaultof<_> dispose e | _ -> () } } diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index d21b996..695bb35 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -945,6 +945,7 @@ let ``AsyncSeq.interleave should fail with Exception if a task fails``() = |> AsyncSeq.toList |> ignore) |> ignore + let perfTest1 n = let empty = async { return () } Seq.init n id @@ -1020,21 +1021,18 @@ let perfTest4 n = //perfTest4 100000 0.362 0.442 //perfTest4 1000000 3.533 4.656 -(* -let FindPackages(_,_,_,_) = [| "package" |] - -type Source = { IsNuget: bool; Url: string } -let DefaultNugetSource = { IsNuget = true; Url = "http://nuget.org" } - -//------------------------------- -let SearchPackagesByName(sources, search) = - let sources = [ yield! sources; yield DefaultNugetSource ] - [ for source in sources -> - asyncSeq { if source.IsNuget then - for p in FindPackages(None, source.Url, search, 1000) do - yield p } ] - |> AsyncSeq.mergeAll -*) +[] +let ``AsyncSeq.unfoldAsync should be iterable in finite resources``() = + let generator state = + async { + if state < 10000 then + return Some ((), state + 1) + else + return None + } + AsyncSeq.unfoldAsync generator 0 + |> AsyncSeq.iter ignore + |> Async.RunSynchronously