Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ module internal Utils =
module internal Choice =

/// Maps over the left result type.
let mapl (f:'a -> 'b) = function
let mapl (f:'T -> 'U) = function
| Choice1Of2 a -> f a |> Choice1Of2
| Choice2Of2 e -> Choice2Of2 e

/// Maps over the right result type.
let mapr (f:'b -> 'c) = function
| Choice1Of2 a -> Choice1Of2 a
| Choice2Of2 e -> f e |> Choice2Of2

// ----------------------------------------------------------------------------

module internal Observable =
Expand Down Expand Up @@ -75,7 +70,7 @@ module internal Utils =

/// Creates an async computations which runs the specified computations
/// in parallel and returns their results.
static member Parallel(a:Async<'a>, b:Async<'b>) : Async<'a * 'b> = async {
static member Parallel(a:Async<'T>, b:Async<'U>) : Async<'T * 'U> = async {
let! a = a |> Async.StartChild
let! b = b |> Async.StartChild
let! a = a
Expand All @@ -95,10 +90,10 @@ module internal Utils =
/// Creates a computation which produces a tuple consiting of the value produces by the first
/// argument computation to complete and a handle to the other computation. The second computation
/// to complete is memoized.
static member internal chooseBoth (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> =
static member internal chooseBoth (a:Async<'T>) (b:Async<'T>) : Async<'T * Async<'T>> =
Async.FromContinuations <| fun (ok,err,cnc) ->
let state = ref 0
let tcs = TaskCompletionSource<'a>()
let tcs = TaskCompletionSource<'T>()
let inline ok a =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok (a, tcs.Task |> Async.AwaitTask)
Expand Down Expand Up @@ -134,7 +129,7 @@ module AsyncSeq =

/// Creates an async sequence which repeats the specified value indefinitely.
let rec replicate (v:'T) : AsyncSeq<'T> =
Cons(v, async.Delay <| fun() -> replicate v) |> async.Return
Cons(v, async.Delay (fun() -> replicate v)) |> async.Return

/// Yields all elements of the first asynchronous sequence and then
/// all elements of the second asynchronous sequence.
Expand Down Expand Up @@ -515,7 +510,7 @@ module AsyncSeq =
// --------------------------------------------------------------------------

/// Threads a state through the mapping over an async sequence using an async function.
let rec threadStateAsync (f:'s -> 'a -> Async<'b * 's>) (st:'s) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
let rec threadStateAsync (f:'State -> 'T -> Async<'U * 'State>) (st:'State) (s:AsyncSeq<'T>) : AsyncSeq<'U> = asyncSeq {
let! s = s
match s with
| Nil -> ()
Expand All @@ -539,7 +534,7 @@ module AsyncSeq =
/// Combines two asynchronous sequences using the specified function.
/// The values from sequences are retrieved in parallel.
/// The resulting sequence stops when either of the argument sequences stop.
let rec zipWithAsync (z:'a -> 'b -> Async<'c>) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> = async {
let rec zipWithAsync (z:'T1 -> 'T2 -> Async<'U>) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> = async {
let! a,b = Async.Parallel(a, b)
match a,b with
| Cons(a, atl), Cons(b, btl) ->
Expand All @@ -550,26 +545,26 @@ module AsyncSeq =
/// Combines two asynchronous sequences using the specified function.
/// The values from sequences are retrieved in parallel.
/// The resulting sequence stops when either of the argument sequences stop.
let inline zipWith (z:'a -> 'b -> 'c) (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<'c> =
let inline zipWith (z:'T1 -> 'T2 -> 'U) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> =
zipWithAsync (fun a b -> z a b |> async.Return) a b

/// Combines two asynchronous sequences using the specified function to which it also passes the index.
/// The values from sequences are retrieved in parallel.
/// The resulting sequence stops when either of the argument sequences stop.
let zipWithIndexAsync (f:int -> 'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
let zipWithIndexAsync (f:int -> 'T -> Async<'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
threadStateAsync (fun i a -> f i a |> Async.map (fun b -> b,i + 1)) 0 s

/// Feeds an async sequence of values into an async sequence of async functions.
let inline zappAsync (fs:AsyncSeq<'a -> Async<'b>>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
let inline zappAsync (fs:AsyncSeq<'T -> Async<'U>>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
zipWithAsync (|>) s fs

/// Feeds an async sequence of values into an async sequence of functions.
let inline zapp (fs:AsyncSeq<'a -> 'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> =
let inline zapp (fs:AsyncSeq<'T -> 'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
zipWith (|>) s fs

/// Traverses an async sequence an applies to specified function such that if None is returned the traversal short-circuits
/// and None is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Some.
let rec traverseOptionAsync (f:'a -> Async<'b option>) (s:AsyncSeq<'a>) : Async<AsyncSeq<'b> option> = async {
let rec traverseOptionAsync (f:'T -> Async<'U option>) (s:AsyncSeq<'T>) : Async<AsyncSeq<'U> option> = async {
let! s = s
match s with
| Nil -> return Some (Nil |> async.Return)
Expand All @@ -583,7 +578,7 @@ module AsyncSeq =

/// Traverses an async sequence an applies to specified function such that if Choice2Of2 is returned the traversal short-circuits
/// and Choice2Of2 is returned as the result. Otherwise, the entire sequence is traversed and the result returned as Choice1Of2.
let rec traverseChoiceAsync (f:'a -> Async<Choice<'b, 'e>>) (s:AsyncSeq<'a>) : Async<Choice<AsyncSeq<'b>, 'e>> = async {
let rec traverseChoiceAsync (f:'T -> Async<Choice<'U, 'e>>) (s:AsyncSeq<'T>) : Async<Choice<AsyncSeq<'U>, 'e>> = async {
let! s = s
match s with
| Nil -> return Choice1Of2 (Nil |> async.Return)
Expand All @@ -609,7 +604,7 @@ module AsyncSeq =

/// Returns elements from the argument async sequence until the specified signal completes or
/// the sequences completes.
let rec takeUntil (signal:Async<unit>) (s:AsyncSeq<'a>) : AsyncSeq<'a> =
let rec takeUntil (signal:Async<unit>) (s:AsyncSeq<'T>) : AsyncSeq<'T> =
Async.chooseBoth (signal |> Async.map Choice1Of2) (s |> Async.map Choice2Of2)
|> Async.map (fun (first,second) ->
match first with
Expand All @@ -632,7 +627,7 @@ module AsyncSeq =
| Nil -> return Nil }

/// Skips elements from an async sequence until the specified signal completes.
let rec skipUntil (signal:Async<unit>) (s:AsyncSeq<'a>) : AsyncSeq<'a> =
let rec skipUntil (signal:Async<unit>) (s:AsyncSeq<'T>) : AsyncSeq<'T> =
Async.chooseBoth (signal |> Async.map Choice1Of2) (s |> Async.map Choice2Of2)
|> Async.bind (fun (first,second) ->
match first with
Expand Down Expand Up @@ -700,13 +695,13 @@ module AsyncSeq =
/// sequences are consumed in lock-step.
let interleave =

let rec left (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<Choice<_,_>> = async {
let rec left (a:AsyncSeq<'T>) (b:AsyncSeq<'U>) : AsyncSeq<Choice<_,_>> = async {
let! a = a
match a with
| Cons (a1, t1) -> return Cons (Choice1Of2 a1, right t1 b)
| Nil -> return! b |> map Choice2Of2 }

and right (a:AsyncSeq<'a>) (b:AsyncSeq<'b>) : AsyncSeq<Choice<_,_>> = async {
and right (a:AsyncSeq<'T>) (b:AsyncSeq<'U>) : AsyncSeq<Choice<_,_>> = async {
let! b = b
match b with
| Cons (a2, t2) -> return Cons (Choice2Of2 a2, left a t2)
Expand Down Expand Up @@ -739,23 +734,23 @@ module AsyncSeq =
}

/// Merges two async sequences into an async sequence non-deterministically.
let rec merge (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) : AsyncSeq<'a> = async {
let rec merge (a:AsyncSeq<'T>) (b:AsyncSeq<'T>) : AsyncSeq<'T> = async {
let! one,other = Async.chooseBoth a b
match one with
| Nil -> return! other
| Cons(hd,tl) ->
return Cons(hd, merge tl other) }

/// Merges all specified async sequences into an async sequence non-deterministically.
let rec mergeAll (ss:AsyncSeq<'a> list) : AsyncSeq<'a> =
let rec mergeAll (ss:AsyncSeq<'T> list) : AsyncSeq<'T> =
match ss with
| [] -> empty
| [s] -> s
| [a;b] -> merge a b
| hd::tl -> merge hd (mergeAll tl)

/// Returns an async sequence which contains no contiguous duplicate elements based on the specified comparison function.
let distinctUntilChangedWithAsync (f:'a -> 'a -> Async<bool>) (s:AsyncSeq<'a>) : AsyncSeq<'a> =
let distinctUntilChangedWithAsync (f:'T -> 'T -> Async<bool>) (s:AsyncSeq<'T>) : AsyncSeq<'T> =

// return the head, if any, then the tail passing the previous element
let rec head s =
Expand All @@ -776,11 +771,11 @@ module AsyncSeq =
head s

/// Returns an async sequence which contains no contiguous duplicate elements based on the specified comparison function.
let distinctUntilChangedWith (f:'a -> 'a -> bool) (s:AsyncSeq<'a>) : AsyncSeq<'a> =
let distinctUntilChangedWith (f:'T -> 'T -> bool) (s:AsyncSeq<'T>) : AsyncSeq<'T> =
distinctUntilChangedWithAsync (fun a b -> f a b |> async.Return) s

/// Returns an async sequence which contains no contiguous duplicate elements.
let distinctUntilChanged (s:AsyncSeq<'a>) : AsyncSeq<'a> =
let distinctUntilChanged (s:AsyncSeq<'T>) : AsyncSeq<'T> =
distinctUntilChangedWith ((=)) s


Expand Down