Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,13 @@ module AsyncSeq =
let! v = f itm
yield v }

let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
let i = ref 0L
for itm in source do
let! v = f i.Value itm
i := i.Value + 1L
yield v }

let chooseAsync f (source : AsyncSeq<'T>) : AsyncSeq<'R> = asyncSeq {
for itm in source do
let! v = f itm
Expand Down Expand Up @@ -636,13 +643,48 @@ module AsyncSeq =
let! moven = ie.MoveNext()
b := moven }

let tryPickAsync f (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! v = ie.MoveNext()
let b = ref v
let res = ref None
while b.Value.IsSome && not res.Value.IsSome do
let! fv = f b.Value.Value
match fv with
| None ->
let! moven = ie.MoveNext()
b := moven
| Some _ as r ->
res := r
return res.Value }

let tryPick f (source : AsyncSeq<'T>) =
tryPickAsync (f >> async.Return) source

let contains value (source : AsyncSeq<'T>) =
source |> tryPick (fun v -> if v = value then Some () else None) |> Async.map Option.isSome

let tryFind f (source : AsyncSeq<'T>) =
source |> tryPick (fun v -> if f v then Some v else None)

let exists f (source : AsyncSeq<'T>) =
source |> tryFind f |> Async.map Option.isSome

let forall f (source : AsyncSeq<'T>) =
source |> exists (f >> not) |> Async.map not

let foldAsync f (state:'State) (source : AsyncSeq<'T>) =
source |> scanAsync f state |> lastOrDefault state

let fold f (state:'State) (source : AsyncSeq<'T>) =
foldAsync (fun st v -> f st v |> async.Return) state source

let length (source : AsyncSeq<'T>) =
fold (fun st _ -> st + 1L) 0L source

let inline sum (source : AsyncSeq<'T>) : Async<'T> =
(LanguagePrimitives.GenericZero, source) ||> fold (+)

let scan f (state:'State) (source : AsyncSeq<'T>) =
scanAsync (fun st v -> f st v |> async.Return) state source

Expand All @@ -669,9 +711,15 @@ module AsyncSeq =
let initInfinite f =
initInfiniteAsync (f >> async.Return)

let mapi f (source : AsyncSeq<'T>) =
mapiAsync (fun i x -> f i x |> async.Return) source

let map f (source : AsyncSeq<'T>) =
mapAsync (f >> async.Return) source

let indexed (source : AsyncSeq<'T>) =
mapi (fun i x -> (i,x)) source

let iter f (source : AsyncSeq<'T>) =
iterAsync (f >> async.Return) source

Expand Down Expand Up @@ -806,10 +854,7 @@ module AsyncSeq =
let zipWith (z:'T1 -> 'T2 -> 'U) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> =
zipWithAsync (fun a b -> z a b |> async.Return) a b

let mapiAsync (f:int -> 'T -> Async<'U>) (source:AsyncSeq<'T>) : AsyncSeq<'U> =
threadStateAsync (fun i a -> f i a |> Async.map (fun b -> b,i + 1)) 0 source

let zipWithIndexAsync (f:int -> 'T -> Async<'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = mapiAsync f s
let zipWithIndexAsync (f:int64 -> 'T -> Async<'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = mapiAsync f s

let zappAsync (fs:AsyncSeq<'T -> Async<'U>>) (s:AsyncSeq<'T>) : AsyncSeq<'U> =
zipWithAsync (|>) s fs
Expand Down
58 changes: 40 additions & 18 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -159,41 +159,63 @@ module AsyncSeq =
/// be asked for the next element after the processing of an element completes).
val scanAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State>

/// Iterates over the input sequence and calls the specified function for
/// every value (to perform some side-effect asynchronously).
///
/// The specified function is asynchronous (and the input sequence will
/// be asked for the next element after the processing of an element completes).
/// Iterates over the input sequence and calls the specified asynchronous function for
/// every value. The input sequence will be asked for the next element after
//// the processing of an element completes.
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>

/// Returns an asynchronous sequence that returns pairs containing an element
/// from the input sequence and its predecessor. Empty sequence is returned for
/// singleton input sequence.
val pairwise : source:AsyncSeq<'T> -> AsyncSeq<'T * 'T>

/// Aggregates the elements of the input asynchronous sequence using the
/// specified 'aggregation' function. The result is an asynchronous
/// workflow that returns the final result.
///
/// The aggregation function is asynchronous (and the input sequence will
/// be asked for the next element after the processing of an element completes).
/// Asynchronously aggregate the elements of the input asynchronous sequence using the
/// specified asynchronous 'aggregation' function.
val foldAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>

/// Same as AsyncSeq.foldAsync, but the specified function is synchronous.
/// Asynchronously aggregate the elements of the input asynchronous sequence using the
/// specified 'aggregation' function.
val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State>

/// Asynchronously sum the elements of the input asynchronous sequence using the specified function.
val inline sum : source:AsyncSeq< ^T > -> Async< ^T>
when ^T : (static member ( + ) : ^T * ^T -> ^T)
and ^T : (static member Zero : ^T)

/// Asynchronously determine if the sequence contains the given value
val contains : value:'T -> source:AsyncSeq<'T> -> Async<bool> when 'T : equality

/// Asynchronously pick a value from a sequence
val tryPick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult option>

/// Asynchronously find the first value in a sequence for which the predicate returns true
val tryFind : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T option>

/// Asynchronously determine if there is a value in the sequence for which the predicate returns true
val exists : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<bool>

/// Asynchronously determine if the predicate returns true for all values in the sequence
val forall : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<bool>

/// Return an asynhronous sequence which, when iterated, includes an integer indicating the index of each element in the sequence.
val indexed : source:AsyncSeq<'T> -> AsyncSeq<int64 * 'T>

/// Asynchronously determine the number of elements in the sequence
val length : source:AsyncSeq<'T> -> Async<int64>

/// Same as AsyncSeq.scanAsync, but the specified function is synchronous.
val scan : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State>

/// Same as AsyncSeq.mapAsync, but the specified function is synchronous.
val map : folder:('T -> 'U) -> source:AsyncSeq<'T> -> AsyncSeq<'U>

/// Same as AsyncSeq.iterAsync, but the specified function is synchronous
/// and performs the side-effect immediately.
/// Iterates over the input sequence and calls the specified function for
/// every value.
val iter : action:('T -> unit) -> source:AsyncSeq<'T> -> Async<unit>

/// Same as AsyncSeq.chooseAsync, but the specified function is synchronous
/// and processes the input element immediately.
/// Asynchronously iterates over the input sequence and generates 'x' for
/// every input element for which the specified function
/// returned 'Some(x)'
val choose : chooser:('T -> 'U option) -> source:AsyncSeq<'T> -> AsyncSeq<'U>

/// Same as AsyncSeq.filterAsync, but the specified predicate is synchronous
Expand Down Expand Up @@ -250,10 +272,10 @@ module AsyncSeq =
///
/// The specified function is asynchronous (and the input sequence will
/// be asked for the next element after the processing of an element completes).
val mapiAsync : mapping:(int -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U>
val mapiAsync : mapping:(int64 -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U>

[<System.Obsolete("Renamed to mapiAsync") >]
val zipWithIndexAsync : mapping:(int -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U>
val zipWithIndexAsync : mapping:(int64 -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U>

/// Feeds an async sequence of values into an async sequence of async functions.
val zappAsync : functions:AsyncSeq<('T -> Async<'U>)> -> source:AsyncSeq<'T> -> AsyncSeq<'U>
Expand Down
61 changes: 61 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,67 @@ let ``AsyncSeq.concatSeq works``() =
Assert.True(EQ expected actual)

[<Test>]
let ``AsyncSeq.sum works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sum |> Async.RunSynchronously
let expected = ls |> List.sum
Assert.True((expected = actual))


[<Test>]
let ``AsyncSeq.length works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.length |> Async.RunSynchronously |> int32
let expected = ls |> List.length
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.contains works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
for j in [0;i;i+1] do
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.contains j |> Async.RunSynchronously
let expected = ls |> List.exists (fun x -> x = j)
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.tryPick works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
for j in [0;i;i+1] do
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.tryPick (fun x -> if x = j then Some (string (x+1)) else None) |> Async.RunSynchronously
let expected = ls |> Seq.tryPick (fun x -> if x = j then Some (string (x+1)) else None)
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.tryFind works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
for j in [0;i;i+1] do
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.tryFind (fun x -> x = j) |> Async.RunSynchronously
let expected = ls |> Seq.tryFind (fun x -> x = j)
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.exists works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
for j in [0;i;i+1] do
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.exists (fun x -> x = j) |> Async.RunSynchronously
let expected = ls |> Seq.exists (fun x -> x = j)
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.forall works``() =
for i in 0 .. 10 do
let ls = [ 1 .. i ]
for j in [0;i;i+1] do
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.forall (fun x -> x = j) |> Async.RunSynchronously
let expected = ls |> Seq.forall (fun x -> x = j)
Assert.True((expected = actual))
[<Test>]
let ``AsyncSeq.cache works``() =
for n in 0 .. 10 do
let ls = [ for i in 1 .. n do for j in 1 .. i do yield i ]
Expand Down