diff --git a/docs/content/library/AsyncSeqExamples.fsx b/docs/content/library/AsyncSeqExamples.fsx index 663d9af..9605314 100644 --- a/docs/content/library/AsyncSeqExamples.fsx +++ b/docs/content/library/AsyncSeqExamples.fsx @@ -11,10 +11,14 @@ open FSharp.Control (** -### Group By +## Group By -Suppose we would like to consume a stream of events `AsyncSeq` and perform an operation on each event. -The operation on each event is of type `Event -> Async`. This can be done as follows: +`AsyncSeq.groupBy` partitions an input sequence into sub-sequences with respect to the specified `projection` function. This operation is the asynchronous analog to `Seq.groupBy`. + + +## Use Case + +Suppose we would like to consume a stream of events `AsyncSeq` and perform an operation on each event. The operation on each event is of type `Event -> Async`. This can be done as follows: *) @@ -94,18 +98,84 @@ The above workflow: 4. Buffers elements of each sub-sequence by time and space. 5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially. +--- + *) (** -### Merge +## Merge + +`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input sequence produces a value, it emits an element when *both* sequences emit a value. This operation is also in contrast to `AsyncSeq.append` which concatenates two async sequences, emitting all element of one, followed by all elements of the another. + +### Example Execution + +An example execution can be depicted visually as follows: + +----------------------------------------- +| source1 | t0 | | t1 | | | t2 | +| source2 | | u0 | | | u1 | | +| result | t0 | u0 | t1 | | u1 | t2 | +----------------------------------------- + +### Use Case + +Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use `AsyncSeq.merge` as follows: + +``` + +/// Represents an stream emitting elements on a specified interval. +let intervalMs (periodMs:int) = asyncSeq { + yield DateTime.UtcNow + while true do + do! Async.Sleep periodMs + yield DateTime.UtcNow } + +let either : AsyncSeq = + AsyncSeq.merge (intervalMs 20) (intervalMs 30) + +The sequence `either` emits an element every 20ms and every 30ms. + +``` + +--- + +*) + -`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements -whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation -is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input -sequence produces a value, it emits an element when *both* sequences emit a value. + +(** + +## Combine Latest + + +`AsyncSeq.combineLatest` non-deterministically merges two async sequences much like `AsyncSeq.merge`, combining their elements using the specified `combine` function. The resulting async sequence will only contain elements if both of the source sequences produce at least one element. After combining the first elements the source sequences, this operation emits elements when either source sequence emits an element, passing the newly emitted element as one of the arguments to the `combine` function, the other being the previously emitted element of that type. + +### Example Execution + +An example execution can be depicted visually as follows: + +``` + +---------------------------------------- +| source1 | a0 | | | a1 | | a2 | +| source2 | | b0 | b1 | | | | +| result | | c0 | c1 | c2 | | c3 | +---------------------------------------- + +where + +c0 = f a0 b0 +c1 = f a0 b1 +c2 = f a1 b1 +c3 = f a2 b1 + + +``` + +### Use Case Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an `AsyncSeq`. To gain intuition for this, consider the [Consul](https://www.consul.io/) configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store @@ -149,12 +219,104 @@ Putting it all together: *) -let changesOrInterval : AsyncSeq> = - AsyncSeq.mergeChoice (changes ("myKey", 0L)) (intervalMs (1000 * 60)) +let changesOrInterval : AsyncSeq = + AsyncSeq.combineLatest (fun v _ -> v) (changes ("myKey", 0L)) (intervalMs (1000 * 60)) (** We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight. -*) \ No newline at end of file +--- + +*) + + + + + +(** + +## Distinct Until Changed + +`AsyncSeq.distinctUntilChanged` returns an async sequence which returns every element of the source sequence, skipping elements which equal its predecessor. + +## Example + +An example execution can be visualized as follows: + +----------------------------------- +| source | a | a | b | b | b | a | +| result | a | | b | | | a | +----------------------------------- + +## Use Case + +Suppose you're polling a resource which returns status information of a background job. + +*) + +type Status = { + completed : int + finished : bool + result : string +} + +/// Gets the status of a job. +let status : Async = + failwith "" + +let statuses : AsyncSeq = + asyncSeq { + let! s = status + while true do + do! Async.Sleep 1000 + let! s = status + yield s } + +(** + +The async sequence `statuses` will return a status every second. It will return a status regardless of whether the status changed. Assuming the status changes monotonically, we can use `AsyncSeq.distinctUntilChanged` to transform `statuses` into an async sequence of distinct statuses: + +*) + +let distinctStatuses : AsyncSeq = + statuses |> AsyncSeq.distinctUntilChanged + + +(** + +Finally, we can create a workflow which prints the status every time a change is detected and terminates when the underlying job reaches the `finished` state: + +*) + +let result : Async = + distinctStatuses + |> AsyncSeq.pick (fun st -> + printfn "status=%A" st + if st.finished then Some st.result + else None) + +(** + +--- + +*) + + + + + + + + + + + + + + + + + + diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 67fb5f7..bb011b7 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -524,23 +524,40 @@ module AsyncSeq = seq |> iterAsync action let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> = - asyncSeq { - let! v = f s - match v with - | None -> () - | Some (v,s2) -> - yield v - yield! unfoldAsync f s2 } + asyncSeq { + let s = ref s + let fin = ref false + while not !fin do + let! next = f !s + match next with + | None -> + fin := true + | Some (a,s') -> + yield a + s := s' } let replicateInfinite (v:'T) : AsyncSeq<'T> = asyncSeq { while true do yield v } + let replicateInfiniteAsync (v:Async<'T>) : AsyncSeq<'T> = + asyncSeq { + while true do + let! v = v + yield v } + let replicate (count:int) (v:'T) : AsyncSeq<'T> = asyncSeq { for i in 1 .. count do yield v } + + let intervalMs (periodMs:int) = asyncSeq { + yield DateTime.UtcNow + while true do + do! Async.Sleep periodMs + yield DateTime.UtcNow } + // -------------------------------------------------------------------------- // Additional combinators (implemented as async/asyncSeq computations) @@ -887,7 +904,7 @@ module AsyncSeq = let zapp (fs:AsyncSeq<'T -> 'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = zipWith (|>) s fs - + let takeWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq { use ie = source.GetEnumerator() let! move = ie.MoveNext() @@ -1095,9 +1112,7 @@ module AsyncSeq = yield! loop None timeoutMs } - let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq> = asyncSeq { - use ie1 = source1.GetEnumerator() - use ie2 = source2.GetEnumerator() + let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq> = asyncSeq { let! move1T = Async.StartChildAsTask (ie1.MoveNext()) let! move2T = Async.StartChildAsTask (ie2.MoveNext()) let! move = Async.chooseTasks move1T move2T @@ -1134,6 +1149,10 @@ module AsyncSeq = b1 := move1n | _ -> failwith "unreachable" } + let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq> = asyncSeq { + use ie1 = source1.GetEnumerator() + use ie2 = source2.GetEnumerator() + yield! mergeChoiceEnum ie1 ie2 } let merge (source1:AsyncSeq<'T>) (source2:AsyncSeq<'T>) : AsyncSeq<'T> = mergeChoice source1 source2 |> map (function Choice1Of2 x -> x | Choice2Of2 x -> x) @@ -1174,6 +1193,40 @@ module AsyncSeq = fin := fin.Value - 1 } + let combineLatestWithAsync (f:'a -> 'b -> Async<'c>) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> = + asyncSeq { + use en1 = source1.GetEnumerator() + use en2 = source2.GetEnumerator() + let! a = Async.StartChild (en1.MoveNext()) + let! b = Async.StartChild (en2.MoveNext()) + let! a = a + let! b = b + match a,b with + | Some a, Some b -> + let! c = f a b + yield c + let merged = mergeChoiceEnum en1 en2 + use mergedEnum = merged.GetEnumerator() + let rec loop (prevA:'a, prevB:'b) = asyncSeq { + let! next = mergedEnum.MoveNext () + match next with + | None -> () + | Some (Choice1Of2 nextA) -> + let! c = f nextA prevB + yield c + yield! loop (nextA,prevB) + | Some (Choice2Of2 nextB) -> + let! c = f prevA nextB + yield c + yield! loop (prevA,nextB) } + yield! loop (a,b) + | _ -> () } + + let combineLatestWith (f:'a -> 'b -> 'c) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> = + combineLatestWithAsync (fun a b -> f a b |> async.Return) source1 source2 + + let combineLatest (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'a * 'b> = + combineLatestWith (fun a b -> a,b) source1 source2 let distinctUntilChangedWithAsync (f:'T -> 'T -> Async) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { use ie = source.GetEnumerator() @@ -1242,7 +1295,6 @@ module AsyncSeq = return Choice1Of2 (asyncSeq { for v in res do yield v }) } - module AsyncSeqSrcImpl = let private createNode () = diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 315904c..9828ff1 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -50,6 +50,12 @@ module AsyncSeq = /// Creates an infinite async sequence which repeats the specified value. val replicateInfinite : v:'T -> AsyncSeq<'T> + /// Creates an infinite async sequence which repeatedly evaluates and emits the specified async value. + val replicateInfiniteAsync : v:Async<'T> -> AsyncSeq<'T> + + /// Returns an async sequence which emits an element on a specified period. + val intervalMs : periodMs:int -> AsyncSeq + /// Yields all elements of the first asynchronous sequence and then /// all elements of the second asynchronous sequence. val append : seq1:AsyncSeq<'T> -> seq2:AsyncSeq<'T> -> AsyncSeq<'T> @@ -294,6 +300,21 @@ module AsyncSeq = /// Feeds an async sequence of values into an async sequence of functions. val zapp : functions:AsyncSeq<('T -> 'U)> -> source:AsyncSeq<'T> -> AsyncSeq<'U> + /// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either + /// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. + /// If either of the input sequences is empty, the resulting sequence is empty. + val combineLatestWithAsync : combine:('T -> 'U -> Async<'V>) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V> + + /// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either + /// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. + /// If either of the input sequences is empty, the resulting sequence is empty. + val combineLatestWith : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V> + + /// Merges two async sequences. The resulting async sequence produces an element when either + /// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. + /// If either of the input sequences is empty, the resulting sequence is empty. + val combineLatest : source1:AsyncSeq<'a> -> source2:AsyncSeq<'b> -> AsyncSeq<'a * 'b> + /// Traverses an async sequence an applies to specified function such that if None is returned the traversal short-circuits /// and None is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Some. val traverseOptionAsync : mapping:('T -> Async<'U option>) -> source:AsyncSeq<'T> -> Async option> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 772cf8b..a00f2ca 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -15,14 +15,44 @@ open FSharp.Control open System module AsyncOps = - let unit = async.Return() - let never = Async.Sleep(-1) + let unit = async.Return() + let never = Async.Sleep(-1) + let timeoutMs (timeoutMs:int) (a:Async<'a>) = async { + let! a = Async.StartChild(a, timeoutMs) + return! a } + +module AsyncSeq = + [] + let never<'a> : AsyncSeq<'a> = asyncSeq { + do! AsyncOps.never + yield invalidOp "" } + + +let DEFAULT_TIMEOUT_MS = 2000 + +let randomDelayMs (minMs:int) (maxMs:int) (s:AsyncSeq<'a>) = + let rand = new Random(int DateTime.Now.Ticks) + let randSleep = async { do! Async.Sleep(rand.Next(minMs, maxMs)) } + AsyncSeq.zipWith (fun _ a -> a) (AsyncSeq.replicateInfiniteAsync randSleep) s + +let randomDelayDefault (s:AsyncSeq<'a>) = + randomDelayMs 0 50 s let catch (f:'a -> 'b) : 'a -> Choice<'b, exn> = fun a -> try f a |> Choice1Of2 with ex -> ex |> Choice2Of2 +let rec IsCancellationExn (e:exn) = + match e with + | :? OperationCanceledException -> true + | :? TimeoutException -> true + | :? AggregateException as x -> x.InnerExceptions |> Seq.filter (IsCancellationExn) |> Seq.isEmpty |> not + | _ -> false + +let AreCancellationExns (e1:exn) (e2:exn) = + IsCancellationExn e1 && IsCancellationExn e2 + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) = let exp = a |> AsyncSeq.toList @@ -34,20 +64,39 @@ let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) = false type Assert with + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>) = - Assert.AreEqual (expected, actual, 1000, exnEq=(fun _ _ -> true)) + Assert.AreEqual (expected, actual, DEFAULT_TIMEOUT_MS, exnEq=(fun _ _ -> true), message=null) + + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. + static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, message:string) = + Assert.AreEqual (expected, actual, DEFAULT_TIMEOUT_MS, exnEq=(fun _ _ -> true), message=message) + + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. + static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, exnEq) = + Assert.AreEqual (expected, actual, timeout=DEFAULT_TIMEOUT_MS, exnEq=exnEq, message=null) + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout) = - Assert.AreEqual (expected, actual, timeout=timeout, exnEq=(fun _ _ -> true)) + Assert.AreEqual (expected, actual, timeout=timeout, exnEq=(fun _ _ -> true), message=null) + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. - static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout, exnEq:exn -> exn -> bool) = - let exp = expected |> AsyncSeq.toListAsync |> Async.Catch - let exp = Async.RunSynchronously (exp, timeout) - let act = actual |> AsyncSeq.toListAsync |> Async.Catch - let act = Async.RunSynchronously(act, timeout) - let message = sprintf "expected=%A actual=%A" exp act - match exp,act with + static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout, exnEq) = + Assert.AreEqual (expected, actual, timeout=timeout, exnEq=exnEq, message=null) + + /// Determines equality of two async sequences by convering them to lists, ignoring side-effects. + /// Exceptions are caught and compared for equality. + /// Timeouts ensure liveness. + static member AreEqual (expected:AsyncSeq<'a>, actual:AsyncSeq<'a>, timeout, exnEq:exn -> exn -> bool, message:string) = + let expected = expected |> AsyncSeq.toListAsync |> AsyncOps.timeoutMs timeout |> Async.Catch + let expected = Async.RunSynchronously (expected) + let actual = actual |> AsyncSeq.toListAsync |> AsyncOps.timeoutMs timeout |> Async.Catch + let actual = Async.RunSynchronously (actual) + let message = + if message = null then sprintf "expected=%A actual=%A" expected actual + else sprintf "message=%s expected=%A actual=%A" message expected actual + match expected,actual with | Choice1Of2 exp, Choice1Of2 act -> Assert.True((exp = act), message) | Choice2Of2 exp, Choice2Of2 act -> @@ -68,8 +117,10 @@ type Assert with Assert.Fail(message) - +[] +let ``AsyncSeq.never should equal itself`` () = + Assert.AreEqual(AsyncSeq.never, AsyncSeq.never, timeout=100, exnEq=AreCancellationExns) [] let ``AsyncSeq.toArray``() = @@ -99,7 +150,7 @@ let ``AsyncSeq.concatSeq works``() = let ls = [ [1;2] ; [3;4] ] let actual = AsyncSeq.ofSeq ls |> AsyncSeq.concatSeq let expected = ls |> List.concat |> AsyncSeq.ofSeq - Assert.True(EQ expected actual) + Assert.AreEqual(expected, actual) [] let ``AsyncSeq.sum works``() = @@ -653,7 +704,7 @@ let ``AsyncSeq.collect works``() = for c in [0; 1; 10] do let actual = AsyncSeq.collect (fun i -> AsyncSeq.ofSeq [ 0 .. i]) (AsyncSeq.ofSeq [ 0 .. c ]) let expected = [ for i in 0 .. c do yield! [ 0 .. i ] ] |> AsyncSeq.ofSeq - Assert.True(EQ expected actual) + Assert.AreEqual(expected, actual) [] @@ -1219,4 +1270,27 @@ let ``AsyncSeq.groupBy should propagate exception and terminate all groups``() = asyncSeq { raise (exn("test")) } |> AsyncSeq.groupBy (fun i -> i % 3) |> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync) - Assert.AreEqual(expected, actual) \ No newline at end of file + Assert.AreEqual(expected, actual) + +[] +let ``AsyncSeq.combineLatest should behave like merge after initial``() = + for n in 0..20 do + for m in 0..10 do + let ls1 = List.init n id + let ls2 = List.init m id + // expect each element to increase combined sum by 1 + // expected count is sum of source counts minus 1 for first result + let expectedCount = + if n = 0 || m = 0 then 0 + else (n + m - 1) + let expected = List.init expectedCount id |> AsyncSeq.ofSeq + let actual = AsyncSeq.combineLatestWith (+) (AsyncSeq.ofSeq ls1 |> randomDelayDefault) (AsyncSeq.ofSeq ls2 |> randomDelayDefault) + Assert.AreEqual(expected, actual, (sprintf "n=%i m=%i" n m)) + +[] +let ``AsyncSeq.combineLatest should be never when either argument is never``() = + let expected = AsyncSeq.never + let actual1 = AsyncSeq.combineLatestWith (fun _ _ -> 0) (AsyncSeq.never) (AsyncSeq.singleton 1) + let actual2 = AsyncSeq.combineLatestWith (fun _ _ -> 0) (AsyncSeq.singleton 1) (AsyncSeq.never) + Assert.AreEqual(expected, actual1, timeout=100, exnEq=AreCancellationExns) + Assert.AreEqual(expected, actual2, timeout=100, exnEq=AreCancellationExns) \ No newline at end of file