From 68e9420cb719b0310901585737d607985055ffa2 Mon Sep 17 00:00:00 2001 From: Eugene Tolmachev Date: Mon, 4 Dec 2017 09:51:11 -0500 Subject: [PATCH] back to async --- .travis.yml | 1 + RELEASE_NOTES.md | 3 + build.cmd | 9 - build.fsx | 4 +- docs/content/RELEASE_NOTES.md | 3 + paket.dependencies | 1 - paket.lock | 3 - samples/Guaranteed/Program.fs | 28 +-- samples/Guaranteed/Topology.fs | 25 ++- samples/WordCount/Topology.fs | 22 ++- src/FsShelter.Tests/FsShelter.Tests.fsproj | 2 +- src/FsShelter.Tests/HostTests.fs | 13 +- src/FsShelter.Tests/IO/CommonTests.fs | 3 +- src/FsShelter.Tests/IO/JsonIOTests.fs | 37 ++-- src/FsShelter.Tests/IO/ProtoIOTests.fs | 55 +++--- src/FsShelter.Tests/TestTopology.fs | 23 ++- src/FsShelter/DSL.fs | 46 ++--- src/FsShelter/Dispatch.fs | 196 +++++++++++---------- src/FsShelter/FsShelter.fsproj | 2 +- src/FsShelter/IO/Common.fs | 15 +- src/FsShelter/IO/JsonIO.fs | 20 +-- src/FsShelter/IO/ProtoIO.fs | 5 +- src/FsShelter/Task.fs | 9 +- src/FsShelter/Topology.fs | 5 +- src/FsShelter/paket.references | 3 +- 25 files changed, 259 insertions(+), 274 deletions(-) diff --git a/.travis.yml b/.travis.yml index ef2d3c0..93e97d2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: csharp sudo: false # use the new container-based Travis infrastructure +dotnet: 2.0 script: - ./build.sh diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f5e9f06..5ef45a6 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +#### 1.0.0-beta-1 - Nov 2017 +* netstandard 2.0 release + #### 0.2.9 - Nov 2017 * Fixing spout timeout when running as a shelled component diff --git a/build.cmd b/build.cmd index 09b44ff..e9933d3 100644 --- a/build.cmd +++ b/build.cmd @@ -1,18 +1,9 @@ @echo off cls -.paket\paket.bootstrapper.exe -if errorlevel 1 ( - exit /b %errorlevel% -) - .paket\paket.exe restore if errorlevel 1 ( exit /b %errorlevel% ) -IF NOT EXIST build.fsx ( - .paket\paket.exe update - packages\build\FAKE\tools\FAKE.exe init.fsx -) packages\build\FAKE\tools\FAKE.exe build.fsx %* diff --git a/build.fsx b/build.fsx index 6f41931..cc48164 100644 --- a/build.fsx +++ b/build.fsx @@ -59,7 +59,7 @@ let release = LoadReleaseNotes "RELEASE_NOTES.md" let assemblyInfo = [ "Description",summary - "Version", release.AssemblyVersion + "Version", release.NugetVersion "Authors", gitOwner "PackageProjectUrl", gitHome "RepositoryUrl", gitHome @@ -133,7 +133,7 @@ Target "Package" (fun _ -> ) Target "PublishNuget" (fun _ -> - runDotnet "src/FsShelter" "nuget publish" + runDotnet "src/FsShelter" (sprintf "nuget push bin/Release/FsShelter.%s.nupkg -s nuget.org -k %s" release.NugetVersion (environVar "nugetkey")) ) diff --git a/docs/content/RELEASE_NOTES.md b/docs/content/RELEASE_NOTES.md index f5e9f06..5ef45a6 100644 --- a/docs/content/RELEASE_NOTES.md +++ b/docs/content/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +#### 1.0.0-beta-1 - Nov 2017 +* netstandard 2.0 release + #### 0.2.9 - Nov 2017 * Fixing spout timeout when running as a shelled component diff --git a/paket.dependencies b/paket.dependencies index e5fef57..1b2904e 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -23,7 +23,6 @@ group Main nuget FsPickler nuget Google.Protobuf nuget apache-thrift-netcore -nuget Hopac nuget newtonsoft.json github prolucid/protoshell src/main/proto/multilang.proto diff --git a/paket.lock b/paket.lock index 098df19..3dd61ef 100644 --- a/paket.lock +++ b/paket.lock @@ -36,9 +36,6 @@ NUGET FSharp.Core (>= 3.1) Google.Protobuf (3.4.1) NETStandard.Library (>= 1.6.1) - Hopac (0.3.23) - FSharp.Core (>= 4.0.1.7-alpha) - NETStandard.Library (>= 1.6) Microsoft.AspNetCore.Hosting.Abstractions (2.0.1) Microsoft.AspNetCore.Hosting.Server.Abstractions (>= 2.0.1) Microsoft.AspNetCore.Http.Abstractions (>= 2.0.1) diff --git a/samples/Guaranteed/Program.fs b/samples/Guaranteed/Program.fs index 009ce32..3540d3d 100644 --- a/samples/Guaranteed/Program.fs +++ b/samples/Guaranteed/Program.fs @@ -26,22 +26,22 @@ let main argv = | ["graph"] -> topology |> DotGraph.writeToConsole -// | ["self-host"] -> -// let stop = -// topology -// // |> Host.runWith (sprintf "self-%d-%d" (System.Diagnostics.Process.GetCurrentProcess().Id) >> Logging.callbackLog) -// |> Host.run -// printf "Running the topology, press ENTER to stop..." -// let sw = System.Diagnostics.Stopwatch.StartNew() -// System.Threading.Thread.Sleep (-1) |> ignore -// stop() -// sw.Stop() -// let (count,_) = Topology.source.PostAndReply Get -// printf "Count: %s, %d/s\n" count (1000L*(int64 count)/sw.ElapsedMilliseconds) + | ["self-host"] -> + let stop = + topology +// |> Host.runWith (sprintf "self-%d-%d" (System.Diagnostics.Process.GetCurrentProcess().Id) >> Logging.callbackLog) + |> Host.run + printf "Running the topology, press ENTER to stop..." + let sw = System.Diagnostics.Stopwatch.StartNew() + System.Console.ReadLine() |> ignore + stop() + sw.Stop() + let (count,_) = Topology.source.PostAndReply Get + printf "Count: %s, %d/s\n" count (1000L*(int64 count)/sw.ElapsedMilliseconds) | _ -> topology |> Task.ofTopology -// |> Task.run ProtoIO.start - |> Task.runWith (string >> Logging.callbackLog) ProtoIO.start // start using a traffic logger + |> Task.run ProtoIO.start +// |> Task.runWith (string >> Logging.callbackLog) ProtoIO.start // start using a traffic logger 0 diff --git a/samples/Guaranteed/Topology.fs b/samples/Guaranteed/Topology.fs index 4b82acf..159a666 100644 --- a/samples/Guaranteed/Topology.fs +++ b/samples/Guaranteed/Topology.fs @@ -1,5 +1,4 @@ module Guaranteed.Topology -open Hopac // data schema for the topology, every case is a unqiue stream type Schema = @@ -9,26 +8,26 @@ type Schema = // numbers spout - produces messages let numbers source = - job { - let! (tupleId,number) = source() |> Job.fromAsync + async { + let! (tupleId,number) = source() return Some(tupleId, Original (number)) } // add 1 bolt - consumes and emits messages to either Even or Odd stream -let addOne (input,(emit:_->Job<_>)) = - job { - do! match input with - | Original x -> - match x % 2 with - | 1 -> Even (x+1) - | _ -> Odd (x+1) - | _ -> failwithf "unexpected input: %A" input - |> emit +let addOne (input,emit) = + async { + match input with + | Original x -> + match x % 2 with + | 1 -> Even (x+1) + | _ -> Odd (x+1) + | _ -> failwithf "unexpected input: %A" input + |> emit } // terminating bolt - consumes messages let logResult (info,input) = - job { + async { match input with | Even x | Odd x -> info (sprintf "Got: %A" input) diff --git a/samples/WordCount/Topology.fs b/samples/WordCount/Topology.fs index 1d83eb1..18aa97e 100644 --- a/samples/WordCount/Topology.fs +++ b/samples/WordCount/Topology.fs @@ -1,7 +1,6 @@ module Sample.Topology open System -open Hopac // data schema for the topology, every case is a unqiue stream type Schema = @@ -10,31 +9,30 @@ type Schema = | WordCount of string*int64 // sentences spout - feeds messages into the topology -let sentences source = job { return source() |> Sentence |> Some } +let sentences source = async { return source() |> Sentence |> Some } // split bolt - consumes sentences and emits words -let splitIntoWords (input, (emit:_->Job<_>)) = - job { +let splitIntoWords (input, emit) = + async { match input with | Sentence s -> - do! s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries) - |> Seq.map (Word >> emit) - |> Job.seqCollect - |> Job.Ignore + s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries) + |> Seq.map Word + |> Seq.iter emit | _ -> failwithf "unexpected input: %A" input } // count words bolt -let countWords (input, increment, (emit:_->Job<_>)) = - job { +let countWords (input, increment, emit) = + async { match input with - | Word word -> do! WordCount (word,increment word) |> emit + | Word word -> WordCount (word,increment word) |> emit | _ -> failwithf "unexpected input: %A" input } // log word count - terminating bolt let logResult (log, input) = - job { + async { match input with | WordCount (word,count) -> log (sprintf "%s: %d" word count) | _ -> failwithf "unexpected input: %A" input diff --git a/src/FsShelter.Tests/FsShelter.Tests.fsproj b/src/FsShelter.Tests/FsShelter.Tests.fsproj index 3e7e587..0ef9824 100644 --- a/src/FsShelter.Tests/FsShelter.Tests.fsproj +++ b/src/FsShelter.Tests/FsShelter.Tests.fsproj @@ -10,7 +10,7 @@ - + diff --git a/src/FsShelter.Tests/HostTests.fs b/src/FsShelter.Tests/HostTests.fs index 16d8572..efe9741 100644 --- a/src/FsShelter.Tests/HostTests.fs +++ b/src/FsShelter.Tests/HostTests.fs @@ -28,12 +28,13 @@ module Topologies = } let printBolt (log,t) = - match t with - | Original _ -> log (sprintf "Init: %A" DateTime.Now) - | MaybeString _ -> log (sprintf "Shutdown: %A" DateTime.Now) - | Tick - | _ -> () //log (sprintf "tuple: %A" t) - |> async.Return + async { + match t with + | Original _ -> log (sprintf "Init: %A" DateTime.Now) + | MaybeString _ -> log (sprintf "Shutdown: %A" DateTime.Now) + | Tick + | _ -> () //log (sprintf "tuple: %A" t) + } let world = {rnd = Random(); count = ref 0L; acked = ref 0L} let t1 = topology "test" { diff --git a/src/FsShelter.Tests/IO/CommonTests.fs b/src/FsShelter.Tests/IO/CommonTests.fs index 5fa3b9e..9e385ba 100644 --- a/src/FsShelter.Tests/IO/CommonTests.fs +++ b/src/FsShelter.Tests/IO/CommonTests.fs @@ -7,11 +7,10 @@ open FsShelter.Multilang open System open System.IO open TupleSchema -open Hopac let toDict (s:seq<_*_>) = System.Linq.Enumerable.ToDictionary(s, fst, snd) -let syncOut (w:unit->unit) = w() |> Job.result +let syncOut (w:unit->unit) = w() type GuidRec = { g : Nullable} diff --git a/src/FsShelter.Tests/IO/JsonIOTests.fs b/src/FsShelter.Tests/IO/JsonIOTests.fs index 06b0ef5..430dfd0 100644 --- a/src/FsShelter.Tests/IO/JsonIOTests.fs +++ b/src/FsShelter.Tests/IO/JsonIOTests.fs @@ -7,7 +7,6 @@ open FsShelter.Multilang open System open JsonIO open CommonTests -open Hopac [] let ``reads handshake``() = @@ -23,7 +22,7 @@ let ``reads handshake``() = Conf ["FsShelter.id",box "Simple-2-1456522507"; "dev.zookeeper.path", box "/tmp/dev-storm-zookeeper"; "topology.tick.tuple.freq.secs", box 30L; "topology.classpath", null], "C:\\Users\\eugene\\storm-local\\workers\\9ee413b6-c7d2-4896-ae4d-d150da988822\\pids", {TaskId=5;ComponentId="SimpleSpout";Components=Map [1,"AddOneBolt"; 2,"AddOneBolt"; 3,"ResultBolt"; 4, "ResultBolt"; 5,"SimpleSpout"; 6,"__acker"]}) - in'() |> run =! expected + in'() |> Async.RunSynchronously =! expected [] @@ -32,7 +31,7 @@ let ``reads next``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Next + in'() |> Async.RunSynchronously =! InCommand.Next [] @@ -41,7 +40,7 @@ let ``reads ack``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Ack "zzz" + in'() |> Async.RunSynchronously =! InCommand.Ack "zzz" [] @@ -50,7 +49,7 @@ let ``reads nack``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Nack "zzz" + in'() |> Async.RunSynchronously =! InCommand.Nack "zzz" [] let ``reads activate``() = @@ -58,7 +57,7 @@ let ``reads activate``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Activate + in'() |> Async.RunSynchronously =! InCommand.Activate [] let ``reads deactivate``() = @@ -66,7 +65,7 @@ let ``reads deactivate``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Deactivate + in'() |> Async.RunSynchronously =! InCommand.Deactivate [] let ``reads tuple``() = @@ -74,7 +73,7 @@ let ``reads tuple``() = let sw = new System.IO.StringWriter() let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore - in'() |> run =! InCommand.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1) + in'() |> Async.RunSynchronously =! InCommand.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1) [] @@ -83,7 +82,7 @@ let ``writes tuple``() = let sw = new System.IO.StringWriter() let (_,out) = JsonIO.startWith (sr,sw) syncOut ignore - out(Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None)) |> run + out(Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None)) Threading.Thread.Sleep(10) sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[62],"anchors":["123"],"stream":"Original","need_task_ids":false}"""+END @@ -94,10 +93,10 @@ let ``rw complex tuple``() = let (in',out) = JsonIO.startWith (sr,sw) syncOut ignore let even = Even({x=62},{str="a"}) - job { - do! out(Emit(even,Some "2651792242051038370",[],"Even",None,None)) + async { + out(Emit(even,Some "2651792242051038370",[],"Even",None,None)) return! in'() - } |> run =! InCommand.Tuple(even,"2651792242051038370","AddOneBolt","Even",1) + } |> Async.RunSynchronously =! InCommand.Tuple(even,"2651792242051038370","AddOneBolt","Even",1) Threading.Thread.Sleep 100 sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[62,"a"],"stream":"Even","need_task_ids":false}"""+END @@ -109,10 +108,10 @@ let ``rw option tuple``() = let (in',out) = JsonIO.startWith (sr,sw) syncOut ignore let t = MaybeString(Some "zzz") - job { - do! out(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None)) + async { + out(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None)) return! in'() - } |> run =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1) + } |> Async.RunSynchronously =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1) sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[{"Case":"Some","Fields":["zzz"]}],"stream":"MaybeString","need_task_ids":false}"""+END @@ -124,13 +123,13 @@ let ``roundtrip throughput``() = let (in',out') = JsonIO.startWith (new IO.StreamReader(mem), new IO.StreamWriter(mem)) syncOut ignore let sw = System.Diagnostics.Stopwatch.StartNew() - job { + async { for i in {1..count} do - do! Emit(justFields,Some "2651792242051038370",[],"JustFields",Some 1,None) |> out' + Emit(justFields,Some "2651792242051038370",[],"JustFields",Some 1,None) |> out' mem.Seek(0L, IO.SeekOrigin.Begin) |> ignore for i in {1..count} do - do! in'() |> Job.Ignore - } |> run + do! in'() |> Async.Ignore + } |> Async.RunSynchronously sw.Stop() printf "[Json] Ellapsed: %dms, %f/s\n" sw.ElapsedMilliseconds ((float count)/sw.Elapsed.TotalSeconds) diff --git a/src/FsShelter.Tests/IO/ProtoIOTests.fs b/src/FsShelter.Tests/IO/ProtoIOTests.fs index beb8496..fef8c2c 100644 --- a/src/FsShelter.Tests/IO/ProtoIOTests.fs +++ b/src/FsShelter.Tests/IO/ProtoIOTests.fs @@ -11,7 +11,6 @@ open Google.Protobuf.WellKnownTypes open Prolucid.ProtoShell open TupleSchema open CommonTests -open Hopac type V = Messages.Variant type VL = WellKnownTypes.Value @@ -31,7 +30,7 @@ let ofStreams (_,memout:#Stream) = memout.Seek(0L, SeekOrigin.Begin) |> ignore Messages.ShellMsg.Parser.ParseDelimitedFrom memout -let reverseIn (memin:#Stream) comp task :unit->Job> = +let reverseIn (memin:#Stream) comp task :unit->Async> = memin.Seek(0L, IO.SeekOrigin.Begin) |> ignore let streamRW = TupleSchema.mapSchema<'t>() |> Map.ofArray @@ -42,7 +41,7 @@ let reverseIn (memin:#Stream) comp task :unit->Job> = let constr = findConstructor msg.Emit.Stream InCommand.Tuple ((ProtoIO.ofFields constr msg.Emit.Tuple)(), msg.Emit.Id, comp, msg.Emit.Stream, task) - fun () -> job { + fun () -> async { let msg = Messages.ShellMsg.Parser.ParseDelimitedFrom memin return toCommand msg } @@ -80,7 +79,7 @@ let ``reads handshake``() = 4, "ResultBolt" 5,"SimpleSpout" 6,"__acker"]}) - in'() |> run =! expected + in'() |> Async.RunSynchronously =! expected [] @@ -91,7 +90,7 @@ let ``reads next``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Next + in'() |> Async.RunSynchronously =! InCommand.Next [] @@ -102,7 +101,7 @@ let ``reads ack``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Ack "zzz" + in'() |> Async.RunSynchronously =! InCommand.Ack "zzz" [] @@ -113,7 +112,7 @@ let ``reads nack``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Nack "zzz" + in'() |> Async.RunSynchronously =! InCommand.Nack "zzz" [] @@ -124,7 +123,7 @@ let ``reads activate``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Activate + in'() |> Async.RunSynchronously =! InCommand.Activate [] @@ -135,7 +134,7 @@ let ``reads deactivate``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Deactivate + in'() |> Async.RunSynchronously =! InCommand.Deactivate [] @@ -154,7 +153,7 @@ let ``reads tuple``() = |> ProtoIO.startWith <|| (syncOut,ignore) - in'() |> run =! InCommand.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1) + in'() |> Async.RunSynchronously =! InCommand.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1) [] @@ -162,7 +161,7 @@ let ``writes tuple``() = let streams = mkStreams() let (_,out) = ProtoIO.startWith streams syncOut ignore - out(Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None)) |> run + out <| Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None) Threading.Thread.Sleep(10) let emit = (ofStreams streams).Emit (emit.Id, emit.Anchors |> List.ofSeq, emit.NeedTaskIds, emit.Stream) =! ("2651792242051038370",["123"], false, "Original") @@ -187,10 +186,10 @@ let ``rw complex tuple``() = let even = Even({x=62},{str="a"}) - job { - do! out'(Emit(even,Some "2651792242051038370",[],"Even",None,None)) + async { + out'(Emit(even,Some "2651792242051038370",[],"Even",None,None)) return! in'() - } |> run =! InCommand.Tuple(even,"2651792242051038370","AddOneBolt","Even",1) + } |> Async.RunSynchronously =! InCommand.Tuple(even,"2651792242051038370","AddOneBolt","Even",1) let emit = (ofStreams streams).Emit (emit.Id, emit.Stream) =! ("2651792242051038370", "Even") emit.Tuple.Count =! 2 @@ -208,12 +207,12 @@ let ``roundtrip nested tuple``() = m = Map [3, Some {x=3}; 0, None] gxs = System.Collections.Generic.List([{x=4}]) d = toDict ["5", Some {x=5}; "0", None]}) - let emitted = job { - do! Emit(nested,Some "2651792242051038370",[],"Nested",None, None) |> out' + let emitted = async { + Emit(nested,Some "2651792242051038370",[],"Nested",None, None) |> out' let in' = reverseIn mem "AddOneBolt" 1 return! in'() - } |> run + } |> Async.RunSynchronously match emitted,nested with | InCommand.Tuple(Nested tuple, _, _, _, _),Nested original -> @@ -242,10 +241,10 @@ let ``rw option tuple``() = let t = MaybeString(Some "zzz") - job { - do! out'(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None)) + async { + out'(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None)) return! in'() - } |> run =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1) + } |> Async.RunSynchronously =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1) let emit = (ofStreams streams).Emit (emit.Id, emit.Stream) =! ("2651792242051038370","MaybeString") emit.Tuple.Count =! 1 @@ -270,10 +269,10 @@ let ``rw Nullable tuple``() = let t = NullableGuid(Nullable guid) - job { - do! out'(Emit(t,Some "2651792242051038370",[],"NullableGuid",None,None)) + async { + out'(Emit(t,Some "2651792242051038370",[],"NullableGuid",None,None)) return! in'() - } |> run =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","NullableGuid",1) + } |> Async.RunSynchronously =! InCommand.Tuple(t,"2651792242051038370","AddOneBolt","NullableGuid",1) let emit = (ofStreams streams).Emit (emit.Id, emit.Stream) =! ("2651792242051038370","NullableGuid") emit.Tuple.Count =! 1 @@ -287,12 +286,12 @@ let ``roundtrip throughput``() = let (_,out') = ProtoIO.startWith (mem,mem) syncOut ignore let sw = System.Diagnostics.Stopwatch.StartNew() - job { + async { for i in 1..count do - do! Emit(justFields,Some "2651792242051038370",[],"JustFields",None,None) |> out' - let in':unit->Job> = reverseIn mem "AddOneBolt" 1 + Emit(justFields,Some "2651792242051038370",[],"JustFields",None,None) |> out' + let in':unit->Async> = reverseIn mem "AddOneBolt" 1 for i in 1..count do - do! in'() |> Job.Ignore - } |> run + do! in'() |> Async.Ignore + } |> Async.RunSynchronously sw.Stop() printf "[Proto] Ellapsed: %dms, %f/s\n" sw.ElapsedMilliseconds ((float count)/sw.Elapsed.TotalSeconds) diff --git a/src/FsShelter.Tests/TestTopology.fs b/src/FsShelter.Tests/TestTopology.fs index 4769551..6eb77f2 100644 --- a/src/FsShelter.Tests/TestTopology.fs +++ b/src/FsShelter.Tests/TestTopology.fs @@ -4,7 +4,6 @@ //defines the spout and bolt open System open System.Collections.Generic -open Hopac type Number = { x : int32 } @@ -30,25 +29,25 @@ type World = /// numbers spout - produces messages let numbers (world : World) = - job { + async { return Some(string(Threading.Interlocked.Increment &world.count.contents), Original { x = world.rnd.Next(0, 100) }) } /// split bolt - consumes and emits messages -let split (input,(emit:_->Job<_>)) = - job { - do! match input with - | Original { x = x } -> - match x % 2 with - | 0 -> Even ({x=x}, {str="even"}) - | _ -> Odd ({x=x}, "odd" ) - | _ -> failwithf "unexpected input: %A" input - |> emit +let split (input,emit) = + async { + match input with + | Original { x = x } -> + match x % 2 with + | 0 -> Even ({x=x}, {str="even"}) + | _ -> Odd ({x=x}, "odd" ) + | _ -> failwithf "unexpected input: %A" input + |> emit } /// terminating bolt - consumes messages let resultBolt (info,input) = - job { + async { match input with | Even ({x = x}, {str=str}) | Odd ({x = x},str) -> info (sprintf "Got %A" input) diff --git a/src/FsShelter/DSL.fs b/src/FsShelter/DSL.fs index a7b38e4..4d98510 100644 --- a/src/FsShelter/DSL.fs +++ b/src/FsShelter/DSL.fs @@ -172,15 +172,15 @@ module private Parsers = [] [] module DSL = - open Hopac + open Multilang open Topology open Dispatch open FSharp.Quotations /// spout function signature - type Next<'a,'t> = 'a->Job<'t option> + type Next<'a,'t> = 'a->Async<'t option> /// bolt function signature - type Consume<'a> = 'a->Job + type Consume<'a> = 'a->Async /// emit signature type Emit<'t> = 't->unit /// ack signature @@ -260,41 +260,41 @@ module DSL = type Shuffle = static member on ([] case:Expr<_->'t>):bool->ComponentId->ComponentId->Stream<'t> = fun anchor src dst -> - { Grouping = Grouping.Shuffle - Src = src - Dst = dst - Anchoring = anchor - Schema = Parsers.findCase case |> TupleSchema.toNames } + {Grouping = Grouping.Shuffle + Src = src + Dst = dst + Anchoring = anchor + Schema = Parsers.findCase case |> TupleSchema.toNames} /// define all grouping type All = static member on ([] case:Expr<_->'t>):bool->ComponentId->ComponentId->Stream<'t> = fun anchor src dst -> - { Grouping = Grouping.All - Src = src - Dst = dst - Anchoring = anchor - Schema = Parsers.findCase case |> TupleSchema.toNames} + {Grouping = Grouping.All + Src = src + Dst = dst + Anchoring = anchor + Schema = Parsers.findCase case |> TupleSchema.toNames} /// define direct grouping type Direct = static member on ([] case:Expr<_->'t>):bool->ComponentId->ComponentId->Stream<'t> = fun anchor src dst -> - { Grouping = Grouping.Direct - Src = src - Dst = dst - Anchoring = anchor - Schema = Parsers.findCase case |> TupleSchema.toNames} + {Grouping = Grouping.Direct + Src = src + Dst = dst + Anchoring = anchor + Schema = Parsers.findCase case |> TupleSchema.toNames} /// define fields grouping type Group = static member by ([] select:Expr<'t->'p>):bool->ComponentId->ComponentId->Stream<'t> = fun anchor src dst -> - { Grouping = Parsers.toGroup select - Src = src - Dst = dst - Anchoring = anchor - Schema = Parsers.findCase select |> TupleSchema.toNames} + {Grouping = Parsers.toGroup select + Src = src + Dst = dst + Anchoring = anchor + Schema = Parsers.findCase select |> TupleSchema.toNames} // let inline (!*>) (anchor:bool) (case:Expr<_->'t>) = // all.on case anchor diff --git a/src/FsShelter/Dispatch.fs b/src/FsShelter/Dispatch.fs index 305b473..33a81f2 100644 --- a/src/FsShelter/Dispatch.fs +++ b/src/FsShelter/Dispatch.fs @@ -4,107 +4,117 @@ module FsShelter.Dispatch open System open FsShelter.Multilang open System.Threading -open Hopac -open Hopac.Infixes let private log out' level msg = Log(msg, level) |> out' -let mkSerialNext (next:_->#Job<_>) args (out:_->Job<_>) toEmit = - let ch = Ch() - ch ^=> fun _ -> job { - do! next args >>= (function Some t -> toEmit t |> out | _ -> Job.result ()) - do! Sync |> out - } - <|> Alt.never () - |> Job.foreverServer |> run - Ch.give ch +let mkSerialNext next args out toEmit = + let mutable err = None + let event = new ManualResetEvent(false) + let rec loop() = + async { + try + let! _ = Async.AwaitWaitHandle event + let! t = next args + event.Reset() |> ignore + t |> Option.iter (toEmit >> out) + Sync |> out + with + ex -> err <- Some ex + return! loop() + } + loop () |> Async.Start + fun () -> + match err with + | Some e -> raise e + | _ -> event.Set() |> ignore /// Dispatch spout commands and handle retries -let reliableSpoutLoop mkArgs mkAcker next getStream (in':unit->Job<_>, out':_->Job<_>) conf = - let args = mkArgs (log out') conf - let ack, nack = mkAcker args - let callNext = mkSerialNext next args out' (fun (tid, tuple) -> Emit(tuple, Some tid, [], (getStream tuple), None, None)) - job { - let! msg = in'() - match msg with - | Next -> - do! callNext() - | Ack tid -> - ack tid - do! Sync |> out' - | Nack tid -> - nack tid - do! Sync |> out' - | Activate | Deactivate -> // ignore for now - do! Sync |> out' - | _ -> failwithf "Unexpected command: %A" msg - } |> Job.foreverServer +let reliableSpoutLoop mkArgs mkAcker next getStream (in', out') conf = + async { + let args = mkArgs (log out') conf + let ack, nack = mkAcker args + let callNext = mkSerialNext next args out' (fun (tid, tuple) -> Emit(tuple, Some tid, [], (getStream tuple), None, None)) + while true do + let! msg = in'() + match msg with + | Next -> + callNext() + | Ack tid -> + ack tid + Sync |> out' + | Nack tid -> + nack tid + Sync |> out' + | Activate | Deactivate -> // ignore for now + Sync |> out' + | _ -> failwithf "Unexpected command: %A" msg + } /// Dispatch commands for spouts that don't provide unique ids to emitted tuples -let unreliableSpoutLoop mkArgs next getStream (in':unit->Job<_>, out':_->Job<_>) conf = - let args = mkArgs (log out') conf - let callNext = mkSerialNext next args out' (fun tuple -> Emit(tuple, None, [], (getStream tuple), None, None)) - job { - let! msg = in'() - match msg with - | Next -> - do! callNext() - | Ack _ | Nack _ - | Activate | Deactivate -> // ignore for now - do! Sync |> out' - | _ -> failwithf "Unexpected command: %A" msg - } |> Job.foreverServer +let unreliableSpoutLoop mkArgs next getStream (in', out') conf = + async { + let args = mkArgs (log out') conf + let callNext = mkSerialNext next args out' (fun tuple -> Emit(tuple, None, [], (getStream tuple), None, None)) + while true do + let! msg = in'() + match msg with + | Next -> + callNext() + | Ack _ | Nack _ + | Activate | Deactivate -> // ignore for now + Sync |> out' + | _ -> failwithf "Unexpected command: %A" msg + } /// Dispatch bolt commands and auto ack/nack handled messages -let autoAckBoltLoop mkArgs consume (getAnchors,act,deact) getStream (in':unit->Job<_>, out':_->Job<_>) conf = - let args = mkArgs (log out') conf - let unanchoredEmit t = Emit(t, None, [], (getStream t), None, None) |> out' - job { - let! msg = in'() - match msg with - | Activate when Option.isNone act -> () - | Deactivate when Option.isNone deact -> () - | Activate -> - let! res = consume (args act.Value unanchoredEmit) |> Job.catch - match res with - | Choice1Of2 _ -> () - | Choice2Of2 ex -> - do! Error("autoBoltRunner: ", ex) |> out' - | Deactivate -> - let! res = consume (args deact.Value unanchoredEmit) |> Job.catch - match res with - | Choice1Of2 _ -> () - | Choice2Of2 ex -> - do! Error("autoBoltRunner: ", ex) |> out' - | Heartbeat -> - do! Sync |> out' - | Tuple(tuple, id, src, stream, task) -> - let emit t = Emit(t, None, getAnchors (src,stream) id, (getStream t), None, None) |> out' - let! res = consume (args tuple emit) |> Job.catch - match res with - | Choice1Of2 _ -> - do! Ok id |> out' - | Choice2Of2 ex -> - do! Fail id |> out' - do! Error("autoBoltRunner: ", ex) |> out' - | _ -> failwithf "Unexpected command: %A" msg - } |> Job.foreverServer +let autoAckBoltLoop mkArgs consume (getAnchors,act,deact) getStream (in', out') conf = + async { + let args = mkArgs (log out') conf + let unanchoredEmit t = Emit(t, None, [], (getStream t), None, None) |> out' + while true do + let! msg = in'() + match msg with + | Activate when Option.isNone act -> () + | Deactivate when Option.isNone deact -> () + | Activate -> + let! res = consume (args act.Value unanchoredEmit) |> Async.Catch + match res with + | Choice1Of2 _ -> () + | Choice2Of2 ex -> + Error("autoBoltRunner: ", ex) |> out' + | Deactivate -> + let! res = consume (args deact.Value unanchoredEmit) |> Async.Catch + match res with + | Choice1Of2 _ -> () + | Choice2Of2 ex -> + Error("autoBoltRunner: ", ex) |> out' + | Heartbeat -> Sync |> out' + | Tuple(tuple, id, src, stream, task) -> + let emit t = Emit(t, None, getAnchors (src,stream) id, (getStream t), None, None) |> out' + let! res = consume (args tuple emit) |> Async.Catch + match res with + | Choice1Of2 _ -> Ok id + | Choice2Of2 ex -> + Fail id |> out' + Error("autoBoltRunner: ", ex) + |> out' + | _ -> failwithf "Unexpected command: %A" msg + } /// Dispatch bolt commands and auto-nack all incoming messages -let autoNackBoltLoop mkArgs consume (in':unit->Job<_>, out':_->Job<_>) conf = - let args = mkArgs (log out') conf - job { - let! msg = in'() - match msg with - | Activate | Deactivate -> () // ignore for now - | Heartbeat -> - do! Sync |> out' - | Tuple(tuple, id, src, stream, task) -> - let! res = consume (args tuple) |> Job.catch - match res with - | Choice1Of2 _ -> () - | Choice2Of2 ex -> - do! Error("autoBoltRunner: ", ex) |> out' - do! Fail id |> out' - | _ -> failwithf "Unexpected command: %A" msg - } |> Job.foreverServer +let autoNackBoltLoop mkArgs consume (in', out') conf = + async { + let args = mkArgs (log out') conf + while true do + let! msg = in'() + match msg with + | Activate | Deactivate -> () // ignore for now + | Heartbeat -> Sync |> out' + | Tuple(tuple, id, src, stream, task) -> + let! res = consume (args tuple) |> Async.Catch + match res with + | Choice1Of2 _ -> () + | Choice2Of2 ex -> Error("autoBoltRunner: ", ex) |> out' + Fail id |> out' + | _ -> failwithf "Unexpected command: %A" msg + } diff --git a/src/FsShelter/FsShelter.fsproj b/src/FsShelter/FsShelter.fsproj index a92363d..a4868a4 100644 --- a/src/FsShelter/FsShelter.fsproj +++ b/src/FsShelter/FsShelter.fsproj @@ -14,7 +14,7 @@ - + diff --git a/src/FsShelter/IO/Common.fs b/src/FsShelter/IO/Common.fs index b296160..23d23a9 100644 --- a/src/FsShelter/IO/Common.fs +++ b/src/FsShelter/IO/Common.fs @@ -1,6 +1,7 @@ namespace FsShelter.IO open System +open System.Reflection open System.Runtime.CompilerServices [] @@ -8,15 +9,15 @@ do() module internal Common = open System.IO - open Hopac let serialOut = - let mb = Mailbox() - job { - let! (write:unit->unit) = Mailbox.take mb - write() - } |> Job.foreverServer |> start - Mailbox.send mb + let mb = MailboxProcessor.Start (fun inbox -> + async { + while true do + let! (write:unit->unit) = inbox.Receive() + write() + }) + mb.Post open MBrace.FsPickler open FSharp.Reflection diff --git a/src/FsShelter/IO/JsonIO.fs b/src/FsShelter/IO/JsonIO.fs index 6040bb1..299a84f 100644 --- a/src/FsShelter/IO/JsonIO.fs +++ b/src/FsShelter/IO/JsonIO.fs @@ -8,7 +8,6 @@ open System.IO open Newtonsoft.Json open TupleSchema open Newtonsoft.Json.Linq -open Hopac [] let internal END = "\nend\n" @@ -151,18 +150,9 @@ let private isMono() = not <| isNull (System.Type.GetType("Mono.Runtime")) let startWith (stdin:TextReader,stdout:TextWriter) syncOut (log:Task.Log) :Topology.IO<'t> = let write (text:string) = log (fun _ -> "> "+text) - syncOut (fun () -> stdout.Write(text.Replace("\n","\\n")) - stdout.Write(END) + syncOut (fun () -> stdout.Write(text.Replace("\n","\\n")+END) stdout.Flush()) - if isMono() then - () //on osx/linux under mono, set env LANG=en_US.UTF-8 - else - if not Console.IsInputRedirected then - Console.InputEncoding <- Encoding.UTF8 - if not Console.IsOutputRedirected then - Console.OutputEncoding <- Encoding.UTF8 - let streamRW = TupleSchema.mapSchema<'t>() |> Map.ofArray let out' cmd = @@ -178,10 +168,10 @@ let startWith (stdin:TextReader,stdout:TextWriter) syncOut (log:Task.Log) :Topol let findConstructor stream = streamRW |> Map.find stream |> fst - let in' ():Job> = - job { - let! msg = stdin.ReadLineAsync |> Job.fromTask - let! term = stdin.ReadLineAsync |> Job.fromTask + let in' ():Async> = + async { + let! msg = stdin.ReadLineAsync() |> Async.AwaitTask + let! term = stdin.ReadLineAsync() |> Async.AwaitTask log (fun _ -> "< "+msg+term) return match msg,term with | msg,"end" when not <| String.IsNullOrEmpty msg -> toCommand findConstructor msg diff --git a/src/FsShelter/IO/ProtoIO.fs b/src/FsShelter/IO/ProtoIO.fs index 6a53b03..63b3add 100644 --- a/src/FsShelter/IO/ProtoIO.fs +++ b/src/FsShelter/IO/ProtoIO.fs @@ -8,7 +8,6 @@ open Google.Protobuf.WellKnownTypes open Prolucid.ProtoShell open TupleSchema open System.IO -open Hopac type internal V = Messages.Variant type internal VL = WellKnownTypes.Value @@ -183,8 +182,8 @@ let startWith (stdin:#Stream,stdout:#Stream) syncOut (log:Task.Log) :Topology.IO let findConstructor stream = streamRW |> Map.find stream |> fst - let in' ():Job> = - job { + let in' ():Async> = + async { let msg = Messages.StormMsg.Parser.ParseDelimitedFrom stdin log (fun _ -> sprintf "< %A" msg) return toCommand log findConstructor msg diff --git a/src/FsShelter/Task.fs b/src/FsShelter/Task.fs index a7ee500..7b6ec01 100644 --- a/src/FsShelter/Task.fs +++ b/src/FsShelter/Task.fs @@ -5,7 +5,6 @@ open System.IO open FsShelter.Multilang open FsShelter.Topology open System -open Hopac /// Logger signature type Log = (unit -> string) -> unit @@ -44,7 +43,7 @@ let ofTopology (t : Topology<'t>) compId = /// Reads the handshake and runs the specified task with a logger let runWith (startLog : int->Log) (io : Log -> IO<'t>) (task : Task<'t>) = - job { + async { let pid = pid() let log = startLog pid let (in', out') = io log @@ -55,19 +54,19 @@ let runWith (startLog : int->Log) (io : Log -> IO<'t>) (task : Task<'t>) = match msg with | Handshake(cfg, pidDir, context) -> pid |> createPid pidDir + Pid pid |> out' (cfg, context.ComponentId) | _ -> failwithf "Expected handshake, got: %A" msg - do! Pid pid |> out' log(fun _ -> sprintf "running %s..." compId) return! task compId (in', out') cfg with ex -> let msg = Exception.toString ex log (fun _ -> msg) - do! Log(msg, LogLevel.Error) |> out' + Log(msg, LogLevel.Error) |> out' Threading.Thread.Sleep 1000 Environment.Exit 1 } - |> run + |> Async.RunSynchronously /// Reads the handshake and runs the specified task let run (io : Log -> IO<'t>) (task : Task<'t>) = runWith (fun _ -> ignore) io task diff --git a/src/FsShelter/Topology.fs b/src/FsShelter/Topology.fs index bc436b9..5715dd9 100644 --- a/src/FsShelter/Topology.fs +++ b/src/FsShelter/Topology.fs @@ -3,7 +3,6 @@ /// Topology data model module Topology = open Multilang - open Hopac /// Tuple id type TupleId = string @@ -14,9 +13,9 @@ module Topology = /// Signature for anchoring implementation type ToAnchors = TupleId->TupleId list /// Signature for pluggable IO implementation - type IO<'t> = (unit->Job>)*(OutCommand<'t>->Job) + type IO<'t> = (unit->Async>)*(OutCommand<'t>->unit) /// Signature for a final runnable component - type Runnable<'t> = IO<'t>->Conf->Job + type Runnable<'t> = IO<'t>->Conf->Async /// Storm Componend abstraction type Component<'t> = diff --git a/src/FsShelter/paket.references b/src/FsShelter/paket.references index 8ed53a1..8d4b7dc 100644 --- a/src/FsShelter/paket.references +++ b/src/FsShelter/paket.references @@ -2,5 +2,4 @@ FSharp.Core newtonsoft.json apache-thrift-netcore Google.Protobuf -FsPickler -Hopac \ No newline at end of file +FsPickler \ No newline at end of file