Skip to content

Commit

Permalink
Add takeWhile, takeWhileInclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 11, 2022
1 parent f0e27ce commit 4f79c0e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Expand Up @@ -253,6 +253,10 @@ module TaskSeq =
let chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source
let filter predicate source = Internal.filter (Predicate predicate) source
let filterAsync predicate source = Internal.filter (PredicateAsync predicate) source
let takeWhile predicate source = Internal.takeWhile (Predicate predicate) source
let takeWhileAsync predicate source = Internal.takeWhile (PredicateAsync predicate) source
let takeWhile predicate source = Internal.takeWhileInclusive (Predicate predicate) source
let takeWhileAsync predicate source = Internal.takeWhileInclusive (PredicateAsync predicate) source
let tryPick chooser source = Internal.tryPick (TryPick chooser) source
let tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
let tryFind predicate source = Internal.tryFind (Predicate predicate) source
Expand Down
30 changes: 30 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Expand Up @@ -365,6 +365,36 @@ module TaskSeq =
/// </summary>
val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileAsync" />.
/// </summary>
val takeWhile: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhile" />.
/// </summary>
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileInclusiveAsync" />.
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhile" />.
/// </summary>
val takeWhileInclusive: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhileInclusive" />.
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhileAsync" />.
/// </summary>
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Returns a new collection containing only the elements of the collection
/// for which the given asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
Expand Down
51 changes: 51 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Expand Up @@ -531,6 +531,57 @@ module internal TaskSeqInternal =
| true -> yield item
| false -> ()
}

let takeWhile predicate (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let! step = e.MoveNextAsync()
let mutable go = step

match predicate with
| Predicate predicate ->
while go do
let value = e.Current
if predicate value then
yield value
let! more = e.MoveNextAsync()
go <- more
else go <- false
| PredicateAsync predicate ->
while go do
let value = e.Current
match! predicate value with
| true ->
yield value
let! more = e.MoveNextAsync()
go <- more
| false -> go <- false
}

let takeWhileInclusive predicate (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let! step = e.MoveNextAsync()
let mutable go = step

match predicate with
| Predicate predicate ->
while go do
let value = e.Current
yield value
if predicate value then
let! more = e.MoveNextAsync()
go <- more
else go <- false
| PredicateAsync predicate ->
while go do
let value = e.Current
yield value
match! predicate value with
| true ->
let! more = e.MoveNextAsync()
go <- more
| false -> go <- false
}

// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
Expand Down

0 comments on commit 4f79c0e

Please sign in to comment.