Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>
<GenerateProgramFile>false</GenerateProgramFile>
<ApplicationIcon>..\..\assets\TaskSeq.ico</ApplicationIcon>
</PropertyGroup>

<ItemGroup>
<Content Include="..\..\assets\TaskSeq.ico" Link="TaskSeq.ico" />
<Compile Include="AssemblyInfo.fs" />
Expand Down Expand Up @@ -47,9 +45,9 @@
<Compile Include="TaskSeq.StateTransitionBug-delayed.Tests.CE.fs" />
<Compile Include="TaskSeq.PocTests.fs" />
<Compile Include="TaskSeq.Realworld.fs" />
<Compile Include="TaskSeq.Extensions.Tests.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<!-- align test project with minimal required version for TaskSeq -->
<!-- we use 6.0.3 here and not 6.0.2 because TaskResult lib requires it-->
Expand All @@ -67,9 +65,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\FSharp.Control.TaskSeq\FSharp.Control.TaskSeq.fsproj" />
</ItemGroup>

</Project>
32 changes: 32 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module TaskSeq.Extenions

open System
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharp.Control

//
// TaskSeq.except
// TaskSeq.exceptOfSeq
//


module TaskBuilder =
open TaskSeq.Tests

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-existsAsync happy path last item of seq`` variant =
task {
let values = Gen.getSeqImmutable variant

let mutable sum = 0
for x in values do
sum <- sum + x

// let! expected =
// (0, values)
// ||> TaskSeq.fold((+))
Assert.Equal(55, sum)
}
143 changes: 143 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

#nowarn "57"

module TaskSeq =
// F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it?
open FSharp.Control.TaskSeqBuilders
Expand Down Expand Up @@ -324,3 +326,144 @@ module TaskSeq =
let fold folder state source = Internal.fold (FolderAction folder) state source

let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source

#nowarn "1204"
#nowarn "3513"


[<AutoOpen>]
module AsyncSeqExtensions =

let rec WhileDynamic
(
sm: byref<TaskStateMachine<'Data>>,
condition: unit -> ValueTask<bool>,
body: TaskCode<'Data, unit>
) : bool =
let vt = condition ()
TaskBuilderBase.BindDynamic(&sm, vt, fun result ->
TaskCode<_,_>(fun sm ->
if result then
if body.Invoke(&sm) then
WhileDynamic(&sm, condition, body)
else
let rf = sm.ResumptionDynamicInfo.ResumptionFunc

sm.ResumptionDynamicInfo.ResumptionFunc <-
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))

false
else
true
)
)


and WhileBodyDynamicAux
(
sm: byref<TaskStateMachine<'Data>>,
condition: unit -> ValueTask<bool>,
body: TaskCode<'Data, unit>,
rf: TaskResumptionFunc<_>
) : bool =
if rf.Invoke(&sm) then
WhileDynamic(&sm, condition, body)
else
let rf = sm.ResumptionDynamicInfo.ResumptionFunc

sm.ResumptionDynamicInfo.ResumptionFunc <-
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))

false
open Microsoft.FSharp.Core.CompilerServices
open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators

// Add asynchronous for loop to the 'async' computation builder
type Microsoft.FSharp.Control.AsyncBuilder with

member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async<unit>) =
tasksq
|> TaskSeq.iterAsync (action >> Async.StartAsTask)
|> Async.AwaitTask

// Add asynchronous for loop to the 'task' computation builder
type Microsoft.FSharp.Control.TaskBuilder with


member inline _.WhileAsync
(
[<InlineIfLambda>] condition: unit -> ValueTask<bool>,
body: TaskCode<_,unit>
) : TaskCode<_,_> =
let mutable condition_res = true

ResumableCode.While(
(fun () -> condition_res),
TaskCode<_, _>(fun sm ->
let mutable __stack_condition_fin = true
let __stack_vtask = condition ()

let mutable awaiter = __stack_vtask.GetAwaiter()
if awaiter.IsCompleted then
// logInfo "at WhileAsync: returning completed task"

__stack_condition_fin <- true
condition_res <- awaiter.GetResult()
else
// logInfo "at WhileAsync: awaiting non-completed task"

// This will yield with __stack_fin = false
// This will resume with __stack_fin = true
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
__stack_condition_fin <- __stack_yield_fin

if __stack_condition_fin then
condition_res <- awaiter.GetResult()


if __stack_condition_fin then
if condition_res then body.Invoke(&sm) else true
else
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
false)
)

member inline this.For
(
tasksq: IAsyncEnumerable<'T>,
body: 'T -> TaskCode<_, unit>
) : TaskCode<_, unit> =
// tasksq
// |> TaskSeq.iterAsync (body >> task.Run)
// |> task.ReturnFrom

// task.ReturnFrom <|
// task {
// let mutable continueWhile = true
// use e = tasksq.GetAsyncEnumerator()
// while continueWhile do
// let! next = e.MoveNextAsync()
// if next then
// do! task.Run(body e.Current)
// else
// continueWhile <- false
// }

TaskCode<'TOverall, unit>(fun sm ->

this
.Using(
tasksq.GetAsyncEnumerator(CancellationToken()),
(fun e ->
this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm))))
)
.Invoke(&sm))

let foo () =
task {
let mutable sum = 0
let xs = taskSeq { 1; 2; 3}
for x in xs do
sum <- sum + x
}
34 changes: 34 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace FSharp.Control

#nowarn "1204"

module TaskSeq =
open System.Collections.Generic
open System.Threading.Tasks
Expand Down Expand Up @@ -562,3 +564,35 @@ module TaskSeq =
/// If the accumulator function <paramref name="folder" /> does not need to be asynchronous, consider using <see cref="TaskSeq.fold" />.
/// </summary>
val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State>



[<AutoOpen>]
module AsyncSeqExtensions =

val WhileDynamic:
sm: byref<TaskStateMachine<'Data>> *
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
body: TaskCode<'Data, unit> ->
bool

val WhileBodyDynamicAux:
sm: byref<TaskStateMachine<'Data>> *
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
body: TaskCode<'Data, unit> *
rf: TaskResumptionFunc<'Data> ->
bool

type AsyncBuilder with

member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async<unit>) -> Async<unit>

type TaskBuilder with

member inline WhileAsync:
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) * body: TaskCode<'TOverall, unit> ->
TaskCode<'TOverall, unit>

member inline For:
tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) ->
TaskCode<'TOverall, unit>