Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 173 additions & 11 deletions docs/content/library/AsyncSeqExamples.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ open FSharp.Control

(**

### Group By
## Group By

Suppose we would like to consume a stream of events `AsyncSeq<Event>` and perform an operation on each event.
The operation on each event is of type `Event -> Async<unit>`. This can be done as follows:
`AsyncSeq.groupBy` partitions an input sequence into sub-sequences with respect to the specified `projection` function. This operation is the asynchronous analog to `Seq.groupBy`.


## Use Case

Suppose we would like to consume a stream of events `AsyncSeq<Event>` and perform an operation on each event. The operation on each event is of type `Event -> Async<unit>`. This can be done as follows:

*)

Expand Down Expand Up @@ -94,18 +98,84 @@ The above workflow:
4. Buffers elements of each sub-sequence by time and space.
5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.

---

*)



(**

### Merge
## Merge

`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input sequence produces a value, it emits an element when *both* sequences emit a value. This operation is also in contrast to `AsyncSeq.append` which concatenates two async sequences, emitting all element of one, followed by all elements of the another.

### Example Execution

An example execution can be depicted visually as follows:

-----------------------------------------
| source1 | t0 | | t1 | | | t2 |
| source2 | | u0 | | | u1 | |
| result | t0 | u0 | t1 | | u1 | t2 |
-----------------------------------------

### Use Case

Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use `AsyncSeq.merge` as follows:

```

/// Represents an stream emitting elements on a specified interval.
let intervalMs (periodMs:int) = asyncSeq {
yield DateTime.UtcNow
while true do
do! Async.Sleep periodMs
yield DateTime.UtcNow }

let either : AsyncSeq<DateTime> =
AsyncSeq.merge (intervalMs 20) (intervalMs 30)

The sequence `either` emits an element every 20ms and every 30ms.

```

---

*)


`AsyncSeq.merge` non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements
whenever *either* input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation
is in contrast to `AsyncSeq.zip` which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when *either* input
sequence produces a value, it emits an element when *both* sequences emit a value.

(**

## Combine Latest


`AsyncSeq.combineLatest` non-deterministically merges two async sequences much like `AsyncSeq.merge`, combining their elements using the specified `combine` function. The resulting async sequence will only contain elements if both of the source sequences produce at least one element. After combining the first elements the source sequences, this operation emits elements when either source sequence emits an element, passing the newly emitted element as one of the arguments to the `combine` function, the other being the previously emitted element of that type.

### Example Execution

An example execution can be depicted visually as follows:

```

----------------------------------------
| source1 | a0 | | | a1 | | a2 |
| source2 | | b0 | b1 | | | |
| result | | c0 | c1 | c2 | | c3 |
----------------------------------------

where

c0 = f a0 b0
c1 = f a0 b1
c2 = f a1 b1
c3 = f a2 b1


```

### Use Case

Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an `AsyncSeq`. To gain intuition for this, consider the [Consul](https://www.consul.io/)
configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store
Expand Down Expand Up @@ -149,12 +219,104 @@ Putting it all together:

*)

let changesOrInterval : AsyncSeq<Choice<Value, DateTime>> =
AsyncSeq.mergeChoice (changes ("myKey", 0L)) (intervalMs (1000 * 60))
let changesOrInterval : AsyncSeq<Value> =
AsyncSeq.combineLatest (fun v _ -> v) (changes ("myKey", 0L)) (intervalMs (1000 * 60))


(**

We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.

*)
---

*)





(**

## Distinct Until Changed

`AsyncSeq.distinctUntilChanged` returns an async sequence which returns every element of the source sequence, skipping elements which equal its predecessor.

## Example

An example execution can be visualized as follows:

-----------------------------------
| source | a | a | b | b | b | a |
| result | a | | b | | | a |
-----------------------------------

## Use Case

Suppose you're polling a resource which returns status information of a background job.

*)

type Status = {
completed : int
finished : bool
result : string
}

/// Gets the status of a job.
let status : Async<Status> =
failwith ""

let statuses : AsyncSeq<Status> =
asyncSeq {
let! s = status
while true do
do! Async.Sleep 1000
let! s = status
yield s }

(**

The async sequence `statuses` will return a status every second. It will return a status regardless of whether the status changed. Assuming the status changes monotonically, we can use `AsyncSeq.distinctUntilChanged` to transform `statuses` into an async sequence of distinct statuses:

*)

let distinctStatuses : AsyncSeq<Status> =
statuses |> AsyncSeq.distinctUntilChanged


(**

Finally, we can create a workflow which prints the status every time a change is detected and terminates when the underlying job reaches the `finished` state:

*)

let result : Async<string> =
distinctStatuses
|> AsyncSeq.pick (fun st ->
printfn "status=%A" st
if st.finished then Some st.result
else None)

(**

---

*)


















76 changes: 64 additions & 12 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -524,23 +524,40 @@ module AsyncSeq =
seq |> iterAsync action

let rec unfoldAsync (f:'State -> Async<('T * 'State) option>) (s:'State) : AsyncSeq<'T> =
asyncSeq {
let! v = f s
match v with
| None -> ()
| Some (v,s2) ->
yield v
yield! unfoldAsync f s2 }
asyncSeq {
let s = ref s
let fin = ref false
while not !fin do
let! next = f !s
match next with
| None ->
fin := true
| Some (a,s') ->
yield a
s := s' }

let replicateInfinite (v:'T) : AsyncSeq<'T> =
asyncSeq {
while true do
yield v }

let replicateInfiniteAsync (v:Async<'T>) : AsyncSeq<'T> =
asyncSeq {
while true do
let! v = v
yield v }

let replicate (count:int) (v:'T) : AsyncSeq<'T> =
asyncSeq {
for i in 1 .. count do
yield v }

let intervalMs (periodMs:int) = asyncSeq {
yield DateTime.UtcNow
while true do
do! Async.Sleep periodMs
yield DateTime.UtcNow }

// --------------------------------------------------------------------------
// Additional combinators (implemented as async/asyncSeq computations)

Expand Down Expand Up @@ -887,7 +904,7 @@ module AsyncSeq =

let zapp (fs:AsyncSeq<'T -> 'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
zipWith (|>) s fs

let takeWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
Expand Down Expand Up @@ -1095,9 +1112,7 @@ module AsyncSeq =
yield! loop None timeoutMs
}

let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
let! move2T = Async.StartChildAsTask (ie2.MoveNext())
let! move = Async.chooseTasks move1T move2T
Expand Down Expand Up @@ -1134,6 +1149,10 @@ module AsyncSeq =
b1 := move1n
| _ -> failwith "unreachable" }

let mergeChoice (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
yield! mergeChoiceEnum ie1 ie2 }

let merge (source1:AsyncSeq<'T>) (source2:AsyncSeq<'T>) : AsyncSeq<'T> =
mergeChoice source1 source2 |> map (function Choice1Of2 x -> x | Choice2Of2 x -> x)
Expand Down Expand Up @@ -1174,6 +1193,40 @@ module AsyncSeq =
fin := fin.Value - 1
}

let combineLatestWithAsync (f:'a -> 'b -> Async<'c>) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> =
asyncSeq {
use en1 = source1.GetEnumerator()
use en2 = source2.GetEnumerator()
let! a = Async.StartChild (en1.MoveNext())
let! b = Async.StartChild (en2.MoveNext())
let! a = a
let! b = b
match a,b with
| Some a, Some b ->
let! c = f a b
yield c
let merged = mergeChoiceEnum en1 en2
use mergedEnum = merged.GetEnumerator()
let rec loop (prevA:'a, prevB:'b) = asyncSeq {
let! next = mergedEnum.MoveNext ()
match next with
| None -> ()
| Some (Choice1Of2 nextA) ->
let! c = f nextA prevB
yield c
yield! loop (nextA,prevB)
| Some (Choice2Of2 nextB) ->
let! c = f prevA nextB
yield c
yield! loop (prevA,nextB) }
yield! loop (a,b)
| _ -> () }

let combineLatestWith (f:'a -> 'b -> 'c) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> =
combineLatestWithAsync (fun a b -> f a b |> async.Return) source1 source2

let combineLatest (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'a * 'b> =
combineLatestWith (fun a b -> a,b) source1 source2

let distinctUntilChangedWithAsync (f:'T -> 'T -> Async<bool>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
use ie = source.GetEnumerator()
Expand Down Expand Up @@ -1242,7 +1295,6 @@ module AsyncSeq =
return Choice1Of2 (asyncSeq { for v in res do yield v })
}


module AsyncSeqSrcImpl =

let private createNode () =
Expand Down
21 changes: 21 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ module AsyncSeq =
/// Creates an infinite async sequence which repeats the specified value.
val replicateInfinite : v:'T -> AsyncSeq<'T>

/// Creates an infinite async sequence which repeatedly evaluates and emits the specified async value.
val replicateInfiniteAsync : v:Async<'T> -> AsyncSeq<'T>

/// Returns an async sequence which emits an element on a specified period.
val intervalMs : periodMs:int -> AsyncSeq<DateTime>

/// Yields all elements of the first asynchronous sequence and then
/// all elements of the second asynchronous sequence.
val append : seq1:AsyncSeq<'T> -> seq2:AsyncSeq<'T> -> AsyncSeq<'T>
Expand Down Expand Up @@ -294,6 +300,21 @@ module AsyncSeq =
/// Feeds an async sequence of values into an async sequence of functions.
val zapp : functions:AsyncSeq<('T -> 'U)> -> source:AsyncSeq<'T> -> AsyncSeq<'U>

/// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either
/// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
/// If either of the input sequences is empty, the resulting sequence is empty.
val combineLatestWithAsync : combine:('T -> 'U -> Async<'V>) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>

/// Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either
/// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
/// If either of the input sequences is empty, the resulting sequence is empty.
val combineLatestWith : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>

/// Merges two async sequences. The resulting async sequence produces an element when either
/// input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
/// If either of the input sequences is empty, the resulting sequence is empty.
val combineLatest : source1:AsyncSeq<'a> -> source2:AsyncSeq<'b> -> AsyncSeq<'a * 'b>

/// Traverses an async sequence an applies to specified function such that if None is returned the traversal short-circuits
/// and None is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Some.
val traverseOptionAsync : mapping:('T -> Async<'U option>) -> source:AsyncSeq<'T> -> Async<AsyncSeq<'U> option>
Expand Down
Loading