From c7f1657c7464ceb46efb0bd7ca9ccfd4d684f7ce Mon Sep 17 00:00:00 2001 From: Don Syme Date: Fri, 29 May 2015 21:02:54 +1000 Subject: [PATCH] Add more operators --- src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 53 ++++++++++++++-- src/FSharp.Control.AsyncSeq/AsyncSeq.fsi | 58 ++++++++++++------ .../AsyncSeqTests.fs | 61 +++++++++++++++++++ 3 files changed, 150 insertions(+), 22 deletions(-) diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 7ced86c..a8dc483 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -564,6 +564,13 @@ module AsyncSeq = let! v = f itm yield v } + let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq { + let i = ref 0L + for itm in source do + let! v = f i.Value itm + i := i.Value + 1L + yield v } + let chooseAsync f (source : AsyncSeq<'T>) : AsyncSeq<'R> = asyncSeq { for itm in source do let! v = f itm @@ -636,6 +643,35 @@ module AsyncSeq = let! moven = ie.MoveNext() b := moven } + let tryPickAsync f (source : AsyncSeq<'T>) = async { + use ie = source.GetEnumerator() + let! v = ie.MoveNext() + let b = ref v + let res = ref None + while b.Value.IsSome && not res.Value.IsSome do + let! fv = f b.Value.Value + match fv with + | None -> + let! moven = ie.MoveNext() + b := moven + | Some _ as r -> + res := r + return res.Value } + + let tryPick f (source : AsyncSeq<'T>) = + tryPickAsync (f >> async.Return) source + + let contains value (source : AsyncSeq<'T>) = + source |> tryPick (fun v -> if v = value then Some () else None) |> Async.map Option.isSome + + let tryFind f (source : AsyncSeq<'T>) = + source |> tryPick (fun v -> if f v then Some v else None) + + let exists f (source : AsyncSeq<'T>) = + source |> tryFind f |> Async.map Option.isSome + + let forall f (source : AsyncSeq<'T>) = + source |> exists (f >> not) |> Async.map not let foldAsync f (state:'State) (source : AsyncSeq<'T>) = source |> scanAsync f state |> lastOrDefault state @@ -643,6 +679,12 @@ module AsyncSeq = let fold f (state:'State) (source : AsyncSeq<'T>) = foldAsync (fun st v -> f st v |> async.Return) state source + let length (source : AsyncSeq<'T>) = + fold (fun st _ -> st + 1L) 0L source + + let inline sum (source : AsyncSeq<'T>) : Async<'T> = + (LanguagePrimitives.GenericZero, source) ||> fold (+) + let scan f (state:'State) (source : AsyncSeq<'T>) = scanAsync (fun st v -> f st v |> async.Return) state source @@ -669,9 +711,15 @@ module AsyncSeq = let initInfinite f = initInfiniteAsync (f >> async.Return) + let mapi f (source : AsyncSeq<'T>) = + mapiAsync (fun i x -> f i x |> async.Return) source + let map f (source : AsyncSeq<'T>) = mapAsync (f >> async.Return) source + let indexed (source : AsyncSeq<'T>) = + mapi (fun i x -> (i,x)) source + let iter f (source : AsyncSeq<'T>) = iterAsync (f >> async.Return) source @@ -806,10 +854,7 @@ module AsyncSeq = let zipWith (z:'T1 -> 'T2 -> 'U) (a:AsyncSeq<'T1>) (b:AsyncSeq<'T2>) : AsyncSeq<'U> = zipWithAsync (fun a b -> z a b |> async.Return) a b - let mapiAsync (f:int -> 'T -> Async<'U>) (source:AsyncSeq<'T>) : AsyncSeq<'U> = - threadStateAsync (fun i a -> f i a |> Async.map (fun b -> b,i + 1)) 0 source - - let zipWithIndexAsync (f:int -> 'T -> Async<'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = mapiAsync f s + let zipWithIndexAsync (f:int64 -> 'T -> Async<'U>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = mapiAsync f s let zappAsync (fs:AsyncSeq<'T -> Async<'U>>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = zipWithAsync (|>) s fs diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 4fc058d..c3ec07c 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -159,11 +159,9 @@ module AsyncSeq = /// be asked for the next element after the processing of an element completes). val scanAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State> - /// Iterates over the input sequence and calls the specified function for - /// every value (to perform some side-effect asynchronously). - /// - /// The specified function is asynchronous (and the input sequence will - /// be asked for the next element after the processing of an element completes). + /// Iterates over the input sequence and calls the specified asynchronous function for + /// every value. The input sequence will be asked for the next element after + //// the processing of an element completes. val iterAsync : action:('T -> Async) -> source:AsyncSeq<'T> -> Async /// Returns an asynchronous sequence that returns pairs containing an element @@ -171,29 +169,53 @@ module AsyncSeq = /// singleton input sequence. val pairwise : source:AsyncSeq<'T> -> AsyncSeq<'T * 'T> - /// Aggregates the elements of the input asynchronous sequence using the - /// specified 'aggregation' function. The result is an asynchronous - /// workflow that returns the final result. - /// - /// The aggregation function is asynchronous (and the input sequence will - /// be asked for the next element after the processing of an element completes). + /// Asynchronously aggregate the elements of the input asynchronous sequence using the + /// specified asynchronous 'aggregation' function. val foldAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'State> - /// Same as AsyncSeq.foldAsync, but the specified function is synchronous. + /// Asynchronously aggregate the elements of the input asynchronous sequence using the + /// specified 'aggregation' function. val fold : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'State> + /// Asynchronously sum the elements of the input asynchronous sequence using the specified function. + val inline sum : source:AsyncSeq< ^T > -> Async< ^T> + when ^T : (static member ( + ) : ^T * ^T -> ^T) + and ^T : (static member Zero : ^T) + + /// Asynchronously determine if the sequence contains the given value + val contains : value:'T -> source:AsyncSeq<'T> -> Async when 'T : equality + + /// Asynchronously pick a value from a sequence + val tryPick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult option> + + /// Asynchronously find the first value in a sequence for which the predicate returns true + val tryFind : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T option> + + /// Asynchronously determine if there is a value in the sequence for which the predicate returns true + val exists : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async + + /// Asynchronously determine if the predicate returns true for all values in the sequence + val forall : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async + + /// Return an asynhronous sequence which, when iterated, includes an integer indicating the index of each element in the sequence. + val indexed : source:AsyncSeq<'T> -> AsyncSeq + + /// Asynchronously determine the number of elements in the sequence + val length : source:AsyncSeq<'T> -> Async + /// Same as AsyncSeq.scanAsync, but the specified function is synchronous. val scan : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State> /// Same as AsyncSeq.mapAsync, but the specified function is synchronous. val map : folder:('T -> 'U) -> source:AsyncSeq<'T> -> AsyncSeq<'U> - /// Same as AsyncSeq.iterAsync, but the specified function is synchronous - /// and performs the side-effect immediately. + /// Iterates over the input sequence and calls the specified function for + /// every value. val iter : action:('T -> unit) -> source:AsyncSeq<'T> -> Async - /// Same as AsyncSeq.chooseAsync, but the specified function is synchronous - /// and processes the input element immediately. + /// Asynchronously iterates over the input sequence and generates 'x' for + /// every input element for which the specified function + /// returned 'Some(x)' val choose : chooser:('T -> 'U option) -> source:AsyncSeq<'T> -> AsyncSeq<'U> /// Same as AsyncSeq.filterAsync, but the specified predicate is synchronous @@ -250,10 +272,10 @@ module AsyncSeq = /// /// The specified function is asynchronous (and the input sequence will /// be asked for the next element after the processing of an element completes). - val mapiAsync : mapping:(int -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U> + val mapiAsync : mapping:(int64 -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U> [] - val zipWithIndexAsync : mapping:(int -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U> + val zipWithIndexAsync : mapping:(int64 -> 'T -> Async<'U>) -> source:AsyncSeq<'T> -> AsyncSeq<'U> /// Feeds an async sequence of values into an async sequence of async functions. val zappAsync : functions:AsyncSeq<('T -> Async<'U>)> -> source:AsyncSeq<'T> -> AsyncSeq<'U> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index c5837bf..bfd35a5 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -60,6 +60,67 @@ let ``AsyncSeq.concatSeq works``() = Assert.True(EQ expected actual) [] +let ``AsyncSeq.sum works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.sum |> Async.RunSynchronously + let expected = ls |> List.sum + Assert.True((expected = actual)) + + +[] +let ``AsyncSeq.length works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.length |> Async.RunSynchronously |> int32 + let expected = ls |> List.length + Assert.True((expected = actual)) + +[] +let ``AsyncSeq.contains works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + for j in [0;i;i+1] do + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.contains j |> Async.RunSynchronously + let expected = ls |> List.exists (fun x -> x = j) + Assert.True((expected = actual)) + +[] +let ``AsyncSeq.tryPick works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + for j in [0;i;i+1] do + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.tryPick (fun x -> if x = j then Some (string (x+1)) else None) |> Async.RunSynchronously + let expected = ls |> Seq.tryPick (fun x -> if x = j then Some (string (x+1)) else None) + Assert.True((expected = actual)) + +[] +let ``AsyncSeq.tryFind works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + for j in [0;i;i+1] do + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.tryFind (fun x -> x = j) |> Async.RunSynchronously + let expected = ls |> Seq.tryFind (fun x -> x = j) + Assert.True((expected = actual)) + +[] +let ``AsyncSeq.exists works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + for j in [0;i;i+1] do + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.exists (fun x -> x = j) |> Async.RunSynchronously + let expected = ls |> Seq.exists (fun x -> x = j) + Assert.True((expected = actual)) + +[] +let ``AsyncSeq.forall works``() = + for i in 0 .. 10 do + let ls = [ 1 .. i ] + for j in [0;i;i+1] do + let actual = AsyncSeq.ofSeq ls |> AsyncSeq.forall (fun x -> x = j) |> Async.RunSynchronously + let expected = ls |> Seq.forall (fun x -> x = j) + Assert.True((expected = actual)) +[] let ``AsyncSeq.cache works``() = for n in 0 .. 10 do let ls = [ for i in 1 .. n do for j in 1 .. i do yield i ]